Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utp packet heap #702

Merged
merged 10 commits into from
Jan 13, 2025
Merged
7 changes: 5 additions & 2 deletions packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
SyncStrategy,
} from '../networks/index.js'
import { CapacitorUDPTransportService, WebSocketTransportService } from '../transports/index.js'
import { MEGABYTE, dirSize } from '../util/index.js'
import { MEGABYTE, dirSize, shortId } from '../util/index.js'
import { PortalNetworkUTP } from '../wire/utp/PortalNetworkUtp/index.js'

import { DBManager } from './dbManager.js'
Expand Down Expand Up @@ -414,7 +414,10 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
await this.uTP.handleUtpPacket(packetBuffer, src.nodeId)
} catch (err: any) {
this.logger.extend('error')(
`handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`,

`handleUTP error: ${err.message}. SrcId: ${
src.nodeId
} MultiAddr: ${src.socketAddr.toString()}`,
)
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/portalnetwork/src/client/routingTable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { KademliaRoutingTable } from '@chainsafe/discv5'

import type { ENR, NodeId } from '@chainsafe/enr'
import type { Debugger } from 'debug'
import { shortId } from '../index.js'
export class PortalNetworkRoutingTable extends KademliaRoutingTable {
public logger?: Debugger
private radiusMap: Map<NodeId, bigint>
Expand Down Expand Up @@ -67,7 +68,7 @@ export class PortalNetworkRoutingTable extends KademliaRoutingTable {
* @param nodeId nodeId of peer to be evicted
*/
public evictNode = (nodeId: NodeId) => {
this.logger?.extend('EVICT')(nodeId)
this.logger?.extend('EVICT')(shortId(nodeId))
let enr: ENR | undefined = this.getValue(nodeId)
this.ignoreNode(nodeId)
if (enr) {
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/src/networks/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ export abstract class BaseNetwork extends EventEmitter {
return enr.encode()
})
if (encodedEnrs.length > 0) {
this.logger(`Found ${encodedEnrs.length} closer to content than us`)
this.logger.extend('FINDCONTENT')(`Found ${encodedEnrs.length} closer to content`)
// TODO: Add capability to send multiple TALKRESP messages if # ENRs exceeds packet size
while (encodedEnrs.length > 0 && arrayByteLength(encodedEnrs) > MAX_PACKET_SIZE) {
// Remove ENRs until total ENRs less than 1200 bytes
Expand All @@ -691,7 +691,7 @@ export abstract class BaseNetwork extends EventEmitter {
selector: FoundContent.ENRS,
value: [],
})
this.logger(`Found no ENRs closer to content than us`)
this.logger(`Found no ENRs closer to content`)
await this.sendResponse(
src,
requestId,
Expand All @@ -712,7 +712,7 @@ export abstract class BaseNetwork extends EventEmitter {
const nodeId = enr.nodeId
// Only add node to the routing table if we have an ENR
this.routingTable.getWithPending(enr.nodeId)?.value === undefined &&
this.logger(`adding ${nodeId} to ${this.networkName} routing table`)
this.logger.extend('RoutingTable')(`adding ${shortId(nodeId)}`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload !== undefined) {
const decodedPayload = PingPongCustomDataType.deserialize(Uint8Array.from(customPayload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ export abstract class ContentRequest {
this.socket._clearTimeout()
this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId} ||`,
)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| seqNr: ${packet.header.seqNr} ||`,
)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| ackNr: ${packet.header.ackNr} ||`,
`|| pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr} t: ${packet.header.timestampMicroseconds}`,
)
switch (packet.header.pType) {
case PacketType.ST_SYN:
Expand Down
11 changes: 9 additions & 2 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ export class PortalNetworkUTP {
return this.requestManagers[nodeId] !== undefined && Object.keys(this.requestManagers[nodeId].requestMap).length > 0
}

openRequests(): number {
return Object.keys(this.requestManagers).reduce((acc, nodeId) => {
return acc + Object.keys(this.requestManagers[nodeId].requestMap).length
}, 0)
}

createPortalNetworkUTPSocket(
networkId: NetworkId,
requestCode: RequestCode,
Expand Down Expand Up @@ -106,9 +112,9 @@ export class PortalNetworkUTP {
content,
contentKeys,
})
this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`)
this.logger.extend('utpRequest')(`ConnectionId: ${connectionId} -- { socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`)
await this.requestManagers[enr.nodeId].handleNewRequest(connectionId, newRequest)
this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`)
this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`)
return newRequest
}

Expand All @@ -125,6 +131,7 @@ export class PortalNetworkUTP {
} catch (err) {
this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`)
this.closeAllPeerRequests(enr.nodeId)
this.logger.extend('utpRequest')(`Open Requests: ${this.openRequests()}`)
throw err
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
import type { ContentRequest } from "./ContentRequest.js";
import { Packet , PacketType } from "../Packets/index.js";
import { Packet , PacketType, UtpSocketType } from "../Packets/index.js";
import type { Debugger } from "debug";
import type {Comparator} from "heap-js";
import { Heap} from "heap-js";

type RequestId = number

const packetComparator: Comparator<Packet<PacketType>> = (a: Packet<PacketType>, b: Packet<PacketType>) => {
// If packets belong to the same connection, sort by sequence number
if (a.header.connectionId === b.header.connectionId) {
return a.header.seqNr - b.header.seqNr;
}
// Otherwise, sort by timestamp
return a.header.timestampMicroseconds - b.header.timestampMicroseconds;
}

export class RequestManager {
peerId: string
requestMap: Record<RequestId, ContentRequest>
logger: Debugger
masterPacketQueue: Array<Packet<PacketType>>
packetHeap: Heap<Packet<PacketType>>
currentPacket: Packet<PacketType> | undefined

constructor(peerId: string, logger: Debugger) {
this.peerId = peerId
this.requestMap = {}
this.logger = logger.extend(`RequestManager`).extend(peerId.slice(0, 4))
this.masterPacketQueue = []
this.currentPacket = undefined
this.packetHeap = new Heap(packetComparator)
}

/**
Expand Down Expand Up @@ -50,43 +61,52 @@ export class RequestManager {
this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`)
return
}
if (this.masterPacketQueue.length === 0) {
this.currentPacket = packet
return this.processCurrentPacket()
}
if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) {
this.masterPacketQueue.unshift(packet)
await request.handleUtpPacket(packet)
return
} else {
this.masterPacketQueue.push(packet)
this.packetHeap.push(packet)
}
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} packet for request ${packet.header.connectionId} to packet queue (size: ${this.masterPacketQueue.length} packets)`)
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`)
if (this.currentPacket === undefined) {
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
}
}

