Skip to content

Commit

Permalink
Merge pull request #20 from drift-labs/master
Browse files Browse the repository at this point in the history
trades publisher
  • Loading branch information
NourAlharithi authored Nov 17, 2023
2 parents 4151076 + e97f319 commit 37b991b
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 32 deletions.
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"main": "lib/index.js",
"license": "Apache-2.0",
"dependencies": {
"@drift-labs/sdk": "2.43.0-beta.17",
"@drift-labs/sdk": "2.45.0-beta.0",
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.1",
"@opentelemetry/exporter-prometheus": "^0.31.0",
Expand All @@ -27,6 +27,7 @@
"prom-client": "^15.0.0",
"redis": "^4.6.10",
"response-time": "^2.3.2",
"rxjs": "^7.8.1",
"socket.io-redis": "^6.1.1",
"typescript": "4.5.4",
"winston": "^3.8.1",
Expand All @@ -50,7 +51,8 @@
"clean": "rm -rf lib",
"start": "node lib/index.js",
"dev": "ts-node src/index.ts",
"ws-publish": "ts-node src/wsPublish.ts",
"dlob-publish": "ts-node src/publishers/dlobPublisher.ts",
"trades-publish": "ts-node src/publishers/tradesPublisher.ts",
"ws-manager": "ts-node src/wsConnectionManager.ts",
"dev:inspect": "yarn build && node --inspect ./lib/index.js",
"dev:debug": "yarn build && node --inspect-brk --inspect=2230 ./lib/index.js",
Expand Down
13 changes: 9 additions & 4 deletions src/dlob-subscriber/DLOBSubscriberIO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import {
MainnetSpotMarkets,
MarketType,
groupL2,
isVariant,
} from '@drift-labs/sdk';
import { getOracleForMarket, l2WithBNToStrings } from '../utils/utils';
import { RedisClient } from '../utils/redisClient';
import { driftEnv } from '../wsPublish';
import { driftEnv } from '../publishers/dlobPublisher';

type wsMarketL2Args = {
marketIndex: number;
Expand Down Expand Up @@ -82,6 +83,7 @@ export class DLOBSubscriberIO extends DLOBSubscriber {
const grouping = l2Args.grouping;
const { marketName, ...l2FuncArgs } = l2Args;
const l2 = this.getL2(l2FuncArgs);
const marketType = isVariant(l2Args.marketType, 'perp') ? 'perp' : 'spot';
let l2Formatted: any;
if (grouping) {
const groupingBN = new BN(grouping);
Expand All @@ -101,14 +103,17 @@ export class DLOBSubscriberIO extends DLOBSubscriber {
this.lastSeenL2Formatted
.get(l2Args.marketType)
?.set(l2Args.marketIndex, JSON.stringify(l2Formatted));
l2Formatted['marketName'] = marketName;
l2Formatted['marketType'] = l2Args.marketType;
l2Formatted['marketName'] = marketName?.toUpperCase();
l2Formatted['marketType'] = marketType?.toLowerCase();
l2Formatted['marketIndex'] = l2Args.marketIndex;
l2Formatted['oracle'] = getOracleForMarket(
this.driftClient,
l2Args.marketType,
l2Args.marketIndex
);
this.redisClient.client.publish(marketName, JSON.stringify(l2Formatted));
this.redisClient.client.publish(
`orderbook_${marketType}_${l2Args.marketIndex}`,
JSON.stringify(l2Formatted)
);
}
}
8 changes: 4 additions & 4 deletions src/wsPublish.ts → src/publishers/dlobPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import {
BulkAccountLoader,
} from '@drift-labs/sdk';

import { logger, setLogLevel } from './utils/logger';
import { sleep } from './utils/utils';
import { DLOBSubscriberIO } from './dlob-subscriber/DLOBSubscriberIO';
import { RedisClient } from './utils/redisClient';
import { logger, setLogLevel } from '../utils/logger';
import { sleep } from '../utils/utils';
import { DLOBSubscriberIO } from '../dlob-subscriber/DLOBSubscriberIO';
import { RedisClient } from '../utils/redisClient';

require('dotenv').config();
const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;
Expand Down
208 changes: 208 additions & 0 deletions src/publishers/tradesPublisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import { program } from 'commander';

import { Connection, Commitment, PublicKey, Keypair } from '@solana/web3.js';

import {
DriftClient,
initialize,
DriftEnv,
SlotSubscriber,
UserMap,
Wallet,
BulkAccountLoader,
EventSubscriber,
OrderAction,
convertToNumber,
BASE_PRECISION,
QUOTE_PRECISION,
PRICE_PRECISION,
getVariant,
} from '@drift-labs/sdk';

import { logger, setLogLevel } from '../utils/logger';
import { sleep } from '../utils/utils';
import { RedisClient } from '../utils/redisClient';
import { fromEvent, filter, map } from 'rxjs';

require('dotenv').config();
const driftEnv = (process.env.ENV || 'devnet') as DriftEnv;
const commitHash = process.env.COMMIT;
const REDIS_HOST = process.env.REDIS_HOST || 'localhost';
const REDIS_PORT = process.env.REDIS_PORT || '6379';
const REDIS_PASSWORD = process.env.REDIS_PASSWORD;

//@ts-ignore
const sdkConfig = initialize({ env: process.env.ENV });

const stateCommitment: Commitment = 'processed';
const ORDERBOOK_UPDATE_INTERVAL = 1000;

let driftClient: DriftClient;

const opts = program.opts();
setLogLevel(opts.debug ? 'debug' : 'info');

const endpoint = process.env.ENDPOINT;
const wsEndpoint = process.env.WS_ENDPOINT;
logger.info(`RPC endpoint: ${endpoint}`);
logger.info(`WS endpoint: ${wsEndpoint}`);
logger.info(`DriftEnv: ${driftEnv}`);
logger.info(`Commit: ${commitHash}`);

const main = async () => {
const wallet = new Wallet(new Keypair());
const clearingHousePublicKey = new PublicKey(sdkConfig.DRIFT_PROGRAM_ID);

const connection = new Connection(endpoint, {
wsEndpoint: wsEndpoint,
commitment: stateCommitment,
});

const bulkAccountLoader = new BulkAccountLoader(
connection,
stateCommitment,
ORDERBOOK_UPDATE_INTERVAL
);

driftClient = new DriftClient({
connection,
wallet,
programID: clearingHousePublicKey,
accountSubscription: {
type: 'polling',
accountLoader: bulkAccountLoader,
},
env: driftEnv,
userStats: true,
});

const slotSubscriber = new SlotSubscriber(connection, {});

const lamportsBalance = await connection.getBalance(wallet.publicKey);
logger.info(
`DriftClient ProgramId: ${driftClient.program.programId.toBase58()}`
);
logger.info(`Wallet pubkey: ${wallet.publicKey.toBase58()}`);
logger.info(` . SOL balance: ${lamportsBalance / 10 ** 9}`);

await driftClient.subscribe();
driftClient.eventEmitter.on('error', (e) => {
logger.info('clearing house error');
logger.error(e);
});

await slotSubscriber.subscribe();

const userMap = new UserMap(
driftClient,
driftClient.userAccountSubscriptionConfig,
false
);
await userMap.subscribe();

const redisClient = new RedisClient(REDIS_HOST, REDIS_PORT, REDIS_PASSWORD);
await redisClient.connect();

const eventSubscriber = new EventSubscriber(connection, driftClient.program, {
maxTx: 8192,
maxEventsPerType: 4096,
orderBy: 'client',
commitment: 'confirmed',
logProviderConfig: {
type: 'polling',
frequency: 1000,
},
});

await eventSubscriber.subscribe();

const eventObservable = fromEvent(eventSubscriber.eventEmitter, 'newEvent');
eventObservable
.pipe(
filter(
(event) =>
event.eventType === 'OrderActionRecord' &&
JSON.stringify(event.action) === JSON.stringify(OrderAction.FILL)
),
map((fill) => {
return {
ts: fill.ts.toNumber(),
marketIndex: fill.marketIndex,
marketType: getVariant(fill.marketType),
filler: fill.filler?.toBase58(),
takerFee: convertToNumber(fill.takerFee, QUOTE_PRECISION),
makerFee: convertToNumber(fill.makerFee, QUOTE_PRECISION),
quoteAssetAmountSurplus: convertToNumber(
fill.quoteAssetAmountSurplus,
QUOTE_PRECISION
),
baseAssetAmountFilled: convertToNumber(
fill.baseAssetAmountFilled,
BASE_PRECISION
),
quoteAssetAmountFilled: convertToNumber(
fill.quoteAssetAmountFilled,
QUOTE_PRECISION
),
taker: fill.taker?.toBase58(),
takerOrderId: fill.takerOrderId,
takerOrderDirection: getVariant(fill.takerOrderDirection),
takerOrderBaseAssetAmount: convertToNumber(
fill.takerOrderBaseAssetAmount,
BASE_PRECISION
),
takerOrderCumulativeBaseAssetAmountFilled: convertToNumber(
fill.takerOrderCumulativeBaseAssetAmountFilled,
BASE_PRECISION
),
takerOrderCumulativeQuoteAssetAmountFilled: convertToNumber(
fill.takerOrderCumulativeQuoteAssetAmountFilled,
QUOTE_PRECISION
),
maker: fill.maker?.toBase58(),
makerOrderId: fill.makerOrderId,
makerOrderDirection: getVariant(fill.makerOrderDirection),
makerOrderBaseAssetAmount: convertToNumber(
fill.makerOrderBaseAssetAmount,
BASE_PRECISION
),
makerOrderCumulativeBaseAssetAmountFilled: convertToNumber(
fill.makerOrderCumulativeBaseAssetAmountFilled,
BASE_PRECISION
),
makerOrderCumulativeQuoteAssetAmountFilled: convertToNumber(
fill.makerOrderCumulativeQuoteAssetAmountFilled,
QUOTE_PRECISION
),
oraclePrice: convertToNumber(fill.oraclePrice, PRICE_PRECISION),
txSig: fill.txSig,
slot: fill.slot,
action: 'fill',
actionExplanation: getVariant(fill.actionExplanation),
referrerReward: convertToNumber(fill.referrerReward, QUOTE_PRECISION),
};
})
)
.subscribe((fillEvent) => {
redisClient.client.publish(
`trades_${fillEvent.marketType}_${fillEvent.marketIndex}`,
JSON.stringify(fillEvent)
);
});

console.log('Publishing trades');
};

async function recursiveTryCatch(f: () => void) {
try {
await f();
} catch (e) {
console.error(e);
await sleep(15000);
await recursiveTryCatch(f);
}
}

recursiveTryCatch(() => main());

export { sdkConfig, endpoint, wsEndpoint, driftEnv, commitHash, driftClient };
Loading

0 comments on commit 37b991b

Please sign in to comment.