diff --git a/packages/core/src/__tests__/destination-kit.test.ts b/packages/core/src/__tests__/destination-kit.test.ts index 84914be375..3e153e3d8a 100644 --- a/packages/core/src/__tests__/destination-kit.test.ts +++ b/packages/core/src/__tests__/destination-kit.test.ts @@ -10,7 +10,9 @@ import { TransactionContext, AuthenticationScheme, RefreshAccessTokenResult, - AudienceDestinationDefinition + AudienceDestinationDefinition, + OAuth2Authentication, + OAuthManagedAuthentication } from '../destination-kit' import { JSONObject } from '../json-object' import { SegmentEvent } from '../segment-event' @@ -374,7 +376,7 @@ const multiStatusCompatibleDestination: DestinationDefinition = { message: 'success' } }, - performBatch: (_request, { payload }) => { + performBatch: (_request, { payload, auth }) => { const response = new MultiStatusResponse() payload.forEach((event) => { // Emulate an API error @@ -392,6 +394,21 @@ const multiStatusCompatibleDestination: DestinationDefinition = { return } + // Emulate Auth error + if (auth?.accessToken === 'OldToken') { + response.pushErrorResponse({ + status: 401, + errortype: ErrorCodes.INVALID_AUTHENTICATION, + errormessage: 'Invalid Auth', + sent: event, + body: { + events_processed: 0, + message: 'Invalid Auth' + } + }) + return + } + if (event?.email) { response.pushSuccessResponse({ body: {}, @@ -1919,5 +1936,323 @@ describe('destination kit', () => { ] `) }) + test('should refresh access token and retry events in case multistatus response contains 401 for oauth2 destinations', async () => { + const mockRefreshToken = jest.fn().mockReturnValue({ + accessToken: 'new-access-token' + }) + const mockOnTokenRefresh = jest.fn().mockReturnValue(Promise.resolve()) + const destinationWithOAuth = { + ...multiStatusCompatibleDestination, + authentication: { + scheme: 'oauth2', + fields: {}, + refreshAccessToken: mockRefreshToken + } as OAuth2Authentication + } + const multiStatusDestination = new Destination(destinationWithOAuth) + + const receivedAt = '2024-08-03T17:40:04.055Z' + + const events: SegmentEvent[] = [ + { + event: 'Add to Cart', + type: 'track', + properties: { + email: 'user.one@example.com' + }, + receivedAt + }, + { + // Missing required fields + event: 'Add to Cart', + type: 'track', + properties: {}, + receivedAt + } + ] + + const settings = { + apiSecret: 'test_key', + oauth: { + access_token: 'OldToken' + }, + subscription: { + subscribe: 'type = "track" and event != "Order Completed"', + partnerAction: 'trackEvent', + mapping: { + name: { '@path': '$.event' }, + email: { '@path': '$.properties.email' }, + phone: { '@path': '$.properties.phone' } + } + } + } + + multiStatusDestination.refreshAccessToken = mockRefreshToken + + const response = await multiStatusDestination.onBatch(events, settings, { + onTokenRefresh: mockOnTokenRefresh + }) + // assert that the refresh token was called once + expect(mockRefreshToken).toHaveBeenCalledTimes(1) + expect(mockOnTokenRefresh).toHaveBeenCalledWith(expect.objectContaining({ accessToken: 'new-access-token' })) + expect(response).toMatchInlineSnapshot(` + Array [ + Object { + "multistatus": Array [ + Object { + "body": Object {}, + "sent": Object {}, + "status": 200, + }, + Object { + "errormessage": "Email is required", + "errorreporter": "INTEGRATIONS", + "errortype": "PAYLOAD_VALIDATION_FAILED", + "status": 400, + }, + ], + }, + ] + `) + }) + test('should refresh access token and retry events in case multistatus response contains 401 for oauth-managed destinations', async () => { + const mockRefreshToken = jest.fn().mockReturnValue({ + accessToken: 'new-access-token' + }) + const mockOnTokenRefresh = jest.fn().mockReturnValue(Promise.resolve()) + const destinationWithOAuth = { + ...multiStatusCompatibleDestination, + authentication: { + scheme: 'oauth-managed', + fields: {}, + refreshAccessToken: mockRefreshToken + } as OAuthManagedAuthentication + } + const multiStatusDestination = new Destination(destinationWithOAuth) + + const receivedAt = '2024-08-03T17:40:04.055Z' + + const events: SegmentEvent[] = [ + { + event: 'Add to Cart', + type: 'track', + properties: { + email: 'user.one@example.com' + }, + receivedAt + }, + { + // Missing required fields + event: 'Add to Cart', + type: 'track', + properties: {}, + receivedAt + } + ] + + const settings = { + apiSecret: 'test_key', + oauth: { + access_token: 'OldToken' + }, + subscription: { + subscribe: 'type = "track" and event != "Order Completed"', + partnerAction: 'trackEvent', + mapping: { + name: { '@path': '$.event' }, + email: { '@path': '$.properties.email' }, + phone: { '@path': '$.properties.phone' } + } + } + } + + multiStatusDestination.refreshAccessToken = mockRefreshToken + + const response = await multiStatusDestination.onBatch(events, settings, { + onTokenRefresh: mockOnTokenRefresh + }) + // assert that the refresh token was called once + expect(mockRefreshToken).toHaveBeenCalledTimes(1) + // assert that the onTokenRefresh was called once + expect(mockOnTokenRefresh).toHaveBeenCalledWith(expect.objectContaining({ accessToken: 'new-access-token' })) + expect(response).toMatchInlineSnapshot(` + Array [ + Object { + "multistatus": Array [ + Object { + "body": Object {}, + "sent": Object {}, + "status": 200, + }, + Object { + "errormessage": "Email is required", + "errorreporter": "INTEGRATIONS", + "errortype": "PAYLOAD_VALIDATION_FAILED", + "status": 400, + }, + ], + }, + ] + `) + }) + test('should not retry events in case multistatus response doesnot contain 401 errors for oauth destinations', async () => { + const mockRefreshToken = jest.fn().mockReturnValue({ + accessToken: 'new-access-token' + }) + const destinationWithOAuth = { + ...multiStatusCompatibleDestination, + authentication: { + scheme: 'oauth-managed', + fields: {}, + refreshAccessToken: mockRefreshToken + } as OAuthManagedAuthentication + } + const multiStatusDestination = new Destination(destinationWithOAuth) + + const receivedAt = '2024-08-03T17:40:04.055Z' + + const events: SegmentEvent[] = [ + { + event: 'Add to Cart', + type: 'track', + properties: { + email: 'user.one@example.com' + }, + receivedAt + }, + { + // Missing required fields + event: 'Add to Cart', + type: 'track', + properties: {}, + receivedAt + } + ] + + const settings = { + apiSecret: 'test_key', + oauth: {}, + subscription: { + subscribe: 'type = "track" and event != "Order Completed"', + partnerAction: 'trackEvent', + mapping: { + name: { '@path': '$.event' }, + email: { '@path': '$.properties.email' }, + phone: { '@path': '$.properties.phone' } + } + } + } + + const response = await multiStatusDestination.onBatch(events, settings, {}) + expect(mockRefreshToken).not.toHaveBeenCalled() + expect(response).toMatchInlineSnapshot(` + Array [ + Object { + "multistatus": Array [ + Object { + "body": Object {}, + "sent": Object {}, + "status": 200, + }, + Object { + "errormessage": "Email is required", + "errorreporter": "INTEGRATIONS", + "errortype": "PAYLOAD_VALIDATION_FAILED", + "status": 400, + }, + ], + }, + ] + `) + }) + test('should not retry events more than max retry attempts for 401 errors', async () => { + const mockRefreshToken = jest.fn().mockReturnValue({ + accessToken: 'OldToken' + }) + const destinationWithOAuth = { + ...multiStatusCompatibleDestination, + authentication: { + scheme: 'oauth-managed', + fields: {}, + refreshAccessToken: mockRefreshToken + } as OAuthManagedAuthentication + } + const multiStatusDestination = new Destination(destinationWithOAuth) + + const receivedAt = '2024-08-03T17:40:04.055Z' + + const events: SegmentEvent[] = [ + { + event: 'Add to Cart', + type: 'track', + properties: { + email: 'user.one@example.com' + }, + receivedAt + }, + { + // Missing required fields + event: 'Add to Cart', + type: 'track', + properties: {}, + receivedAt + } + ] + + const settings = { + apiSecret: 'test_key', + oauth: { + access_token: 'OldToken' + }, + subscription: { + subscribe: 'type = "track" and event != "Order Completed"', + partnerAction: 'trackEvent', + mapping: { + name: { '@path': '$.event' }, + email: { '@path': '$.properties.email' }, + phone: { '@path': '$.properties.phone' } + } + } + } + + const response = await multiStatusDestination.onBatch(events, settings, {}) + // Default retry attempts is 2 + expect(mockRefreshToken).toHaveBeenCalledTimes(2) + expect(response).toMatchInlineSnapshot(` + Array [ + Object { + "multistatus": Array [ + Object { + "body": Object { + "events_processed": 0, + "message": "Invalid Auth", + }, + "errormessage": "Invalid Auth", + "errorreporter": "DESTINATION", + "errortype": "INVALID_AUTHENTICATION", + "sent": Object { + "email": "user.one@example.com", + "name": "Add to Cart", + }, + "status": 401, + }, + Object { + "body": Object { + "events_processed": 0, + "message": "Invalid Auth", + }, + "errormessage": "Invalid Auth", + "errorreporter": "DESTINATION", + "errortype": "INVALID_AUTHENTICATION", + "sent": Object { + "name": "Add to Cart", + }, + "status": 401, + }, + ], + }, + ] + `) + }) }) }) diff --git a/packages/core/src/destination-kit/index.ts b/packages/core/src/destination-kit/index.ts index 350117829a..da7c211505 100644 --- a/packages/core/src/destination-kit/index.ts +++ b/packages/core/src/destination-kit/index.ts @@ -483,8 +483,8 @@ export class Destination { return await audienceConfig?.createAudience(requestClient, createAudienceInput) } - const onFailedAttempt = async (error: ResponseError & HTTPError) => { - settings = await this.handleAuthError(error, settings) + const onFailedAttempt = async (error: ResponseError | HTTPError) => { + settings = await this.handleError(error, settings) } return await retry(run, { retries: 2, onFailedAttempt }) } @@ -509,8 +509,8 @@ export class Destination { return await audienceConfig?.getAudience(requestClient, getAudienceInput) } - const onFailedAttempt = async (error: ResponseError & HTTPError) => { - settings = await this.handleAuthError(error, settings) + const onFailedAttempt = async (error: ResponseError | HTTPError) => { + settings = await this.handleError(error, settings) } return await retry(run, { retries: 2, onFailedAttempt }) @@ -901,8 +901,8 @@ export class Destination { return result } - const onFailedAttempt = async (error: ResponseError & HTTPError) => { - settings = await this.handleAuthError(error, settings, options) + const onFailedAttempt = async (error: ResponseError | HTTPError) => { + settings = await this.handleError(error, settings, options) } return await retry(run, { retries: 2, onFailedAttempt }) @@ -928,12 +928,39 @@ export class Destination { return ([] as Result[]).concat(...results) } + const MAX_ATTEMPTS = 2 + // eslint-disable-next-line @typescript-eslint/no-explicit-any const onFailedAttempt = async (error: any) => { - settings = await this.handleAuthError(error, settings, options) + settings = await this.handleError(error, settings, options) } - return await retry(run, { retries: 2, onFailedAttempt }) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const shouldRetry = async (response: any, attemptCount: number) => { + const results = response as Result[] + /* + Here, we iterate over results array. Each result in the array is a response from a single subscription. + However, we always execute one subscription at a time despite receiving an array of subscriptions as input. + So, results array will always have a single result. + TODO: Get rid of onSubscriptions method to reflect execution model in the code accurately. + */ + for (const result of results) { + /* + If the multistatus response contains a 401 status code, we should retry the request + if we haven't already retried the request the maximum number of times. + So, we throw an InvalidAuthenticationError to retry the request. + */ + const has401Errors = result?.multistatus?.some((event) => event.status === 401) + const isOAuthDestination = + this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed' + if (attemptCount <= MAX_ATTEMPTS && has401Errors && isOAuthDestination) { + await this.handleAuthError(settings, options) + return false + } + } + return true + } + return await retry(run, { retries: MAX_ATTEMPTS, onFailedAttempt, shouldRetry }) } private getSubscriptions(settings: JSONObject): Subscription[] { @@ -961,19 +988,33 @@ export class Destination { /** * Handles the failed attempt by checking if reauthentication is needed and updating the token if necessary. - * @param {ResponseError & HTTPError} error - The error object from the failed attempt. + * @param {ResponseError | HTTPError} error - The error object from the failed attempt. * @param {JSONObject} settings - The current settings object. * @returns {Promise} - The updated settings object. - * @throws {ResponseError & HTTPError} - If reauthentication is not needed or token refresh fails. + * @throws {ResponseError | HTTPError} - If reauthentication is not needed or token refresh fails. */ - async handleAuthError(error: ResponseError & HTTPError, settings: JSONObject, options?: OnEventOptions) { - const statusCode = error?.status ?? error?.response?.status ?? 500 + async handleError( + error: ResponseError | HTTPError, + settings: JSONObject, + options?: OnEventOptions + ): Promise { + const statusCode = (error as ResponseError).status ?? (error as HTTPError)?.response?.status ?? 500 const needsReauthentication = statusCode === 401 && (this.authentication?.scheme === 'oauth2' || this.authentication?.scheme === 'oauth-managed') if (!needsReauthentication) { throw error } + return this.handleAuthError(settings, options) + } + + /** + * Handles the authentication error by refreshing the token and updating the settings. + * @param {JSONObject} settings - The current settings object. + * @returns {Promise} - The updated settings object. + * @returns + */ + async handleAuthError(settings: JSONObject, options?: OnEventOptions) { const newTokens = await this.refreshTokenAndGetNewToken(settings, options) // Update new access-token in cache and in settings. await options?.onTokenRefresh?.(newTokens) diff --git a/packages/core/src/retry.ts b/packages/core/src/retry.ts index 101accfbd8..23123a907b 100644 --- a/packages/core/src/retry.ts +++ b/packages/core/src/retry.ts @@ -2,6 +2,8 @@ interface RetryOptions { retries?: number // eslint-disable-next-line @typescript-eslint/no-explicit-any onFailedAttempt?: (error: any, attemptCount: number) => PromiseLike | void + // eslint-disable-next-line @typescript-eslint/no-explicit-any + shouldRetry?: (response: any, attemptCount: number) => PromiseLike | boolean } const DEFAULT_RETRY_ATTEMPTS = 2 @@ -14,7 +16,14 @@ export async function retry( for (let attemptCount = 1; attemptCount <= retries; attemptCount++) { try { - return await input(attemptCount) + const response = await input(attemptCount) + if (options?.shouldRetry) { + const success = await options.shouldRetry(response, attemptCount) + if (!success && attemptCount < retries) { + continue + } + } + return response } catch (error) { if (options?.onFailedAttempt) { await options.onFailedAttempt(error, attemptCount)