Skip to content

Commit

Permalink
now really looking good
Browse files Browse the repository at this point in the history
  • Loading branch information
theosanderson committed Oct 8, 2024
1 parent 7f81021 commit fb08638
Showing 1 changed file with 58 additions and 50 deletions.
108 changes: 58 additions & 50 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,60 @@ import stream from "stream";
import buffer from "buffer";

class StreamSplitter extends stream.Transform {
constructor(options = {}) {
constructor(headerParser, dataParser, options = {}) {
super(options);
this.headerParser = headerParser;
this.dataParser = dataParser;
this.firstPart = true;
this.buffer = "";
this.buffer = null; // Buffer to hold partial data
}

_transform(chunk, encoding, callback) {
let data = chunk;
let newlineIndex = data.indexOf(10); // ASCII code for '\n'

if (this.firstPart) {
const chunkStr = chunk.toString();
const newlineIndex = chunkStr.indexOf("\n");
if (newlineIndex !== -1) {
this.push(chunkStr.slice(0, newlineIndex + 1));
this.firstPart = false;
this.push(null); // End the first part
// Found newline, split the data
const headerData = data.slice(0, newlineIndex);
const restData = data.slice(newlineIndex + 1);


// Start the second part
if (newlineIndex + 1 < chunkStr.length) {
this.push(chunkStr.slice(newlineIndex + 1));
// Write header data to headerParser
this.headerParser.write(headerData);
this.headerParser.end();

// Write restData to dataParser
if (restData.length > 0) {
this.dataParser.write(restData);
}

this.firstPart = false;
} else {
this.push(chunkStr);
// No newline found, store data in buffer
this.headerParser.write(data);
}
} else {
this.push(chunk);
// After header is processed, pass data to dataParser
this.dataParser.write(data);
}

callback();
}

_flush(callback) {
if (this.firstPart) {
this.push(null); // End the first part
this.push(""); // Start the second part (empty in this case)
if (this.firstPart && this.buffer) {
// No newline found in the entire stream, treat entire data as header
this.headerParser.write(this.buffer);
this.headerParser.end();
this.firstPart = false;
}
this.dataParser.end();
callback();
}
}


const roundToDp = (number, dp) => {
return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp);
};
Expand All @@ -65,30 +83,26 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) {
}
}

export const setUpStream = (the_stream, data, sendStatusMessage, parser, streamValues) => {
const splitter = new StreamSplitter();

// Custom header parser using json-stream
const headerParser = stream.pipeline(
parser({ jsonStreaming: true }),
streamValues(),
new stream.Writable({
objectMode: true,
write(chunk, encoding, callback) {
data.header = chunk.value;
data.nodes = [];
data.node_to_mut = {};
callback();
},
}),
(err) => {
if (err) {
console.error("Header parser error:", err);
}
}
);
export const setUpStream = (
the_stream,
data,
sendStatusMessage,
parser,
streamValues
) => {
// Header parser
const headerParser = parser({ jsonStreaming: true });
const headerPipeline = headerParser.pipe(streamValues());
headerPipeline.on('data', (chunk) => {
data.header = chunk.value;
data.nodes = [];
data.node_to_mut = {};
});
headerPipeline.on('error', (err) => {
console.error("Header parser error:", err);
});

// Data parser for the rest of the stream (unchanged)
// Data parser for the rest of the stream
let lineBuffer = "";
let line_number = 0;
const dataParser = new stream.Writable({
Expand Down Expand Up @@ -140,25 +154,19 @@ export const setUpStream = (the_stream, data, sendStatusMessage, parser, streamV
data.nodes.push(decoded);
}

// Set up the pipeline
the_stream
.pipe(splitter)
.on("error", (err) => console.error("Splitter error:", err));
// Set up the splitter with headerParser and dataParser
const splitter = new StreamSplitter(headerParser, dataParser);

splitter
.pipe(headerParser, { end: false })
.on("error", (err) => console.error("Header parser error:", err));
// Pipe the input stream through the splitter
the_stream.pipe(splitter).on("error", (err) => console.error("Splitter error:", err));

splitter
.pipe(dataParser)
.on("error", (err) => console.error("Data parser error:", err));

// Handle the completion of the stream
// Handle the completion of the dataParser
dataParser.on("finish", () => {
console.log("Finished processing the stream");
});
};


export const processJsonl = async (
jsonl,
sendStatusMessage,
Expand Down

0 comments on commit fb08638

Please sign in to comment.