diff --git a/lib/DBSQLOperation/index.ts b/lib/DBSQLOperation/index.ts index 08e32864..eaddac18 100644 --- a/lib/DBSQLOperation/index.ts +++ b/lib/DBSQLOperation/index.ts @@ -23,6 +23,7 @@ import RowSetProvider from '../result/RowSetProvider'; import JsonResultHandler from '../result/JsonResultHandler'; import ArrowResultHandler from '../result/ArrowResultHandler'; import CloudFetchResultHandler from '../result/CloudFetchResultHandler'; +import ResultSlicer from '../result/ResultSlicer'; import { definedOrError } from '../utils'; import HiveDriverError from '../errors/HiveDriverError'; import IClientContext from '../contracts/IClientContext'; @@ -68,7 +69,7 @@ export default class DBSQLOperation implements IOperation { private hasResultSet: boolean = false; - private resultHandler?: IResultsProvider>; + private resultHandler?: ResultSlicer; constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) { this.operationHandle = handle; @@ -107,9 +108,17 @@ export default class DBSQLOperation implements IOperation { */ public async fetchAll(options?: FetchOptions): Promise> { const data: Array> = []; + + const fetchChunkOptions = { + ...options, + // Tell slicer to return raw chunks. We're going to process all of them anyway, + // so no need to additionally buffer and slice chunks returned by server + disableBuffering: true, + }; + do { // eslint-disable-next-line no-await-in-loop - const chunk = await this.fetchChunk(options); + const chunk = await this.fetchChunk(fetchChunkOptions); data.push(chunk); } while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.getId()}`); @@ -138,7 +147,10 @@ export default class DBSQLOperation implements IOperation { const resultHandler = await this.getResultHandler(); await this.failIfClosed(); - const result = resultHandler.fetchNext({ limit: options?.maxRows || defaultMaxRows }); + const result = resultHandler.fetchNext({ + limit: options?.maxRows || defaultMaxRows, + disableBuffering: options?.disableBuffering, + }); await this.failIfClosed(); this.context @@ -335,24 +347,28 @@ export default class DBSQLOperation implements IOperation { return this.metadata; } - private async getResultHandler(): Promise>> { + private async getResultHandler(): Promise> { const metadata = await this.fetchMetadata(); const resultFormat = definedOrError(metadata.resultFormat); if (!this.resultHandler) { + let resultSource: IResultsProvider> | undefined; + switch (resultFormat) { case TSparkRowSetType.COLUMN_BASED_SET: - this.resultHandler = new JsonResultHandler(this.context, this._data, metadata.schema); + resultSource = new JsonResultHandler(this.context, this._data, metadata.schema); break; case TSparkRowSetType.ARROW_BASED_SET: - this.resultHandler = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema); + resultSource = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema); break; case TSparkRowSetType.URL_BASED_SET: - this.resultHandler = new CloudFetchResultHandler(this.context, this._data, metadata.schema); - break; - default: - this.resultHandler = undefined; + resultSource = new CloudFetchResultHandler(this.context, this._data, metadata.schema); break; + // no default + } + + if (resultSource) { + this.resultHandler = new ResultSlicer(this.context, resultSource); } } diff --git a/lib/contracts/IOperation.ts b/lib/contracts/IOperation.ts index a9ed45fe..123d4da3 100644 --- a/lib/contracts/IOperation.ts +++ b/lib/contracts/IOperation.ts @@ -14,6 +14,9 @@ export interface FinishedOptions extends WaitUntilReadyOptions { export interface FetchOptions extends WaitUntilReadyOptions { maxRows?: number; + // Disables internal buffer used to ensure a consistent chunks size. + // When set to `true`, returned chunks size may vary (and may differ from `maxRows`) + disableBuffering?: boolean; } export interface GetSchemaOptions extends WaitUntilReadyOptions { diff --git a/lib/result/ResultSlicer.ts b/lib/result/ResultSlicer.ts new file mode 100644 index 00000000..0f640a9a --- /dev/null +++ b/lib/result/ResultSlicer.ts @@ -0,0 +1,74 @@ +import IClientContext from '../contracts/IClientContext'; +import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider'; + +export interface ResultSlicerFetchNextOptions extends ResultsProviderFetchNextOptions { + // Setting this to `true` will disable slicer, and it will return unprocessed chunks + // from underlying results provider + disableBuffering?: boolean; +} + +export default class ResultSlicer implements IResultsProvider> { + private readonly context: IClientContext; + + private readonly source: IResultsProvider>; + + private remainingResults: Array = []; + + constructor(context: IClientContext, source: IResultsProvider>) { + this.context = context; + this.source = source; + } + + public async hasMore(): Promise { + if (this.remainingResults.length > 0) { + return true; + } + return this.source.hasMore(); + } + + public async fetchNext(options: ResultSlicerFetchNextOptions): Promise> { + // If we're asked to not use buffer - first try to return whatever we have in buffer. + // If buffer is empty - just proxy the call to underlying results provider + if (options.disableBuffering) { + if (this.remainingResults.length > 0) { + const result = this.remainingResults; + this.remainingResults = []; + return result; + } + + return this.source.fetchNext(options); + } + + const result: Array> = []; + let resultsCount = 0; + + // First, use remaining items from the previous fetch + if (this.remainingResults.length > 0) { + result.push(this.remainingResults); + resultsCount += this.remainingResults.length; + this.remainingResults = []; + } + + // Fetch items from source results provider until we reach a requested count + while (resultsCount < options.limit) { + // eslint-disable-next-line no-await-in-loop + const chunk = await this.source.fetchNext(options); + if (chunk.length === 0) { + break; + } + + result.push(chunk); + resultsCount += chunk.length; + } + + // If we collected more results than requested, slice the excess items and store them for the next time + if (resultsCount > options.limit) { + const lastChunk = result.pop() ?? []; + const neededCount = options.limit - (resultsCount - lastChunk.length); + result.push(lastChunk.splice(0, neededCount)); + this.remainingResults = lastChunk; + } + + return result.flat(); + } +} diff --git a/tests/e2e/arrow.test.js b/tests/e2e/arrow.test.js index 08fe17d8..f513882e 100644 --- a/tests/e2e/arrow.test.js +++ b/tests/e2e/arrow.test.js @@ -4,6 +4,7 @@ const config = require('./utils/config'); const logger = require('./utils/logger')(config.logger); const { DBSQLClient } = require('../..'); const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default; +const ResultSlicer = require('../../dist/result/ResultSlicer').default; const fixtures = require('../fixtures/compatibility'); const { expected: expectedColumn } = require('../fixtures/compatibility/column'); @@ -81,7 +82,8 @@ describe('Arrow support', () => { expect(result).to.deep.equal(expectedColumn); const resultHandler = await operation.getResultHandler(); - expect(resultHandler).to.be.not.instanceof(ArrowResultHandler); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.not.instanceof(ArrowResultHandler); await operation.close(); }, @@ -100,7 +102,8 @@ describe('Arrow support', () => { expect(fixArrowResult(result)).to.deep.equal(expectedArrow); const resultHandler = await operation.getResultHandler(); - expect(resultHandler).to.be.instanceof(ArrowResultHandler); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.instanceof(ArrowResultHandler); await operation.close(); }, @@ -120,7 +123,8 @@ describe('Arrow support', () => { expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes); const resultHandler = await operation.getResultHandler(); - expect(resultHandler).to.be.instanceof(ArrowResultHandler); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.instanceof(ArrowResultHandler); await operation.close(); }, @@ -145,7 +149,8 @@ describe('Arrow support', () => { // We use some internals here to check that server returned response with multiple batches const resultHandler = await operation.getResultHandler(); - expect(resultHandler).to.be.instanceof(ArrowResultHandler); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.instanceof(ArrowResultHandler); sinon.spy(operation._data, 'fetchNext'); diff --git a/tests/e2e/batched_fetch.test.js b/tests/e2e/batched_fetch.test.js index 2e3059a9..e22e1a8e 100644 --- a/tests/e2e/batched_fetch.test.js +++ b/tests/e2e/batched_fetch.test.js @@ -38,7 +38,9 @@ describe('Data fetching', () => { try { // set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults` const operation = await session.executeStatement(query, { maxRows: null }); - let chunkedOp = await operation.fetchChunk({ maxRows: 10 }).catch((error) => logger(error)); + let chunkedOp = await operation + .fetchChunk({ maxRows: 10, disableBuffering: true }) + .catch((error) => logger(error)); expect(chunkedOp.length).to.be.equal(10); // we explicitly requested only one chunk expect(session.context.driver.fetchResults.callCount).to.equal(1); @@ -47,6 +49,33 @@ describe('Data fetching', () => { } }); + it('fetch chunks should respect maxRows', async () => { + const session = await openSession({ arrowEnabled: false }); + + const chunkSize = 300; + const lastChunkSize = 100; // 1000 % chunkSize + + try { + const operation = await session.executeStatement(query, { maxRows: 500 }); + + let hasMoreRows = true; + let chunkCount = 0; + + while (hasMoreRows) { + let chunkedOp = await operation.fetchChunk({ maxRows: 300 }); + chunkCount += 1; + hasMoreRows = await operation.hasMoreRows(); + + const isLastChunk = !hasMoreRows; + expect(chunkedOp.length).to.be.equal(isLastChunk ? lastChunkSize : chunkSize); + } + + expect(chunkCount).to.be.equal(4); // 1000 = 3*300 + 1*100 + } finally { + await session.close(); + } + }); + it('fetch all should fetch all records', async () => { const session = await openSession({ arrowEnabled: false }); sinon.spy(session.context.driver, 'fetchResults'); diff --git a/tests/e2e/cloudfetch.test.js b/tests/e2e/cloudfetch.test.js index e2b564db..5573b434 100644 --- a/tests/e2e/cloudfetch.test.js +++ b/tests/e2e/cloudfetch.test.js @@ -4,6 +4,7 @@ const config = require('./utils/config'); const logger = require('./utils/logger')(config.logger); const { DBSQLClient } = require('../..'); const CloudFetchResultHandler = require('../../dist/result/CloudFetchResultHandler').default; +const ResultSlicer = require('../../dist/result/ResultSlicer').default; async function openSession(customConfig) { const client = new DBSQLClient(); @@ -51,31 +52,34 @@ describe('CloudFetch', () => { // Check if we're actually getting data via CloudFetch const resultHandler = await operation.getResultHandler(); - expect(resultHandler).to.be.instanceOf(CloudFetchResultHandler); + expect(resultHandler).to.be.instanceof(ResultSlicer); + expect(resultHandler.source).to.be.instanceOf(CloudFetchResultHandler); + + const cfResultHandler = resultHandler.source; // Fetch first chunk and check if result handler behaves properly. // With the count of rows we queried, there should be at least one row set, // containing 8 result links. After fetching the first chunk, // result handler should download 5 of them and schedule the rest - expect(await resultHandler.hasMore()).to.be.false; - expect(resultHandler.pendingLinks.length).to.be.equal(0); - expect(resultHandler.downloadedBatches.length).to.be.equal(0); + expect(await cfResultHandler.hasMore()).to.be.false; + expect(cfResultHandler.pendingLinks.length).to.be.equal(0); + expect(cfResultHandler.downloadedBatches.length).to.be.equal(0); sinon.spy(operation._data, 'fetchNext'); - const chunk = await operation.fetchChunk({ maxRows: 100000 }); + const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true }); // Count links returned from server const resultSet = await operation._data.fetchNext.firstCall.returnValue; const resultLinksCount = resultSet?.resultLinks?.length ?? 0; - expect(await resultHandler.hasMore()).to.be.true; + expect(await cfResultHandler.hasMore()).to.be.true; // expected batches minus first 5 already fetched - expect(resultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads); - expect(resultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1); + expect(cfResultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads); + expect(cfResultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1); let fetchedRowCount = chunk.length; while (await operation.hasMoreRows()) { - const chunk = await operation.fetchChunk({ maxRows: 100000 }); + const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true }); fetchedRowCount += chunk.length; } diff --git a/tests/unit/DBSQLOperation.test.js b/tests/unit/DBSQLOperation.test.js index ef9c89c9..be08f527 100644 --- a/tests/unit/DBSQLOperation.test.js +++ b/tests/unit/DBSQLOperation.test.js @@ -9,6 +9,7 @@ const HiveDriverError = require('../../dist/errors/HiveDriverError').default; const JsonResultHandler = require('../../dist/result/JsonResultHandler').default; const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default; const CloudFetchResultHandler = require('../../dist/result/CloudFetchResultHandler').default; +const ResultSlicer = require('../../dist/result/ResultSlicer').default; class OperationHandleMock { constructor(hasResultSet = true) { @@ -407,7 +408,7 @@ describe('DBSQLOperation', () => { expect(operation.cancelled).to.be.true; await expectFailure(() => operation.fetchAll()); - await expectFailure(() => operation.fetchChunk()); + await expectFailure(() => operation.fetchChunk({ disableBuffering: true })); await expectFailure(() => operation.status()); await expectFailure(() => operation.finished()); await expectFailure(() => operation.getSchema()); @@ -533,7 +534,7 @@ describe('DBSQLOperation', () => { expect(operation.closed).to.be.true; await expectFailure(() => operation.fetchAll()); - await expectFailure(() => operation.fetchChunk()); + await expectFailure(() => operation.fetchChunk({ disableBuffering: true })); await expectFailure(() => operation.status()); await expectFailure(() => operation.finished()); await expectFailure(() => operation.getSchema()); @@ -885,7 +886,8 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); const resultHandler = await operation.getResultHandler(); expect(context.driver.getResultSetMetadata.called).to.be.true; - expect(resultHandler).to.be.instanceOf(JsonResultHandler); + expect(resultHandler).to.be.instanceOf(ResultSlicer); + expect(resultHandler.source).to.be.instanceOf(JsonResultHandler); } arrowHandler: { @@ -895,7 +897,8 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); const resultHandler = await operation.getResultHandler(); expect(context.driver.getResultSetMetadata.called).to.be.true; - expect(resultHandler).to.be.instanceOf(ArrowResultHandler); + expect(resultHandler).to.be.instanceOf(ResultSlicer); + expect(resultHandler.source).to.be.instanceOf(ArrowResultHandler); } cloudFetchHandler: { @@ -905,7 +908,8 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); const resultHandler = await operation.getResultHandler(); expect(context.driver.getResultSetMetadata.called).to.be.true; - expect(resultHandler).to.be.instanceOf(CloudFetchResultHandler); + expect(resultHandler).to.be.instanceOf(ResultSlicer); + expect(resultHandler.source).to.be.instanceOf(CloudFetchResultHandler); } }); }); @@ -921,7 +925,7 @@ describe('DBSQLOperation', () => { sinon.spy(context.driver, 'fetchResults'); const operation = new DBSQLOperation({ handle, context }); - const results = await operation.fetchChunk(); + const results = await operation.fetchChunk({ disableBuffering: true }); expect(results).to.deep.equal([]); expect(context.driver.getResultSetMetadata.called).to.be.false; @@ -948,7 +952,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); - const results = await operation.fetchChunk(); + const results = await operation.fetchChunk({ disableBuffering: true }); expect(context.driver.getOperationStatus.called).to.be.true; expect(results).to.deep.equal([]); @@ -974,7 +978,7 @@ describe('DBSQLOperation', () => { context.driver.fetchResultsResp.results.columns = []; const operation = new DBSQLOperation({ handle, context }); - await operation.fetchChunk({ progress: true }); + await operation.fetchChunk({ progress: true, disableBuffering: true }); expect(context.driver.getOperationStatus.called).to.be.true; const request = context.driver.getOperationStatus.getCall(0).args[0]; @@ -1005,7 +1009,7 @@ describe('DBSQLOperation', () => { const callback = sinon.stub(); - await operation.fetchChunk({ callback }); + await operation.fetchChunk({ callback, disableBuffering: true }); expect(context.driver.getOperationStatus.called).to.be.true; expect(callback.callCount).to.be.equal(attemptsUntilFinished); @@ -1023,7 +1027,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); - const results = await operation.fetchChunk(); + const results = await operation.fetchChunk({ disableBuffering: true }); expect(results).to.deep.equal([{ test: 1 }, { test: 2 }, { test: 3 }]); expect(context.driver.getResultSetMetadata.called).to.be.true; @@ -1060,7 +1064,7 @@ describe('DBSQLOperation', () => { }, }); - const results = await operation.fetchChunk(); + const results = await operation.fetchChunk({ disableBuffering: true }); expect(results).to.deep.equal([{ test: 5 }, { test: 6 }]); expect(context.driver.getResultSetMetadata.called).to.be.true; @@ -1098,13 +1102,13 @@ describe('DBSQLOperation', () => { }, }); - const results1 = await operation.fetchChunk(); + const results1 = await operation.fetchChunk({ disableBuffering: true }); expect(results1).to.deep.equal([{ test: 5 }, { test: 6 }]); expect(context.driver.getResultSetMetadata.callCount).to.be.eq(1); expect(context.driver.fetchResults.callCount).to.be.eq(0); - const results2 = await operation.fetchChunk(); + const results2 = await operation.fetchChunk({ disableBuffering: true }); expect(results2).to.deep.equal([{ test: 1 }, { test: 2 }, { test: 3 }]); expect(context.driver.getResultSetMetadata.callCount).to.be.eq(1); @@ -1125,7 +1129,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); try { - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect.fail('It should throw a HiveDriverError'); } catch (e) { if (e instanceof AssertionError) { @@ -1180,7 +1184,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; }); @@ -1196,7 +1200,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; await operation.close(); expect(await operation.hasMoreRows()).to.be.false; @@ -1214,7 +1218,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; await operation.cancel(); expect(await operation.hasMoreRows()).to.be.false; @@ -1232,7 +1236,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; }); @@ -1248,7 +1252,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; }); @@ -1264,7 +1268,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.true; }); @@ -1281,7 +1285,7 @@ describe('DBSQLOperation', () => { const operation = new DBSQLOperation({ handle, context }); expect(await operation.hasMoreRows()).to.be.false; - await operation.fetchChunk(); + await operation.fetchChunk({ disableBuffering: true }); expect(await operation.hasMoreRows()).to.be.false; }); }); diff --git a/tests/unit/result/ResultSlicer.test.js b/tests/unit/result/ResultSlicer.test.js new file mode 100644 index 00000000..7458b462 --- /dev/null +++ b/tests/unit/result/ResultSlicer.test.js @@ -0,0 +1,92 @@ +const { expect } = require('chai'); +const sinon = require('sinon'); +const ResultSlicer = require('../../../dist/result/ResultSlicer').default; + +class ResultsProviderMock { + constructor(chunks) { + this.chunks = chunks; + } + + async hasMore() { + return this.chunks.length > 0; + } + + async fetchNext() { + return this.chunks.shift() ?? []; + } +} + +describe('ResultSlicer', () => { + it('should return chunks of requested size', async () => { + const provider = new ResultsProviderMock([ + [10, 11, 12, 13, 14, 15], + [20, 21, 22, 23, 24, 25], + [30, 31, 32, 33, 34, 35], + ]); + + const slicer = new ResultSlicer({}, provider); + + const chunk1 = await slicer.fetchNext({ limit: 4 }); + expect(chunk1).to.deep.eq([10, 11, 12, 13]); + expect(await slicer.hasMore()).to.be.true; + + const chunk2 = await slicer.fetchNext({ limit: 10 }); + expect(chunk2).to.deep.eq([14, 15, 20, 21, 22, 23, 24, 25, 30, 31]); + expect(await slicer.hasMore()).to.be.true; + + const chunk3 = await slicer.fetchNext({ limit: 10 }); + expect(chunk3).to.deep.eq([32, 33, 34, 35]); + expect(await slicer.hasMore()).to.be.false; + }); + + it('should return raw chunks', async () => { + const provider = new ResultsProviderMock([ + [10, 11, 12, 13, 14, 15], + [20, 21, 22, 23, 24, 25], + [30, 31, 32, 33, 34, 35], + ]); + sinon.spy(provider, 'fetchNext'); + + const slicer = new ResultSlicer({}, provider); + + const chunk1 = await slicer.fetchNext({ limit: 4, disableBuffering: true }); + expect(chunk1).to.deep.eq([10, 11, 12, 13, 14, 15]); + expect(await slicer.hasMore()).to.be.true; + expect(provider.fetchNext.callCount).to.be.equal(1); + + const chunk2 = await slicer.fetchNext({ limit: 10, disableBuffering: true }); + expect(chunk2).to.deep.eq([20, 21, 22, 23, 24, 25]); + expect(await slicer.hasMore()).to.be.true; + expect(provider.fetchNext.callCount).to.be.equal(2); + }); + + it('should switch between returning sliced and raw chunks', async () => { + const provider = new ResultsProviderMock([ + [10, 11, 12, 13, 14, 15], + [20, 21, 22, 23, 24, 25], + [30, 31, 32, 33, 34, 35], + ]); + + const slicer = new ResultSlicer({}, provider); + + const chunk1 = await slicer.fetchNext({ limit: 4 }); + expect(chunk1).to.deep.eq([10, 11, 12, 13]); + expect(await slicer.hasMore()).to.be.true; + + const chunk2 = await slicer.fetchNext({ limit: 10, disableBuffering: true }); + expect(chunk2).to.deep.eq([14, 15]); + expect(await slicer.hasMore()).to.be.true; + + const chunk3 = await slicer.fetchNext({ limit: 10, disableBuffering: true }); + expect(chunk3).to.deep.eq([20, 21, 22, 23, 24, 25]); + expect(await slicer.hasMore()).to.be.true; + + const chunk4 = await slicer.fetchNext({ limit: 4 }); + expect(chunk4).to.deep.eq([30, 31, 32, 33]); + expect(await slicer.hasMore()).to.be.true; + + const chunk5 = await slicer.fetchNext({ limit: 4 }); + expect(chunk5).to.deep.eq([34, 35]); + expect(await slicer.hasMore()).to.be.false; + }); +});