Singleton-backed worker pool for reading history filtering and aggregation.

export class ReadingHistoryWorkerPool {
private static instance: ReadingHistoryWorkerPool | null = null;
private initialized = false;
private readonly config: {
maxWorkers: number;
enableWorkers: boolean;
fallbackToMainThread: boolean;
};

private constructor(config?: {
maxWorkers?: number;
enableWorkers?: boolean;
fallbackToMainThread?: boolean;
}) {
this.config = {
maxWorkers: config?.maxWorkers ?? 4,
enableWorkers: config?.enableWorkers ?? true,
fallbackToMainThread: config?.fallbackToMainThread ?? true,
};
}

/**
* Returns the singleton instance, creating it on first access.
* @param config - Optional worker pool configuration.
* @returns Shared worker pool instance.
* @source
*/
static getInstance(config?: {
maxWorkers?: number;
enableWorkers?: boolean;
fallbackToMainThread?: boolean;
}): ReadingHistoryWorkerPool {
this.instance ??= new ReadingHistoryWorkerPool(config);
return this.instance;
}

/**
* Initializes the underlying generic worker pool.
* @source
*/
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
const pool = getGenericWorkerPool({
maxWorkers: this.config.maxWorkers,
enableWorkers: this.config.enableWorkers,
fallbackToMainThread: this.config.fallbackToMainThread,
});
await pool.initialize();
this.initialized = true;
}

/**
* Filters and optionally aggregates reading history using workers when available.
* Falls back to main thread if workers are unavailable or disabled.
* @param history - Reading history to filter.
* @param dateRange - Inclusive date range to consider.
* @param aggregationType - Aggregation granularity or "none".
* @param progressCallback - Optional callback for progress updates.
* @param taskId - Optional existing task identifier.
* @returns Filtered result including stats and timing.
* @source
*/
async filterReadingHistory(
history: ReadingHistory,
dateRange: {
start: Date | number;
end: Date | number;
},
aggregationType?: "daily" | "weekly" | "none",
progressCallback?: (
stage: string,
progress: number,
message: string,
) => void,
taskId?: string,
): Promise<ReadingHistoryFilterResult> {
// Initialize pool if not already done
if (!this.initialized) {
await this.initialize();
}

const mainTaskId = taskId || generateUUID();
const pool = getGenericWorkerPool({
maxWorkers: this.config.maxWorkers,
enableWorkers: this.config.enableWorkers,
fallbackToMainThread: this.config.fallbackToMainThread,
});

// Ensure pool is initialized
await pool.ensureInitialized();

// Normalize date range to timestamps
const startMs =
dateRange.start instanceof Date
? dateRange.start.getTime()
: dateRange.start;
const endMs =
dateRange.end instanceof Date ? dateRange.end.getTime() : dateRange.end;

return new Promise<ReadingHistoryFilterResult>((resolve, reject) => {
// If pool is not available or workers are disabled, use main thread
if (!pool.isAvailable() || !this.config.enableWorkers) {
this.filterReadingHistoryMainThread(
history,
startMs,
endMs,
aggregationType || "none",
)
.then(resolve)
.catch(reject);
return;
}

// Try to get a worker
const workerIndex = pool.selectWorker();
if (workerIndex === -1) {
// No workers available, fall back to main thread
this.filterReadingHistoryMainThread(
history,
startMs,
endMs,
aggregationType || "none",
)
.then(resolve)
.catch(reject);
return;
}

const worker = pool.getWorker(workerIndex);
if (!worker) {
// Worker retrieval failed, fall back to main thread
this.filterReadingHistoryMainThread(
history,
startMs,
endMs,
aggregationType || "none",
)
.then(resolve)
.catch(reject);
return;
}

// Register task with pool
const task: WorkerTask = {
taskId: mainTaskId,
resolve: (result: Record<string, unknown>) => {
const typedResult = result as unknown as ReadingHistoryFilterResult;
resolve({
filteredEntries: typedResult.filteredEntries,
stats: typedResult.stats,
aggregatedData: typedResult.aggregatedData,
timing: typedResult.timing,
});
},
reject,
isCancelled: false,
workerIndex,
onProgress: (message: WorkerMessage) => {
if (
message.type === "READING_HISTORY_FILTER_PROGRESS" &&
progressCallback &&
typeof message.payload === "object" &&
message.payload !== null
) {
const payload = message.payload as {
stage?: string;
progress?: number;
message?: string;
};
progressCallback(
payload.stage ?? "progress",
typeof payload.progress === "number" ? payload.progress : 0,
payload.message ?? "",
);
}
},
};

pool.registerTask(mainTaskId, task);

// Dispatch task to worker
worker.postMessage({
type: "READING_HISTORY_FILTER",
payload: {
taskId: mainTaskId,
history,
dateRange: {
start: startMs,
end: endMs,
},
aggregationType: aggregationType || "none",
},
});

console.info(
`[ReadingHistoryWorkerPool] Dispatched reading history filter to worker ${workerIndex}`,
);
});
}

