Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PECO-969] Make sure that DBSQLOperation.fetchChunk returns chunks of requested size #200

Merged
merged 12 commits into from
Nov 28, 2023
Merged
36 changes: 26 additions & 10 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import RowSetProvider from '../result/RowSetProvider';
import JsonResultHandler from '../result/JsonResultHandler';
import ArrowResultHandler from '../result/ArrowResultHandler';
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
import ResultSlicer from '../result/ResultSlicer';
import { definedOrError } from '../utils';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
Expand Down Expand Up @@ -68,7 +69,7 @@ export default class DBSQLOperation implements IOperation {

private hasResultSet: boolean = false;

private resultHandler?: IResultsProvider<Array<any>>;
private resultHandler?: ResultSlicer<any>;

constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) {
this.operationHandle = handle;
Expand Down Expand Up @@ -107,9 +108,17 @@ export default class DBSQLOperation implements IOperation {
*/
public async fetchAll(options?: FetchOptions): Promise<Array<object>> {
const data: Array<Array<object>> = [];

const fetchChunkOptions = {
...options,
// Tell slicer to return raw chunks. We're going to process all of them anyway,
// so no need to additionally buffer and slice chunks returned by server
disableBuffering: true,
};

do {
// eslint-disable-next-line no-await-in-loop
const chunk = await this.fetchChunk(options);
const chunk = await this.fetchChunk(fetchChunkOptions);
data.push(chunk);
} while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop
this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.getId()}`);
Expand Down Expand Up @@ -138,7 +147,10 @@ export default class DBSQLOperation implements IOperation {
const resultHandler = await this.getResultHandler();
await this.failIfClosed();

const result = resultHandler.fetchNext({ limit: options?.maxRows || defaultMaxRows });
const result = resultHandler.fetchNext({
limit: options?.maxRows || defaultMaxRows,
disableBuffering: options?.disableBuffering,
});
await this.failIfClosed();

this.context
Expand Down Expand Up @@ -335,24 +347,28 @@ export default class DBSQLOperation implements IOperation {
return this.metadata;
}

private async getResultHandler(): Promise<IResultsProvider<Array<any>>> {
private async getResultHandler(): Promise<ResultSlicer<any>> {
const metadata = await this.fetchMetadata();
const resultFormat = definedOrError(metadata.resultFormat);

if (!this.resultHandler) {
let resultSource: IResultsProvider<Array<any>> | undefined;

switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
this.resultHandler = new JsonResultHandler(this.context, this._data, metadata.schema);
resultSource = new JsonResultHandler(this.context, this._data, metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
this.resultHandler = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
resultSource = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
break;
case TSparkRowSetType.URL_BASED_SET:
this.resultHandler = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
break;
default:
this.resultHandler = undefined;
resultSource = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
break;
// no default
}

if (resultSource) {
this.resultHandler = new ResultSlicer(this.context, resultSource);
}
}

Expand Down
3 changes: 3 additions & 0 deletions lib/contracts/IOperation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ export interface FinishedOptions extends WaitUntilReadyOptions {

export interface FetchOptions extends WaitUntilReadyOptions {
maxRows?: number;
// Disables internal buffer used to ensure a consistent chunks size.
// When set to `true`, returned chunks size may vary (and may differ from `maxRows`)
disableBuffering?: boolean;
}

export interface GetSchemaOptions extends WaitUntilReadyOptions {
Expand Down
74 changes: 74 additions & 0 deletions lib/result/ResultSlicer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import IClientContext from '../contracts/IClientContext';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';

export interface ResultSlicerFetchNextOptions extends ResultsProviderFetchNextOptions {
// Setting this to `true` will disable slicer, and it will return unprocessed chunks
// from underlying results provider
disableBuffering?: boolean;
}

export default class ResultSlicer<T> implements IResultsProvider<Array<T>> {
private readonly context: IClientContext;

private readonly source: IResultsProvider<Array<T>>;

private remainingResults: Array<T> = [];

constructor(context: IClientContext, source: IResultsProvider<Array<T>>) {
this.context = context;
this.source = source;
}

public async hasMore(): Promise<boolean> {
if (this.remainingResults.length > 0) {
return true;
}
return this.source.hasMore();
}

public async fetchNext(options: ResultSlicerFetchNextOptions): Promise<Array<T>> {
// If we're asked to not use buffer - first try to return whatever we have in buffer.
// If buffer is empty - just proxy the call to underlying results provider
if (options.disableBuffering) {
if (this.remainingResults.length > 0) {
const result = this.remainingResults;
this.remainingResults = [];
return result;
}

return this.source.fetchNext(options);
}

const result: Array<Array<T>> = [];
let resultsCount = 0;

// First, use remaining items from the previous fetch
if (this.remainingResults.length > 0) {
result.push(this.remainingResults);
resultsCount += this.remainingResults.length;
this.remainingResults = [];
}

// Fetch items from source results provider until we reach a requested count
while (resultsCount < options.limit) {
// eslint-disable-next-line no-await-in-loop
const chunk = await this.source.fetchNext(options);
if (chunk.length === 0) {
break;
}

result.push(chunk);
resultsCount += chunk.length;
}

// If we collected more results than requested, slice the excess items and store them for the next time
if (resultsCount > options.limit) {
const lastChunk = result.pop() ?? [];
const neededCount = options.limit - (resultsCount - lastChunk.length);
result.push(lastChunk.splice(0, neededCount));
this.remainingResults = lastChunk;
}

return result.flat();
}
}
13 changes: 9 additions & 4 deletions tests/e2e/arrow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default;
const ResultSlicer = require('../../dist/result/ResultSlicer').default;

const fixtures = require('../fixtures/compatibility');
const { expected: expectedColumn } = require('../fixtures/compatibility/column');
Expand Down Expand Up @@ -81,7 +82,8 @@ describe('Arrow support', () => {
expect(result).to.deep.equal(expectedColumn);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.not.instanceof(ArrowResultHandler);
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.not.instanceof(ArrowResultHandler);

await operation.close();
},
Expand All @@ -100,7 +102,8 @@ describe('Arrow support', () => {
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceof(ArrowResultHandler);

await operation.close();
},
Expand All @@ -120,7 +123,8 @@ describe('Arrow support', () => {
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceof(ArrowResultHandler);

await operation.close();
},
Expand All @@ -145,7 +149,8 @@ describe('Arrow support', () => {

// We use some internals here to check that server returned response with multiple batches
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResultHandler);
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceof(ArrowResultHandler);

sinon.spy(operation._data, 'fetchNext');

Expand Down
31 changes: 30 additions & 1 deletion tests/e2e/batched_fetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ describe('Data fetching', () => {
try {
// set `maxRows` to null to disable direct results so all the data are fetched through `driver.fetchResults`
const operation = await session.executeStatement(query, { maxRows: null });
let chunkedOp = await operation.fetchChunk({ maxRows: 10 }).catch((error) => logger(error));
let chunkedOp = await operation
.fetchChunk({ maxRows: 10, disableBuffering: true })
.catch((error) => logger(error));
expect(chunkedOp.length).to.be.equal(10);
// we explicitly requested only one chunk
expect(session.context.driver.fetchResults.callCount).to.equal(1);
Expand All @@ -47,6 +49,33 @@ describe('Data fetching', () => {
}
});

it('fetch chunks should respect maxRows', async () => {
const session = await openSession({ arrowEnabled: false });

const chunkSize = 300;
const lastChunkSize = 100; // 1000 % chunkSize

try {
const operation = await session.executeStatement(query, { maxRows: 500 });

let hasMoreRows = true;
let chunkCount = 0;

while (hasMoreRows) {
let chunkedOp = await operation.fetchChunk({ maxRows: 300 });
chunkCount += 1;
hasMoreRows = await operation.hasMoreRows();

const isLastChunk = !hasMoreRows;
expect(chunkedOp.length).to.be.equal(isLastChunk ? lastChunkSize : chunkSize);
}

expect(chunkCount).to.be.equal(4); // 1000 = 3*300 + 1*100
} finally {
await session.close();
}
});

it('fetch all should fetch all records', async () => {
const session = await openSession({ arrowEnabled: false });
sinon.spy(session.context.driver, 'fetchResults');
Expand Down
22 changes: 13 additions & 9 deletions tests/e2e/cloudfetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const CloudFetchResultHandler = require('../../dist/result/CloudFetchResultHandler').default;
const ResultSlicer = require('../../dist/result/ResultSlicer').default;

async function openSession(customConfig) {
const client = new DBSQLClient();
Expand Down Expand Up @@ -51,31 +52,34 @@ describe('CloudFetch', () => {

// Check if we're actually getting data via CloudFetch
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceOf(CloudFetchResultHandler);
expect(resultHandler).to.be.instanceof(ResultSlicer);
expect(resultHandler.source).to.be.instanceOf(CloudFetchResultHandler);

const cfResultHandler = resultHandler.source;

// Fetch first chunk and check if result handler behaves properly.
// With the count of rows we queried, there should be at least one row set,
// containing 8 result links. After fetching the first chunk,
// result handler should download 5 of them and schedule the rest
expect(await resultHandler.hasMore()).to.be.false;
expect(resultHandler.pendingLinks.length).to.be.equal(0);
expect(resultHandler.downloadedBatches.length).to.be.equal(0);
expect(await cfResultHandler.hasMore()).to.be.false;
expect(cfResultHandler.pendingLinks.length).to.be.equal(0);
expect(cfResultHandler.downloadedBatches.length).to.be.equal(0);

sinon.spy(operation._data, 'fetchNext');

const chunk = await operation.fetchChunk({ maxRows: 100000 });
const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
// Count links returned from server
const resultSet = await operation._data.fetchNext.firstCall.returnValue;
const resultLinksCount = resultSet?.resultLinks?.length ?? 0;

expect(await resultHandler.hasMore()).to.be.true;
expect(await cfResultHandler.hasMore()).to.be.true;
// expected batches minus first 5 already fetched
expect(resultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads);
expect(resultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1);
expect(cfResultHandler.pendingLinks.length).to.be.equal(resultLinksCount - cloudFetchConcurrentDownloads);
expect(cfResultHandler.downloadedBatches.length).to.be.equal(cloudFetchConcurrentDownloads - 1);

let fetchedRowCount = chunk.length;
while (await operation.hasMoreRows()) {
const chunk = await operation.fetchChunk({ maxRows: 100000 });
const chunk = await operation.fetchChunk({ maxRows: 100000, disableBuffering: true });
fetchedRowCount += chunk.length;
}

Expand Down
Loading
Loading