Skip to content

Commit

Permalink
Convert Json/Arrow/CloudFetch result handlers to implement result pro…
Browse files Browse the repository at this point in the history
…vider interface

Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Oct 7, 2023
1 parent d292824 commit 3da3e4a
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 96 deletions.
33 changes: 13 additions & 20 deletions lib/DBSQLOperation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ import {
import Status from '../dto/Status';
import { LogLevel } from '../contracts/IDBSQLLogger';
import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError';
import IOperationResult from '../result/IOperationResult';
import IResultsProvider from '../result/IResultsProvider';
import RowSetProvider from '../result/RowSetProvider';
import JsonResult from '../result/JsonResult';
import ArrowResult from '../result/ArrowResult';
import CloudFetchResult from '../result/CloudFetchResult';
import JsonResultHandler from '../result/JsonResultHandler';
import ArrowResultHandler from '../result/ArrowResultHandler';
import CloudFetchResultHandler from '../result/CloudFetchResultHandler';
import { definedOrError } from '../utils';
import HiveDriverError from '../errors/HiveDriverError';
import IClientContext from '../contracts/IClientContext';
Expand Down Expand Up @@ -68,7 +68,7 @@ export default class DBSQLOperation implements IOperation {

private hasResultSet: boolean = false;

private resultHandler?: IOperationResult;
private resultHandler?: IResultsProvider<Array<any>>;

constructor({ handle, directResults, context }: DBSQLOperationConstructorOptions) {
this.operationHandle = handle;
Expand Down Expand Up @@ -135,14 +135,12 @@ export default class DBSQLOperation implements IOperation {

await this.waitUntilReady(options);

const [resultHandler, data] = await Promise.all([
this.getResultHandler(),
this._data.fetchNext({ limit: options?.maxRows || defaultMaxRows }),
]);
const resultHandler = await this.getResultHandler();
await this.failIfClosed();

const result = resultHandler.fetchNext({ limit: options?.maxRows || defaultMaxRows });
await this.failIfClosed();

const result = await resultHandler.getValue(data ? [data] : []);
this.context
.getLogger()
.log(
Expand Down Expand Up @@ -234,14 +232,9 @@ export default class DBSQLOperation implements IOperation {
return false;
}

// Return early if there are still data available for fetching
if (this._data.hasMoreRows) {
return true;
}

// If we fetched all the data from server - check if there's anything buffered in result handler
const resultHandler = await this.getResultHandler();
return resultHandler.hasPendingData();
return resultHandler.hasMore();
}

public async getSchema(options?: GetSchemaOptions): Promise<TTableSchema | null> {
Expand Down Expand Up @@ -342,20 +335,20 @@ export default class DBSQLOperation implements IOperation {
return this.metadata;
}

private async getResultHandler(): Promise<IOperationResult> {
private async getResultHandler(): Promise<IResultsProvider<Array<any>>> {
const metadata = await this.fetchMetadata();
const resultFormat = definedOrError(metadata.resultFormat);

if (!this.resultHandler) {
switch (resultFormat) {
case TSparkRowSetType.COLUMN_BASED_SET:
this.resultHandler = new JsonResult(this.context, metadata.schema);
this.resultHandler = new JsonResultHandler(this.context, this._data, metadata.schema);
break;
case TSparkRowSetType.ARROW_BASED_SET:
this.resultHandler = new ArrowResult(this.context, metadata.schema, metadata.arrowSchema);
this.resultHandler = new ArrowResultHandler(this.context, this._data, metadata.schema, metadata.arrowSchema);
break;
case TSparkRowSetType.URL_BASED_SET:
this.resultHandler = new CloudFetchResult(this.context, metadata.schema);
this.resultHandler = new CloudFetchResultHandler(this.context, this._data, metadata.schema);
break;
default:
this.resultHandler = undefined;
Expand Down
23 changes: 18 additions & 5 deletions lib/result/ArrowResult.ts → lib/result/ArrowResultHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,42 @@ import {
} from 'apache-arrow';
import { TRowSet, TTableSchema, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IOperationResult from './IOperationResult';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';

const { isArrowBigNumSymbol, bigNumToBigInt } = arrowUtils;

type ArrowSchema = Schema<TypeMap>;
type ArrowSchemaField = Field<DataType<Type, TypeMap>>;

export default class ArrowResult implements IOperationResult {
export default class ArrowResultHandler implements IResultsProvider<Array<any>> {
protected readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly schema: Array<TColumnDesc>;

private readonly arrowSchema?: Buffer;

constructor(context: IClientContext, schema?: TTableSchema, arrowSchema?: Buffer) {
constructor(
context: IClientContext,
source: IResultsProvider<TRowSet | undefined>,
schema?: TTableSchema,
arrowSchema?: Buffer,
) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
this.arrowSchema = arrowSchema;
}

async hasPendingData() {
return false;
async hasMore() {
return this.source.hasMore();
}

async fetchNext(options: ResultsProviderFetchNextOptions) {
const data = await this.source.fetchNext(options);
return this.getValue(data ? [data] : []);
}

async getValue(data?: Array<TRowSet>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@ import { Buffer } from 'buffer';
import fetch, { RequestInfo, RequestInit } from 'node-fetch';
import { TRowSet, TSparkArrowResultLink, TTableSchema } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import ArrowResult from './ArrowResult';
import IResultsProvider from './IResultsProvider';
import ArrowResultHandler from './ArrowResultHandler';
import globalConfig from '../globalConfig';

export default class CloudFetchResult extends ArrowResult {
export default class CloudFetchResultHandler extends ArrowResultHandler {
private pendingLinks: Array<TSparkArrowResultLink> = [];

private downloadedBatches: Array<Buffer> = [];

constructor(context: IClientContext, schema?: TTableSchema) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, schema?: TTableSchema) {
// Arrow schema returned in metadata is not needed for CloudFetch results:
// each batch already contains schema and could be decoded as is
super(context, schema, Buffer.alloc(0));
super(context, source, schema, Buffer.alloc(0));
}

async hasPendingData() {
return this.pendingLinks.length > 0 || this.downloadedBatches.length > 0;
async hasMore() {
if (this.pendingLinks.length > 0 || this.downloadedBatches.length > 0) {
return true;
}
return super.hasMore();
}

protected async getBatches(data: Array<TRowSet>): Promise<Array<Buffer>> {
Expand Down
7 changes: 0 additions & 7 deletions lib/result/IOperationResult.ts

This file was deleted.

18 changes: 13 additions & 5 deletions lib/result/JsonResult.ts → lib/result/JsonResultHandler.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
import { ColumnCode } from '../hive/Types';
import { TRowSet, TTableSchema, TColumn, TColumnDesc } from '../../thrift/TCLIService_types';
import IClientContext from '../contracts/IClientContext';
import IOperationResult from './IOperationResult';
import IResultsProvider, { ResultsProviderFetchNextOptions } from './IResultsProvider';
import { getSchemaColumns, convertThriftValue } from './utils';

export default class JsonResult implements IOperationResult {
export default class JsonResultHandler implements IResultsProvider<Array<any>> {
private readonly context: IClientContext;

private readonly source: IResultsProvider<TRowSet | undefined>;

private readonly schema: Array<TColumnDesc>;

constructor(context: IClientContext, schema?: TTableSchema) {
constructor(context: IClientContext, source: IResultsProvider<TRowSet | undefined>, schema?: TTableSchema) {
this.context = context;
this.source = source;
this.schema = getSchemaColumns(schema);
}

async hasPendingData() {
return false;
async hasMore() {
return this.source.hasMore();
}

async fetchNext(options: ResultsProviderFetchNextOptions) {
const data = await this.source.fetchNext(options);
return this.getValue(data ? [data] : []);
}

async getValue(data?: Array<TRowSet>): Promise<Array<object>> {
Expand Down
12 changes: 6 additions & 6 deletions tests/e2e/arrow.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const { expect } = require('chai');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const ArrowResult = require('../../dist/result/ArrowResult').default;
const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default;
const globalConfig = require('../../dist/globalConfig').default;

const fixtures = require('../fixtures/compatibility');
Expand Down Expand Up @@ -76,7 +76,7 @@ describe('Arrow support', () => {
expect(result).to.deep.equal(expectedColumn);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.not.instanceof(ArrowResult);
expect(resultHandler).to.be.not.instanceof(ArrowResultHandler);

await operation.close();
}),
Expand All @@ -93,7 +93,7 @@ describe('Arrow support', () => {
expect(fixArrowResult(result)).to.deep.equal(expectedArrow);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

await operation.close();
}),
Expand All @@ -110,7 +110,7 @@ describe('Arrow support', () => {
expect(fixArrowResult(result)).to.deep.equal(expectedArrowNativeTypes);

const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

await operation.close();
}),
Expand All @@ -130,9 +130,9 @@ describe('Arrow support', () => {

// We use some internals here to check that server returned response with multiple batches
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceof(ArrowResult);
expect(resultHandler).to.be.instanceof(ArrowResultHandler);

const rawData = await operation._data.fetch(rowsCount);
const rawData = await operation._data.fetchNext({ limit: rowsCount });
// We don't know exact count of batches returned, it depends on server's configuration,
// but with much enough rows there should be more than one result batch
expect(rawData.arrowBatches?.length).to.be.gt(1);
Expand Down
12 changes: 6 additions & 6 deletions tests/e2e/cloudfetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const sinon = require('sinon');
const config = require('./utils/config');
const logger = require('./utils/logger')(config.logger);
const { DBSQLClient } = require('../..');
const CloudFetchResult = require('../../dist/result/CloudFetchResult').default;
const CloudFetchResultHandler = require('../../dist/result/CloudFetchResultHandler').default;
const globalConfig = require('../../dist/globalConfig').default;

const openSession = async () => {
Expand Down Expand Up @@ -57,24 +57,24 @@ describe('CloudFetch', () => {

// Check if we're actually getting data via CloudFetch
const resultHandler = await operation.getResultHandler();
expect(resultHandler).to.be.instanceOf(CloudFetchResult);
expect(resultHandler).to.be.instanceOf(CloudFetchResultHandler);

// Fetch first chunk and check if result handler behaves properly.
// With the count of rows we queried, there should be at least one row set,
// containing 8 result links. After fetching the first chunk,
// result handler should download 5 of them and schedule the rest
expect(await resultHandler.hasPendingData()).to.be.false;
expect(await resultHandler.hasMore()).to.be.false;
expect(resultHandler.pendingLinks.length).to.be.equal(0);
expect(resultHandler.downloadedBatches.length).to.be.equal(0);

sinon.spy(operation._data, 'fetch');
sinon.spy(operation._data, 'fetchNext');

const chunk = await operation.fetchChunk({ maxRows: 100000 });
// Count links returned from server
const resultSet = await operation._data.fetch.firstCall.returnValue;
const resultSet = await operation._data.fetchNext.firstCall.returnValue;
const resultLinksCount = resultSet?.resultLinks?.length ?? 0;

expect(await resultHandler.hasPendingData()).to.be.true;
expect(await resultHandler.hasMore()).to.be.true;
// expected batches minus first 5 already fetched
expect(resultHandler.pendingLinks.length).to.be.equal(
resultLinksCount - globalConfig.cloudFetchConcurrentDownloads,
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/DBSQLOperation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const DBSQLOperation = require('../../dist/DBSQLOperation').default;
const StatusError = require('../../dist/errors/StatusError').default;
const OperationStateError = require('../../dist/errors/OperationStateError').default;
const HiveDriverError = require('../../dist/errors/HiveDriverError').default;
const JsonResult = require('../../dist/result/JsonResult').default;
const ArrowResult = require('../../dist/result/ArrowResult').default;
const CloudFetchResult = require('../../dist/result/CloudFetchResult').default;
const JsonResultHandler = require('../../dist/result/JsonResultHandler').default;
const ArrowResultHandler = require('../../dist/result/ArrowResultHandler').default;
const CloudFetchResultHandler = require('../../dist/result/CloudFetchResultHandler').default;

class OperationHandleMock {
constructor(hasResultSet = true) {
Expand Down Expand Up @@ -885,7 +885,7 @@ describe('DBSQLOperation', () => {
const operation = new DBSQLOperation({ handle, context });
const resultHandler = await operation.getResultHandler();
expect(context.driver.getResultSetMetadata.called).to.be.true;
expect(resultHandler).to.be.instanceOf(JsonResult);
expect(resultHandler).to.be.instanceOf(JsonResultHandler);
}

arrowHandler: {
Expand All @@ -895,7 +895,7 @@ describe('DBSQLOperation', () => {
const operation = new DBSQLOperation({ handle, context });
const resultHandler = await operation.getResultHandler();
expect(context.driver.getResultSetMetadata.called).to.be.true;
expect(resultHandler).to.be.instanceOf(ArrowResult);
expect(resultHandler).to.be.instanceOf(ArrowResultHandler);
}

cloudFetchHandler: {
Expand All @@ -905,7 +905,7 @@ describe('DBSQLOperation', () => {
const operation = new DBSQLOperation({ handle, context });
const resultHandler = await operation.getResultHandler();
expect(context.driver.getResultSetMetadata.called).to.be.true;
expect(resultHandler).to.be.instanceOf(CloudFetchResult);
expect(resultHandler).to.be.instanceOf(CloudFetchResultHandler);
}
});
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const { expect } = require('chai');
const fs = require('fs');
const path = require('path');
const ArrowResult = require('../../../dist/result/ArrowResult').default;
const ArrowResultHandler = require('../../../dist/result/ArrowResultHandler').default;
const RowSetProviderMock = require('./fixtures/RowSetProviderMock');

const sampleThriftSchema = {
columns: [
Expand Down Expand Up @@ -84,17 +85,19 @@ const rowSetAllNulls = {
],
};

describe('ArrowResult', () => {
describe('ArrowResultHandler', () => {
it('should not buffer any data', async () => {
const context = {};
const result = new ArrowResult(context, sampleThriftSchema, sampleArrowSchema);
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
await result.getValue([sampleRowSet1]);
expect(await result.hasPendingData()).to.be.false;
expect(await result.hasMore()).to.be.false;
});

it('should convert data', async () => {
const context = {};
const result = new ArrowResult(context, sampleThriftSchema, sampleArrowSchema);
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.getValue([sampleRowSet1])).to.be.deep.eq([]);
expect(await result.getValue([sampleRowSet2])).to.be.deep.eq([]);
expect(await result.getValue([sampleRowSet3])).to.be.deep.eq([]);
Expand All @@ -103,20 +106,23 @@ describe('ArrowResult', () => {

it('should return empty array if no data to process', async () => {
const context = {};
const result = new ArrowResult(context, sampleThriftSchema, sampleArrowSchema);
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider, sampleThriftSchema, sampleArrowSchema);
expect(await result.getValue()).to.be.deep.eq([]);
expect(await result.getValue([])).to.be.deep.eq([]);
});

it('should return empty array if no schema available', async () => {
const context = {};
const result = new ArrowResult(context);
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider);
expect(await result.getValue([sampleRowSet4])).to.be.deep.eq([]);
});

it('should detect nulls', async () => {
const context = {};
const result = new ArrowResult(context, thriftSchemaAllNulls, arrowSchemaAllNulls);
const rowSetProvider = new RowSetProviderMock();
const result = new ArrowResultHandler(context, rowSetProvider, thriftSchemaAllNulls, arrowSchemaAllNulls);
expect(await result.getValue([rowSetAllNulls])).to.be.deep.eq([
{
boolean_field: null,
Expand Down
Loading

0 comments on commit 3da3e4a

Please sign in to comment.