Initializes the worker pool for title normalization.
async initialize(): Promise<void> {
if (this.initialized) {
console.debug("[TitleNormalizationWorkerPool] â Already initialized");
return;
}
console.info("[TitleNormalizationWorkerPool] đ§ Initializing pool...");
const pool = getGenericWorkerPool();
await pool.initialize();
this.initialized = true;
console.info("[TitleNormalizationWorkerPool] â
Pool initialized");
}
Indicates whether the underlying pool is initialized and has workers.
True if normalization workers are available.
isAvailable(): boolean {
const pool = getGenericWorkerPool();
const available = this.initialized && pool.isAvailable();
if (!available) {
console.warn(
`[TitleNormalizationWorkerPool] â ī¸ Pool not available (initialized: ${this.initialized}, pool available: ${pool.isAvailable()})`,
);
}
return available;
}
Normalizes titles using specified algorithms on a worker.
Titles to normalize.
Algorithms to apply for normalization.
OptionalprogressCallback: NormalizationProgressCallbackOptional callback for progress updates.
OptionaltaskId: stringOptional task identifier.
Normalization cache result with timing and deltas.
async normalizeTitles(
titles: string[],
algorithms: Array<"normalizeForMatching" | "processTitle"> = [
"normalizeForMatching",
],
progressCallback?: NormalizationProgressCallback,
taskId?: string,
): Promise<NormalizationCacheResult> {
if (!this.initialized) {
console.debug("[TitleNormalizationWorkerPool] đ Auto-initializing pool");
await this.initialize();
}
const pool = getGenericWorkerPool();
const effectiveTaskId = taskId || this.generateTaskId();
console.info(
`[TitleNormalizationWorkerPool] đ normalizeTitles called: ${titles.length} titles, algorithms: [${algorithms.join(", ")}], taskId: ${effectiveTaskId}`,
);
return new Promise((resolve, reject) => {
if (!pool.isAvailable()) {
console.error(
"[TitleNormalizationWorkerPool] â Worker pool not available",
);
reject(
new Error(
"Worker pool is not available for title normalization. Check if workers are enabled.",
),
);
return;
}
const workerIndex = pool.selectWorker();
if (workerIndex === -1) {
console.error(
"[TitleNormalizationWorkerPool] â No available workers (all busy)",
);
reject(new Error("No available workers for title normalization"));
return;
}
console.debug(
`[TitleNormalizationWorkerPool] đ Selected worker index: ${workerIndex}`,
);
const worker = pool.getWorker(workerIndex);
if (!worker) {
console.error(
`[TitleNormalizationWorkerPool] â Failed to acquire worker at index ${workerIndex}`,
);
reject(new Error("Failed to acquire worker for title normalization"));
return;
}
// Register task with progress handler
const task = {
taskId: effectiveTaskId,
type: "normalization" as const,
resolve: (result: unknown) => {
const typedResult = result as Partial<NormalizationCacheResult>;
console.info(
`[TitleNormalizationWorkerPool] â
Task ${effectiveTaskId} resolved with result`,
);
resolve({
caches: typedResult.caches || {},
deltas: typedResult.deltas,
timing: typedResult.timing || {
processingTimeMs: 0,
totalTitlesProcessed: 0,
},
});
},
reject: (error: Error) => {
console.error(
`[TitleNormalizationWorkerPool] â Task ${effectiveTaskId} rejected:`,
error.message,
);
reject(error);
},
isCancelled: false,
onProgress: (message: unknown) => {
const msgWithType = message as {
type?: string;
payload?: { algorithm?: string; current?: number; total?: number };
};
if (
msgWithType.type === "TITLE_NORMALIZATION_PROGRESS" &&
progressCallback &&
msgWithType.payload
) {
const { algorithm, current, total } = msgWithType.payload;
if (
typeof algorithm === "string" &&
typeof current === "number" &&
typeof total === "number"
) {
console.debug(
`[TitleNormalizationWorkerPool] đ Progress for task ${effectiveTaskId} - ${algorithm}: ${current}/${total}`,
);
progressCallback(algorithm, current, total);
}
}
},
workerIndex,
};
pool.registerTask(effectiveTaskId, task);
console.debug(
`[TitleNormalizationWorkerPool] đ Task ${effectiveTaskId} registered with worker ${workerIndex}`,
);
// Send normalization message to worker
const message: TitleNormalizationMessage = {
type: "TITLE_NORMALIZATION",
payload: {
taskId: effectiveTaskId,
titles,
algorithms,
},
};
try {
worker.postMessage(message);
console.info(
`[TitleNormalizationWorkerPool] đ¤ Dispatched title normalization to worker ${workerIndex}: ${titles.length} titles, taskId: ${effectiveTaskId}`,
);
} catch (error) {
console.error(
`[TitleNormalizationWorkerPool] â Failed to post message to worker:`,
error,
);
pool.completeTask(effectiveTaskId);
reject(error);
}
});
}
Cancels an in-progress normalization task.
Identifier of the task to cancel.
Coordinates title normalization tasks via the generic worker pool. Provides cache seeding and incremental update support.
Source