Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring: Convert global config to client config and make it available through client context #202

Merged
merged 3 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import TCLIService from '../thrift/TCLIService';
import { TProtocolVersion } from '../thrift/TCLIService_types';
import IDBSQLClient, { ClientOptions, ConnectionOptions, OpenSessionRequest } from './contracts/IDBSQLClient';
import IDriver from './contracts/IDriver';
import IClientContext from './contracts/IClientContext';
import IClientContext, { ClientConfig } from './contracts/IClientContext';
import HiveDriver from './hive/HiveDriver';
import { Int64 } from './hive/Types';
import DBSQLSession from './DBSQLSession';
Expand Down Expand Up @@ -46,6 +46,8 @@ function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
export default class DBSQLClient extends EventEmitter implements IDBSQLClient, IClientContext {
private static defaultLogger?: IDBSQLLogger;

private readonly config: ClientConfig;

private connectionProvider?: IConnectionProvider;

private authProvider?: IAuthentication;
Expand All @@ -69,8 +71,25 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
return this.defaultLogger;
}

private static getDefaultConfig(): ClientConfig {
return {
arrowEnabled: true,
useArrowNativeTypes: true,
socketTimeout: 15 * 60 * 1000, // 15 minutes

retryMaxAttempts: 30,
retriesTimeout: 900 * 1000,
retryDelayMin: 1 * 1000,
retryDelayMax: 60 * 1000,

useCloudFetch: false,
cloudFetchConcurrentDownloads: 10,
};
}

constructor(options?: ClientOptions) {
super();
this.config = DBSQLClient.getDefaultConfig();
this.logger = options?.logger ?? DBSQLClient.getDefaultLogger();
this.logger.log(LogLevel.info, 'Created DBSQLClient');
}
Expand Down Expand Up @@ -129,7 +148,7 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
public async connect(options: ConnectionOptions, authProvider?: IAuthentication): Promise<IDBSQLClient> {
this.authProvider = this.initAuthProvider(options, authProvider);

this.connectionProvider = new HttpConnection(this.getConnectionOptions(options));
this.connectionProvider = new HttpConnection(this.getConnectionOptions(options), this);

const thriftConnection = await this.connectionProvider.getThriftConnection();

Expand Down Expand Up @@ -196,6 +215,10 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient, I
this.authProvider = undefined;
}

public getConfig(): ClientConfig {
return this.config;
}

public getLogger(): IDBSQLLogger {
return this.logger;
}
Expand Down
12 changes: 6 additions & 6 deletions lib/DBSQLSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ import { definedOrError } from './utils';
import CloseableCollection from './utils/CloseableCollection';
import { LogLevel } from './contracts/IDBSQLLogger';
import HiveDriverError from './errors/HiveDriverError';
import globalConfig from './globalConfig';
import StagingError from './errors/StagingError';
import { DBSQLParameter, DBSQLParameterValue } from './DBSQLParameter';
import ParameterError from './errors/ParameterError';
import IClientContext from './contracts/IClientContext';
import IClientContext, { ClientConfig } from './contracts/IClientContext';

const defaultMaxRows = 100000;

Expand All @@ -59,11 +58,11 @@ function getDirectResultsOptions(maxRows: number | null = defaultMaxRows) {
};
}

