Skip to content

Commit

Permalink
[PECO-1532] Ignore the excess records in arrow batches
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Mar 25, 2024
1 parent 374af38 commit 560ffca
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions lib/result/ArrowResultConverter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>

private recordBatchReader?: IterableIterator<RecordBatch<TypeMap>>;

// Remaining rows in current Arrow batch (not the record batch!)
private remainingRows: number = 0;

// This is the next (!!) record batch to be read. It is unset only in two cases:
// - prior to the first call to `fetchNext`
// - when no more data available
Expand Down Expand Up @@ -71,13 +74,25 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
await this.prefetch(options);

if (this.prefetchedRecordBatch) {
// Here we consume a record batch fetched during previous call to `fetchNext`, and prefetch the next batch
const previousRecordBatch = this.prefetchedRecordBatch;
// Consume a record batch fetched during previous call to `fetchNext`
const table = new Table(this.prefetchedRecordBatch);
this.prefetchedRecordBatch = undefined;
// Get table rows, but not more than remaining count
const arrowRows = table.toArray().slice(0, this.remainingRows);
const result = this.getRows(table.schema, arrowRows);

// Reduce remaining rows count by a count of rows we just processed.
// If the remaining count reached zero - we're done with current arrow
// batch, so discard the batch reader
this.remainingRows -= result.length;
if (this.remainingRows === 0) {
this.recordBatchReader = undefined;
}

// Prefetch the next record batch
await this.prefetch(options);

const table = new Table(previousRecordBatch);
return this.getRows(table.schema, table.toArray());
return result;
}

return [];
Expand All @@ -103,6 +118,7 @@ export default class ArrowResultConverter implements IResultsProvider<Array<any>
if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) {
const reader = RecordBatchReader.from<TypeMap>(arrowBatch.batches);
this.recordBatchReader = reader[Symbol.iterator]();
this.remainingRows = arrowBatch.rowCount;
}
}

Expand Down

0 comments on commit 560ffca

Please sign in to comment.