diff --git a/taxonium_data_handling/importing.js b/taxonium_data_handling/importing.js index acbf08b2..d01c8dfd 100644 --- a/taxonium_data_handling/importing.js +++ b/taxonium_data_handling/importing.js @@ -1,6 +1,40 @@ import zlib from "zlib"; import stream from "stream"; import buffer from "buffer"; +import { send } from "process"; + + +class ChunkCounterStream extends stream.PassThrough { + constructor(sendStatusMessage, options = {}) { + super(options); + this.sendStatusMessage = sendStatusMessage; + this.chunkCount = 0; + } + + _transform(chunk, encoding, callback) { + this.chunkCount++; + if (this.chunkCount % 100 === 0) { + this.sendStatusMessage({ + message: `Processed ${this.chunkCount} groups of mutations`, + count: this.chunkCount + }); + } + + // Pass the chunk through unchanged + this.push(chunk); + callback(); + } + + _flush(callback) { + this.sendStatusMessage({ + message: `Finished processing. Total chunks: ${this.chunkCount}`, + count: this.chunkCount, + finished: true + }); + callback(); + } +} + class StreamSplitter extends stream.Transform { constructor(headerParser, dataParser, options = {}) { @@ -153,9 +187,9 @@ export const setUpStream = ( data.node_to_mut[decoded.node_id] = decoded.mutations; data.nodes.push(decoded); } - - // Set up the splitter with headerParser and dataParser - const splitter = new StreamSplitter(headerParser, dataParser); + const chunkCounterStream = new ChunkCounterStream(sendStatusMessage); + chunkCounterStream.pipe(headerParser); + const splitter = new StreamSplitter(chunkCounterStream, dataParser); // Pipe the input stream through the splitter the_stream.pipe(splitter).on("error", (err) => console.error("Splitter error:", err));