async processCurrentPacket() {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.masterPacketQueue.length}`)
async processCurrentPacket(): Promise<void> {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`)
if (this.currentPacket === undefined) {
if (this.masterPacketQueue.length === 0) {
if (this.packetHeap.size() === 0) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`No packets to process`)
return
}
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
return
}
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} packet for request ${this.currentPacket.header.connectionId}`)
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`)
const request = this.lookupRequest(this.currentPacket.header.connectionId)
if (request === undefined) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`)
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
return
}
if (request.socket.type === UtpSocketType.READ && request.socket.reader !== undefined) {
if (this.currentPacket.header.seqNr < request.socket.reader!.nextDataNr) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet ${this.currentPacket.header.seqNr} already processed.`)
this.currentPacket = this.packetHeap.pop()
return this.processCurrentPacket()
} else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) {
acolytec3 marked this conversation as resolved.
Show resolved Hide resolved
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}`)
this.packetHeap.push(this.currentPacket)
this.currentPacket = undefined
return
}
}
await request.handleUtpPacket(this.currentPacket)
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
}

Expand All @@ -108,7 +128,7 @@ export class RequestManager {
for (const id of Object.keys(this.requestMap)) {
delete this.requestMap[Number(id)]
}
this.masterPacketQueue = []
this.packetHeap = new Heap(packetComparator)
}


Expand Down
8 changes: 1 addition & 7 deletions packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,7 @@ export abstract class UtpSocket {
async sendPacket<T extends PacketType>(packet: Packet<T>): Promise<Buffer> {
const msg = packet.encode()
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId}`,
)
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| seqNr: ${packet.header.seqNr}`,
)
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| ackNr: ${packet.header.ackNr}`,
`pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr}`,
)
try {
await this.utp.send(this.remoteAddress, msg, this.networkId)
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/test/wire/utp/utp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ describe('RequestManager', () => {
},
})
void mgr.handleNewRequest(req1.connectionId, req1)
mgr.masterPacketQueue.push(packet2)
mgr.packetHeap.push(packet2)
mgr.currentPacket = packet3
void mgr.handlePacket(packet1.encode())
assert.equal(mgr.masterPacketQueue.length, 2)
assert.deepEqual(mgr.masterPacketQueue[0], packet2)
assert.equal(mgr.packetHeap.size(), 2)
assert.deepEqual(mgr.packetHeap.peek(), packet2)
})
})
Loading