From ca8a41d8e6b09b90ed8c2bdf9ae6eb44c755686b Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 3 Jan 2025 10:11:08 -0700 Subject: [PATCH 01/10] improve logs --- packages/portalnetwork/src/client/client.ts | 7 +++++-- packages/portalnetwork/src/client/routingTable.ts | 3 ++- packages/portalnetwork/src/networks/network.ts | 6 +++--- .../src/wire/utp/PortalNetworkUtp/ContentRequest.ts | 8 +------- packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts | 8 +------- 5 files changed, 12 insertions(+), 20 deletions(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index 02a33ce73..12ae44595 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -14,7 +14,7 @@ import { SyncStrategy, } from '../networks/index.js' import { CapacitorUDPTransportService, WebSocketTransportService } from '../transports/index.js' -import { MEGABYTE, dirSize } from '../util/index.js' +import { MEGABYTE, dirSize, shortId } from '../util/index.js' import { PortalNetworkUTP } from '../wire/utp/PortalNetworkUtp/index.js' import { DBManager } from './dbManager.js' @@ -414,7 +414,10 @@ export class PortalNetwork extends EventEmitter { await this.uTP.handleUtpPacket(packetBuffer, src.nodeId) } catch (err: any) { this.logger.extend('error')( - `handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`, + + `handleUTP error: ${err.message}. SrcId: ${ + src.nodeId + } MultiAddr: ${src.socketAddr.toString()}`, ) } } diff --git a/packages/portalnetwork/src/client/routingTable.ts b/packages/portalnetwork/src/client/routingTable.ts index fb3fd84f7..52231e8c5 100644 --- a/packages/portalnetwork/src/client/routingTable.ts +++ b/packages/portalnetwork/src/client/routingTable.ts @@ -2,6 +2,7 @@ import { KademliaRoutingTable } from '@chainsafe/discv5' import type { ENR, NodeId } from '@chainsafe/enr' import type { Debugger } from 'debug' +import { shortId } from '../index.js' export class PortalNetworkRoutingTable extends KademliaRoutingTable { public logger?: Debugger private radiusMap: Map @@ -67,7 +68,7 @@ export class PortalNetworkRoutingTable extends KademliaRoutingTable { * @param nodeId nodeId of peer to be evicted */ public evictNode = (nodeId: NodeId) => { - this.logger?.extend('EVICT')(nodeId) + this.logger?.extend('EVICT')(shortId(nodeId)) let enr: ENR | undefined = this.getValue(nodeId) this.ignoreNode(nodeId) if (enr) { diff --git a/packages/portalnetwork/src/networks/network.ts b/packages/portalnetwork/src/networks/network.ts index 2c69e5380..66584b1cc 100644 --- a/packages/portalnetwork/src/networks/network.ts +++ b/packages/portalnetwork/src/networks/network.ts @@ -671,7 +671,7 @@ export abstract class BaseNetwork extends EventEmitter { return enr.encode() }) if (encodedEnrs.length > 0) { - this.logger(`Found ${encodedEnrs.length} closer to content than us`) + this.logger.extend('FINDCONTENT')(`Found ${encodedEnrs.length} closer to content`) // TODO: Add capability to send multiple TALKRESP messages if # ENRs exceeds packet size while (encodedEnrs.length > 0 && arrayByteLength(encodedEnrs) > MAX_PACKET_SIZE) { // Remove ENRs until total ENRs less than 1200 bytes @@ -691,7 +691,7 @@ export abstract class BaseNetwork extends EventEmitter { selector: FoundContent.ENRS, value: [], }) - this.logger(`Found no ENRs closer to content than us`) + this.logger(`Found no ENRs closer to content`) await this.sendResponse( src, requestId, @@ -712,7 +712,7 @@ export abstract class BaseNetwork extends EventEmitter { const nodeId = enr.nodeId // Only add node to the routing table if we have an ENR this.routingTable.getWithPending(enr.nodeId)?.value === undefined && - this.logger(`adding ${nodeId} to ${this.networkName} routing table`) + this.logger.extend('RoutingTable')(`adding ${shortId(nodeId)}`) this.routingTable.insertOrUpdate(enr, EntryStatus.Connected) if (customPayload !== undefined) { const decodedPayload = PingPongCustomDataType.deserialize(Uint8Array.from(customPayload)) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts index d68635bfa..79e0e7ee5 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts @@ -88,13 +88,7 @@ export abstract class ContentRequest { this.socket._clearTimeout() this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds) this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( - `|| pktId: ${packet.header.connectionId} ||`, - ) - this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( - `|| seqNr: ${packet.header.seqNr} ||`, - ) - this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( - `|| ackNr: ${packet.header.ackNr} ||`, + `|| pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr} t: ${packet.header.timestampMicroseconds}`, ) switch (packet.header.pType) { case PacketType.ST_SYN: diff --git a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts index f9a5912a0..709f380f8 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts @@ -84,13 +84,7 @@ export abstract class UtpSocket { async sendPacket(packet: Packet): Promise { const msg = packet.encode() this.logger.extend('SEND').extend(PacketType[packet.header.pType])( - `|| pktId: ${packet.header.connectionId}`, - ) - this.logger.extend('SEND').extend(PacketType[packet.header.pType])( - `|| seqNr: ${packet.header.seqNr}`, - ) - this.logger.extend('SEND').extend(PacketType[packet.header.pType])( - `|| ackNr: ${packet.header.ackNr}`, + `pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr}`, ) try { await this.utp.send(this.remoteAddress, msg, this.networkId) From 9abba78ccae39835a129cffeaa11c38c13005884 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 3 Jan 2025 10:12:09 -0700 Subject: [PATCH 02/10] uTP: track and log open request total --- .../src/wire/utp/PortalNetworkUtp/index.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index 0373c1039..57b925fb8 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -39,6 +39,12 @@ export class PortalNetworkUTP { return this.requestManagers[nodeId] !== undefined && Object.keys(this.requestManagers[nodeId].requestMap).length > 0 } + openRequests(): number { + return Object.keys(this.requestManagers).reduce((acc, nodeId) => { + return acc + Object.keys(this.requestManagers[nodeId].requestMap).length + }, 0) + } + createPortalNetworkUTPSocket( networkId: NetworkId, requestCode: RequestCode, @@ -106,9 +112,9 @@ export class PortalNetworkUTP { content, contentKeys, }) - this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`) - this.logger.extend('utpRequest')(`ConnectionId: ${connectionId} -- { socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`) await this.requestManagers[enr.nodeId].handleNewRequest(connectionId, newRequest) + this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`) + this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`) return newRequest } @@ -125,6 +131,7 @@ export class PortalNetworkUTP { } catch (err) { this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`) this.closeAllPeerRequests(enr.nodeId) + this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`) throw err } } From bd7d6065c4dca91f414df80e70d06a48f9c797fc Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Thu, 9 Jan 2025 17:23:35 -0700 Subject: [PATCH 03/10] uTP: replace packet queue with packetHeap --- .../utp/PortalNetworkUtp/requestManager.ts | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index e3994e295..f86d4c917 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -1,22 +1,33 @@ import type { ContentRequest } from "./ContentRequest.js"; -import { Packet , PacketType } from "../Packets/index.js"; +import { Packet , PacketType, UtpSocketType } from "../Packets/index.js"; import type { Debugger } from "debug"; +import type {Comparator} from "heap-js"; +import { Heap} from "heap-js"; type RequestId = number +const packetComparator: Comparator> = (a: Packet, b: Packet) => { + // If packets belong to the same connection, sort by sequence number + if (a.header.connectionId === b.header.connectionId) { + return a.header.seqNr - b.header.seqNr; + } + // Otherwise, sort by timestamp + return a.header.timestampMicroseconds - b.header.timestampMicroseconds; +} + export class RequestManager { peerId: string requestMap: Record logger: Debugger - masterPacketQueue: Array> + packetHeap: Heap> currentPacket: Packet | undefined constructor(peerId: string, logger: Debugger) { this.peerId = peerId this.requestMap = {} this.logger = logger.extend(`RequestManager`).extend(peerId.slice(0, 4)) - this.masterPacketQueue = [] this.currentPacket = undefined + this.packetHeap = new Heap(packetComparator) } /** @@ -50,43 +61,40 @@ export class RequestManager { this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`) return } - if (this.masterPacketQueue.length === 0) { - this.currentPacket = packet - return this.processCurrentPacket() - } if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) { - this.masterPacketQueue.unshift(packet) + await request.handleUtpPacket(packet) + return } else { - this.masterPacketQueue.push(packet) + this.packetHeap.push(packet) } - this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} packet for request ${packet.header.connectionId} to packet queue (size: ${this.masterPacketQueue.length} packets)`) + this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`) if (this.currentPacket === undefined) { - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() } } - async processCurrentPacket() { - this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.masterPacketQueue.length}`) + async processCurrentPacket(): Promise { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`) if (this.currentPacket === undefined) { - if (this.masterPacketQueue.length === 0) { + if (this.packetHeap.size() === 0) { this.logger.extend('PROCESS_CURRENT_PACKET')(`No packets to process`) return } - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() return } - this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} packet for request ${this.currentPacket.header.connectionId}`) + this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`) const request = this.lookupRequest(this.currentPacket.header.connectionId) if (request === undefined) { this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`) - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() return } await request.handleUtpPacket(this.currentPacket) - this.currentPacket = this.masterPacketQueue.shift() + this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() } @@ -108,7 +116,7 @@ export class RequestManager { for (const id of Object.keys(this.requestMap)) { delete this.requestMap[Number(id)] } - this.masterPacketQueue = [] + this.packetHeap = new Heap(packetComparator) } From 5429f5ab027d0b6dc4eb0fff86d84bf4e1fc32d1 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Thu, 9 Jan 2025 17:26:54 -0700 Subject: [PATCH 04/10] uTP: skip or requeue OOP --- .../src/wire/utp/PortalNetworkUtp/requestManager.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index f86d4c917..6c31242df 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -93,6 +93,18 @@ export class RequestManager { await this.processCurrentPacket() return } + if (request.socket.type === UtpSocketType.READ && request.socket.reader !== undefined) { + if (this.currentPacket.header.seqNr < request.socket.reader!.nextDataNr) { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet ${this.currentPacket.header.seqNr} already processed.`) + this.currentPacket = this.packetHeap.pop() + return this.processCurrentPacket() + } else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}`) + this.packetHeap.push(this.currentPacket) + this.currentPacket = undefined + return + } + } await request.handleUtpPacket(this.currentPacket) this.currentPacket = this.packetHeap.pop() await this.processCurrentPacket() From 7e0ca7a9effc31a304a191550a30fcad4b902818 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Thu, 9 Jan 2025 17:27:31 -0700 Subject: [PATCH 05/10] update test --- packages/portalnetwork/test/wire/utp/utp.spec.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/portalnetwork/test/wire/utp/utp.spec.ts b/packages/portalnetwork/test/wire/utp/utp.spec.ts index f4924dda9..74573270b 100644 --- a/packages/portalnetwork/test/wire/utp/utp.spec.ts +++ b/packages/portalnetwork/test/wire/utp/utp.spec.ts @@ -280,10 +280,10 @@ describe('RequestManager', () => { }, }) void mgr.handleNewRequest(req1.connectionId, req1) - mgr.masterPacketQueue.push(packet2) + mgr.packetHeap.push(packet2) mgr.currentPacket = packet3 void mgr.handlePacket(packet1.encode()) - assert.equal(mgr.masterPacketQueue.length, 2) - assert.deepEqual(mgr.masterPacketQueue[0], packet2) + assert.equal(mgr.packetHeap.size(), 2) + assert.deepEqual(mgr.packetHeap.peek(), packet2) }) }) From 774053c2626a66fb1a08ae9ca00c1a265a23cef9 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 10 Jan 2025 13:42:49 -0700 Subject: [PATCH 06/10] uTP: process OOP if 3 OOP in heap --- .../utp/PortalNetworkUtp/requestManager.ts | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index 6c31242df..cb460374f 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -40,6 +40,15 @@ export class RequestManager { return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] } + /** + * Finds the number of packets in the packet heap for a given request + * @param connectionId connectionId of the request to get the packet count for + * @returns the number of packets in the packet heap for a given request + */ + getPacketCount(connectionId: number): number { + return this.packetHeap.heapArray.filter((packet) => packet.header.connectionId === connectionId).length + } + /** * Adds a new uTP request to the peer's request manager. * @param connectionId connectionId from uTP initialization @@ -99,10 +108,14 @@ export class RequestManager { this.currentPacket = this.packetHeap.pop() return this.processCurrentPacket() } else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) { - this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}`) - this.packetHeap.push(this.currentPacket) - this.currentPacket = undefined - return + if (this.getPacketCount(this.currentPacket.header.connectionId) < 3) { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Pushing packet back to heap.`) + this.packetHeap.push(this.currentPacket) + this.currentPacket = undefined + return + } else { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Treating expected packet as lost.`) + } } } await request.handleUtpPacket(this.currentPacket) From 5619502dcc467204bdb588e2ff96e2cba575ae3c Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 10 Jan 2025 14:14:25 -0700 Subject: [PATCH 07/10] uTP: add method to remove request packets from heap --- .../utp/PortalNetworkUtp/ContentRequest.ts | 4 ++-- .../utp/PortalNetworkUtp/requestManager.ts | 21 +++++++++++++++++-- 2 files changed, 21 insertions(+), 4 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts index 79e0e7ee5..6aec30b7c 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts @@ -242,7 +242,7 @@ export class FoundContentWriteRequest extends ContentWriteRequest { async _handleStatePacket(packet: StatePacket): Promise { await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds) if (this.socket.state === ConnectionState.Closed) { - await this.requestManager.closeRequest(packet.header.connectionId) + this.requestManager.closeRequest(packet.header.connectionId) } } } @@ -305,7 +305,7 @@ export class OfferWriteRequest extends ContentWriteRequest { } await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds) if (this.socket.state === ConnectionState.Closed) { - await this.requestManager.closeRequest(packet.header.connectionId) + this.requestManager.closeRequest(packet.header.connectionId) } } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index cb460374f..e051d6eaf 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -49,6 +49,22 @@ export class RequestManager { return this.packetHeap.heapArray.filter((packet) => packet.header.connectionId === connectionId).length } + /** + * Removes all packets from the packet heap for a given request + * @param connectionId connectionId of the request to remove packets for + */ + removeRequestPackets(connectionId: number) { + const comparator = (packet: Packet) => packet.header.connectionId === connectionId + const packet = new Packet({ + header: { + connectionId, + } as any, + }) + while (this.packetHeap.remove(packet, comparator)) { + continue + } + } + /** * Adds a new uTP request to the peer's request manager. * @param connectionId connectionId from uTP initialization @@ -127,19 +143,20 @@ export class RequestManager { * Closes a uTP request and processes the next request in the queue. * @param connectionId connectionId of the request to close */ - async closeRequest(connectionId: number) { + closeRequest(connectionId: number) { const request = this.lookupRequest(connectionId) if (request === undefined) { return } this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`) + this.removeRequestPackets(connectionId) delete this.requestMap[connectionId] } closeAllRequests() { this.logger.extend('CLOSE_REQUEST')(`Closing all requests for peer ${this.peerId}`) for (const id of Object.keys(this.requestMap)) { - delete this.requestMap[Number(id)] + this.closeRequest(Number(id)) } this.packetHeap = new Heap(packetComparator) } From eb6df1af0735e840a7afdb3e35b05773f0cafae1 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Mon, 13 Jan 2025 11:42:21 -0700 Subject: [PATCH 08/10] uTP: Add comments and constant to OOP packet conditional --- .../src/wire/utp/PortalNetworkUtp/requestManager.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index e051d6eaf..b4c07b5cc 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -15,6 +15,8 @@ const packetComparator: Comparator> = (a: Packet, return a.header.timestampMicroseconds - b.header.timestampMicroseconds; } +const MAX_IN_FLIGHT_PACKETS = 3 + export class RequestManager { peerId: string requestMap: Record @@ -124,12 +126,14 @@ export class RequestManager { this.currentPacket = this.packetHeap.pop() return this.processCurrentPacket() } else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) { - if (this.getPacketCount(this.currentPacket.header.connectionId) < 3) { + if (this.getPacketCount(this.currentPacket.header.connectionId) < MAX_IN_FLIGHT_PACKETS) { + // Requeue packet. Optimistically assume expected packet has arrived out of order. this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Pushing packet back to heap.`) this.packetHeap.push(this.currentPacket) this.currentPacket = undefined return } else { + // Treat expected packet as lost. Process next packet (should trigger SELECTIVE_ACK) this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Treating expected packet as lost.`) } } From 3ab0f4f0a33359facf9288c81835a407936522ab Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Mon, 13 Jan 2025 15:23:12 -0500 Subject: [PATCH 09/10] lint --- packages/portalnetwork/src/client/client.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index 12ae44595..1b581d6e9 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -14,7 +14,7 @@ import { SyncStrategy, } from '../networks/index.js' import { CapacitorUDPTransportService, WebSocketTransportService } from '../transports/index.js' -import { MEGABYTE, dirSize, shortId } from '../util/index.js' +import { MEGABYTE, dirSize } from '../util/index.js' import { PortalNetworkUTP } from '../wire/utp/PortalNetworkUtp/index.js' import { DBManager } from './dbManager.js' @@ -414,9 +414,8 @@ export class PortalNetwork extends EventEmitter { await this.uTP.handleUtpPacket(packetBuffer, src.nodeId) } catch (err: any) { this.logger.extend('error')( - - `handleUTP error: ${err.message}. SrcId: ${ - src.nodeId + + `handleUTP error: ${err.message}. SrcId: ${src.nodeId } MultiAddr: ${src.socketAddr.toString()}`, ) } From 539a70866194f8400f9f08054967d44b6e7d3b73 Mon Sep 17 00:00:00 2001 From: acolytec3 <17355484+acolytec3@users.noreply.github.com> Date: Mon, 13 Jan 2025 15:25:02 -0500 Subject: [PATCH 10/10] move types to types --- .../utp/PortalNetworkUtp/requestManager.ts | 24 ++++++++----------- .../src/wire/utp/PortalNetworkUtp/types.ts | 5 ++++ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts index b4c07b5cc..65a8117a9 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -1,10 +1,9 @@ import type { ContentRequest } from "./ContentRequest.js"; -import { Packet , PacketType, UtpSocketType } from "../Packets/index.js"; +import { Packet, PacketType, UtpSocketType } from "../Packets/index.js"; import type { Debugger } from "debug"; -import type {Comparator} from "heap-js"; -import { Heap} from "heap-js"; - -type RequestId = number +import type { Comparator } from "heap-js"; +import { Heap } from "heap-js"; +import { MAX_IN_FLIGHT_PACKETS, type RequestId } from "./types.js"; const packetComparator: Comparator> = (a: Packet, b: Packet) => { // If packets belong to the same connection, sort by sequence number @@ -14,9 +13,6 @@ const packetComparator: Comparator> = (a: Packet, // Otherwise, sort by timestamp return a.header.timestampMicroseconds - b.header.timestampMicroseconds; } - -const MAX_IN_FLIGHT_PACKETS = 3 - export class RequestManager { peerId: string requestMap: Record @@ -59,11 +55,11 @@ export class RequestManager { const comparator = (packet: Packet) => packet.header.connectionId === connectionId const packet = new Packet({ header: { - connectionId, + connectionId, } as any, - }) + }) while (this.packetHeap.remove(packet, comparator)) { - continue + continue } } @@ -72,7 +68,7 @@ export class RequestManager { * @param connectionId connectionId from uTP initialization * @param request new ContentRequest */ - async handleNewRequest(connectionId: number,request: ContentRequest) { + async handleNewRequest(connectionId: number, request: ContentRequest) { this.requestMap[connectionId] = request await request.init() } @@ -124,7 +120,7 @@ export class RequestManager { if (this.currentPacket.header.seqNr < request.socket.reader!.nextDataNr) { this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet ${this.currentPacket.header.seqNr} already processed.`) this.currentPacket = this.packetHeap.pop() - return this.processCurrentPacket() + return this.processCurrentPacket() } else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) { if (this.getPacketCount(this.currentPacket.header.connectionId) < MAX_IN_FLIGHT_PACKETS) { // Requeue packet. Optimistically assume expected packet has arrived out of order. @@ -156,7 +152,7 @@ export class RequestManager { this.removeRequestPackets(connectionId) delete this.requestMap[connectionId] } - + closeAllRequests() { this.logger.extend('CLOSE_REQUEST')(`Closing all requests for peer ${this.peerId}`) for (const id of Object.keys(this.requestMap)) { diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts index 364a7049f..7003d1f9a 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts @@ -23,3 +23,8 @@ export interface INewRequest { requestCode: RequestCode contents?: Uint8Array } + + +export const MAX_IN_FLIGHT_PACKETS = 3 + +export type RequestId = number \ No newline at end of file