From 560ffca0280ef894b2b53ca636a6e6249625bbe9 Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Mon, 25 Mar 2024 13:48:23 +0200 Subject: [PATCH] [PECO-1532] Ignore the excess records in arrow batches Signed-off-by: Levko Kravets --- lib/result/ArrowResultConverter.ts | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/lib/result/ArrowResultConverter.ts b/lib/result/ArrowResultConverter.ts index fc035028..5d8a5b1f 100644 --- a/lib/result/ArrowResultConverter.ts +++ b/lib/result/ArrowResultConverter.ts @@ -32,6 +32,9 @@ export default class ArrowResultConverter implements IResultsProvider private recordBatchReader?: IterableIterator>; + // Remaining rows in current Arrow batch (not the record batch!) + private remainingRows: number = 0; + // This is the next (!!) record batch to be read. It is unset only in two cases: // - prior to the first call to `fetchNext` // - when no more data available @@ -71,13 +74,25 @@ export default class ArrowResultConverter implements IResultsProvider await this.prefetch(options); if (this.prefetchedRecordBatch) { - // Here we consume a record batch fetched during previous call to `fetchNext`, and prefetch the next batch - const previousRecordBatch = this.prefetchedRecordBatch; + // Consume a record batch fetched during previous call to `fetchNext` + const table = new Table(this.prefetchedRecordBatch); this.prefetchedRecordBatch = undefined; + // Get table rows, but not more than remaining count + const arrowRows = table.toArray().slice(0, this.remainingRows); + const result = this.getRows(table.schema, arrowRows); + + // Reduce remaining rows count by a count of rows we just processed. + // If the remaining count reached zero - we're done with current arrow + // batch, so discard the batch reader + this.remainingRows -= result.length; + if (this.remainingRows === 0) { + this.recordBatchReader = undefined; + } + + // Prefetch the next record batch await this.prefetch(options); - const table = new Table(previousRecordBatch); - return this.getRows(table.schema, table.toArray()); + return result; } return []; @@ -103,6 +118,7 @@ export default class ArrowResultConverter implements IResultsProvider if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) { const reader = RecordBatchReader.from(arrowBatch.batches); this.recordBatchReader = reader[Symbol.iterator](); + this.remainingRows = arrowBatch.rowCount; } }