Skip to content

Commit

Permalink
add support for retry after header
Browse files Browse the repository at this point in the history
  • Loading branch information
belopash committed Dec 17, 2024
1 parent 263bab1 commit c2dfddd
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/http-client",
"comment": "add `retry-after` header support",
"type": "minor"
}
],
"packageName": "@subsquid/http-client"
}
5 changes: 4 additions & 1 deletion evm/evm-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ export interface PortalSettings {
* Request timeout in ms
*/
requestTimeout?: number

retryAttempts?: number

bufferThreshold?: number

Expand Down Expand Up @@ -542,7 +544,8 @@ export class EvmBatchProcessor<F extends FieldSelection = {}> {
new PortalClient({
http,
url: archive.url,
queryTimeout: archive.requestTimeout,
requestTimeout: archive.requestTimeout,
retryAttempts: archive.retryAttempts,
bufferThreshold: archive.bufferThreshold,
newBlockTimeout: archive.newBlockTimeout,
log,
Expand Down
5 changes: 4 additions & 1 deletion substrate/substrate-processor/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ export interface PortalSettings {
* Request timeout in ms
*/
requestTimeout?: number

retryAttempts?: number

bufferThreshold?: number

Expand Down Expand Up @@ -535,7 +537,8 @@ export class SubstrateBatchProcessor<F extends FieldSelection = {}> {
client: new PortalClient({
http,
url: options.url,
queryTimeout: options.requestTimeout,
requestTimeout: options.requestTimeout,
retryAttempts: options.retryAttempts,
bufferThreshold: options.bufferThreshold,
newBlockTimeout: options.newBlockTimeout,
log,
Expand Down
25 changes: 23 additions & 2 deletions util/http-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,13 @@ export class HttpClient {
let res: HttpResponse | Error = await this.performRequestWithTimeout(req).catch(ensureError)
if (res instanceof Error || !res.ok) {
if (retryAttempts > retries && this.isRetryableError(res, req)) {
let pause = retrySchedule.length
let pause = asRetryAfterPause(res)
if (pause == null) {
pause = retrySchedule.length
? retrySchedule[Math.min(retries, retrySchedule.length - 1)]
: 1000
}
// FIXME: should we count retries if there is retry-after header in response?
retries += 1
this.beforeRetryPause(req, res, pause)
await wait(pause, req.signal)
Expand Down Expand Up @@ -334,7 +338,7 @@ export class HttpClient {
case 524:
return true
default:
return false
return error.headers.has('retry-after')
}
}
return false
Expand Down Expand Up @@ -478,3 +482,20 @@ export function isHttpConnectionError(err: unknown): boolean {
&& err.type == 'system'
&& (err.message.startsWith('request to') || err.code == 'ERR_STREAM_PREMATURE_CLOSE')
}


export function asRetryAfterPause(res: HttpResponse | Error): number | undefined {
if (res instanceof HttpError) {
res = res.response
}
if (res instanceof HttpResponse) {
let retryAfter = res.headers.get('retry-after')
if (retryAfter == null) return undefined
if (/^\d+$/.test(retryAfter)) return parseInt(retryAfter, 10) * 1000
if (HTTP_DATE_REGEX.test(retryAfter)) return Math.max(0, new Date(retryAfter).getTime() - Date.now())
}

return undefined
}

const HTTP_DATE_REGEX = /^(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun), (\d{2}) (Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) (\d{4}) (\d{2}):(\d{2}):(\d{2}) GMT$/;
25 changes: 14 additions & 11 deletions util/portal-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,29 @@ export interface PortalClientOptions {
url: string
http: HttpClient
log?: Logger
queryTimeout?: number
requestTimeout?: number
retryAttempts?: number
bufferThreshold?: number
newBlockTimeout?: number
}

export class PortalClient {
private url: URL
private http: HttpClient
private queryTimeout: number
private requestTimeout: number
private bufferThreshold: number
private newBlockTimeout: number
private retryAttempts: number
private log?: Logger

constructor(options: PortalClientOptions) {
this.url = new URL(options.url)
this.log = options.log
this.http = options.http
this.queryTimeout = options.queryTimeout ?? 180_000
this.requestTimeout = options.requestTimeout ?? 180_000
this.bufferThreshold = options.bufferThreshold ?? 10 * 1024 * 1024
this.newBlockTimeout = options.newBlockTimeout ?? 120_000
this.retryAttempts = options.retryAttempts ?? Infinity
}

private getDatasetUrl(path: string): string {
Expand All @@ -60,8 +63,8 @@ export class PortalClient {

async getMetadata(): Promise<Metadata> {
let res: {real_time: boolean} = await this.http.get(this.getDatasetUrl('metadata'), {
retryAttempts: 3,
httpTimeout: 10_000,
retryAttempts: this.retryAttempts,
httpTimeout: this.requestTimeout,
})
return {
isRealTime: !!res.real_time,
Expand All @@ -70,8 +73,8 @@ export class PortalClient {

async getFinalizedHeight(): Promise<number> {
let res: string = await this.http.get(this.getDatasetUrl('finalized-stream/height'), {
retryAttempts: 3,
httpTimeout: 10_000,
retryAttempts: this.retryAttempts,
httpTimeout: this.requestTimeout,
})
let height = parseInt(res)
assert(Number.isSafeInteger(height))
Expand All @@ -83,8 +86,8 @@ export class PortalClient {
return this.http
.request<Buffer>('POST', this.getDatasetUrl(`finalized-stream`), {
json: query,
retryAttempts: 3,
httpTimeout: this.queryTimeout,
retryAttempts: this.retryAttempts,
httpTimeout: this.requestTimeout,
})
.catch(
withErrorContext({
Expand Down Expand Up @@ -154,8 +157,8 @@ export class PortalClient {
let res = await this.http
.request<Readable>('POST', this.getDatasetUrl(`finalized-stream`), {
json: archiveQuery,
retryAttempts: 3,
httpTimeout: this.queryTimeout,
retryAttempts: this.retryAttempts,
httpTimeout: this.requestTimeout,
stream: true,
})
.catch(
Expand Down

0 comments on commit c2dfddd

Please sign in to comment.