StaticgetReturns the singleton instance, creating it on first access.
Optionalconfig: { maxWorkers?: number; enableWorkers?: boolean; fallbackToMainThread?: boolean }Optional worker pool configuration.
Shared worker pool instance.
Initializes the underlying generic worker pool.
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
const pool = getGenericWorkerPool({
maxWorkers: this.config.maxWorkers,
enableWorkers: this.config.enableWorkers,
fallbackToMainThread: this.config.fallbackToMainThread,
});
await pool.initialize();
this.initialized = true;
}
Filters and optionally aggregates reading history using workers when available. Falls back to main thread if workers are unavailable or disabled.
Reading history to filter.
Inclusive date range to consider.
OptionalaggregationType: "none" | "daily" | "weekly"Aggregation granularity or "none".
OptionalprogressCallback: (stage: string, progress: number, message: string) => voidOptional callback for progress updates.
OptionaltaskId: stringOptional existing task identifier.
Filtered result including stats and timing.
async filterReadingHistory(
history: ReadingHistory,
dateRange: {
start: Date | number;
end: Date | number;
},
aggregationType?: "daily" | "weekly" | "none",
progressCallback?: (
stage: string,
progress: number,
message: string,
) => void,
taskId?: string,
): Promise<ReadingHistoryFilterResult> {
// Initialize pool if not already done
if (!this.initialized) {
await this.initialize();
}
const mainTaskId = taskId || generateUUID();
const pool = getGenericWorkerPool({
maxWorkers: this.config.maxWorkers,
enableWorkers: this.config.enableWorkers,
fallbackToMainThread: this.config.fallbackToMainThread,
});
// Ensure pool is initialized
await pool.ensureInitialized();
// Normalize date range to timestamps
const startMs =
dateRange.start instanceof Date
? dateRange.start.getTime()
: dateRange.start;
const endMs =
dateRange.end instanceof Date ? dateRange.end.getTime() : dateRange.end;
return new Promise<ReadingHistoryFilterResult>((resolve, reject) => {
// If pool is not available or workers are disabled, use main thread
if (!pool.isAvailable() || !this.config.enableWorkers) {
this.filterReadingHistoryMainThread(
history,
startMs,
endMs,
aggregationType || "none",
)
.then(resolve)
.catch(reject);
return;
}
// Try to get a worker
const workerIndex = pool.selectWorker();
if (workerIndex === -1) {
// No workers available, fall back to main thread
this.filterReadingHistoryMainThread(
history,
startMs,
endMs,
aggregationType || "none",
)
.then(resolve)
.catch(reject);
return;
}
const worker = pool.getWorker(workerIndex);
if (!worker) {
// Worker retrieval failed, fall back to main thread
this.filterReadingHistoryMainThread(
history,
startMs,
endMs,
aggregationType || "none",
)
.then(resolve)
.catch(reject);
return;
}
// Register task with pool
const task: WorkerTask = {
taskId: mainTaskId,
resolve: (result: Record<string, unknown>) => {
const typedResult = result as unknown as ReadingHistoryFilterResult;
resolve({
filteredEntries: typedResult.filteredEntries,
stats: typedResult.stats,
aggregatedData: typedResult.aggregatedData,
timing: typedResult.timing,
});
},
reject,
isCancelled: false,
workerIndex,
onProgress: (message: WorkerMessage) => {
if (
message.type === "READING_HISTORY_FILTER_PROGRESS" &&
progressCallback &&
typeof message.payload === "object" &&
message.payload !== null
) {
const payload = message.payload as {
stage?: string;
progress?: number;
message?: string;
};
progressCallback(
payload.stage ?? "progress",
typeof payload.progress === "number" ? payload.progress : 0,
payload.message ?? "",
);
}
},
};
pool.registerTask(mainTaskId, task);
// Dispatch task to worker
worker.postMessage({
type: "READING_HISTORY_FILTER",
payload: {
taskId: mainTaskId,
history,
dateRange: {
start: startMs,
end: endMs,
},
aggregationType: aggregationType || "none",
},
});
console.info(
`[ReadingHistoryWorkerPool] Dispatched reading history filter to worker ${workerIndex}`,
);
});
}
Returns statistics for the underlying generic worker pool.
Pool statistics including workers and active tasks.
Singleton-backed worker pool for reading history filtering and aggregation.
Source