/**
* Filters and aggregates reading history on the main thread.
* @param history - Reading history to process.
* @param startMs - Start timestamp in milliseconds.
* @param endMs - End timestamp in milliseconds.
* @param aggregationType - Aggregation granularity or "none".
* @returns Filter result with stats and timing.
* @source
*/
private async filterReadingHistoryMainThread(
history: ReadingHistory,
startMs: number,
endMs: number,
aggregationType: "daily" | "weekly" | "none",
): Promise<ReadingHistoryFilterResult> {
const startTime = performance.now();

// Filter entries by date range
const filterStartTime = performance.now();
const filteredEntries = history.entries.filter(
(entry) => entry.timestamp >= startMs && entry.timestamp <= endMs,
);
const filteringTimeMs = performance.now() - filterStartTime;

// Compute statistics
const aggregationStartTime = performance.now();

// Count unique manga
const uniqueManga = new Set(filteredEntries.map((e) => e.mangaId));

// Count total chapters
const totalChapters = filteredEntries.reduce(
(sum, e) => sum + e.chaptersRead,
0,
);

// Count active days
const activeDays = new Set(
filteredEntries.map((e) => {
const date = new Date(e.timestamp);
return date.toISOString().split("T")[0];
}),
).size;

// Calculate average chapters per day
const averageChaptersPerDay =
activeDays > 0 ? Math.round((totalChapters / activeDays) * 100) / 100 : 0;

// Aggregate data if requested
let aggregatedData:
| Array<{
date: string;
chaptersRead: number;
entriesCount: number;
}>
| undefined;

if (aggregationType !== "none") {
const aggregationMap = new Map<
string,
{ chaptersRead: number; entriesCount: number }
>();

for (const entry of filteredEntries) {
let key: string;
if (aggregationType === "daily") {
const date = new Date(entry.timestamp);
key = date.toISOString().split("T")[0];
} else {
// Weekly
const date = new Date(entry.timestamp);
const day = date.getUTCDay();
const diff = date.getUTCDate() - day + (day === 0 ? -6 : 1);
const weekStart = new Date(
Date.UTC(date.getUTCFullYear(), date.getUTCMonth(), diff),
);
key = weekStart.toISOString().split("T")[0];
}

const current = aggregationMap.get(key) || {
chaptersRead: 0,
entriesCount: 0,
};
aggregationMap.set(key, {
chaptersRead: current.chaptersRead + entry.chaptersRead,
entriesCount: current.entriesCount + 1,
});
}

// Convert to sorted array
aggregatedData = Array.from(aggregationMap.entries())
.sort(([dateA], [dateB]) => dateA.localeCompare(dateB))
.map(([date, data]) => ({
date,
chaptersRead: data.chaptersRead,
entriesCount: data.entriesCount,
}));
}

const aggregationTimeMs = performance.now() - aggregationStartTime;
const totalTimeMs = performance.now() - startTime;

return {
filteredEntries,
stats: {
totalEntries: filteredEntries.length,
totalChaptersRead: totalChapters,
uniqueMangaCount: uniqueManga.size,
dateRange: {
start: startMs,
end: endMs,
},
activeDays,
averageChaptersPerDay,
},
aggregatedData,
timing: {
filteringTimeMs,
aggregationTimeMs:
aggregationType === "none" ? undefined : aggregationTimeMs,
totalTimeMs,
},
};
}

/**
* Cancels an in-flight filtering operation by task ID.
* @param taskId - Identifier of the worker task to cancel.
* @source
*/
cancelFilter(taskId: string): void {
const pool = getGenericWorkerPool();
pool.cancelTask(taskId);
}

/**
* Returns statistics for the underlying generic worker pool.
* @returns Pool statistics including workers and active tasks.
* @source
*/
getStats(): {
totalWorkers: number;
activeWorkers: number;
activeTasks: number;
} {
const pool = getGenericWorkerPool();
return pool.getStats();
}

/**
* Terminates the worker pool if needed. Currently a no-op.
* @source
*/
terminate(): void {
// No-op for now
}

/**
* Returns the number of currently available workers.
* @returns Count of available workers.
* @source
*/
getAvailableWorkerCount(): number {
const pool = getGenericWorkerPool();
return pool.getAvailableWorkerCount();
}
}

