diff --git a/taxonium_data_handling/importing.js b/taxonium_data_handling/importing.js index acbf08b2..9cbe3878 100644 --- a/taxonium_data_handling/importing.js +++ b/taxonium_data_handling/importing.js @@ -8,7 +8,6 @@ class StreamSplitter extends stream.Transform { this.headerParser = headerParser; this.dataParser = dataParser; this.firstPart = true; - this.buffer = null; // Buffer to hold partial data } _transform(chunk, encoding, callback) { @@ -93,15 +92,35 @@ export const setUpStream = ( // Header parser const headerParser = parser({ jsonStreaming: true }); const headerPipeline = headerParser.pipe(streamValues()); + + let headerBytesProcessed = 0; + const HEADER_PROGRESS_INTERVAL = 1024 * 1024; // 1MB + + headerParser.on('data', (chunk) => { + headerBytesProcessed += chunk.length; + if (headerBytesProcessed >= HEADER_PROGRESS_INTERVAL) { + sendStatusMessage({ + message: `Processing header: ${(headerBytesProcessed / (1024 * 1024)).toFixed(2)} MB processed`, + }); + headerBytesProcessed = 0; // Reset the counter + } + }); + headerPipeline.on('data', (chunk) => { data.header = chunk.value; data.nodes = []; data.node_to_mut = {}; + sendStatusMessage({ + message: `Header processed successfully`, + }); }); + headerPipeline.on('error', (err) => { console.error("Header parser error:", err); + sendStatusMessage({ + error: `Header parser error: ${err.message}`, + }); }); - // Data parser for the rest of the stream let lineBuffer = ""; let line_number = 0;