Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utp packet heap #702

Merged
merged 10 commits into from
Jan 13, 2025
Merged
4 changes: 3 additions & 1 deletion packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
await this.uTP.handleUtpPacket(packetBuffer, src.nodeId)
} catch (err: any) {
this.logger.extend('error')(
`handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`,

`handleUTP error: ${err.message}. SrcId: ${src.nodeId
} MultiAddr: ${src.socketAddr.toString()}`,
)
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/portalnetwork/src/client/routingTable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { KademliaRoutingTable } from '@chainsafe/discv5'

import type { ENR, NodeId } from '@chainsafe/enr'
import type { Debugger } from 'debug'
import { shortId } from '../index.js'
export class PortalNetworkRoutingTable extends KademliaRoutingTable {
public logger?: Debugger
private radiusMap: Map<NodeId, bigint>
Expand Down Expand Up @@ -67,7 +68,7 @@ export class PortalNetworkRoutingTable extends KademliaRoutingTable {
* @param nodeId nodeId of peer to be evicted
*/
public evictNode = (nodeId: NodeId) => {
this.logger?.extend('EVICT')(nodeId)
this.logger?.extend('EVICT')(shortId(nodeId))
let enr: ENR | undefined = this.getValue(nodeId)
this.ignoreNode(nodeId)
if (enr) {
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/src/networks/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ export abstract class BaseNetwork extends EventEmitter {
return enr.encode()
})
if (encodedEnrs.length > 0) {
this.logger(`Found ${encodedEnrs.length} closer to content than us`)
this.logger.extend('FINDCONTENT')(`Found ${encodedEnrs.length} closer to content`)
// TODO: Add capability to send multiple TALKRESP messages if # ENRs exceeds packet size
while (encodedEnrs.length > 0 && arrayByteLength(encodedEnrs) > MAX_PACKET_SIZE) {
// Remove ENRs until total ENRs less than 1200 bytes
Expand All @@ -691,7 +691,7 @@ export abstract class BaseNetwork extends EventEmitter {
selector: FoundContent.ENRS,
value: [],
})
this.logger(`Found no ENRs closer to content than us`)
this.logger(`Found no ENRs closer to content`)
await this.sendResponse(
src,
requestId,
Expand All @@ -712,7 +712,7 @@ export abstract class BaseNetwork extends EventEmitter {
const nodeId = enr.nodeId
// Only add node to the routing table if we have an ENR
this.routingTable.getWithPending(enr.nodeId)?.value === undefined &&
this.logger(`adding ${nodeId} to ${this.networkName} routing table`)
this.logger.extend('RoutingTable')(`adding ${shortId(nodeId)}`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload !== undefined) {
const decodedPayload = PingPongCustomDataType.deserialize(Uint8Array.from(customPayload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ export abstract class ContentRequest {
this.socket._clearTimeout()
this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId} ||`,
)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| seqNr: ${packet.header.seqNr} ||`,
)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| ackNr: ${packet.header.ackNr} ||`,
`|| pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr} t: ${packet.header.timestampMicroseconds}`,
)
switch (packet.header.pType) {
case PacketType.ST_SYN:
Expand Down Expand Up @@ -248,7 +242,7 @@ export class FoundContentWriteRequest extends ContentWriteRequest {
async _handleStatePacket(packet: StatePacket): Promise<void> {
await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds)
if (this.socket.state === ConnectionState.Closed) {
await this.requestManager.closeRequest(packet.header.connectionId)
this.requestManager.closeRequest(packet.header.connectionId)
}
}
}
Expand Down Expand Up @@ -311,7 +305,7 @@ export class OfferWriteRequest extends ContentWriteRequest {
}
await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds)
if (this.socket.state === ConnectionState.Closed) {
await this.requestManager.closeRequest(packet.header.connectionId)
this.requestManager.closeRequest(packet.header.connectionId)
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ export class PortalNetworkUTP {
return this.requestManagers[nodeId] !== undefined && Object.keys(this.requestManagers[nodeId].requestMap).length > 0
}

openRequests(): number {
return Object.keys(this.requestManagers).reduce((acc, nodeId) => {
return acc + Object.keys(this.requestManagers[nodeId].requestMap).length
}, 0)
}

createPortalNetworkUTPSocket(
networkId: NetworkId,
requestCode: RequestCode,
Expand Down Expand Up @@ -106,9 +112,9 @@ export class PortalNetworkUTP {
content,
contentKeys,
})
this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`)
this.logger.extend('utpRequest')(`ConnectionId: ${connectionId} -- { socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`)
await this.requestManagers[enr.nodeId].handleNewRequest(connectionId, newRequest)
this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`)
this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`)
return newRequest
}

Expand All @@ -125,6 +131,7 @@ export class PortalNetworkUTP {
} catch (err) {
this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`)
this.closeAllPeerRequests(enr.nodeId)
this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`)
throw err
}
}
Expand Down
100 changes: 75 additions & 25 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import type { ContentRequest } from "./ContentRequest.js";
import { Packet , PacketType } from "../Packets/index.js";
import { Packet, PacketType, UtpSocketType } from "../Packets/index.js";
import type { Debugger } from "debug";
import type { Comparator } from "heap-js";
import { Heap } from "heap-js";
import { MAX_IN_FLIGHT_PACKETS, type RequestId } from "./types.js";

type RequestId = number

const packetComparator: Comparator<Packet<PacketType>> = (a: Packet<PacketType>, b: Packet<PacketType>) => {
// If packets belong to the same connection, sort by sequence number
if (a.header.connectionId === b.header.connectionId) {
return a.header.seqNr - b.header.seqNr;
}
// Otherwise, sort by timestamp
return a.header.timestampMicroseconds - b.header.timestampMicroseconds;
}
export class RequestManager {
peerId: string
requestMap: Record<RequestId, ContentRequest>
logger: Debugger
masterPacketQueue: Array<Packet<PacketType>>
packetHeap: Heap<Packet<PacketType>>
currentPacket: Packet<PacketType> | undefined

constructor(peerId: string, logger: Debugger) {
this.peerId = peerId
this.requestMap = {}
this.logger = logger.extend(`RequestManager`).extend(peerId.slice(0, 4))
this.masterPacketQueue = []
this.currentPacket = undefined
this.packetHeap = new Heap(packetComparator)
}

/**
Expand All @@ -29,12 +38,37 @@ export class RequestManager {
return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1]
}

/**
* Finds the number of packets in the packet heap for a given request
* @param connectionId connectionId of the request to get the packet count for
* @returns the number of packets in the packet heap for a given request
*/
getPacketCount(connectionId: number): number {
return this.packetHeap.heapArray.filter((packet) => packet.header.connectionId === connectionId).length
}

/**
* Removes all packets from the packet heap for a given request
* @param connectionId connectionId of the request to remove packets for
*/
removeRequestPackets(connectionId: number) {
const comparator = (packet: Packet<PacketType>) => packet.header.connectionId === connectionId
const packet = new Packet({
header: {
connectionId,
} as any,
})
while (this.packetHeap.remove(packet, comparator)) {
continue
}
}

/**
* Adds a new uTP request to the peer's request manager.
* @param connectionId connectionId from uTP initialization
* @param request new ContentRequest
*/
async handleNewRequest(connectionId: number,request: ContentRequest) {
async handleNewRequest(connectionId: number, request: ContentRequest) {
this.requestMap[connectionId] = request
await request.init()
}
Expand All @@ -50,65 +84,81 @@ export class RequestManager {
this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`)
return
}
if (this.masterPacketQueue.length === 0) {
this.currentPacket = packet
return this.processCurrentPacket()
}
if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) {
this.masterPacketQueue.unshift(packet)
await request.handleUtpPacket(packet)
return
} else {
this.masterPacketQueue.push(packet)
this.packetHeap.push(packet)
}
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} packet for request ${packet.header.connectionId} to packet queue (size: ${this.masterPacketQueue.length} packets)`)
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`)
if (this.currentPacket === undefined) {
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
}
}

async processCurrentPacket() {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.masterPacketQueue.length}`)
async processCurrentPacket(): Promise<void> {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`)
if (this.currentPacket === undefined) {
if (this.masterPacketQueue.length === 0) {
if (this.packetHeap.size() === 0) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`No packets to process`)
return
}
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
return
}
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} packet for request ${this.currentPacket.header.connectionId}`)
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`)
const request = this.lookupRequest(this.currentPacket.header.connectionId)
if (request === undefined) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`)
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
return
}
if (request.socket.type === UtpSocketType.READ && request.socket.reader !== undefined) {
if (this.currentPacket.header.seqNr < request.socket.reader!.nextDataNr) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet ${this.currentPacket.header.seqNr} already processed.`)
this.currentPacket = this.packetHeap.pop()
return this.processCurrentPacket()
} else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) {
acolytec3 marked this conversation as resolved.
Show resolved Hide resolved
if (this.getPacketCount(this.currentPacket.header.connectionId) < MAX_IN_FLIGHT_PACKETS) {
// Requeue packet. Optimistically assume expected packet has arrived out of order.
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Pushing packet back to heap.`)
this.packetHeap.push(this.currentPacket)
this.currentPacket = undefined
return
} else {
// Treat expected packet as lost. Process next packet (should trigger SELECTIVE_ACK)
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Treating expected packet as lost.`)
}
}
}
await request.handleUtpPacket(this.currentPacket)
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
}

/**
* Closes a uTP request and processes the next request in the queue.
* @param connectionId connectionId of the request to close
*/
async closeRequest(connectionId: number) {
closeRequest(connectionId: number) {
const request = this.lookupRequest(connectionId)
if (request === undefined) {
return
}
this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`)
this.removeRequestPackets(connectionId)
delete this.requestMap[connectionId]
}

