From fb08638b4aae9de4fba3f2957a40729509a4ea47 Mon Sep 17 00:00:00 2001 From: Theo Sanderson Date: Tue, 8 Oct 2024 18:13:40 +0100 Subject: [PATCH] now really looking good --- taxonium_data_handling/importing.js | 108 +++++++++++++++------------- 1 file changed, 58 insertions(+), 50 deletions(-) diff --git a/taxonium_data_handling/importing.js b/taxonium_data_handling/importing.js index 6ab33881..acbf08b2 100644 --- a/taxonium_data_handling/importing.js +++ b/taxonium_data_handling/importing.js @@ -3,42 +3,60 @@ import stream from "stream"; import buffer from "buffer"; class StreamSplitter extends stream.Transform { - constructor(options = {}) { + constructor(headerParser, dataParser, options = {}) { super(options); + this.headerParser = headerParser; + this.dataParser = dataParser; this.firstPart = true; - this.buffer = ""; + this.buffer = null; // Buffer to hold partial data } + _transform(chunk, encoding, callback) { + let data = chunk; + let newlineIndex = data.indexOf(10); // ASCII code for '\n' + if (this.firstPart) { - const chunkStr = chunk.toString(); - const newlineIndex = chunkStr.indexOf("\n"); if (newlineIndex !== -1) { - this.push(chunkStr.slice(0, newlineIndex + 1)); - this.firstPart = false; - this.push(null); // End the first part + // Found newline, split the data + const headerData = data.slice(0, newlineIndex); + const restData = data.slice(newlineIndex + 1); + - // Start the second part - if (newlineIndex + 1 < chunkStr.length) { - this.push(chunkStr.slice(newlineIndex + 1)); + // Write header data to headerParser + this.headerParser.write(headerData); + this.headerParser.end(); + + // Write restData to dataParser + if (restData.length > 0) { + this.dataParser.write(restData); } + + this.firstPart = false; } else { - this.push(chunkStr); + // No newline found, store data in buffer + this.headerParser.write(data); } } else { - this.push(chunk); + // After header is processed, pass data to dataParser + this.dataParser.write(data); } callback(); } + _flush(callback) { - if (this.firstPart) { - this.push(null); // End the first part - this.push(""); // Start the second part (empty in this case) + if (this.firstPart && this.buffer) { + // No newline found in the entire stream, treat entire data as header + this.headerParser.write(this.buffer); + this.headerParser.end(); + this.firstPart = false; } + this.dataParser.end(); callback(); } } + const roundToDp = (number, dp) => { return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp); }; @@ -65,30 +83,26 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) { } } -export const setUpStream = (the_stream, data, sendStatusMessage, parser, streamValues) => { - const splitter = new StreamSplitter(); - - // Custom header parser using json-stream - const headerParser = stream.pipeline( - parser({ jsonStreaming: true }), - streamValues(), - new stream.Writable({ - objectMode: true, - write(chunk, encoding, callback) { - data.header = chunk.value; - data.nodes = []; - data.node_to_mut = {}; - callback(); - }, - }), - (err) => { - if (err) { - console.error("Header parser error:", err); - } - } - ); +export const setUpStream = ( + the_stream, + data, + sendStatusMessage, + parser, + streamValues +) => { + // Header parser + const headerParser = parser({ jsonStreaming: true }); + const headerPipeline = headerParser.pipe(streamValues()); + headerPipeline.on('data', (chunk) => { + data.header = chunk.value; + data.nodes = []; + data.node_to_mut = {}; + }); + headerPipeline.on('error', (err) => { + console.error("Header parser error:", err); + }); - // Data parser for the rest of the stream (unchanged) + // Data parser for the rest of the stream let lineBuffer = ""; let line_number = 0; const dataParser = new stream.Writable({ @@ -140,25 +154,19 @@ export const setUpStream = (the_stream, data, sendStatusMessage, parser, streamV data.nodes.push(decoded); } - // Set up the pipeline - the_stream - .pipe(splitter) - .on("error", (err) => console.error("Splitter error:", err)); + // Set up the splitter with headerParser and dataParser + const splitter = new StreamSplitter(headerParser, dataParser); - splitter - .pipe(headerParser, { end: false }) - .on("error", (err) => console.error("Header parser error:", err)); + // Pipe the input stream through the splitter + the_stream.pipe(splitter).on("error", (err) => console.error("Splitter error:", err)); - splitter - .pipe(dataParser) - .on("error", (err) => console.error("Data parser error:", err)); - - // Handle the completion of the stream + // Handle the completion of the dataParser dataParser.on("finish", () => { console.log("Finished processing the stream"); }); }; + export const processJsonl = async ( jsonl, sendStatusMessage,