Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into utp-limit
Browse files Browse the repository at this point in the history
  • Loading branch information
acolytec3 committed Jan 13, 2025
2 parents d0af365 + 2b78274 commit 25d242d
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 49 deletions.
4 changes: 3 additions & 1 deletion packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ export class PortalNetwork extends EventEmitter<PortalNetworkEvents> {
await this.uTP.handleUtpPacket(packetBuffer, src.nodeId)
} catch (err: any) {
this.logger.extend('error')(
`handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`,

`handleUTP error: ${err.message}. SrcId: ${src.nodeId
} MultiAddr: ${src.socketAddr.toString()}`,
)
}
}
Expand Down
3 changes: 2 additions & 1 deletion packages/portalnetwork/src/client/routingTable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { KademliaRoutingTable } from '@chainsafe/discv5'

import type { ENR, NodeId } from '@chainsafe/enr'
import type { Debugger } from 'debug'
import { shortId } from '../index.js'
export class PortalNetworkRoutingTable extends KademliaRoutingTable {
public logger?: Debugger
private radiusMap: Map<NodeId, bigint>
Expand Down Expand Up @@ -67,7 +68,7 @@ export class PortalNetworkRoutingTable extends KademliaRoutingTable {
* @param nodeId nodeId of peer to be evicted
*/
public evictNode = (nodeId: NodeId) => {
this.logger?.extend('EVICT')(nodeId)
this.logger?.extend('EVICT')(shortId(nodeId))
let enr: ENR | undefined = this.getValue(nodeId)
this.ignoreNode(nodeId)
if (enr) {
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/src/networks/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ export abstract class BaseNetwork extends EventEmitter {
return enr.encode()
})
if (encodedEnrs.length > 0) {
this.logger(`Found ${encodedEnrs.length} closer to content than us`)
this.logger.extend('FINDCONTENT')(`Found ${encodedEnrs.length} closer to content`)
// TODO: Add capability to send multiple TALKRESP messages if # ENRs exceeds packet size
while (encodedEnrs.length > 0 && arrayByteLength(encodedEnrs) > MAX_PACKET_SIZE) {
// Remove ENRs until total ENRs less than 1200 bytes
Expand All @@ -697,7 +697,7 @@ export abstract class BaseNetwork extends EventEmitter {
selector: FoundContent.ENRS,
value: [],
})
this.logger(`Found no ENRs closer to content than us`)
this.logger(`Found no ENRs closer to content`)
await this.sendResponse(
src,
requestId,
Expand All @@ -718,7 +718,7 @@ export abstract class BaseNetwork extends EventEmitter {
const nodeId = enr.nodeId
// Only add node to the routing table if we have an ENR
this.routingTable.getWithPending(enr.nodeId)?.value === undefined &&
this.logger(`adding ${nodeId} to ${this.networkName} routing table`)
this.logger.extend('RoutingTable')(`adding ${shortId(nodeId)}`)
this.routingTable.insertOrUpdate(enr, EntryStatus.Connected)
if (customPayload !== undefined) {
const decodedPayload = PingPongCustomDataType.deserialize(Uint8Array.from(customPayload))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,7 @@ export abstract class ContentRequest {
this.socket._clearTimeout()
this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId} ||`,
)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| seqNr: ${packet.header.seqNr} ||`,
)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| ackNr: ${packet.header.ackNr} ||`,
`|| pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr} t: ${packet.header.timestampMicroseconds}`,
)
switch (packet.header.pType) {
case PacketType.ST_SYN:
Expand Down Expand Up @@ -248,7 +242,7 @@ export class FoundContentWriteRequest extends ContentWriteRequest {
async _handleStatePacket(packet: StatePacket): Promise<void> {
await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds)
if (this.socket.state === ConnectionState.Closed) {
await this.requestManager.closeRequest(packet.header.connectionId)
this.requestManager.closeRequest(packet.header.connectionId)
}
}
}
Expand Down Expand Up @@ -311,7 +305,7 @@ export class OfferWriteRequest extends ContentWriteRequest {
}
await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds)
if (this.socket.state === ConnectionState.Closed) {
await this.requestManager.closeRequest(packet.header.connectionId)
this.requestManager.closeRequest(packet.header.connectionId)
}
}
}
Expand Down
100 changes: 75 additions & 25 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
import type { ContentRequest } from "./ContentRequest.js";
import { Packet , PacketType } from "../Packets/index.js";
import { Packet, PacketType, UtpSocketType } from "../Packets/index.js";
import type { Debugger } from "debug";
import type { Comparator } from "heap-js";
import { Heap } from "heap-js";
import { MAX_IN_FLIGHT_PACKETS, type RequestId } from "./types.js";

type RequestId = number

const packetComparator: Comparator<Packet<PacketType>> = (a: Packet<PacketType>, b: Packet<PacketType>) => {
// If packets belong to the same connection, sort by sequence number
if (a.header.connectionId === b.header.connectionId) {
return a.header.seqNr - b.header.seqNr;
}
// Otherwise, sort by timestamp
return a.header.timestampMicroseconds - b.header.timestampMicroseconds;
}
export class RequestManager {
peerId: string
requestMap: Record<RequestId, ContentRequest>
logger: Debugger
masterPacketQueue: Array<Packet<PacketType>>
packetHeap: Heap<Packet<PacketType>>
currentPacket: Packet<PacketType> | undefined

constructor(peerId: string, logger: Debugger) {
this.peerId = peerId
this.requestMap = {}
this.logger = logger.extend(`RequestManager`).extend(peerId.slice(0, 4))
this.masterPacketQueue = []
this.currentPacket = undefined
this.packetHeap = new Heap(packetComparator)
}

/**
Expand All @@ -29,12 +38,37 @@ export class RequestManager {
return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1]
}

/**
* Finds the number of packets in the packet heap for a given request
* @param connectionId connectionId of the request to get the packet count for
* @returns the number of packets in the packet heap for a given request
*/
getPacketCount(connectionId: number): number {
return this.packetHeap.heapArray.filter((packet) => packet.header.connectionId === connectionId).length
}

/**
* Removes all packets from the packet heap for a given request
* @param connectionId connectionId of the request to remove packets for
*/
removeRequestPackets(connectionId: number) {
const comparator = (packet: Packet<PacketType>) => packet.header.connectionId === connectionId
const packet = new Packet({
header: {
connectionId,
} as any,
})
while (this.packetHeap.remove(packet, comparator)) {
continue
}
}

/**
* Adds a new uTP request to the peer's request manager.
* @param connectionId connectionId from uTP initialization
* @param request new ContentRequest
*/
async handleNewRequest(connectionId: number,request: ContentRequest) {
async handleNewRequest(connectionId: number, request: ContentRequest) {
this.requestMap[connectionId] = request
await request.init()
}
Expand All @@ -50,65 +84,81 @@ export class RequestManager {
this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`)
return
}
if (this.masterPacketQueue.length === 0) {
this.currentPacket = packet
return this.processCurrentPacket()
}
if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) {
this.masterPacketQueue.unshift(packet)
await request.handleUtpPacket(packet)
return
} else {
this.masterPacketQueue.push(packet)
this.packetHeap.push(packet)
}
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} packet for request ${packet.header.connectionId} to packet queue (size: ${this.masterPacketQueue.length} packets)`)
this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} [${packet.header.seqNr}] for Req:${packet.header.connectionId} to queue (size: ${this.packetHeap.size()} packets)`)
if (this.currentPacket === undefined) {
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
}
}

