From 2a673e65c41011ac94e972df77794cc97199dacf Mon Sep 17 00:00:00 2001 From: abretonc7s <107169956+abretonc7s@users.noreply.github.com> Date: Thu, 29 Feb 2024 17:01:00 +0800 Subject: [PATCH] feat: socket io debug setup (#717) * feat: socket io logging and admin ui * feat: dynamic enablment of rate limiter * feat: socketio config * chore: cleanup --- packages/sdk-socket-server-next/.env.sample | 6 + .../sdk-socket-server-next/docker-compose.yml | 61 ++++++++- packages/sdk-socket-server-next/package.json | 1 + .../sdk-socket-server-next/src/api-config.ts | 45 +++++-- packages/sdk-socket-server-next/src/index.ts | 35 +++-- packages/sdk-socket-server-next/src/logger.ts | 55 +++++++- .../src/socket-config.ts | 127 ++++++++++++------ packages/sdk-socket-server-next/src/utils.ts | 27 +++- yarn.lock | 3 +- 9 files changed, 280 insertions(+), 80 deletions(-) diff --git a/packages/sdk-socket-server-next/.env.sample b/packages/sdk-socket-server-next/.env.sample index 56e7f8ba3..bcf021e1a 100644 --- a/packages/sdk-socket-server-next/.env.sample +++ b/packages/sdk-socket-server-next/.env.sample @@ -1,4 +1,10 @@ SEGMENT_API_KEY_PRODUCTION=123456789 SEGMENT_API_KEY_DEBUG=123456789 +# REDIS_URL=redis://redis:6379/0 +# Example REDIS_NODES format: "redis://host1:6379,redis://host2:6379" +# REDIS_NODES=redis://localhost:6380,redis://localhost:6381,redis://localhost:6382 REDIS_URL=redis://redis:6379/0 RATE_LIMITER=false +RATE_LIMITER_HTTP_WINDOW_MINUTE=1 +RATE_LIMITER_HTTP_LIMIT=100000 +ADMIN_UI=false diff --git a/packages/sdk-socket-server-next/docker-compose.yml b/packages/sdk-socket-server-next/docker-compose.yml index 159c0c215..86e4dac94 100644 --- a/packages/sdk-socket-server-next/docker-compose.yml +++ b/packages/sdk-socket-server-next/docker-compose.yml @@ -1,6 +1,16 @@ version: '3.9' services: + + app: + image: node:latest + volumes: + - ./:/usr/src/app + working_dir: /usr/src/app + command: yarn debug:redis + # ports: + # - "4000:4000" + app1: build: context: . @@ -40,8 +50,57 @@ services: depends_on: - cache + redis-master1: + # build: + # context: . + # dockerfile: Dockerfile.redis + image: redis:7.2-alpine + command: redis-server --appendonly yes --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --port 6380 + environment: + - REDIS_ROLE=master + ports: + - "6380:6380" + + redis-master2: + image: redis:7.2-alpine + # build: + # context: . + # dockerfile: Dockerfile.redis + command: redis-server --appendonly yes --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --port 6381 + environment: + - REDIS_ROLE=master + ports: + - "6381:6381" + + redis-master3: + image: redis:7.2-alpine + # build: + # context: . + # dockerfile: Dockerfile.redis + command: redis-server --appendonly yes --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --port 6382 + environment: + - REDIS_ROLE=master + ports: + - "6382:6382" + + redis-cluster-init: + image: redis:7.2-alpine + volumes: + - ./init-redis-cluster.sh:/usr/local/bin/init-redis-cluster.sh + depends_on: + - redis-master1 + - redis-master2 + - redis-master3 + entrypoint: ["/bin/sh", "/usr/local/bin/init-redis-cluster.sh"] + # environment: + # - "REDISCLI_AUTH=yourpassword" # Optional: Include if your Redis nodes are password-protected + # To connect and debug the cluster use + # docker run -it --network sdk-socket-server-next_default --rm redis redis-cli -c -p 6379 -h redis-master1 + # set mykey "Hello, Redis Cluster!" + + # cache is used if want to simulate single node redis architecture cache: - image: redis:latest + image: redis:7.2-alpine command: redis-server --maxmemory 100mb --maxmemory-policy volatile-lru --loglevel debug ports: - '6379:6379' diff --git a/packages/sdk-socket-server-next/package.json b/packages/sdk-socket-server-next/package.json index 66a6d2c87..9f61f5b2a 100644 --- a/packages/sdk-socket-server-next/package.json +++ b/packages/sdk-socket-server-next/package.json @@ -44,6 +44,7 @@ "express-rate-limit": "^7.1.5", "helmet": "^5.1.1", "ioredis": "^5.3.2", + "logform": "^2.6.0", "lru-cache": "^10.0.0", "rate-limiter-flexible": "^2.3.8", "redis": "^4.6.12", diff --git a/packages/sdk-socket-server-next/src/api-config.ts b/packages/sdk-socket-server-next/src/api-config.ts index bf0010222..b4df7826a 100644 --- a/packages/sdk-socket-server-next/src/api-config.ts +++ b/packages/sdk-socket-server-next/src/api-config.ts @@ -15,25 +15,50 @@ import { isDevelopment, isDevelopmentServer } from '.'; const redisUrl = process.env.REDIS_URL || 'redis://localhost:6379'; export const redisClient = createClient({ url: redisUrl }); const THIRTY_DAYS_IN_SECONDS = 30 * 24 * 60 * 60; // expiration time of entries in Redis +const hasRateLimit = process.env.RATE_LIMITER === 'true'; const app = express(); -const limiter = rateLimit({ - windowMs: 5 * 60 * 1000, // 5 minutes - limit: 100, // Limit each IP to 100 requests per `window` (here, per 5 minutes). - standardHeaders: 'draft-7', // draft-6: `RateLimit-*` headers; draft-7: combined `RateLimit` header - legacyHeaders: false, // Disable the `X-RateLimit-*` headers. - // store: ... , // Use an external store for consistency across multiple server instances. -}); - app.use(bodyParser.urlencoded({ extended: true })); app.use(bodyParser.json()); app.use(cors()); app.options('*', cors()); app.use(helmet()); app.disable('x-powered-by'); -// Apply the rate limiting middleware to all requests. -app.use(limiter); + +if (hasRateLimit) { + // Conditionally apply the rate limiting middleware to all requests. + let windowMin = 1; // every 1minute + try { + if (process.env.RATE_LIMITER_HTTP_LIMIT) { + windowMin = parseInt( + process.env.RATE_LIMITER_HTTP_WINDOW_MINUTE ?? '1', + 10, + ); + } + } catch (error) { + // Ignore parsing errors + } + let limit = 100_000; // 100,000 requests per minute by default (unlimited...) + try { + if (process.env.RATE_LIMITER_HTTP_LIMIT) { + limit = parseInt(process.env.RATE_LIMITER_HTTP_LIMIT, 10); + } + } catch (error) { + // Ignore parsing errors + } + + const limiterConfig = { + windowMs: windowMin * 60 * 1000, + limit, + legacyHeaders: false, // Disable the `X-RateLimit-*` headers. + // store: ... , // Use an external store for consistency across multiple server instances. + }; + const limiter = rateLimit(limiterConfig); + + logger.info('Rate limiter enabled', limiterConfig); + app.use(limiter); +} async function inspectRedis(key?: string) { if (key && typeof key === 'string') { diff --git a/packages/sdk-socket-server-next/src/index.ts b/packages/sdk-socket-server-next/src/index.ts index 9c0a8faab..f3b29fdf7 100644 --- a/packages/sdk-socket-server-next/src/index.ts +++ b/packages/sdk-socket-server-next/src/index.ts @@ -5,30 +5,41 @@ import dotenv from 'dotenv'; // Dotenv must be loaded before importing local files dotenv.config(); +import { instrument } from '@socket.io/admin-ui'; import { analytics, app } from './api-config'; import { logger } from './logger'; -import { cleanupAndExit } from './utils'; -import { configureSocketServer } from './socket-config'; import { extractMetrics } from './metrics'; +import { configureSocketServer } from './socket-config'; +import { cleanupAndExit } from './utils'; export const isDevelopment: boolean = process.env.NODE_ENV === 'development'; export const isDevelopmentServer: boolean = process.env.ENVIRONMENT === 'development'; +export const withAdminUI: boolean = process.env.ADMIN_UI === 'true'; const server = http.createServer(app); -configureSocketServer(server) - .then((ioServer) => { - logger.info('INFO> isDevelopment?', isDevelopment); +// Register event listeners for process termination events +process.on('SIGINT', async () => { + await cleanupAndExit(server, analytics); +}); - // Register event listeners for process termination events - process.on('SIGINT', async () => { - await cleanupAndExit(server, analytics); - }); +process.on('SIGTERM', async () => { + await cleanupAndExit(server, analytics); +}); - process.on('SIGTERM', async () => { - await cleanupAndExit(server, analytics); - }); +configureSocketServer(server) + .then((ioServer) => { + logger.info( + `socker.io server started development=${isDevelopment} adminUI=${withAdminUI}`, + ); + + if (isDevelopmentServer && withAdminUI) { + instrument(ioServer, { + auth: false, + mode: 'development', + }); + } // Make sure to protect the endpoint to be only available within the cluster for prometheus app.get('/metrics', async (_req, res) => { diff --git a/packages/sdk-socket-server-next/src/logger.ts b/packages/sdk-socket-server-next/src/logger.ts index 0e411171a..0faf713c0 100644 --- a/packages/sdk-socket-server-next/src/logger.ts +++ b/packages/sdk-socket-server-next/src/logger.ts @@ -1,14 +1,57 @@ -import winston from 'winston'; +import winston, { format } from 'winston'; const isDevelopment: boolean = process.env.NODE_ENV === 'development'; +const customFormat = format.printf((ti) => { + const { level, message, timestamp } = ti; + const args = ti[Symbol.for('splat')]; + + const color = { + info: '\x1b[36m', + error: '\x1b[31m', + warn: '\x1b[33m', + debug: '\x1b[32m', + }; + + // add color to the level of the log + // let msg = `${timestamp} [${level}] : `; + let printTsLevel = level; + if (color[level as keyof typeof color]) { + printTsLevel = `${ + color[level as keyof typeof color] + }${timestamp} [${level}]\x1b[0m`; + } + let msg = `${printTsLevel}: `; + + const extras = + args + ?.map((arg: unknown) => { + if (typeof arg === 'object' && arg !== null) { + return JSON.stringify(args); + } + return arg; + }) + .join(' ') ?? ''; + + if (isDevelopment) { + const searchContext = message; + if (searchContext.indexOf('wallet') !== -1) { + msg += `\x1b[36m${message} ${extras}\x1b[0m`; + // eslint-disable-next-line no-negated-condition + } else if (searchContext.indexOf('dapp') !== -1) { + msg += `\x1b[35m${message} ${extras}\x1b[0m`; + } else { + msg += `${message} ${extras}`; + } + } else { + msg += `${message} ${extras}`; + } + return msg; +}); + export const logger = winston.createLogger({ level: 'debug', - format: winston.format.combine( - winston.format.timestamp(), - isDevelopment ? winston.format.colorize() : winston.format.uncolorize(), - winston.format.simple(), - ), + format: winston.format.combine(winston.format.timestamp(), customFormat), transports: [ new winston.transports.Console(), // You can also add file transport or any other transport here diff --git a/packages/sdk-socket-server-next/src/socket-config.ts b/packages/sdk-socket-server-next/src/socket-config.ts index 42f502b60..103749199 100644 --- a/packages/sdk-socket-server-next/src/socket-config.ts +++ b/packages/sdk-socket-server-next/src/socket-config.ts @@ -17,6 +17,8 @@ import { export const MAX_CLIENTS_PER_ROOM = 2; +export const MISSING_CONTEXT = "___MISSING_CONTEXT___"; + export const configureSocketServer = async ( server: HTTPServer, ): Promise => { @@ -60,6 +62,7 @@ export const configureSocketServer = async ( io.of('/').adapter.on('leave-room', async (roomId, socketId) => { if (!validate(roomId)) { + // Ignore invalid room IDs return; } @@ -78,6 +81,8 @@ export const configureSocketServer = async ( console.log( `'leave-room' Room ${roomId} kept alive with ${channelOccupancy} clients`, ); + // Inform the room of the disconnection + io.to(roomId).emit(`clients_disconnected-${roomId}`); } }); @@ -93,8 +98,13 @@ export const configureSocketServer = async ( host, }); - socket.on('create_channel', async (channelId: string) => { - logger.info('create_channel', { id: channelId, socketId, clientIp }); + socket.on('create_channel', async (channelId: string, context: string) => { + let from = context ?? MISSING_CONTEXT; + if (context === 'metamask-mobile') { + from = 'wallet'; + } else if (context === 'dapp') { + from = 'dapp'; + } try { if (hasRateLimit) { @@ -102,7 +112,14 @@ export const configureSocketServer = async ( } if (!validate(channelId)) { - logger.info(`ERROR > create_channel id = ${channelId} invalid`); + logger.error( + `create_channel from=${from} id = ${channelId} invalid`, + { + id: channelId, + socketId, + clientIp, + }, + ); return socket.emit(`message-${channelId}`, { error: 'must specify a valid id', }); @@ -111,7 +128,12 @@ export const configureSocketServer = async ( const room = io.sockets.adapter.rooms.get(channelId); if (!channelId) { logger.error( - `ERROR > create_channel id = ${channelId} not specified`, + `create_channel from=${from} id = ${channelId} not specified`, + { + id: channelId, + socketId, + clientIp, + }, ); return socket.emit(`message-${channelId}`, { error: 'must specify an id', @@ -120,7 +142,12 @@ export const configureSocketServer = async ( if (room) { logger.error( - `ERROR > create_channel id = ${channelId} room already created`, + `create_channel from=${from} id = ${channelId} room already created`, + { + id: channelId, + socketId, + clientIp, + }, ); return socket.emit(`message-${channelId}`, { error: 'room already created', @@ -131,7 +158,12 @@ export const configureSocketServer = async ( resetRateLimits(); } - logger.debug(`joining channel ${channelId} + emit channel_created`); + logger.info(`create_channel from=${from}`, { + id: channelId, + socketId, + clientIp, + }); + // Make sure to join both so that disconnection events are handled properl socket.join(channelId); // socket.broadcast.socketsJoin(channelId); @@ -143,7 +175,7 @@ export const configureSocketServer = async ( } catch (error) { setLastConnectionErrorTimestamp(Date.now()); // increaseRateLimits(90); - logger.error('ERROR> Error on create_channel', error); + logger.error(`ERROR> Error on create_channel from=${from}`, error); // emit an error message back to the client, if appropriate return socket.emit(`error`, { error: (error as Error).message }); } @@ -158,41 +190,36 @@ export const configureSocketServer = async ( plaintext: string; }) => { const { id, message, context, plaintext } = msg; - const isMobile = context === 'metamask-mobile'; + let from = context ?? MISSING_CONTEXT; + if (context === 'metamask-mobile') { + from = 'wallet'; + } else if (context === 'dapp') { + from = 'dapp'; + } - logger.debug(`received message`, msg); + // logger.debug(`received message`, msg); try { if (hasRateLimit) { await rateLimiterMessage.consume(socket.handshake.address); } - const logContext: { - id: string; - message?: string; - plaintext?: string; - context?: string; - } = { - id, - context, - }; - - if (isDevelopment) { - logContext.plaintext = plaintext; - } - if (hasRateLimit) { resetRateLimits(); } + let formatted = plaintext; + const protocol = + typeof message === 'object' ? message : '__ENCRYPTED__'; + if (isDevelopment && formatted) { + formatted = JSON.stringify(JSON.parse(plaintext), null, 2); + } + logger.info( - `message-${id} received from { ${isMobile ? 'wallet' : 'dapp'} }`, - { ...logContext, socketId, clientIp }, + `message-${id} received from=${from}`, + formatted, + protocol, ); - if (isDevelopment) { - logger.debug(`message context`, JSON.stringify(context)); - } - return socket.broadcast.to(id).emit(`message-${id}`, { id, message }); } catch (error) { setLastConnectionErrorTimestamp(Date.now()); @@ -232,12 +259,19 @@ export const configureSocketServer = async ( }, ); - socket.on('join_channel', async (channelId: string) => { + socket.on('join_channel', async (channelId: string, context: string) => { + let from = context ?? MISSING_CONTEXT; + if (context === 'metamask-mobile') { + from = 'wallet'; + } else if (context === 'dapp') { + from = 'dapp'; + } + if (hasRateLimit) { try { await rateLimiter.consume(socket.handshake.address); } catch (e) { - logger.error('ERROR> Error while consuming rate limiter:', e); + logger.error('Error while consuming rate limiter:', e); return; } } @@ -255,9 +289,12 @@ export const configureSocketServer = async ( channelId, ); let channelOccupancy = 0; - logger.debug( - `join_channel ${channelId} from ${socketId} sRedisChannelOccupancy=${sRedisChannelOccupancy}`, - ); + logger.debug(`join_channel from=${from} ${channelId}`, { + context, + socketId, + channelOccupancy, + sRedisChannelOccupancy, + }); if (sRedisChannelOccupancy) { channelOccupancy = parseInt(sRedisChannelOccupancy, 10); @@ -284,13 +321,15 @@ export const configureSocketServer = async ( } if (roomOccupancy >= MAX_CLIENTS_PER_ROOM) { - logger.warn(`join_channel ${channelId} room already full`); + logger.warn(`join_channel from=${from} ${channelId} room already full`); socket.emit(`message-${channelId}`, { error: 'room already full' }); return; } if (channelOccupancy >= MAX_CLIENTS_PER_ROOM) { - logger.warn(`join_channel ${channelId} redis channel appears full`); + logger.warn( + `join_channel from=${from} ${channelId} redis channel appears full`, + ); socket.emit(`message-${channelId}`, { error: 'room already full' }); return; } @@ -310,7 +349,7 @@ export const configureSocketServer = async ( } logger.info( - `Client ${socketId} joined channel ${channelId}. Occupancy: ${channelOccupancy}`, + `join_channel from=${from} ${channelId}. Occupancy: ${channelOccupancy}`, { id: channelId, socketId, @@ -330,12 +369,14 @@ export const configureSocketServer = async ( } socket.on('disconnect', async (error) => { - logger.info(`disconnect from room ${channelId}`, { - id: channelId, - socketId, - clientIp, - error, - }); + logger.info( + `disconnect event from=${from} room ${channelId} ${error}`, + { + id: channelId, + socketId, + clientIp, + }, + ); // Inform the room of the disconnection socket.broadcast diff --git a/packages/sdk-socket-server-next/src/utils.ts b/packages/sdk-socket-server-next/src/utils.ts index 42dbffcb4..0145b13d7 100644 --- a/packages/sdk-socket-server-next/src/utils.ts +++ b/packages/sdk-socket-server-next/src/utils.ts @@ -1,4 +1,5 @@ import { Server as HttpServer } from 'http'; +import { logger } from './logger'; export type FlushResponse = { batch: any; @@ -52,18 +53,30 @@ export const cleanupAndExit = async ( analytics: Analytics, ): Promise => { if (isShuttingDown) { + logger.info(`cleanupAndExit already in progress`); return; } isShuttingDown = true; - const serverCloseResult = await closeServer(server); - const flushAnalyticsResult = await flushAnalytics(analytics); + try { + const flushAnalyticsResult = await flushAnalytics(analytics); + logger.info(`flushAnalyticsResult: ${flushAnalyticsResult}`); - if ((serverCloseResult as any) instanceof Error) { - throw new Error(`Error during server shutdown: ${serverCloseResult}`); - } + // CloseServer will block until all clients have disconnected. + const serverCloseResult = await closeServer(server); + logger.info(`serverCloseResult: ${serverCloseResult}`); - if (flushAnalyticsResult instanceof Error) { - throw new Error(`Error on exitGracefully: ${flushAnalyticsResult}`); + if ((serverCloseResult as any) instanceof Error) { + throw new Error(`Error during server shutdown: ${serverCloseResult}`); + } + + if (flushAnalyticsResult instanceof Error) { + throw new Error(`Error on exitGracefully: ${flushAnalyticsResult}`); + } + } catch (error) { + logger.error(`cleanupAndExit error: ${error}`); + } finally { + logger.info(`cleanupAndExit done`); + process.exit(0); } }; diff --git a/yarn.lock b/yarn.lock index 9c0a811d6..d8139247c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8281,6 +8281,7 @@ __metadata: helmet: ^5.1.1 ioredis: ^5.3.2 jest: ^29.6.4 + logform: ^2.6.0 lru-cache: ^10.0.0 nodemon: ^3.0.2 prettier: ^2.8.8 @@ -30754,7 +30755,7 @@ __metadata: languageName: node linkType: hard -"logform@npm:^2.3.2, logform@npm:^2.4.0": +"logform@npm:^2.3.2, logform@npm:^2.4.0, logform@npm:^2.6.0": version: 2.6.0 resolution: "logform@npm:2.6.0" dependencies: