From 1dc16ac040612fdb15f06357d46630fd1b58e5ed Mon Sep 17 00:00:00 2001 From: Levko Kravets Date: Wed, 27 Mar 2024 14:16:24 +0200 Subject: [PATCH] [PECO-1532] Ignore the excess records in query results (#239) * [PECO-1532] Arrow and CloudFetch result handlers: return row count with raw batch data Signed-off-by: Levko Kravets * Update tests Signed-off-by: Levko Kravets * Refactor `ArrowResultConverter` - cleanup and make it skip empty batches Signed-off-by: Levko Kravets * [PECO-1532] Ignore the excess records in arrow batches Signed-off-by: Levko Kravets * Add tests Signed-off-by: Levko Kravets --------- Signed-off-by: Levko Kravets --- lib/result/ArrowResultConverter.ts | 115 +++++++++++----- lib/result/ArrowResultHandler.ts | 23 +++- lib/result/CloudFetchResultHandler.ts | 23 +++- lib/result/utils.ts | 5 + .../unit/result/ArrowResultConverter.test.js | 126 +++++++++++++++--- tests/unit/result/ArrowResultHandler.test.js | 67 ++++++++-- .../result/CloudFetchResultHandler.test.js | 58 ++++++-- tests/unit/result/compatibility.test.js | 33 ++--- 8 files changed, 330 insertions(+), 120 deletions(-) diff --git a/lib/result/ArrowResultConverter.ts b/lib/result/ArrowResultConverter.ts index e1ada6f1..5d8a5b1f 100644 --- a/lib/result/ArrowResultConverter.ts +++ b/lib/result/ArrowResultConverter.ts @@ -16,7 +16,7 @@ import { import { TGetResultSetMetadataResp, TColumnDesc } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; -import { getSchemaColumns, convertThriftValue } from './utils'; +import { ArrowBatch, getSchemaColumns, convertThriftValue } from './utils'; const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils; @@ -26,15 +26,23 @@ type ArrowSchemaField = Field>; export default class ArrowResultConverter implements IResultsProvider> { protected readonly context: IClientContext; - private readonly source: IResultsProvider>; + private readonly source: IResultsProvider; private readonly schema: Array; - private reader?: IterableIterator>; + private recordBatchReader?: IterableIterator>; - private pendingRecordBatch?: RecordBatch; + // Remaining rows in current Arrow batch (not the record batch!) + private remainingRows: number = 0; - constructor(context: IClientContext, source: IResultsProvider>, { schema }: TGetResultSetMetadataResp) { + // This is the next (!!) record batch to be read. It is unset only in two cases: + // - prior to the first call to `fetchNext` + // - when no more data available + // This field is primarily used by a `hasMore`, so it can tell if next `fetchNext` will + // actually return a non-empty result + private prefetchedRecordBatch?: RecordBatch; + + constructor(context: IClientContext, source: IResultsProvider, { schema }: TGetResultSetMetadataResp) { this.context = context; this.source = source; this.schema = getSchemaColumns(schema); @@ -44,7 +52,7 @@ export default class ArrowResultConverter implements IResultsProvider if (this.schema.length === 0) { return false; } - if (this.pendingRecordBatch) { + if (this.prefetchedRecordBatch) { return true; } return this.source.hasMore(); @@ -55,47 +63,80 @@ export default class ArrowResultConverter implements IResultsProvider return []; } - // eslint-disable-next-line no-constant-condition - while (true) { - // It's not possible to know if iterator has more items until trying - // to get the next item. But we need to know if iterator is empty right - // after getting the next item. Therefore, after creating the iterator, - // we get one item more and store it in `pendingRecordBatch`. Next time, - // we use that stored item, and prefetch the next one. Prefetched item - // is therefore the next item we are going to return, so it can be used - // to know if we actually can return anything next time - const recordBatch = this.pendingRecordBatch; - this.pendingRecordBatch = this.prefetch(); - - if (recordBatch) { - const table = new Table(recordBatch); - return this.getRows(table.schema, table.toArray()); + // It's not possible to know if iterator has more items until trying to get the next item. + // So each time we read one batch ahead and store it, but process the batch prefetched on + // a previous `fetchNext` call. Because we actually already have the next item - it's easy + // to tell if the subsequent `fetchNext` will be able to read anything, and `hasMore` logic + // becomes trivial + + // This prefetch handles a first call to `fetchNext`, when all the internal fields are not initialized yet. + // On subsequent calls to `fetchNext` it will do nothing + await this.prefetch(options); + + if (this.prefetchedRecordBatch) { + // Consume a record batch fetched during previous call to `fetchNext` + const table = new Table(this.prefetchedRecordBatch); + this.prefetchedRecordBatch = undefined; + // Get table rows, but not more than remaining count + const arrowRows = table.toArray().slice(0, this.remainingRows); + const result = this.getRows(table.schema, arrowRows); + + // Reduce remaining rows count by a count of rows we just processed. + // If the remaining count reached zero - we're done with current arrow + // batch, so discard the batch reader + this.remainingRows -= result.length; + if (this.remainingRows === 0) { + this.recordBatchReader = undefined; } - // eslint-disable-next-line no-await-in-loop - const batches = await this.source.fetchNext(options); - if (batches.length === 0) { - this.reader = undefined; - break; - } + // Prefetch the next record batch + await this.prefetch(options); - const reader = RecordBatchReader.from(batches); - this.reader = reader[Symbol.iterator](); - this.pendingRecordBatch = this.prefetch(); + return result; } return []; } - private prefetch(): RecordBatch | undefined { - const item = this.reader?.next() ?? { done: true, value: undefined }; + // This method tries to read one more record batch and store it in `prefetchedRecordBatch` field. + // If `prefetchedRecordBatch` is already non-empty - the method does nothing. + // This method pulls the next item from source if needed, initializes a record batch reader and + // gets the next item from it - until either reaches end of data or finds a non-empty record batch + private async prefetch(options: ResultsProviderFetchNextOptions) { + // This loop will be executed until a next non-empty record batch is retrieved + // Another implicit loop condition (end of data) is checked in the loop body + while (!this.prefetchedRecordBatch) { + // First, try to fetch next item from source and initialize record batch reader. + // If source has no more data - exit prematurely + if (!this.recordBatchReader) { + const sourceHasMore = await this.source.hasMore(); // eslint-disable-line no-await-in-loop + if (!sourceHasMore) { + return; + } + + const arrowBatch = await this.source.fetchNext(options); // eslint-disable-line no-await-in-loop + if (arrowBatch.batches.length > 0 && arrowBatch.rowCount > 0) { + const reader = RecordBatchReader.from(arrowBatch.batches); + this.recordBatchReader = reader[Symbol.iterator](); + this.remainingRows = arrowBatch.rowCount; + } + } - if (item.done || item.value === undefined) { - this.reader = undefined; - return undefined; + // Try to get a next item from current record batch reader. The reader may be unavailable at this point - + // in this case we fall back to a "done" state, and the `while` loop will do one more iteration attempting + // to create a new reader. Eventually it will either succeed or reach end of source. This scenario also + // handles readers which are already empty + const item = this.recordBatchReader?.next() ?? { done: true, value: undefined }; + if (item.done || item.value === undefined) { + this.recordBatchReader = undefined; + } else { + // Skip empty batches + // eslint-disable-next-line no-lonely-if + if (item.value.numRows > 0) { + this.prefetchedRecordBatch = item.value; + } + } } - - return item.value; } private getRows(schema: ArrowSchema, rows: Array): Array { diff --git a/lib/result/ArrowResultHandler.ts b/lib/result/ArrowResultHandler.ts index 2b9a3238..601432e8 100644 --- a/lib/result/ArrowResultHandler.ts +++ b/lib/result/ArrowResultHandler.ts @@ -2,9 +2,9 @@ import LZ4 from 'lz4'; import { TGetResultSetMetadataResp, TRowSet } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; -import { hiveSchemaToArrowSchema } from './utils'; +import { ArrowBatch, hiveSchemaToArrowSchema } from './utils'; -export default class ArrowResultHandler implements IResultsProvider> { +export default class ArrowResultHandler implements IResultsProvider { protected readonly context: IClientContext; private readonly source: IResultsProvider; @@ -35,22 +35,33 @@ export default class ArrowResultHandler implements IResultsProvider = []; - rowSet?.arrowBatches?.forEach(({ batch }) => { + let totalRowCount = 0; + rowSet?.arrowBatches?.forEach(({ batch, rowCount }) => { if (batch) { batches.push(this.isLZ4Compressed ? LZ4.decode(batch) : batch); + totalRowCount += rowCount.toNumber(true); } }); if (batches.length === 0) { - return []; + return { + batches: [], + rowCount: 0, + }; } - return [this.arrowSchema, ...batches]; + return { + batches: [this.arrowSchema, ...batches], + rowCount: totalRowCount, + }; } } diff --git a/lib/result/CloudFetchResultHandler.ts b/lib/result/CloudFetchResultHandler.ts index 4b2b9369..39ef6f94 100644 --- a/lib/result/CloudFetchResultHandler.ts +++ b/lib/result/CloudFetchResultHandler.ts @@ -3,8 +3,9 @@ import fetch, { RequestInfo, RequestInit, Request } from 'node-fetch'; import { TGetResultSetMetadataResp, TRowSet, TSparkArrowResultLink } from '../../thrift/TCLIService_types'; import IClientContext from '../contracts/IClientContext'; import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; +import { ArrowBatch } from './utils'; -export default class CloudFetchResultHandler implements IResultsProvider> { +export default class CloudFetchResultHandler implements IResultsProvider { protected readonly context: IClientContext; private readonly source: IResultsProvider; @@ -13,7 +14,7 @@ export default class CloudFetchResultHandler implements IResultsProvider = []; - private downloadTasks: Array> = []; + private downloadTasks: Array> = []; constructor( context: IClientContext, @@ -49,15 +50,20 @@ export default class CloudFetchResultHandler implements IResultsProvider LZ4.decode(buffer)); + batch.batches = batch.batches.map((buffer) => LZ4.decode(buffer)); } - return batches; + return batch; } - private async downloadLink(link: TSparkArrowResultLink): Promise { + private async downloadLink(link: TSparkArrowResultLink): Promise { if (Date.now() >= link.expiryTime.toNumber()) { throw new Error('CloudFetch link has expired'); } @@ -68,7 +74,10 @@ export default class CloudFetchResultHandler implements IResultsProvider; + rowCount: number; +} + export function getSchemaColumns(schema?: TTableSchema): Array { if (!schema) { return []; diff --git a/tests/unit/result/ArrowResultConverter.test.js b/tests/unit/result/ArrowResultConverter.test.js index 2cf4949f..4b53c620 100644 --- a/tests/unit/result/ArrowResultConverter.test.js +++ b/tests/unit/result/ArrowResultConverter.test.js @@ -1,27 +1,32 @@ const { expect } = require('chai'); const fs = require('fs'); const path = require('path'); +const { tableFromArrays, tableToIPC, Table } = require('apache-arrow'); const ArrowResultConverter = require('../../../dist/result/ArrowResultConverter').default; const ResultsProviderMock = require('./fixtures/ResultsProviderMock'); -const sampleThriftSchema = { - columns: [ - { - columnName: '1', - typeDesc: { - types: [ - { - primitiveEntry: { - type: 3, - typeQualifiers: null, +function createSampleThriftSchema(columnName) { + return { + columns: [ + { + columnName, + typeDesc: { + types: [ + { + primitiveEntry: { + type: 3, + typeQualifiers: null, + }, }, - }, - ], + ], + }, + position: 1, }, - position: 1, - }, - ], -}; + ], + }; +} + +const sampleThriftSchema = createSampleThriftSchema('1'); const sampleArrowSchema = Buffer.from([ 255, 255, 255, 255, 208, 0, 0, 0, 16, 0, 0, 0, 0, 0, 10, 0, 14, 0, 6, 0, 13, 0, 8, 0, 10, 0, 0, 0, 0, 0, 4, 0, 16, 0, @@ -53,17 +58,42 @@ const arrowBatchAllNulls = [ fs.readFileSync(path.join(__dirname, 'fixtures/dataAllNulls.arrow')), ]; -describe('ArrowResultHandler', () => { +const emptyItem = { + batches: [], + rowCount: 0, +}; + +function createSampleRecordBatch(start, count) { + const table = tableFromArrays({ + id: Float64Array.from({ length: count }, (unused, index) => index + start), + }); + return table.batches[0]; +} + +function createSampleArrowBatch(...recordBatches) { + const table = new Table(recordBatches); + return tableToIPC(table); +} + +describe('ArrowResultConverter', () => { it('should convert data', async () => { const context = {}; - const rowSetProvider = new ResultsProviderMock([sampleArrowBatch]); + const rowSetProvider = new ResultsProviderMock( + [ + { + batches: sampleArrowBatch, + rowCount: 1, + }, + ], + emptyItem, + ); const result = new ArrowResultConverter(context, rowSetProvider, { schema: sampleThriftSchema }); expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([{ 1: 1 }]); }); it('should return empty array if no data to process', async () => { const context = {}; - const rowSetProvider = new ResultsProviderMock([], []); + const rowSetProvider = new ResultsProviderMock([], emptyItem); const result = new ArrowResultConverter(context, rowSetProvider, { schema: sampleThriftSchema }); expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); expect(await result.hasMore()).to.be.false; @@ -71,7 +101,15 @@ describe('ArrowResultHandler', () => { it('should return empty array if no schema available', async () => { const context = {}; - const rowSetProvider = new ResultsProviderMock([sampleArrowBatch]); + const rowSetProvider = new ResultsProviderMock( + [ + { + batches: sampleArrowBatch, + rowCount: 1, + }, + ], + emptyItem, + ); const result = new ArrowResultConverter(context, rowSetProvider, {}); expect(await result.hasMore()).to.be.false; expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); @@ -79,7 +117,15 @@ describe('ArrowResultHandler', () => { it('should detect nulls', async () => { const context = {}; - const rowSetProvider = new ResultsProviderMock([arrowBatchAllNulls]); + const rowSetProvider = new ResultsProviderMock( + [ + { + batches: arrowBatchAllNulls, + rowCount: 1, + }, + ], + emptyItem, + ); const result = new ArrowResultConverter(context, rowSetProvider, { schema: thriftSchemaAllNulls }); expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([ { @@ -110,4 +156,42 @@ describe('ArrowResultHandler', () => { }, ]); }); + + it('should respect row count in batch', async () => { + const context = {}; + + const rowSetProvider = new ResultsProviderMock( + [ + // First Arrow batch: contains two record batches of 5 and 5 record, + // but declared count of rows is 8. It means that result should + // contain all 5 records from the first record batch, but only 3 records + // from the second record batch + { + batches: [createSampleArrowBatch(createSampleRecordBatch(10, 5), createSampleRecordBatch(20, 5))], + rowCount: 8, + }, + // Second Arrow batch: contains one record batch of 5 records. + // Declared count of rows is 2, and only 2 rows from this batch + // should be returned in result + { + batches: [createSampleArrowBatch(createSampleRecordBatch(30, 5))], + rowCount: 2, + }, + ], + emptyItem, + ); + const result = new ArrowResultConverter(context, rowSetProvider, { schema: createSampleThriftSchema('id') }); + + const rows1 = await result.fetchNext({ limit: 10000 }); + expect(rows1).to.deep.equal([{ id: 10 }, { id: 11 }, { id: 12 }, { id: 13 }, { id: 14 }]); + expect(await result.hasMore()).to.be.true; + + const rows2 = await result.fetchNext({ limit: 10000 }); + expect(rows2).to.deep.equal([{ id: 20 }, { id: 21 }, { id: 22 }]); + expect(await result.hasMore()).to.be.true; + + const rows3 = await result.fetchNext({ limit: 10000 }); + expect(rows3).to.deep.equal([{ id: 30 }, { id: 31 }]); + expect(await result.hasMore()).to.be.false; + }); }); diff --git a/tests/unit/result/ArrowResultHandler.test.js b/tests/unit/result/ArrowResultHandler.test.js index 74bf37c3..92cb573d 100644 --- a/tests/unit/result/ArrowResultHandler.test.js +++ b/tests/unit/result/ArrowResultHandler.test.js @@ -1,4 +1,5 @@ const { expect } = require('chai'); +const Int64 = require('node-int64'); const LZ4 = require('lz4'); const ArrowResultHandler = require('../../../dist/result/ArrowResultHandler').default; const ResultsProviderMock = require('./fixtures/ResultsProviderMock'); @@ -21,16 +22,16 @@ const sampleArrowBatch = { 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, ]), - rowCount: 1, + rowCount: new Int64(1), }; const sampleRowSet1 = { - startRowOffset: 0, + startRowOffset: new Int64(0), arrowBatches: [sampleArrowBatch], }; const sampleRowSet1LZ4Compressed = { - startRowOffset: 0, + startRowOffset: new Int64(0), arrowBatches: sampleRowSet1.arrowBatches.map((item) => ({ ...item, batch: LZ4.encode(item.batch), @@ -38,21 +39,21 @@ const sampleRowSet1LZ4Compressed = { }; const sampleRowSet2 = { - startRowOffset: 0, + startRowOffset: new Int64(0), arrowBatches: undefined, }; const sampleRowSet3 = { - startRowOffset: 0, + startRowOffset: new Int64(0), arrowBatches: [], }; const sampleRowSet4 = { - startRowOffset: 0, + startRowOffset: new Int64(0), arrowBatches: [ { batch: undefined, - rowCount: 0, + rowCount: new Int64(0), }, ], }; @@ -63,7 +64,7 @@ describe('ArrowResultHandler', () => { const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); const result = new ArrowResultHandler(context, rowSetProvider, { arrowSchema: sampleArrowSchema }); - const batches = await result.fetchNext({ limit: 10000 }); + const { batches } = await result.fetchNext({ limit: 10000 }); expect(await rowSetProvider.hasMore()).to.be.false; expect(await result.hasMore()).to.be.false; @@ -79,7 +80,7 @@ describe('ArrowResultHandler', () => { lz4Compressed: true, }); - const batches = await result.fetchNext({ limit: 10000 }); + const { batches } = await result.fetchNext({ limit: 10000 }); expect(await rowSetProvider.hasMore()).to.be.false; expect(await result.hasMore()).to.be.false; @@ -101,32 +102,67 @@ describe('ArrowResultHandler', () => { it('should return empty array if no data to process', async () => { const context = {}; + + const expectedResult = { + batches: [], + rowCount: 0, + }; + case1: { const rowSetProvider = new ResultsProviderMock(); const result = new ArrowResultHandler(context, rowSetProvider, { arrowSchema: sampleArrowSchema }); - expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq(expectedResult); expect(await result.hasMore()).to.be.false; } case2: { const rowSetProvider = new ResultsProviderMock([sampleRowSet2]); const result = new ArrowResultHandler(context, rowSetProvider, { arrowSchema: sampleArrowSchema }); - expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq(expectedResult); expect(await result.hasMore()).to.be.false; } case3: { const rowSetProvider = new ResultsProviderMock([sampleRowSet3]); const result = new ArrowResultHandler(context, rowSetProvider, { arrowSchema: sampleArrowSchema }); - expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq(expectedResult); expect(await result.hasMore()).to.be.false; } case4: { const rowSetProvider = new ResultsProviderMock([sampleRowSet4]); const result = new ArrowResultHandler(context, rowSetProvider, { arrowSchema: sampleArrowSchema }); - expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq(expectedResult); expect(await result.hasMore()).to.be.false; } }); + it('should return a proper row count in a batch', async () => { + const context = {}; + const rowSetProvider = new ResultsProviderMock([ + { + ...sampleRowSet1, + arrowBatches: [ + { + batch: Buffer.alloc(0), + rowCount: new Int64(2), + }, + { + batch: Buffer.alloc(0), + rowCount: new Int64(0), + }, + { + batch: Buffer.alloc(0), + rowCount: new Int64(3), + }, + ], + }, + ]); + const result = new ArrowResultHandler(context, rowSetProvider, { arrowSchema: sampleArrowSchema }); + + const { rowCount } = await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; + expect(await result.hasMore()).to.be.false; + expect(rowCount).to.equal(5); + }); + it('should infer arrow schema from thrift schema', async () => { const context = {}; const rowSetProvider = new ResultsProviderMock([sampleRowSet2]); @@ -158,7 +194,10 @@ describe('ArrowResultHandler', () => { const context = {}; const rowSetProvider = new ResultsProviderMock([sampleRowSet2]); const result = new ArrowResultHandler(context, rowSetProvider, {}); - expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq([]); + expect(await result.fetchNext({ limit: 10000 })).to.be.deep.eq({ + batches: [], + rowCount: 0, + }); expect(await result.hasMore()).to.be.false; }); }); diff --git a/tests/unit/result/CloudFetchResultHandler.test.js b/tests/unit/result/CloudFetchResultHandler.test.js index 3d9c67ea..d2899ccd 100644 --- a/tests/unit/result/CloudFetchResultHandler.test.js +++ b/tests/unit/result/CloudFetchResultHandler.test.js @@ -32,47 +32,54 @@ const sampleRowSet1 = { { fileLink: 'http://example.com/result/1', expiryTime: new Int64(defaultLinkExpiryTime), + rowCount: new Int64(1), }, { fileLink: 'http://example.com/result/2', expiryTime: new Int64(defaultLinkExpiryTime), + rowCount: new Int64(1), }, ], }; const sampleRowSet2 = { - startRowOffset: 0, + startRowOffset: new Int64(0), resultLinks: [ { fileLink: 'http://example.com/result/3', expiryTime: new Int64(defaultLinkExpiryTime), + rowCount: new Int64(1), }, { fileLink: 'http://example.com/result/4', expiryTime: new Int64(defaultLinkExpiryTime), + rowCount: new Int64(1), }, { fileLink: 'http://example.com/result/5', expiryTime: new Int64(defaultLinkExpiryTime), + rowCount: new Int64(1), }, ], }; const sampleEmptyRowSet = { - startRowOffset: 0, + startRowOffset: new Int64(0), resultLinks: undefined, }; const sampleExpiredRowSet = { - startRowOffset: 0, + startRowOffset: new Int64(0), resultLinks: [ { fileLink: 'http://example.com/result/6', expiryTime: new Int64(defaultLinkExpiryTime), + rowCount: new Int64(1), }, { fileLink: 'http://example.com/result/7', expiryTime: new Int64(Date.now() - 24 * 60 * 60 * 1000), // 24hr in past + rowCount: new Int64(1), }, ], }; @@ -165,7 +172,7 @@ describe('CloudFetchResultHandler', () => { const clientConfig = context.getConfig(); const rowSet = { - startRowOffset: 0, + startRowOffset: new Int64(0), resultLinks: [...sampleRowSet1.resultLinks, ...sampleRowSet2.resultLinks], }; const expectedLinksCount = rowSet.resultLinks.length; // 5 @@ -187,8 +194,8 @@ describe('CloudFetchResultHandler', () => { initialFetch: { // `cloudFetchConcurrentDownloads` out of `expectedLinksCount` links should be scheduled immediately // first one should be `await`-ed and returned from `fetchNext` - const items = await result.fetchNext({ limit: 10000 }); - expect(items.length).to.be.gt(0); + const { batches } = await result.fetchNext({ limit: 10000 }); + expect(batches.length).to.be.gt(0); expect(await rowSetProvider.hasMore()).to.be.false; // it should use retry policy for all requests @@ -200,8 +207,8 @@ describe('CloudFetchResultHandler', () => { secondFetch: { // It should return previously fetched batch, and schedule one more - const items = await result.fetchNext({ limit: 10000 }); - expect(items.length).to.be.gt(0); + const { batches } = await result.fetchNext({ limit: 10000 }); + expect(batches.length).to.be.gt(0); expect(await rowSetProvider.hasMore()).to.be.false; // it should use retry policy for all requests @@ -215,8 +222,8 @@ describe('CloudFetchResultHandler', () => { thirdFetch: { // Now buffer should be empty, and it should fetch next batches - const items = await result.fetchNext({ limit: 10000 }); - expect(items.length).to.be.gt(0); + const { batches } = await result.fetchNext({ limit: 10000 }); + expect(batches.length).to.be.gt(0); expect(await rowSetProvider.hasMore()).to.be.false; // it should use retry policy for all requests @@ -229,6 +236,33 @@ describe('CloudFetchResultHandler', () => { } }); + it('should return a proper row count in a batch', async () => { + const context = new ClientContextMock(); + + const rowSetProvider = new ResultsProviderMock([sampleRowSet1]); + + const result = new CloudFetchResultHandler(context, rowSetProvider, { lz4Compressed: false }); + + context.fetchHandler.returns( + Promise.resolve({ + ok: true, + status: 200, + statusText: 'OK', + arrayBuffer: async () => Buffer.alloc(0), + }), + ); + + expect(await rowSetProvider.hasMore()).to.be.true; + + const { rowCount } = await result.fetchNext({ limit: 10000 }); + expect(await rowSetProvider.hasMore()).to.be.false; + + // it should use retry policy for all requests + expect(context.connectionProvider.getRetryPolicy.called).to.be.true; + expect(context.fetchHandler.called).to.be.true; + expect(rowCount).to.equal(1); + }); + it('should handle LZ4 compressed data', async () => { const context = new ClientContextMock(); @@ -249,13 +283,13 @@ describe('CloudFetchResultHandler', () => { expect(await rowSetProvider.hasMore()).to.be.true; - const items = await result.fetchNext({ limit: 10000 }); + const { batches } = await result.fetchNext({ limit: 10000 }); expect(await rowSetProvider.hasMore()).to.be.false; // it should use retry policy for all requests expect(context.connectionProvider.getRetryPolicy.called).to.be.true; expect(context.fetchHandler.called).to.be.true; - expect(items).to.deep.eq([expectedBatch]); + expect(batches).to.deep.eq([expectedBatch]); }); it('should handle HTTP errors', async () => { diff --git a/tests/unit/result/compatibility.test.js b/tests/unit/result/compatibility.test.js index eb4119b5..1fe22cbd 100644 --- a/tests/unit/result/compatibility.test.js +++ b/tests/unit/result/compatibility.test.js @@ -2,7 +2,6 @@ const { expect } = require('chai'); const ArrowResultHandler = require('../../../dist/result/ArrowResultHandler').default; const ArrowResultConverter = require('../../../dist/result/ArrowResultConverter').default; const JsonResultHandler = require('../../../dist/result/JsonResultHandler').default; -const ResultSlicer = require('../../../dist/result/ResultSlicer').default; const { fixArrowResult } = require('../../fixtures/compatibility'); const fixtureColumn = require('../../fixtures/compatibility/column'); @@ -15,10 +14,7 @@ describe('Result handlers compatibility tests', () => { it('colum-based data', async () => { const context = {}; const rowSetProvider = new ResultsProviderMock(fixtureColumn.rowSets); - const result = new ResultSlicer( - context, - new JsonResultHandler(context, rowSetProvider, { schema: fixtureColumn.schema }), - ); + const result = new JsonResultHandler(context, rowSetProvider, { schema: fixtureColumn.schema }); const rows = await result.fetchNext({ limit: 10000 }); expect(rows).to.deep.equal(fixtureColumn.expected); }); @@ -26,13 +22,10 @@ describe('Result handlers compatibility tests', () => { it('arrow-based data without native types', async () => { const context = {}; const rowSetProvider = new ResultsProviderMock(fixtureArrow.rowSets); - const result = new ResultSlicer( + const result = new ArrowResultConverter( context, - new ArrowResultConverter( - context, - new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrow.arrowSchema }), - { schema: fixtureArrow.schema }, - ), + new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrow.arrowSchema }), + { schema: fixtureArrow.schema }, ); const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected); @@ -41,13 +34,10 @@ describe('Result handlers compatibility tests', () => { it('arrow-based data with native types', async () => { const context = {}; const rowSetProvider = new ResultsProviderMock(fixtureArrowNT.rowSets); - const result = new ResultSlicer( + const result = new ArrowResultConverter( context, - new ArrowResultConverter( - context, - new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrowNT.arrowSchema }), - { schema: fixtureArrowNT.schema }, - ), + new ArrowResultHandler(context, rowSetProvider, { arrowSchema: fixtureArrowNT.arrowSchema }), + { schema: fixtureArrowNT.schema }, ); const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrowNT.expected); @@ -56,13 +46,10 @@ describe('Result handlers compatibility tests', () => { it('should infer arrow schema from thrift schema', async () => { const context = {}; const rowSetProvider = new ResultsProviderMock(fixtureArrow.rowSets); - const result = new ResultSlicer( + const result = new ArrowResultConverter( context, - new ArrowResultConverter( - context, - new ArrowResultHandler(context, rowSetProvider, { schema: fixtureArrow.schema }), - { schema: fixtureArrow.schema }, - ), + new ArrowResultHandler(context, rowSetProvider, { schema: fixtureArrow.schema }), + { schema: fixtureArrow.schema }, ); const rows = await result.fetchNext({ limit: 10000 }); expect(fixArrowResult(rows)).to.deep.equal(fixtureArrow.expected);