async processCurrentPacket() {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.masterPacketQueue.length}`)
async processCurrentPacket(): Promise<void> {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.packetHeap.size()}`)
if (this.currentPacket === undefined) {
if (this.masterPacketQueue.length === 0) {
if (this.packetHeap.size() === 0) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`No packets to process`)
return
}
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
return
}
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} packet for request ${this.currentPacket.header.connectionId}`)
this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} [${this.currentPacket.header.seqNr}] for Req:${this.currentPacket.header.connectionId}`)
const request = this.lookupRequest(this.currentPacket.header.connectionId)
if (request === undefined) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`)
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
return
}
if (request.socket.type === UtpSocketType.READ && request.socket.reader !== undefined) {
if (this.currentPacket.header.seqNr < request.socket.reader!.nextDataNr) {
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet ${this.currentPacket.header.seqNr} already processed.`)
this.currentPacket = this.packetHeap.pop()
return this.processCurrentPacket()
} else if (this.currentPacket.header.seqNr > request.socket.reader!.nextDataNr) {
if (this.getPacketCount(this.currentPacket.header.connectionId) < MAX_IN_FLIGHT_PACKETS) {
// Requeue packet. Optimistically assume expected packet has arrived out of order.
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Pushing packet back to heap.`)
this.packetHeap.push(this.currentPacket)
this.currentPacket = undefined
return
} else {
// Treat expected packet as lost. Process next packet (should trigger SELECTIVE_ACK)
this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet is ahead of current reader position - seqNr: ${this.currentPacket.header.seqNr} > ${request.socket.reader?.nextDataNr}. Treating expected packet as lost.`)
}
}
}
await request.handleUtpPacket(this.currentPacket)
this.currentPacket = this.masterPacketQueue.shift()
this.currentPacket = this.packetHeap.pop()
await this.processCurrentPacket()
}

