diff --git a/db/migrations/sdp-migrations/2024-06-27.0-alter-circle-transfers-add-reconciliation-tracking.sql b/db/migrations/sdp-migrations/2024-06-27.0-alter-circle-transfers-add-reconciliation-tracking.sql index 7296a04b1..8216ab068 100644 --- a/db/migrations/sdp-migrations/2024-06-27.0-alter-circle-transfers-add-reconciliation-tracking.sql +++ b/db/migrations/sdp-migrations/2024-06-27.0-alter-circle-transfers-add-reconciliation-tracking.sql @@ -1,6 +1,6 @@ -- +migrate Up ALTER TABLE circle_transfer_requests - ADD COLUMN sync_attempts INT DEFAULT 0, + ADD COLUMN sync_attempts INT NOT NULL DEFAULT 0, ADD COLUMN last_sync_attempt_at TIMESTAMPTZ; -- +migrate Down diff --git a/internal/circle/client_test.go b/internal/circle/client_test.go index 01195262e..f21f682a6 100644 --- a/internal/circle/client_test.go +++ b/internal/circle/client_test.go @@ -117,7 +117,7 @@ func Test_Client_PostTransfer(t *testing.T) { Once() transfer, err := cc.PostTransfer(ctx, validTransferReq) - assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[]") + assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[], StatusCode=401") assert.Nil(t, transfer) }) @@ -173,7 +173,7 @@ func Test_Client_GetTransferByID(t *testing.T) { Once() transfer, err := cc.GetTransferByID(ctx, "test-id") - assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[]") + assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[], StatusCode=401") assert.Nil(t, transfer) }) @@ -239,7 +239,7 @@ func Test_Client_GetWalletByID(t *testing.T) { Once() transfer, err := cc.GetWalletByID(ctx, "test-id") - assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[]") + assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[], StatusCode=401") assert.Nil(t, transfer) }) diff --git a/internal/circle/errors.go b/internal/circle/errors.go index 9dcf4ffa3..7cd0e8af9 100644 --- a/internal/circle/errors.go +++ b/internal/circle/errors.go @@ -9,9 +9,12 @@ import ( // APIError represents the error response from Circle APIs. type APIError struct { + // Code is the Circle API error code. Code int `json:"code"` Message string `json:"message"` Errors []APIErrorDetail `json:"errors,omitempty"` + // StatusCode is the HTTP status code. + StatusCode int `json:"status_code,omitempty"` } // APIErrorDetail represents the detailed error information. @@ -25,7 +28,7 @@ type APIErrorDetail struct { // Error implements the error interface for APIError. func (e APIError) Error() string { - return fmt.Sprintf("APIError: Code=%d, Message=%s, Errors=%v", e.Code, e.Message, e.Errors) + return fmt.Sprintf("APIError: Code=%d, Message=%s, Errors=%v, StatusCode=%d", e.Code, e.Message, e.Errors, e.StatusCode) } // parseAPIError parses the error response from Circle APIs. @@ -41,6 +44,7 @@ func parseAPIError(resp *http.Response) (*APIError, error) { if err = json.Unmarshal(body, &apiErr); err != nil { return nil, fmt.Errorf("unmarshalling error response body: %w", err) } + apiErr.StatusCode = resp.StatusCode return &apiErr, nil } diff --git a/internal/circle/transfer.go b/internal/circle/transfer.go index 127005569..09adef2f8 100644 --- a/internal/circle/transfer.go +++ b/internal/circle/transfer.go @@ -132,7 +132,7 @@ func (tr TransferRequest) validate() error { func parseTransferResponse(resp *http.Response) (*Transfer, error) { var transferResponse TransferResponse if err := json.NewDecoder(resp.Body).Decode(&transferResponse); err != nil { - return nil, err + return nil, fmt.Errorf("decoding transfer response: %w", err) } return &transferResponse.Data, nil diff --git a/internal/data/circle_transfer_requests.go b/internal/data/circle_transfer_requests.go index 09db3b82c..6a8ea308e 100644 --- a/internal/data/circle_transfer_requests.go +++ b/internal/data/circle_transfer_requests.go @@ -126,6 +126,8 @@ const ( batchSize = 10 ) +// GetPendingReconciliation returns the pending Circle transfer requests that are in `pending` status and have not +// reached the maximum sync attempts. func (m CircleTransferRequestModel) GetPendingReconciliation(ctx context.Context, sqlExec db.SQLExecuter) ([]*CircleTransferRequest, error) { queryParams := QueryParams{ Filters: map[FilterKey]interface{}{ diff --git a/internal/data/fixtures.go b/internal/data/fixtures.go index caabde00e..c0353bf80 100644 --- a/internal/data/fixtures.go +++ b/internal/data/fixtures.go @@ -363,12 +363,6 @@ func CreateReceiverVerificationFixture(t *testing.T, ctx context.Context, sqlExe return &verification } -func DeleteAllCircleTransferRequestsFixtures(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter) { - const query = "DELETE FROM circle_transfer_requests" - _, err := sqlExec.ExecContext(ctx, query) - require.NoError(t, err) -} - func CreateCircleTransferRequestFixture(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter, insert CircleTransferRequest) *CircleTransferRequest { const query = ` INSERT INTO circle_transfer_requests diff --git a/internal/services/circle_reconciliation_service.go b/internal/services/circle_reconciliation_service.go index e130127d4..11bb9a098 100644 --- a/internal/services/circle_reconciliation_service.go +++ b/internal/services/circle_reconciliation_service.go @@ -3,7 +3,9 @@ package services import ( "context" "encoding/json" + "errors" "fmt" + "net/http" "time" "github.com/stellar/go/support/log" @@ -12,6 +14,7 @@ import ( "github.com/stellar/stellar-disbursement-platform-backend/internal/circle" "github.com/stellar/stellar-disbursement-platform-backend/internal/data" "github.com/stellar/stellar-disbursement-platform-backend/internal/transactionsubmission/engine/signing" + "github.com/stellar/stellar-disbursement-platform-backend/internal/utils" "github.com/stellar/stellar-disbursement-platform-backend/pkg/schema" "github.com/stellar/stellar-disbursement-platform-backend/stellar-multitenant/pkg/tenant" ) @@ -27,6 +30,10 @@ type CircleReconciliationService struct { DistAccountResolver signing.DistributionAccountResolver } +// Reconcile reconciles the pending Circle transfer requests for the tenant in the context. It fetches the rows from +// circte_transfer_request where status is set to pending, and then fetches the transfer details from Circle API. It +// updates the status of the transfer request in the DB based on the status of the transfer in Circle. If the transfer +// reached a successful/failure status, it updates the payment status in the DB as well to reflect that. func (s *CircleReconciliationService) Reconcile(ctx context.Context) error { // Step 1: Get the tenant from the context. tnt, outerErr := tenant.GetTenantFromContext(ctx) @@ -48,6 +55,8 @@ func (s *CircleReconciliationService) Reconcile(ctx context.Context) error { return nil } + var reconciliationErrors []error + var reconciliationCount int outerErr = db.RunInTransaction(ctx, s.Models.DBConnectionPool, nil, func(dbTx db.DBTransaction) error { // Step 3: Get pending Circle transfer requests. circleRequests, err := s.Models.CircleTransferRequests.GetPendingReconciliation(ctx, dbTx) @@ -61,68 +70,105 @@ func (s *CircleReconciliationService) Reconcile(ctx context.Context) error { } // Step 4: Reconcile the pending Circle transfer requests. + reconciliationCount = len(circleRequests) for _, circleRequest := range circleRequests { - // 4.1. get the Circle transfer by ID - transfer, err := s.CircleService.GetTransferByID(ctx, *circleRequest.CircleTransferID) + err = s.reconcileTransferRequest(ctx, dbTx, tnt, circleRequest) if err != nil { - return fmt.Errorf("getting Circle transfer by ID %q: %w", *circleRequest.CircleTransferID, err) - } - jsonBody, err := json.Marshal(transfer) - if err != nil { - return fmt.Errorf("converting transfer body to json: %w", err) + err = fmt.Errorf("reconciling Circle transfer request: %w", err) + reconciliationErrors = append(reconciliationErrors, err) } + } - // 4.2. update the circle transfer request entry in the DB. - newStatus := data.CircleTransferStatus(transfer.Status) - if *circleRequest.Status == newStatus { - log.Ctx(ctx).Debugf("[tenant=%s] Circle transfer request %q is already in status %q, skipping reconciliation...", tnt.Name, circleRequest.IdempotencyKey, newStatus) - continue - } + return nil + }) + if outerErr != nil { + return fmt.Errorf("running Circle reconciliation for tenant %q: %w", tnt.Name, outerErr) + } - now := time.Now() - var completedAt *time.Time - if newStatus.IsCompleted() { - completedAt = &now - } - circleRequest, err = s.Models.CircleTransferRequests.Update(ctx, dbTx, circleRequest.IdempotencyKey, data.CircleTransferRequestUpdate{ - Status: newStatus, - CompletedAt: completedAt, - LastSyncAttemptAt: &now, - SyncAttempts: circleRequest.SyncAttempts + 1, - ResponseBody: jsonBody, - }) - if err != nil { - return fmt.Errorf("updating Circle transfer request: %w", err) - } + if len(reconciliationErrors) > 0 { + return fmt.Errorf("attempted to reconcyle %d circle requests but failed on %d reconciliations: %v", reconciliationCount, len(reconciliationErrors), reconciliationErrors) + } - // 4.3. update the payment status in the DB. - newPaymentStatus, err := transfer.Status.ToPaymentStatus() - if err != nil { - return fmt.Errorf("converting Circle transfer status to Payment status: %w", err) - } - var statusMsg string - switch newStatus { - case data.CircleTransferStatusSuccess: - statusMsg = fmt.Sprintf("Circle transfer completed successfully with the Stellar transaction hash: %q", transfer.TransactionHash) - case data.CircleTransferStatusFailed: - statusMsg = fmt.Sprintf("Circle transfer failed with error: %q", transfer.ErrorCode) - default: - return fmt.Errorf("unexpected Circle transfer status: %q", newStatus) - } + return nil +} - err = s.Models.Payment.UpdateStatus(ctx, dbTx, circleRequest.PaymentID, newPaymentStatus, &statusMsg, transfer.TransactionHash) - if err != nil { - return fmt.Errorf("updating payment status: %w", err) +// reconcileTransferRequest reconciles a Circle transfer request and updates the payment status in the DB. It returns an +// error if the reconciliation fails. +func (s *CircleReconciliationService) reconcileTransferRequest(ctx context.Context, dbTx db.DBTransaction, tnt *tenant.Tenant, circleRequest *data.CircleTransferRequest) error { + // 4.1. get the Circle transfer by ID + transfer, err := s.CircleService.GetTransferByID(ctx, *circleRequest.CircleTransferID) + if err != nil { + var cAPIErr *circle.APIError + if errors.As(err, &cAPIErr) && cAPIErr.StatusCode == http.StatusBadRequest { + // if the the Circle API returns a 400, increment the sync attempts and update the last sync + errJSONBody, marshalErr := json.Marshal(cAPIErr) + if marshalErr != nil { + log.Ctx(ctx).Errorf("marshalling Circle APIError: %v", marshalErr) } - log.Ctx(ctx).Infof("[tenant=%s] Reconciled Circle transfer request %q with status %q", tnt.Name, *circleRequest.CircleTransferID, newStatus) + // increment the sync attempts and update the last sync attempt time. + var updateErr error + circleRequest, updateErr = s.Models.CircleTransferRequests.Update(ctx, dbTx, circleRequest.IdempotencyKey, data.CircleTransferRequestUpdate{ + LastSyncAttemptAt: utils.TimePtr(time.Now()), + SyncAttempts: circleRequest.SyncAttempts + 1, + ResponseBody: errJSONBody, + }) + if updateErr != nil { + return fmt.Errorf("updating Circle transfer request sync attempts: %w", updateErr) + } } + return fmt.Errorf("getting Circle transfer by ID %q: %w", *circleRequest.CircleTransferID, err) + } + jsonBody, err := json.Marshal(transfer) + if err != nil { + return fmt.Errorf("converting transfer body to json: %w", err) + } + // 4.2. update the circle transfer request entry in the DB. + newStatus := data.CircleTransferStatus(transfer.Status) + if *circleRequest.Status == newStatus { + // this condition should be unrechable, but we're adding this log just in case... + log.Ctx(ctx).Debugf("[tenant=%s] Circle transfer request %q is already in status %q, skipping reconciliation...", tnt.Name, circleRequest.IdempotencyKey, newStatus) return nil + } + + now := time.Now() + var completedAt *time.Time + if newStatus.IsCompleted() { + completedAt = &now + } + circleRequest, err = s.Models.CircleTransferRequests.Update(ctx, dbTx, circleRequest.IdempotencyKey, data.CircleTransferRequestUpdate{ + Status: newStatus, + CompletedAt: completedAt, + LastSyncAttemptAt: &now, + SyncAttempts: circleRequest.SyncAttempts + 1, + ResponseBody: jsonBody, }) - if outerErr != nil { - return fmt.Errorf("running Circle reconciliation for tenant %q: %w", tnt.Name, outerErr) + if err != nil { + return fmt.Errorf("updating Circle transfer request: %w", err) + } + + // 4.3. update the payment status in the DB. + newPaymentStatus, err := transfer.Status.ToPaymentStatus() + if err != nil { + return fmt.Errorf("converting Circle transfer status to Payment status: %w", err) } + var statusMsg string + switch newStatus { + case data.CircleTransferStatusSuccess: + statusMsg = fmt.Sprintf("Circle transfer completed successfully with the Stellar transaction hash: %q", transfer.TransactionHash) + case data.CircleTransferStatusFailed: + statusMsg = fmt.Sprintf("Circle transfer failed with error: %q", transfer.ErrorCode) + default: + return fmt.Errorf("unexpected Circle transfer status: %q", newStatus) + } + + err = s.Models.Payment.UpdateStatus(ctx, dbTx, circleRequest.PaymentID, newPaymentStatus, &statusMsg, transfer.TransactionHash) + if err != nil { + return fmt.Errorf("updating payment status: %w", err) + } + + log.Ctx(ctx).Infof("[tenant=%s] Reconciled Circle transfer request %q with status %q", tnt.Name, *circleRequest.CircleTransferID, newStatus) return nil } diff --git a/internal/services/circle_reconciliation_service_test.go b/internal/services/circle_reconciliation_service_test.go index af51c30f9..c89549b67 100644 --- a/internal/services/circle_reconciliation_service_test.go +++ b/internal/services/circle_reconciliation_service_test.go @@ -2,6 +2,8 @@ package services import ( "context" + "errors" + "net/http" "testing" "github.com/sirupsen/logrus" @@ -149,7 +151,7 @@ func Test_NewCircleReconciliationService_Reconcile_failure(t *testing.T) { } } -func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { +func Test_NewCircleReconciliationService_Reconcile_partialSuccess(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) @@ -173,14 +175,6 @@ func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { Status: schema.AccountStatusActive, } - // database cleanup - defer data.DeleteAllDisbursementFixtures(t, ctx, dbConnectionPool) - defer data.DeleteAllReceiversFixtures(t, ctx, dbConnectionPool) - defer data.DeleteAllReceiverVerificationFixtures(t, ctx, dbConnectionPool) - defer data.DeleteAllReceiverWalletsFixtures(t, ctx, dbConnectionPool) - defer data.DeleteAllPaymentsFixtures(t, ctx, dbConnectionPool) - defer data.DeleteAllCircleTransferRequestsFixtures(t, ctx, dbConnectionPool) - disbursement := data.CreateDisbursementFixture(t, ctx, dbConnectionPool, models.Disbursements, &data.Disbursement{ Name: "disbursement", Status: data.StartedDisbursementStatus, @@ -192,46 +186,60 @@ func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { receiverWallet := data.CreateReceiverWalletFixture(t, ctx, dbConnectionPool, receiver.ID, wallet.ID, data.RegisteredReceiversWalletStatus) // Create payments with Circle transfer requests - p1StaysPending := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ + circlePendingStatus := data.CircleTransferStatusPending + + p1WillThrowAnError := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ ReceiverWallet: receiverWallet, Disbursement: disbursement, Asset: *asset, Amount: "100", Status: data.PendingPaymentStatus, }) - circlePendingStatus := data.CircleTransferStatusPending - circleReq1StaysPending := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ - PaymentID: p1StaysPending.ID, + circleReq1WillThrowAnError := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ + PaymentID: p1WillThrowAnError.ID, Status: &circlePendingStatus, CircleTransferID: utils.StringPtr("circle-transfer-id-1"), }) - p2WillSucceed := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ + p2StaysPending := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ ReceiverWallet: receiverWallet, Disbursement: disbursement, Asset: *asset, Amount: "100", Status: data.PendingPaymentStatus, }) - circleReq2WillSucceed := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ - PaymentID: p2WillSucceed.ID, + circleReq2StaysPending := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ + PaymentID: p2StaysPending.ID, Status: &circlePendingStatus, CircleTransferID: utils.StringPtr("circle-transfer-id-2"), }) - p3WillFail := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ + p3WillSucceed := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ ReceiverWallet: receiverWallet, Disbursement: disbursement, Asset: *asset, Amount: "100", Status: data.PendingPaymentStatus, }) - circleReq3WillFail := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ - PaymentID: p3WillFail.ID, + circleReq3WillSucceed := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ + PaymentID: p3WillSucceed.ID, Status: &circlePendingStatus, CircleTransferID: utils.StringPtr("circle-transfer-id-3"), }) + p4WillFail := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ + ReceiverWallet: receiverWallet, + Disbursement: disbursement, + Asset: *asset, + Amount: "100", + Status: data.PendingPaymentStatus, + }) + circleReq4WillFail := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ + PaymentID: p4WillFail.ID, + Status: &circlePendingStatus, + CircleTransferID: utils.StringPtr("circle-transfer-id-4"), + }) + // prepare mocks mDistAccountResolver := sigMocks.NewMockDistributionAccountResolver(t) mDistAccountResolver. @@ -240,21 +248,24 @@ func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { Once() mCircleService := circle.NewMockService(t) mCircleService. - On("GetTransferByID", mock.Anything, *circleReq1StaysPending.CircleTransferID). + On("GetTransferByID", mock.Anything, *circleReq1WillThrowAnError.CircleTransferID). + Return(nil, errors.New("something went wrong")). + Once(). + On("GetTransferByID", mock.Anything, *circleReq2StaysPending.CircleTransferID). Return(&circle.Transfer{ - ID: *circleReq1StaysPending.CircleTransferID, + ID: *circleReq2StaysPending.CircleTransferID, Status: circle.TransferStatusPending, }, nil). Once(). - On("GetTransferByID", mock.Anything, *circleReq2WillSucceed.CircleTransferID). + On("GetTransferByID", mock.Anything, *circleReq3WillSucceed.CircleTransferID). Return(&circle.Transfer{ - ID: *circleReq2WillSucceed.CircleTransferID, + ID: *circleReq3WillSucceed.CircleTransferID, Status: circle.TransferStatusComplete, }, nil). Once(). - On("GetTransferByID", mock.Anything, *circleReq3WillFail.CircleTransferID). + On("GetTransferByID", mock.Anything, *circleReq4WillFail.CircleTransferID). Return(&circle.Transfer{ - ID: *circleReq3WillFail.CircleTransferID, + ID: *circleReq4WillFail.CircleTransferID, Status: circle.TransferStatusFailed, }, nil). Once() @@ -267,7 +278,8 @@ func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { DistAccountResolver: mDistAccountResolver, } err = svc.Reconcile(ctx) - assert.NoError(t, err) + assert.Error(t, err) + assert.EqualError(t, err, "attempted to reconcyle 4 circle requests but failed on 1 reconciliations: [reconciling Circle transfer request: getting Circle transfer by ID \"circle-transfer-id-1\": something went wrong]") // assert logs entries := getEntries() @@ -275,11 +287,11 @@ func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { for _, entry := range entries { messages = append(messages, entry.Message) } - assert.Contains(t, messages, `[tenant=test-tenant] Reconciled Circle transfer request "circle-transfer-id-2" with status "complete"`) - assert.Contains(t, messages, `[tenant=test-tenant] Reconciled Circle transfer request "circle-transfer-id-3" with status "failed"`) + assert.Contains(t, messages, `[tenant=test-tenant] Reconciled Circle transfer request "circle-transfer-id-3" with status "complete"`) + assert.Contains(t, messages, `[tenant=test-tenant] Reconciled Circle transfer request "circle-transfer-id-4" with status "failed"`) // assert results - updatedCircleRequestAndPayment := func(paymentID string) (*data.CircleTransferRequest, *data.Payment) { + getPaymentAndCircleRequestFromDB := func(paymentID string) (*data.CircleTransferRequest, *data.Payment) { updatedCircleRequest, err := models.CircleTransferRequests.Get(ctx, dbConnectionPool, data.QueryParams{Filters: map[data.FilterKey]interface{}{data.FilterKeyPaymentID: paymentID}}) require.NoError(t, err) @@ -288,16 +300,208 @@ func Test_NewCircleReconciliationService_Reconcile_success(t *testing.T) { return updatedCircleRequest, updatedPayment } - // p1StaysPending - updatedCircleReq1, updatedPayment1 := updatedCircleRequestAndPayment(p1StaysPending.ID) + // p1WillThrowAnError + updatedCircleReq1, updatedPayment1 := getPaymentAndCircleRequestFromDB(p1WillThrowAnError.ID) assert.Equal(t, data.CircleTransferStatusPending, *updatedCircleReq1.Status) assert.Equal(t, data.PendingPaymentStatus, updatedPayment1.Status) - // p2WillSucceed - updatedCircleReq2, updatedPayment2 := updatedCircleRequestAndPayment(p2WillSucceed.ID) - assert.Equal(t, data.CircleTransferStatusSuccess, *updatedCircleReq2.Status) - assert.Equal(t, data.SuccessPaymentStatus, updatedPayment2.Status) - // p3WillFail - updatedCircleReq3, updatedPayment3 := updatedCircleRequestAndPayment(p3WillFail.ID) - assert.Equal(t, data.CircleTransferStatusFailed, *updatedCircleReq3.Status) - assert.Equal(t, data.FailedPaymentStatus, updatedPayment3.Status) + // p2StaysPending + updatedCircleReq2, updatedPayment2 := getPaymentAndCircleRequestFromDB(p2StaysPending.ID) + assert.Equal(t, data.CircleTransferStatusPending, *updatedCircleReq2.Status) + assert.Equal(t, data.PendingPaymentStatus, updatedPayment2.Status) + // p3WillSucceed + updatedCircleReq3, updatedPayment3 := getPaymentAndCircleRequestFromDB(p3WillSucceed.ID) + assert.Equal(t, data.CircleTransferStatusSuccess, *updatedCircleReq3.Status) + assert.Equal(t, data.SuccessPaymentStatus, updatedPayment3.Status) + // p4WillFail + updatedCircleReq4, updatedPayment4 := getPaymentAndCircleRequestFromDB(p4WillFail.ID) + assert.Equal(t, data.CircleTransferStatusFailed, *updatedCircleReq4.Status) + assert.Equal(t, data.FailedPaymentStatus, updatedPayment4.Status) +} + +func Test_NewCircleReconciliationService_reconcileTransferRequest(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + tnt := &tenant.Tenant{ID: "95e788b6-c80e-4975-9d12-141001fe6e44", Name: "test-tenant"} + ctx := tenant.SaveTenantInContext(context.Background(), tnt) + + models, err := data.NewModels(dbConnectionPool) + require.NoError(t, err) + + asset := data.CreateAssetFixture(t, ctx, dbConnectionPool, assets.EURCAssetCode, assets.EURCAssetTestnet.Issuer) + country := data.CreateCountryFixture(t, ctx, dbConnectionPool, "FRA", "France") + wallet := data.CreateWalletFixture(t, ctx, dbConnectionPool, "My Wallet", "https://www.wallet.com", "www.wallet.com", "wallet1://") + + disbursement := data.CreateDisbursementFixture(t, ctx, dbConnectionPool, models.Disbursements, &data.Disbursement{ + Name: "disbursement", + Status: data.StartedDisbursementStatus, + Asset: asset, + Wallet: wallet, + Country: country, + }) + receiver := data.CreateReceiverFixture(t, ctx, dbConnectionPool, &data.Receiver{}) + receiverWallet := data.CreateReceiverWalletFixture(t, ctx, dbConnectionPool, receiver.ID, wallet.ID, data.RegisteredReceiversWalletStatus) + + // Create payments with Circle transfer requests + circlePendingStatus := data.CircleTransferStatusPending + + payment := data.CreatePaymentFixture(t, ctx, dbConnectionPool, models.Payment, &data.Payment{ + ReceiverWallet: receiverWallet, + Disbursement: disbursement, + Asset: *asset, + Amount: "100", + Status: data.PendingPaymentStatus, + }) + circleRequest := data.CreateCircleTransferRequestFixture(t, ctx, dbConnectionPool, data.CircleTransferRequest{ + PaymentID: payment.ID, + Status: &circlePendingStatus, + CircleTransferID: utils.StringPtr("circle-transfer-id"), + }) + + testCases := []struct { + name string + setupMocksAndDBFn func(t *testing.T, mCircleService *circle.MockService) + wantErrorContains []string + shouldIncrementSyncAttempts bool + assertLogsFn func(entries []logrus.Entry) + }{ + { + name: "401 should be logged and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, &circle.APIError{StatusCode: http.StatusUnauthorized}). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "APIError", "StatusCode=401"}, + }, + { + name: "403 should be logged and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, &circle.APIError{StatusCode: http.StatusForbidden}). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "APIError", "StatusCode=403"}, + }, + { + name: "404 should be logged and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, &circle.APIError{StatusCode: http.StatusNotFound}). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "APIError", "StatusCode=404"}, + }, + { + name: "429 should be logged and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, &circle.APIError{StatusCode: http.StatusTooManyRequests}). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "APIError", "StatusCode=429"}, + }, + { + name: "5xx should be logged and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, &circle.APIError{StatusCode: http.StatusInternalServerError}). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "APIError", "StatusCode=500"}, + }, + { + name: "non-API error should be logged and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, errors.New("test-error")). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "test-error"}, + }, + { + name: "400 should increment the sync attempts and an error should be returned", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(nil, &circle.APIError{Message: "foo bar", StatusCode: http.StatusBadRequest}). + Once() + }, + wantErrorContains: []string{"getting Circle transfer by ID", "APIError", "StatusCode=400"}, + shouldIncrementSyncAttempts: true, + }, + { + name: "200 should increment the sync attempts and return nil", + setupMocksAndDBFn: func(t *testing.T, mCircleService *circle.MockService) { + mCircleService. + On("GetTransferByID", mock.Anything, "circle-transfer-id"). + Return(&circle.Transfer{ + ID: *circleRequest.CircleTransferID, + Status: circle.TransferStatusComplete, + }, nil). + Once() + }, + shouldIncrementSyncAttempts: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dbTx, err := dbConnectionPool.BeginTxx(ctx, nil) + require.NoError(t, err) + defer func() { + err = dbTx.Rollback() + require.NoError(t, err) + }() + + // prepare mocks + mCircleService := circle.NewMockService(t) + tc.setupMocksAndDBFn(t, mCircleService) + svc := CircleReconciliationService{ + Models: models, + CircleService: mCircleService, + } + err = svc.reconcileTransferRequest(ctx, dbTx, tnt, circleRequest) + + // get the updated CircleRequestTransfer and Payment from the DB + circleReqFromDB, dbErr := models.CircleTransferRequests.Get(ctx, dbTx, data.QueryParams{ + Filters: map[data.FilterKey]interface{}{ + data.FilterKeyPaymentID: circleRequest.PaymentID, + }, + }) + require.NoError(t, dbErr) + paymentFromDB, dbErr := models.Payment.Get(ctx, circleRequest.PaymentID, dbTx) + require.NoError(t, dbErr) + + if len(tc.wantErrorContains) != 0 { + require.Error(t, err) + for _, wantErrorContains := range tc.wantErrorContains { + assert.ErrorContains(t, err, wantErrorContains) + } + } else { + require.NoError(t, err) + assert.Equal(t, data.CircleTransferStatusSuccess, *circleReqFromDB.Status) + assert.Equal(t, data.SuccessPaymentStatus, paymentFromDB.Status) + } + + if tc.shouldIncrementSyncAttempts { + assert.Equal(t, circleRequest.SyncAttempts+1, circleReqFromDB.SyncAttempts) + assert.NotNil(t, circleReqFromDB.LastSyncAttemptAt) + assert.NotNil(t, circleReqFromDB.ResponseBody) + } else { + assert.Equal(t, circleRequest.SyncAttempts, circleReqFromDB.SyncAttempts) + assert.Nil(t, circleReqFromDB.LastSyncAttemptAt) + assert.Nil(t, circleReqFromDB.ResponseBody) + } + }) + } }