closeAllRequests() {
this.logger.extend('CLOSE_REQUEST')(`Closing all requests for peer ${this.peerId}`)
for (const id of Object.keys(this.requestMap)) {
delete this.requestMap[Number(id)]
this.closeRequest(Number(id))
}
this.masterPacketQueue = []
this.packetHeap = new Heap(packetComparator)
}


Expand Down
5 changes: 5 additions & 0 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ export interface INewRequest {
requestCode: RequestCode
contents?: Uint8Array
}


export const MAX_IN_FLIGHT_PACKETS = 3

export type RequestId = number
8 changes: 1 addition & 7 deletions packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,7 @@ export abstract class UtpSocket {
async sendPacket<T extends PacketType>(packet: Packet<T>): Promise<Buffer> {
const msg = packet.encode()
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId}`,
)
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| seqNr: ${packet.header.seqNr}`,
)
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| ackNr: ${packet.header.ackNr}`,
`pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr}`,
)
try {
await this.utp.send(this.remoteAddress, msg, this.networkId)
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/test/wire/utp/utp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ describe('RequestManager', () => {
},
})
void mgr.handleNewRequest(req1.connectionId, req1)
mgr.masterPacketQueue.push(packet2)
mgr.packetHeap.push(packet2)
mgr.currentPacket = packet3
void mgr.handlePacket(packet1.encode())
assert.equal(mgr.masterPacketQueue.length, 2)
assert.deepEqual(mgr.masterPacketQueue[0], packet2)
assert.equal(mgr.packetHeap.size(), 2)
assert.deepEqual(mgr.packetHeap.peek(), packet2)
})
})
Loading