/**
* Closes a uTP request and processes the next request in the queue.
* @param connectionId connectionId of the request to close
*/
async closeRequest(connectionId: number) {
closeRequest(connectionId: number) {
const request = this.lookupRequest(connectionId)
if (request === undefined) {
return
}
this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`)
this.removeRequestPackets(connectionId)
delete this.requestMap[connectionId]
}

closeAllRequests() {
this.logger.extend('CLOSE_REQUEST')(`Closing all requests for peer ${this.peerId}`)
for (const id of Object.keys(this.requestMap)) {
delete this.requestMap[Number(id)]
this.closeRequest(Number(id))
}
this.masterPacketQueue = []
this.packetHeap = new Heap(packetComparator)
}


Expand Down
5 changes: 5 additions & 0 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ export interface INewRequest {
requestCode: RequestCode
contents?: Uint8Array
}


export const MAX_IN_FLIGHT_PACKETS = 3

export type RequestId = number
8 changes: 1 addition & 7 deletions packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,7 @@ export abstract class UtpSocket {
async sendPacket<T extends PacketType>(packet: Packet<T>): Promise<Buffer> {
const msg = packet.encode()
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId}`,
)
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| seqNr: ${packet.header.seqNr}`,
)
this.logger.extend('SEND').extend(PacketType[packet.header.pType])(
`|| ackNr: ${packet.header.ackNr}`,
`pid: ${packet.header.connectionId} sNr: ${packet.header.seqNr} aNr: ${packet.header.ackNr}`,
)
try {
await this.utp.send(this.remoteAddress, msg, this.networkId)
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/test/wire/utp/utp.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ describe('RequestManager', () => {
},
})
void mgr.handleNewRequest(req1.connectionId, req1)
mgr.masterPacketQueue.push(packet2)
mgr.packetHeap.push(packet2)
mgr.currentPacket = packet3
void mgr.handlePacket(packet1.encode())
assert.equal(mgr.masterPacketQueue.length, 2)
assert.deepEqual(mgr.masterPacketQueue[0], packet2)
assert.equal(mgr.packetHeap.size(), 2)
assert.deepEqual(mgr.packetHeap.peek(), packet2)
})
})

0 comments on commit 25d242d

Please sign in to comment.