Worker message containing CSV chunk data.
Set tracking active task IDs.
Void; posts PROGRESS, CSV_COMPLETE, or ERROR.
export function handleCsvChunk(
message: CSVChunkMessage,
activeTasks: Set<string>,
): void {
const { taskId, chunk, isLastChunk } = message.payload;
const state = csvParserStates.get(taskId);
if (!state) {
if (!activeTasks.has(taskId)) {
return;
}
console.error(
`[Worker] Task ${taskId} not found - did you send CSV_START first?`,
);
globalThis.postMessage({
type: "ERROR",
payload: {
taskId,
error: {
message: `Task ${taskId} not found. Did you send CSV_START first?`,
},
},
});
return;
}
// Serialize chunk processing by chaining to the existing promise
state.processingPromise = (state.processingPromise ?? Promise.resolve())
.then(() => processCsvChunkIncremental(chunk, state))
.then(() => {
if (isLastChunk) {
// Finalize parsing and send complete results
return finalizeCsvParsing(state);
}
})
.then(() => {
if (isLastChunk) {
activeTasks.delete(taskId);
csvParserStates.delete(taskId);
}
})
.catch((error) => {
console.error(
`[Worker] Error during CSV chunk processing for task ${taskId}:`,
error,
);
globalThis.postMessage({
type: "ERROR",
payload: {
taskId,
error: getErrorDetails(error),
},
});
activeTasks.delete(taskId);
csvParserStates.delete(taskId);
})
.finally(() => {
// Clear the processing promise when the chain settles
state.processingPromise = undefined;
});
}
Handles CSV chunk messages with incremental streaming parsing. Appends data to buffer, flushes when necessary, and finalizes on last chunk.