From 88d5a55b1184d16cdf11210d1e1a3603791ab7b9 Mon Sep 17 00:00:00 2001 From: Theo Sanderson Date: Wed, 9 Oct 2024 00:21:07 +0100 Subject: [PATCH] Split stream and process mutations with streaming JSON parsing (#619) * split-stream * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * looking goood! * ok actually it wasn't working and still isn't * now really looking good * move dep * add more logging * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- taxonium_component/package.json | 1 + .../src/webworkers/localBackendWorker.js | 7 +- taxonium_component/yarn.lock | 12 ++ taxonium_data_handling/importing.js | 185 +++++++++++++++--- 4 files changed, 172 insertions(+), 33 deletions(-) diff --git a/taxonium_component/package.json b/taxonium_component/package.json index 72d5ef5c..3766ce02 100644 --- a/taxonium_component/package.json +++ b/taxonium_component/package.json @@ -26,6 +26,7 @@ }, "dependencies": {}, "devDependencies": { + "stream-json": "^1.8.0", "@fontsource/roboto": "^5.0.1", "@headlessui/react": "^1.7.17", "@jbrowse/core": "^2.5.0", diff --git a/taxonium_component/src/webworkers/localBackendWorker.js b/taxonium_component/src/webworkers/localBackendWorker.js index 9514d15a..36e030fe 100644 --- a/taxonium_component/src/webworkers/localBackendWorker.js +++ b/taxonium_component/src/webworkers/localBackendWorker.js @@ -7,6 +7,8 @@ import { import { processNewickAndMetadata } from "../utils/processNewick.js"; import { processNextstrain } from "../utils/processNextstrain.js"; import { ReadableWebToNodeStream } from "readable-web-to-node-stream"; +import { parser } from "stream-json"; +import { streamValues } from "stream-json/streamers/StreamValues"; console.log("worker starting"); postMessage({ data: "Worker starting" }); @@ -211,8 +213,11 @@ onmessage = async (event) => { processedUploadedData = await processJsonl( data.data, sendStatusMessage, - ReadableWebToNodeStream + ReadableWebToNodeStream, + parser, + streamValues ); + console.log("processedUploadedData created"); } else if ( data.type === "upload" && diff --git a/taxonium_component/yarn.lock b/taxonium_component/yarn.lock index 58c83a36..fd688848 100644 --- a/taxonium_component/yarn.lock +++ b/taxonium_component/yarn.lock @@ -10164,6 +10164,11 @@ stream-browserify@^3.0.0: inherits "~2.0.4" readable-stream "^3.5.0" +stream-chain@^2.2.5: + version "2.2.5" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.2.5.tgz#b30967e8f14ee033c5b9a19bbe8a2cba90ba0d09" + integrity sha512-1TJmBx6aSWqZ4tx7aTpBDXK0/e2hhcNSTV8+CbFJtDjbb+I1mZ8lHit0Grw9GRT+6JbIrrDd8esncgBi8aBXGA== + stream-http@^3.2.0: version "3.2.0" resolved "https://registry.yarnpkg.com/stream-http/-/stream-http-3.2.0.tgz#1872dfcf24cb15752677e40e5c3f9cc1926028b5" @@ -10174,6 +10179,13 @@ stream-http@^3.2.0: readable-stream "^3.6.0" xtend "^4.0.2" +stream-json@^1.8.0: + version "1.8.0" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.8.0.tgz#53f486b2e3b4496c506131f8d7260ba42def151c" + integrity sha512-HZfXngYHUAr1exT4fxlbc1IOce1RYxp2ldeaf97LYCOPSoOqY/1Psp7iGvpb+6JIOgkra9zDYnPX01hGAHzEPw== + dependencies: + stream-chain "^2.2.5" + stream-shift@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.1.tgz#d7088281559ab2778424279b0877da3c392d5a3d" diff --git a/taxonium_data_handling/importing.js b/taxonium_data_handling/importing.js index 2a043477..8f121922 100644 --- a/taxonium_data_handling/importing.js +++ b/taxonium_data_handling/importing.js @@ -1,6 +1,91 @@ import zlib from "zlib"; import stream from "stream"; import buffer from "buffer"; +import { send } from "process"; + +class ChunkCounterStream extends stream.PassThrough { + constructor(sendStatusMessage, options = {}) { + super(options); + this.sendStatusMessage = sendStatusMessage; + this.chunkCount = 0; + } + + _transform(chunk, encoding, callback) { + this.chunkCount++; + if (this.chunkCount % 100 === 0) { + this.sendStatusMessage({ + message: `Processed ${this.chunkCount} groups of mutations`, + count: this.chunkCount, + }); + } + + // Pass the chunk through unchanged + this.push(chunk); + callback(); + } + + _flush(callback) { + this.sendStatusMessage({ + message: `Finished processing. Total chunks: ${this.chunkCount}`, + count: this.chunkCount, + finished: true, + }); + callback(); + } +} + +class StreamSplitter extends stream.Transform { + constructor(headerParser, dataParser, options = {}) { + super(options); + this.headerParser = headerParser; + this.dataParser = dataParser; + this.firstPart = true; + this.buffer = null; // Buffer to hold partial data + } + + _transform(chunk, encoding, callback) { + let data = chunk; + let newlineIndex = data.indexOf(10); // ASCII code for '\n' + + if (this.firstPart) { + if (newlineIndex !== -1) { + // Found newline, split the data + const headerData = data.slice(0, newlineIndex); + const restData = data.slice(newlineIndex + 1); + + // Write header data to headerParser + this.headerParser.write(headerData); + this.headerParser.end(); + + // Write restData to dataParser + if (restData.length > 0) { + this.dataParser.write(restData); + } + + this.firstPart = false; + } else { + // No newline found, store data in buffer + this.headerParser.write(data); + } + } else { + // After header is processed, pass data to dataParser + this.dataParser.write(data); + } + + callback(); + } + + _flush(callback) { + if (this.firstPart && this.buffer) { + // No newline found in the entire stream, treat entire data as header + this.headerParser.write(this.buffer); + this.headerParser.end(); + this.firstPart = false; + } + this.dataParser.end(); + callback(); + } +} const roundToDp = (number, dp) => { return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp); @@ -28,8 +113,57 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) { } } -export const setUpStream = (the_stream, data, sendStatusMessage) => { +export const setUpStream = ( + the_stream, + data, + sendStatusMessage, + parser, + streamValues +) => { + // Header parser + const headerParser = parser({ jsonStreaming: true }); + const headerPipeline = headerParser.pipe(streamValues()); + headerPipeline.on("data", (chunk) => { + data.header = chunk.value; + data.nodes = []; + data.node_to_mut = {}; + }); + headerPipeline.on("error", (err) => { + console.error("Header parser error:", err); + }); + + // Data parser for the rest of the stream + let lineBuffer = ""; + let line_number = 0; + const dataParser = new stream.Writable({ + write(chunk, encoding, callback) { + const chunkStr = chunk.toString(); + let start = 0; + let end = chunkStr.indexOf("\n"); + + while (end !== -1) { + lineBuffer += chunkStr.slice(start, end); + processLine(lineBuffer, line_number); + line_number++; + lineBuffer = ""; + start = end + 1; + end = chunkStr.indexOf("\n", start); + } + + lineBuffer += chunkStr.slice(start); + callback(); + }, + final(callback) { + if (lineBuffer) { + processLine(lineBuffer, line_number); + } + callback(); + }, + }); + function processLine(line, line_number) { + if (line.trim() === "") return; + if ((line_number % 10000 === 0 && line_number > 0) || line_number == 500) { console.log(`Processed ${formatNumber(line_number)} lines`); if (data.header.total_nodes) { @@ -45,44 +179,31 @@ export const setUpStream = (the_stream, data, sendStatusMessage) => { }); } } - // console.log("LINE",line_number,line); const decoded = JSON.parse(line); - if (line_number === 0) { - data.header = decoded; - data.nodes = []; - data.node_to_mut = {}; - } else { - data.node_to_mut[decoded.node_id] = decoded.mutations; // this is an int to ints map - data.nodes.push(decoded); - } + data.node_to_mut[decoded.node_id] = decoded.mutations; + data.nodes.push(decoded); } - let cur_line = ""; - let line_counter = 0; - the_stream.on("data", function (data) { - cur_line += data.toString(); - if (cur_line.includes("\n")) { - const lines = cur_line.split("\n"); - cur_line = lines.pop(); - lines.forEach((line) => { - processLine(line, line_counter); - line_counter++; - }); - } - }); - - the_stream.on("error", function (err) { - console.log(err); - }); - - the_stream.on("end", function () { - console.log("end"); + const chunkCounterStream = new ChunkCounterStream(sendStatusMessage); + chunkCounterStream.pipe(headerParser); + const splitter = new StreamSplitter(chunkCounterStream, dataParser); + + // Pipe the input stream through the splitter + the_stream + .pipe(splitter) + .on("error", (err) => console.error("Splitter error:", err)); + + // Handle the completion of the dataParser + dataParser.on("finish", () => { + console.log("Finished processing the stream"); }); }; export const processJsonl = async ( jsonl, sendStatusMessage, - ReadableWebToNodeStream + ReadableWebToNodeStream, + parser, + streamValues ) => { console.log( "Worker processJsonl" //, jsonl @@ -98,7 +219,7 @@ export const processJsonl = async ( the_stream = new stream.PassThrough(); } let new_data = {}; - setUpStream(the_stream, new_data, sendStatusMessage); + setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues); if (status === "loaded") { const dataAsArrayBuffer = data;