function getArrowOptions(): {
function getArrowOptions(config: ClientConfig): {
canReadArrowResult: boolean;
useArrowNativeTypes?: TSparkArrowTypes;
} {
const { arrowEnabled = true, useArrowNativeTypes = true } = globalConfig;
const { arrowEnabled = true, useArrowNativeTypes = true } = config;

if (!arrowEnabled) {
return {
Expand Down Expand Up @@ -187,14 +186,15 @@ export default class DBSQLSession implements IDBSQLSession {
public async executeStatement(statement: string, options: ExecuteStatementOptions = {}): Promise<IOperation> {
await this.failIfClosed();
const driver = await this.context.getDriver();
const clientConfig = this.context.getConfig();
const operationPromise = driver.executeStatement({
sessionHandle: this.sessionHandle,
statement,
queryTimeout: options.queryTimeout,
runAsync: true,
...getDirectResultsOptions(options.maxRows),
...getArrowOptions(),
canDownloadResult: options.useCloudFetch ?? globalConfig.useCloudFetch,
...getArrowOptions(clientConfig),
canDownloadResult: options.useCloudFetch ?? clientConfig.useCloudFetch,
parameters: getQueryParameters(this.sessionHandle, options.namedParameters, options.ordinalParameters),
});
const response = await this.handleResponse(operationPromise);
Expand Down
13 changes: 9 additions & 4 deletions lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,24 @@ import { ProxyAgent } from 'proxy-agent';

import IConnectionProvider from '../contracts/IConnectionProvider';
import IConnectionOptions, { ProxyOptions } from '../contracts/IConnectionOptions';
import globalConfig from '../../globalConfig';
import IClientContext from '../../contracts/IClientContext';

import ThriftHttpConnection from './ThriftHttpConnection';

export default class HttpConnection implements IConnectionProvider {
private readonly options: IConnectionOptions;

private readonly context: IClientContext;

private headers: HeadersInit = {};

private connection?: ThriftHttpConnection;

private agent?: http.Agent;

constructor(options: IConnectionOptions) {
constructor(options: IConnectionOptions, context: IClientContext) {
this.options = options;
this.context = context;
}

public setHeaders(headers: HeadersInit) {
Expand All @@ -44,11 +47,12 @@ export default class HttpConnection implements IConnectionProvider {
}

private getAgentDefaultOptions(): http.AgentOptions {
const clientConfig = this.context.getConfig();
return {
keepAlive: true,
maxSockets: 5,
keepAliveMsecs: 10000,
timeout: this.options.socketTimeout ?? globalConfig.socketTimeout,
timeout: this.options.socketTimeout ?? clientConfig.socketTimeout,
};
}

Expand Down Expand Up @@ -89,6 +93,7 @@ export default class HttpConnection implements IConnectionProvider {
public async getThriftConnection(): Promise<any> {
if (!this.connection) {
const { options } = this;
const clientConfig = this.context.getConfig();
const agent = await this.getAgent();

this.connection = new ThriftHttpConnection(
Expand All @@ -99,7 +104,7 @@ export default class HttpConnection implements IConnectionProvider {
},
{
agent,
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
timeout: options.socketTimeout ?? clientConfig.socketTimeout,
headers: {
...options.headers,
...this.headers,
Expand Down
16 changes: 16 additions & 0 deletions lib/contracts/IClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,23 @@ import IDriver from './IDriver';
import IConnectionProvider from '../connection/contracts/IConnectionProvider';
import TCLIService from '../../thrift/TCLIService';

export interface ClientConfig {
arrowEnabled?: boolean;
useArrowNativeTypes?: boolean;
socketTimeout: number;

retryMaxAttempts: number;
retriesTimeout: number; // in milliseconds
retryDelayMin: number; // in milliseconds
retryDelayMax: number; // in milliseconds

useCloudFetch: boolean;
cloudFetchConcurrentDownloads: number;
}

export default interface IClientContext {
getConfig(): ClientConfig;

getLogger(): IDBSQLLogger;

getConnectionProvider(): Promise<IConnectionProvider>;
Expand Down
27 changes: 0 additions & 27 deletions lib/globalConfig.ts

This file was deleted.

19 changes: 12 additions & 7 deletions lib/hive/Commands/BaseCommand.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Thrift } from 'thrift';
import TCLIService from '../../../thrift/TCLIService';
import HiveDriverError from '../../errors/HiveDriverError';
import globalConfig from '../../globalConfig';
import IClientContext, { ClientConfig } from '../../contracts/IClientContext';

interface CommandExecutionInfo {
startTime: number; // in milliseconds
attempt: number;
}

function getRetryDelay(attempt: number): number {
function getRetryDelay(attempt: number, config: ClientConfig): number {
const scale = Math.max(1, 1.5 ** (attempt - 1)); // ensure scale >= 1
return Math.min(globalConfig.retryDelayMin * scale, globalConfig.retryDelayMax);
return Math.min(config.retryDelayMin * scale, config.retryDelayMax);
}

function delay(milliseconds: number): Promise<void> {
Expand All @@ -22,8 +22,11 @@ function delay(milliseconds: number): Promise<void> {
export default abstract class BaseCommand {
protected client: TCLIService.Client;

constructor(client: TCLIService.Client) {
protected context: IClientContext;

constructor(client: TCLIService.Client, context: IClientContext) {
this.client = client;
this.context = context;
}

protected executeCommand<Response>(request: object, command: Function | void): Promise<Response> {
Expand All @@ -49,19 +52,21 @@ export default abstract class BaseCommand {
case 503: // Service Unavailable
info.attempt += 1;

const clientConfig = this.context.getConfig();

// Delay interval depends on current attempt - the more attempts we do
// the longer the interval will be
// TODO: Respect `Retry-After` header (PECO-729)
const retryDelay = getRetryDelay(info.attempt);
const retryDelay = getRetryDelay(info.attempt, clientConfig);

const attemptsExceeded = info.attempt >= globalConfig.retryMaxAttempts;
const attemptsExceeded = info.attempt >= clientConfig.retryMaxAttempts;
if (attemptsExceeded) {
throw new HiveDriverError(
`Hive driver: ${error.statusCode} when connecting to resource. Max retry count exceeded.`,
);
}

const timeoutExceeded = Date.now() - info.startTime + retryDelay >= globalConfig.retriesTimeout;
const timeoutExceeded = Date.now() - info.startTime + retryDelay >= clientConfig.retriesTimeout;
if (timeoutExceeded) {
throw new HiveDriverError(
`Hive driver: ${error.statusCode} when connecting to resource. Retry timeout exceeded.`,
Expand Down
Loading
Loading