From 2b78274483a5f04b7c59adf20e4e305702047b7e Mon Sep 17 00:00:00 2001 From: Scotty <66335769+ScottyPoi@users.noreply.github.com> Date: Mon, 13 Jan 2025 13:27:42 -0700 Subject: [PATCH] Utp packet heap (#702) * improve logs * uTP: track and log open request total * uTP: replace packet queue with packetHeap * uTP: skip or requeue OOP * update test * uTP: process OOP if 3 OOP in heap * uTP: add method to remove request packets from heap * uTP: Add comments and constant to OOP packet conditional * lint * move types to types --------- Co-authored-by: acolytec3 <17355484+acolytec3@users.noreply.github.com> --- packages/portalnetwork/src/client/client.ts | 4 +- .../portalnetwork/src/client/routingTable.ts | 3 +- .../portalnetwork/src/networks/network.ts | 6 +- .../utp/PortalNetworkUtp/ContentRequest.ts | 12 +-- .../src/wire/utp/PortalNetworkUtp/index.ts | 11 +- .../utp/PortalNetworkUtp/requestManager.ts | 100 +++++++++++++----- .../src/wire/utp/PortalNetworkUtp/types.ts | 5 + .../src/wire/utp/Socket/UtpSocket.ts | 8 +- .../portalnetwork/test/wire/utp/utp.spec.ts | 6 +- 9 files changed, 104 insertions(+), 51 deletions(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index 02a33ce73..1b581d6e9 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -414,7 +414,9 @@ export class PortalNetwork extends EventEmitter { await this.uTP.handleUtpPacket(packetBuffer, src.nodeId) } catch (err: any) { this.logger.extend('error')( - `handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`, + + `handleUTP error: ${err.message}. SrcId: ${src.nodeId + } MultiAddr: ${src.socketAddr.toString()}`, ) } } diff --git a/packages/portalnetwork/src/client/routingTable.ts b/packages/portalnetwork/src/client/routingTable.ts index fb3fd84f7..52231e8c5 100644 --- a/packages/portalnetwork/src/client/routingTable.ts +++ b/packages/portalnetwork/src/client/routingTable.ts @@ -2,6 +2,7 @@ import { KademliaRoutingTable } from '@chainsafe/discv5' import type { ENR, NodeId } from '@chainsafe/enr' import type { Debugger } from 'debug' +import { shortId } from '../index.js' export class PortalNetworkRoutingTable extends KademliaRoutingTable { public logger?: Debugger private radiusMap: Map @@ -67,7 +68,7 @@ export class PortalNetworkRoutingTable extends KademliaRoutingTable { * @param nodeId nodeId of peer to be evicted */ public evictNode = (nodeId: NodeId) => { - this.logger?.extend('EVICT')(nodeId) + this.logger?.extend('EVICT')(shortId(nodeId)) let enr: ENR | undefined = this.getValue(nodeId) this.ignoreNode(nodeId) if (enr) { diff --git a/packages/portalnetwork/src/networks/network.ts b/packages/portalnetwork/src/networks/network.ts index 2c69e5380..66584b1cc 100644 --- a/packages/portalnetwork/src/networks/network.ts +++ b/packages/portalnetwork/src/networks/network.ts @@ -671,7 +671,7 @@ export abstract class BaseNetwork extends EventEmitter { return enr.encode() }) if (encodedEnrs.length > 0) { - this.logger(`Found ${encodedEnrs.length} closer to content than us`) + this.logger.extend('FINDCONTENT')(`Found ${encodedEnrs.length} closer to content`) // TODO: Add capability to send multiple TALKRESP messages if # ENRs exceeds packet size while (encodedEnrs.length > 0 && arrayByteLength(encodedEnrs) > MAX_PACKET_SIZE) { // Remove ENRs until total ENRs less than 1200 bytes @@ -691,7 +691,7 @@ export abstract class BaseNetwork extends EventEmitter { selector: FoundContent.ENRS, value: [], }) - this.logger(`Found no ENRs closer to content than us`) + this.logger(`Found no ENRs closer to content`) await this.sendResponse( src, requestId, @@ -712,7 +712,7 @@ export abstract class BaseNetwork extends EventEmitter { const nodeId = enr.nodeId // Only add node to the routing table if we have an ENR this.routingTable.getWithPending(enr.nodeId)?.value === undefined && - this.logger(`adding ${nodeId} to ${this.networkName} routing table`) + this.logger.extend('RoutingTable')(`adding ${shortId(nodeId)}`) this.routingTable.insertOrUpdate(enr, EntryStatus.Connected) if (customPayload !== undefined) { const decodedPayload = PingPongCustomDataType.deserialize(Uint8Array.from(customPayload)) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts index d68635bfa..6aec30b7c 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts @@ -88,13 +88,7 @@ export abstract class ContentRequest { this.socket._clearTimeout() this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds) this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( - `|| pktId: ${packet.header.connectionId} ||`, - ) - this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( - `|| seqNr: ${packet.header.seqNr} ||`, - ) - this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( - `|| ackNr: ${packet.header.ackNr} ||`, + `|| pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr} t: ${packet.header.timestampMicroseconds}`, ) switch (packet.header.pType) { case PacketType.ST_SYN: @@ -248,7 +242,7 @@ export class FoundContentWriteRequest extends ContentWriteRequest { async _handleStatePacket(packet: StatePacket): Promise { await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds) if (this.socket.state === ConnectionState.Closed) { - await this.requestManager.closeRequest(packet.header.connectionId) + this.requestManager.closeRequest(packet.header.connectionId) } } } @@ -311,7 +305,7 @@ export class OfferWriteRequest extends ContentWriteRequest { } await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds) if (this.socket.state === ConnectionState.Closed) { - await this.requestManager.closeRequest(packet.header.connectionId) + this.requestManager.closeRequest(packet.header.connectionId) } } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index 0373c1039..57b925fb8 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -39,6 +39,12 @@ export class PortalNetworkUTP { return this.requestManagers[nodeId] !== undefined && Object.keys(this.requestManagers[nodeId].requestMap).length > 0 } + openRequests(): number { + return Object.keys(this.requestManagers).reduce((acc, nodeId) => { + return acc + Object.keys(this.requestManagers[nodeId].requestMap).length + }, 0) + } + createPortalNetworkUTPSocket( networkId: NetworkId, requestCode: RequestCode, @@ -106,9 +112,9 @@ export class PortalNetworkUTP { content, contentKeys, }) - this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`) - this.logger.extend('utpRequest')(`ConnectionId: ${connectionId} -- { socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`) await this.requestManagers[enr.nodeId].handleNewRequest(connectionId, newRequest) + this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`) + this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`) return newRequest } @@ -125,6 +131,7 @@ export class PortalNetworkUTP { } catch (err) { this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`) this.closeAllPeerRequests(enr.nodeId) + this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`) throw err } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index e3994e295..65a8117a9 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -1,22 +1,31 @@ import type { ContentRequest } from "./ContentRequest.js"; -import { Packet , PacketType } from "../Packets/index.js"; +import { Packet, PacketType, UtpSocketType } from "../Packets/index.js"; import type { Debugger } from "debug"; +import type { Comparator } from "heap-js"; +import { Heap } from "heap-js"; +import { MAX_IN_FLIGHT_PACKETS, type RequestId } from "./types.js"; -type RequestId = number - +const packetComparator: Comparator> = (a: Packet, b: Packet) => { + // If packets belong to the same connection, sort by sequence number + if (a.header.connectionId === b.header.connectionId) { + return a.header.seqNr - b.header.seqNr; + } + // Otherwise, sort by timestamp + return a.header.timestampMicroseconds - b.header.timestampMicroseconds; +} export class RequestManager { peerId: string requestMap: Record logger: Debugger - masterPacketQueue: Array> + packetHeap: Heap> currentPacket: Packet | undefined constructor(peerId: string, logger: Debugger) { this.peerId = peerId this.requestMap = {} this.logger = logger.extend(`RequestManager`).extend(peerId.slice(0, 4)) - this.masterPacketQueue = [] this.currentPacket = undefined + this.packetHeap = new Heap(packetComparator) } /** @@ -29,12 +38,37 @@ export class RequestManager { return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] } + /** + * Finds the number of packets in the packet heap for a given request + * @param connectionId connectionId of the request to get the packet count for + * @returns the number of packets in the packet heap for a given request + */ + getPacketCount(connectionId: number): number { + return this.packetHeap.heapArray.filter((packet) => packet.header.connectionId === connectionId).length + } + + /** + * Removes all packets from the packet heap for a given request + * @param connectionId connectionId of the request to remove packets for + */ + removeRequestPackets(connectionId: number) { + const comparator = (packet: Packet) => packet.header.connectionId === connectionId + const packet = new Packet({ + header: { + connectionId, + } as any, + }) + while (this.packetHeap.remove(packet, comparator)) { + continue + } + } + /** * Adds a new uTP request to the peer's request manager. * @param connectionId connectionId from uTP initialization * @param request new ContentRequest */ - async handleNewRequest(connectionId: number,request: ContentRequest) { + async handleNewRequest(connectionId: number, request: ContentRequest) { this.requestMap[connectionId] = request await request.init() } @@ -50,43 +84,58 @@ export class RequestManager { this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`) return } - if (this.masterPacketQueue.length === 0) { - this.currentPacket = packet - return this.processCurrentPacket() - } if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) { - this.masterPacketQueue.unshift(packet) + await request.handleUtpPacket(packet) + return } else { - this.masterPacketQueue.push(packet) + this.packetHeap.push(packet) } - this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} packet for request ${packet.header.connectionId} to packet queue (size: ${this.masterPacketQueue.length} packets)`) + this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`) if (this.currentPacket === undefined) { - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() } } - async processCurrentPacket() { - this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.masterPacketQueue.length}`) + async processCurrentPacket(): Promise { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`) if (this.currentPacket === undefined) { - if (this.masterPacketQueue.length === 0) { + if (this.packetHeap.size() === 0) { this.logger.extend('PROCESS_CURRENT_PACKET')(`No packets to process`) return } - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() return } - this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} packet for request ${this.currentPacket.header.connectionId}`) + this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`) const request = this.lookupRequest(this.currentPacket.header.connectionId) if (request === undefined) { this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`) - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() return } + if (request.socket.type === UtpSocketType.READ && request.socket.reader !== undefined) { + if (this.currentPacket.header.seqNr < request.socket.reader!.nextDataNr) { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet ${this.currentPacket.header.seqNr} already processed.`) + this.currentPacket = this.packetHeap.pop() + return this.processCurrentPacket() + } else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) { + if (this.getPacketCount(this.currentPacket.header.connectionId) < MAX_IN_FLIGHT_PACKETS) { + // Requeue packet. Optimistically assume expected packet has arrived out of order. + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Pushing packet back to heap.`) + this.packetHeap.push(this.currentPacket) + this.currentPacket = undefined + return + } else { + // Treat expected packet as lost. Process next packet (should trigger SELECTIVE_ACK) + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Treating expected packet as lost.`) + } + } + } await request.handleUtpPacket(this.currentPacket) - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() } @@ -94,21 +143,22 @@ export class RequestManager { * Closes a uTP request and processes the next request in the queue. * @param connectionId connectionId of the request to close */ - async closeRequest(connectionId: number) { + closeRequest(connectionId: number) { const request = this.lookupRequest(connectionId) if (request === undefined) { return } this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`) + this.removeRequestPackets(connectionId) delete this.requestMap[connectionId] } - + closeAllRequests() { this.logger.extend('CLOSE_REQUEST')(`Closing all requests for peer ${this.peerId}`) for (const id of Object.keys(this.requestMap)) { - delete this.requestMap[Number(id)] + this.closeRequest(Number(id)) } - this.masterPacketQueue = [] + this.packetHeap = new Heap(packetComparator) } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts index 364a7049f..7003d1f9a 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts @@ -23,3 +23,8 @@ export interface INewRequest { requestCode: RequestCode contents?: Uint8Array } + + +export const MAX_IN_FLIGHT_PACKETS = 3 + +export type RequestId = number \ No newline at end of file diff --git a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts index f9a5912a0..709f380f8 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts @@ -84,13 +84,7 @@ export abstract class UtpSocket { async sendPacket(packet: Packet): Promise { const msg = packet.encode() this.logger.extend('SEND').extend(PacketType[packet.header.pType])( - `|| pktId: ${packet.header.connectionId}`, - ) - this.logger.extend('SEND').extend(PacketType[packet.header.pType])( - `|| seqNr: ${packet.header.seqNr}`, - ) - this.logger.extend('SEND').extend(PacketType[packet.header.pType])( - `|| ackNr: ${packet.header.ackNr}`, + `pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr}`, ) try { await this.utp.send(this.remoteAddress, msg, this.networkId) diff --git a/packages/portalnetwork/test/wire/utp/utp.spec.ts b/packages/portalnetwork/test/wire/utp/utp.spec.ts index f4924dda9..74573270b 100644 --- a/packages/portalnetwork/test/wire/utp/utp.spec.ts +++ b/packages/portalnetwork/test/wire/utp/utp.spec.ts @@ -280,10 +280,10 @@ describe('RequestManager', () => { }, }) void mgr.handleNewRequest(req1.connectionId, req1) - mgr.masterPacketQueue.push(packet2) + mgr.packetHeap.push(packet2) mgr.currentPacket = packet3 void mgr.handlePacket(packet1.encode()) - assert.equal(mgr.masterPacketQueue.length, 2) - assert.deepEqual(mgr.masterPacketQueue[0], packet2) + assert.equal(mgr.packetHeap.size(), 2) + assert.deepEqual(mgr.packetHeap.peek(), packet2) }) })