diff --git a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts index fb8364bb2..3a50070db 100644 --- a/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/publisher.test.ts @@ -17,6 +17,51 @@ const testClient = new TestFetchClient() const makeReqSpy = jest.spyOn(testClient, 'makeRequest') const getLastRequest = () => makeReqSpy.mock.lastCall![0] +class TestHeaders implements Headers { + private headers: Record + + constructor() { + this.headers = {} + } + + append(name: string, value: string): void { + if (this.headers[name]) { + this.headers[name] += `, ${value}` + } else { + this.headers[name] = value + } + } + + delete(name: string): void { + delete this.headers[name] + } + + get(name: string): string | null { + return this.headers[name] || null + } + + has(name: string): boolean { + return name in this.headers + } + + set(name: string, value: string): void { + this.headers[name] = value + } + + forEach( + callback: (value: string, name: string, parent: Headers) => void + ): void { + for (const name in this.headers) { + callback(this.headers[name], name, this) + } + } + + getSetCookie(): string[] { + // Implement the getSetCookie method here + return [] + } +} + const createTestNodePlugin = (props: Partial = {}) => createConfiguredNodePlugin( { @@ -306,6 +351,36 @@ describe('error handling', () => { `) }) + it('delays retrying 429 errors', async () => { + jest.useRealTimers() + const headers = new TestHeaders() + const resetTime = Date.now() + 350 + headers.set('x-ratelimit-reset', resetTime.toString()) + makeReqSpy + .mockReturnValueOnce( + createError({ + status: 429, + statusText: 'Too Many Requests', + ...headers, + }) + ) + .mockReturnValue(createSuccess()) + + const { plugin: segmentPlugin } = createTestNodePlugin({ + maxRetries: 3, + flushAt: 1, + }) + + const context = new Context(eventFactory.alias('to', 'from')) + const pendingContext = segmentPlugin.alias(context) + validateMakeReqInputs(context) + expect(await pendingContext).toBe(context) + expect(makeReqSpy).toHaveBeenCalledTimes(2) + // Check that we've waited until roughly the reset time. + expect(Date.now()).toBeLessThanOrEqual(resetTime + 20) + expect(Date.now()).toBeGreaterThanOrEqual(resetTime - 20) + }) + it.each([ { status: 500, statusText: 'Internal Server Error' }, { status: 300, statusText: 'Multiple Choices' }, diff --git a/packages/node/src/plugins/segmentio/publisher.ts b/packages/node/src/plugins/segmentio/publisher.ts index 5c63f9ee8..49c0c509e 100644 --- a/packages/node/src/plugins/segmentio/publisher.ts +++ b/packages/node/src/plugins/segmentio/publisher.ts @@ -215,6 +215,7 @@ export class Publisher { while (currentAttempt < maxAttempts) { currentAttempt++ + let requestedRetryTimeout: number | undefined let failureReason: unknown try { if (this._disable) { @@ -279,6 +280,20 @@ export class Publisher { new Error(`[${response.status}] ${response.statusText}`) ) return + } else if (response.status === 429) { + // Rate limited, wait for the reset time + if (response.headers && 'x-ratelimit-reset' in response.headers) { + const rateLimitResetTimestamp = parseInt( + response.headers['x-ratelimit-reset'], + 10 + ) + if (isFinite(rateLimitResetTimestamp)) { + requestedRetryTimeout = rateLimitResetTimestamp - Date.now() + } + } + failureReason = new Error( + `[${response.status}] ${response.statusText}` + ) } else { // Treat other errors as transient and retry. failureReason = new Error( @@ -298,11 +313,13 @@ export class Publisher { // Retry after attempt-based backoff. await sleep( - backoff({ - attempt: currentAttempt, - minTimeout: 25, - maxTimeout: 1000, - }) + requestedRetryTimeout + ? requestedRetryTimeout + : backoff({ + attempt: currentAttempt, + minTimeout: 25, + maxTimeout: 1000, + }) ) } }