Methods

  • Returns the singleton instance, creating it on first access.

    Parameters

    • Optionalconfig: { maxWorkers?: number; enableWorkers?: boolean; fallbackToMainThread?: boolean }

      Optional worker pool configuration.

    Returns ReadingHistoryWorkerPool

    Shared worker pool instance.

      static getInstance(config?: {
    maxWorkers?: number;
    enableWorkers?: boolean;
    fallbackToMainThread?: boolean;
    }): ReadingHistoryWorkerPool {
    this.instance ??= new ReadingHistoryWorkerPool(config);
    return this.instance;
    }
  • Initializes the underlying generic worker pool.

    Returns Promise<void>

      async initialize(): Promise<void> {
    if (this.initialized) {
    return;
    }
    const pool = getGenericWorkerPool({
    maxWorkers: this.config.maxWorkers,
    enableWorkers: this.config.enableWorkers,
    fallbackToMainThread: this.config.fallbackToMainThread,
    });
    await pool.initialize();
    this.initialized = true;
    }
  • Filters and optionally aggregates reading history using workers when available. Falls back to main thread if workers are unavailable or disabled.

    Parameters

    • history: ReadingHistory

      Reading history to filter.

    • dateRange: { start: number | Date; end: number | Date }

      Inclusive date range to consider.

    • OptionalaggregationType: "none" | "daily" | "weekly"

      Aggregation granularity or "none".

    • OptionalprogressCallback: (stage: string, progress: number, message: string) => void

      Optional callback for progress updates.

    • OptionaltaskId: string

      Optional existing task identifier.

    Returns Promise<ReadingHistoryFilterResult>

    Filtered result including stats and timing.

      async filterReadingHistory(
    history: ReadingHistory,
    dateRange: {
    start: Date | number;
    end: Date | number;
    },
    aggregationType?: "daily" | "weekly" | "none",
    progressCallback?: (
    stage: string,
    progress: number,
    message: string,
    ) => void,
    taskId?: string,
    ): Promise<ReadingHistoryFilterResult> {
    // Initialize pool if not already done
    if (!this.initialized) {
    await this.initialize();
    }

    const mainTaskId = taskId || generateUUID();
    const pool = getGenericWorkerPool({
    maxWorkers: this.config.maxWorkers,
    enableWorkers: this.config.enableWorkers,
    fallbackToMainThread: this.config.fallbackToMainThread,
    });

    // Ensure pool is initialized
    await pool.ensureInitialized();

    // Normalize date range to timestamps
    const startMs =
    dateRange.start instanceof Date
    ? dateRange.start.getTime()
    : dateRange.start;
    const endMs =
    dateRange.end instanceof Date ? dateRange.end.getTime() : dateRange.end;

    return new Promise<ReadingHistoryFilterResult>((resolve, reject) => {
    // If pool is not available or workers are disabled, use main thread
    if (!pool.isAvailable() || !this.config.enableWorkers) {
    this.filterReadingHistoryMainThread(
    history,
    startMs,
    endMs,
    aggregationType || "none",
    )
    .then(resolve)
    .catch(reject);
    return;
    }

    // Try to get a worker
    const workerIndex = pool.selectWorker();
    if (workerIndex === -1) {
    // No workers available, fall back to main thread
    this.filterReadingHistoryMainThread(
    history,
    startMs,
    endMs,
    aggregationType || "none",
    )
    .then(resolve)
    .catch(reject);
    return;
    }

    const worker = pool.getWorker(workerIndex);
    if (!worker) {
    // Worker retrieval failed, fall back to main thread
    this.filterReadingHistoryMainThread(
    history,
    startMs,
    endMs,
    aggregationType || "none",
    )
    .then(resolve)
    .catch(reject);
    return;
    }

    // Register task with pool
    const task: WorkerTask = {
    taskId: mainTaskId,
    resolve: (result: Record<string, unknown>) => {
    const typedResult = result as unknown as ReadingHistoryFilterResult;
    resolve({
    filteredEntries: typedResult.filteredEntries,
    stats: typedResult.stats,
    aggregatedData: typedResult.aggregatedData,
    timing: typedResult.timing,
    });
    },
    reject,
    isCancelled: false,
    workerIndex,
    onProgress: (message: WorkerMessage) => {
    if (
    message.type === "READING_HISTORY_FILTER_PROGRESS" &&
    progressCallback &&
    typeof message.payload === "object" &&
    message.payload !== null
    ) {
    const payload = message.payload as {
    stage?: string;
    progress?: number;
    message?: string;
    };
    progressCallback(
    payload.stage ?? "progress",
    typeof payload.progress === "number" ? payload.progress : 0,
    payload.message ?? "",
    );
    }
    },
    };

    pool.registerTask(mainTaskId, task);

    // Dispatch task to worker
    worker.postMessage({
    type: "READING_HISTORY_FILTER",
    payload: {
    taskId: mainTaskId,
    history,
    dateRange: {
    start: startMs,
    end: endMs,
    },
    aggregationType: aggregationType || "none",
    },
    });

    console.info(
    `[ReadingHistoryWorkerPool] Dispatched reading history filter to worker ${workerIndex}`,
    );
    });
    }
  • Cancels an in-flight filtering operation by task ID.

    Parameters

    • taskId: string

      Identifier of the worker task to cancel.

    Returns void

      cancelFilter(taskId: string): void {
    const pool = getGenericWorkerPool();
    pool.cancelTask(taskId);
    }
  • Returns statistics for the underlying generic worker pool.

    Returns { totalWorkers: number; activeWorkers: number; activeTasks: number }

    Pool statistics including workers and active tasks.

      getStats(): {
    totalWorkers: number;
    activeWorkers: number;
    activeTasks: number;
    } {
    const pool = getGenericWorkerPool();
    return pool.getStats();
    }
  • Returns the number of currently available workers.

    Returns number

    Count of available workers.

      getAvailableWorkerCount(): number {
    const pool = getGenericWorkerPool();
    return pool.getAvailableWorkerCount();
    }