Optionalconfig: Partial<WorkerPoolConfig>Lazily creates workers and enables main-thread fallback on failure.
async initialize(): Promise<void> {
if (this.initialized) {
return;
}
if (!this.config.enableWorkers) {
console.warn(
"[WorkerPool] ⚠️ Workers disabled, using main thread fallback",
);
this.useFallback = true;
this.initialized = true;
return;
}
try {
console.info("[WorkerPool] 📦 Initializing worker pool...");
for (let i = 0; i < this.config.maxWorkers; i++) {
try {
const worker = new Worker();
worker.onmessage = (event: MessageEvent<WorkerMessage>) => {
this.handleWorkerMessage(i, event.data);
};
worker.onerror = (error: ErrorEvent) => {
console.error(`[WorkerPool] ❌ Worker ${i} error:`, error);
this.handleWorkerError(i, error);
};
this.workers.push(worker);
this.workerBusy.push(false);
console.debug(`[WorkerPool] 🔧 Worker ${i} created`);
} catch (error) {
console.error(`[WorkerPool] ❌ Failed to create worker ${i}:`, error);
throw error;
}
}
this.initialized = true;
console.info(
`[WorkerPool] ✅ Initialized with ${this.workers.length} workers`,
);
} catch (error) {
console.error("[WorkerPool] ❌ Failed to initialize worker pool:", error);
this.terminate();
if (this.config.fallbackToMainThread) {
console.warn("[WorkerPool] ⚠️ Falling back to main thread execution");
this.useFallback = true;
this.initialized = true;
} else {
throw error;
}
}
}
Ensures the worker pool is initialized and ready, avoiding redundant initialization calls.
Registers a task so responses can be routed correctly.
Clears tracking for a task and frees its worker.
completeTask(taskId: string): WorkerTask | undefined {
const task = this.tasks.get(taskId);
if (task?.workerIndex !== undefined) {
this.workerBusy[task.workerIndex] = false;
}
// Clear any pending cancel timeout to prevent it from firing after task completion
if (task?.cancelTimeoutHandle) {
clearTimeout(task.cancelTimeoutHandle);
}
this.tasks.delete(taskId);
return task;
}
Requests cancellation for a task and forces completion on timeout.
cancelTask(taskId: string): void {
const task = this.tasks.get(taskId);
if (task?.workerIndex !== undefined) {
// Mark task as cancelled
task.isCancelled = true;
// Post CANCEL message to worker
this.workers[task.workerIndex].postMessage({
type: "CANCEL",
payload: { taskId },
});
// Set a timeout fallback to ensure task doesn't hang indefinitely
// If no terminal message within 5 seconds, force completion
const timeoutHandle = setTimeout(() => {
const stillExistingTask = this.tasks.get(taskId);
if (stillExistingTask) {
const timeoutContext = {
taskId,
processedItems: stillExistingTask.processedItems ?? 0,
totalItems: stillExistingTask.totalItems ?? 0,
};
let resolution: Record<string, unknown>;
if (stillExistingTask.buildTimeoutResult) {
resolution = stillExistingTask.buildTimeoutResult(timeoutContext);
} else if (stillExistingTask.cancellationMessageType) {
resolution = this.formatCancellationResult(
stillExistingTask.cancellationMessageType,
timeoutContext,
);
} else {
resolution = timeoutContext;
}
stillExistingTask.resolve(resolution);
this.completeTask(taskId);
console.warn(
`[WorkerPool] ⏱️ Cancelled task ${taskId} timed out after 5s, forcing cancellation completion`,
);
}
}, 5000);
// Store timeout handle for potential cleanup
task.cancelTimeoutHandle = timeoutHandle;
}
}
Returns snapshot metrics for workers and active tasks.
Coordinates worker lifecycle, task routing, cancellation, and fallbacks.
Source