Skip to content

Commit

Permalink
[SDP-1255] handle partial failures when reconciling Circle transfers (#…
Browse files Browse the repository at this point in the history
…347)

### What

Handle partial failures when reconciling Circle transfers.

### Why

Address https://stellarorg.atlassian.net/browse/SDP-1255
  • Loading branch information
marcelosalloum authored Jul 11, 2024
1 parent ec9837b commit 1bb04a3
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- +migrate Up
ALTER TABLE circle_transfer_requests
ADD COLUMN sync_attempts INT DEFAULT 0,
ADD COLUMN sync_attempts INT NOT NULL DEFAULT 0,
ADD COLUMN last_sync_attempt_at TIMESTAMPTZ;

-- +migrate Down
Expand Down
6 changes: 3 additions & 3 deletions internal/circle/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func Test_Client_PostTransfer(t *testing.T) {
Once()

transfer, err := cc.PostTransfer(ctx, validTransferReq)
assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[]")
assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[], StatusCode=401")
assert.Nil(t, transfer)
})

Expand Down Expand Up @@ -173,7 +173,7 @@ func Test_Client_GetTransferByID(t *testing.T) {
Once()

transfer, err := cc.GetTransferByID(ctx, "test-id")
assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[]")
assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[], StatusCode=401")
assert.Nil(t, transfer)
})

Expand Down Expand Up @@ -239,7 +239,7 @@ func Test_Client_GetWalletByID(t *testing.T) {
Once()

transfer, err := cc.GetWalletByID(ctx, "test-id")
assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[]")
assert.EqualError(t, err, "API error: APIError: Code=401, Message=Malformed key. Does it contain three parts?, Errors=[], StatusCode=401")
assert.Nil(t, transfer)
})

Expand Down
6 changes: 5 additions & 1 deletion internal/circle/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (

// APIError represents the error response from Circle APIs.
type APIError struct {
// Code is the Circle API error code.
Code int `json:"code"`
Message string `json:"message"`
Errors []APIErrorDetail `json:"errors,omitempty"`
// StatusCode is the HTTP status code.
StatusCode int `json:"status_code,omitempty"`
}

// APIErrorDetail represents the detailed error information.
Expand All @@ -25,7 +28,7 @@ type APIErrorDetail struct {

// Error implements the error interface for APIError.
func (e APIError) Error() string {
return fmt.Sprintf("APIError: Code=%d, Message=%s, Errors=%v", e.Code, e.Message, e.Errors)
return fmt.Sprintf("APIError: Code=%d, Message=%s, Errors=%v, StatusCode=%d", e.Code, e.Message, e.Errors, e.StatusCode)
}

// parseAPIError parses the error response from Circle APIs.
Expand All @@ -41,6 +44,7 @@ func parseAPIError(resp *http.Response) (*APIError, error) {
if err = json.Unmarshal(body, &apiErr); err != nil {
return nil, fmt.Errorf("unmarshalling error response body: %w", err)
}
apiErr.StatusCode = resp.StatusCode

return &apiErr, nil
}
2 changes: 1 addition & 1 deletion internal/circle/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (tr TransferRequest) validate() error {
func parseTransferResponse(resp *http.Response) (*Transfer, error) {
var transferResponse TransferResponse
if err := json.NewDecoder(resp.Body).Decode(&transferResponse); err != nil {
return nil, err
return nil, fmt.Errorf("decoding transfer response: %w", err)
}

return &transferResponse.Data, nil
Expand Down
2 changes: 2 additions & 0 deletions internal/data/circle_transfer_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ const (
batchSize = 10
)

// GetPendingReconciliation returns the pending Circle transfer requests that are in `pending` status and have not
// reached the maximum sync attempts.
func (m CircleTransferRequestModel) GetPendingReconciliation(ctx context.Context, sqlExec db.SQLExecuter) ([]*CircleTransferRequest, error) {
queryParams := QueryParams{
Filters: map[FilterKey]interface{}{
Expand Down
6 changes: 0 additions & 6 deletions internal/data/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,6 @@ func CreateReceiverVerificationFixture(t *testing.T, ctx context.Context, sqlExe
return &verification
}

func DeleteAllCircleTransferRequestsFixtures(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter) {
const query = "DELETE FROM circle_transfer_requests"
_, err := sqlExec.ExecContext(ctx, query)
require.NoError(t, err)
}

func CreateCircleTransferRequestFixture(t *testing.T, ctx context.Context, sqlExec db.SQLExecuter, insert CircleTransferRequest) *CircleTransferRequest {
const query = `
INSERT INTO circle_transfer_requests
Expand Down
142 changes: 94 additions & 48 deletions internal/services/circle_reconciliation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package services
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"

"github.com/stellar/go/support/log"
Expand All @@ -12,6 +14,7 @@ import (
"github.com/stellar/stellar-disbursement-platform-backend/internal/circle"
"github.com/stellar/stellar-disbursement-platform-backend/internal/data"
"github.com/stellar/stellar-disbursement-platform-backend/internal/transactionsubmission/engine/signing"
"github.com/stellar/stellar-disbursement-platform-backend/internal/utils"
"github.com/stellar/stellar-disbursement-platform-backend/pkg/schema"
"github.com/stellar/stellar-disbursement-platform-backend/stellar-multitenant/pkg/tenant"
)
Expand All @@ -27,6 +30,10 @@ type CircleReconciliationService struct {
DistAccountResolver signing.DistributionAccountResolver
}

// Reconcile reconciles the pending Circle transfer requests for the tenant in the context. It fetches the rows from
// circte_transfer_request where status is set to pending, and then fetches the transfer details from Circle API. It
// updates the status of the transfer request in the DB based on the status of the transfer in Circle. If the transfer
// reached a successful/failure status, it updates the payment status in the DB as well to reflect that.
func (s *CircleReconciliationService) Reconcile(ctx context.Context) error {
// Step 1: Get the tenant from the context.
tnt, outerErr := tenant.GetTenantFromContext(ctx)
Expand All @@ -48,6 +55,8 @@ func (s *CircleReconciliationService) Reconcile(ctx context.Context) error {
return nil
}

var reconciliationErrors []error
var reconciliationCount int
outerErr = db.RunInTransaction(ctx, s.Models.DBConnectionPool, nil, func(dbTx db.DBTransaction) error {
// Step 3: Get pending Circle transfer requests.
circleRequests, err := s.Models.CircleTransferRequests.GetPendingReconciliation(ctx, dbTx)
Expand All @@ -61,68 +70,105 @@ func (s *CircleReconciliationService) Reconcile(ctx context.Context) error {
}

// Step 4: Reconcile the pending Circle transfer requests.
reconciliationCount = len(circleRequests)
for _, circleRequest := range circleRequests {
// 4.1. get the Circle transfer by ID
transfer, err := s.CircleService.GetTransferByID(ctx, *circleRequest.CircleTransferID)
err = s.reconcileTransferRequest(ctx, dbTx, tnt, circleRequest)
if err != nil {
return fmt.Errorf("getting Circle transfer by ID %q: %w", *circleRequest.CircleTransferID, err)
}
jsonBody, err := json.Marshal(transfer)
if err != nil {
return fmt.Errorf("converting transfer body to json: %w", err)
err = fmt.Errorf("reconciling Circle transfer request: %w", err)
reconciliationErrors = append(reconciliationErrors, err)
}
}

// 4.2. update the circle transfer request entry in the DB.
newStatus := data.CircleTransferStatus(transfer.Status)
if *circleRequest.Status == newStatus {
log.Ctx(ctx).Debugf("[tenant=%s] Circle transfer request %q is already in status %q, skipping reconciliation...", tnt.Name, circleRequest.IdempotencyKey, newStatus)
continue
}
return nil
})
if outerErr != nil {
return fmt.Errorf("running Circle reconciliation for tenant %q: %w", tnt.Name, outerErr)
}

now := time.Now()
var completedAt *time.Time
if newStatus.IsCompleted() {
completedAt = &now
}
circleRequest, err = s.Models.CircleTransferRequests.Update(ctx, dbTx, circleRequest.IdempotencyKey, data.CircleTransferRequestUpdate{
Status: newStatus,
CompletedAt: completedAt,
LastSyncAttemptAt: &now,
SyncAttempts: circleRequest.SyncAttempts + 1,
ResponseBody: jsonBody,
})
if err != nil {
return fmt.Errorf("updating Circle transfer request: %w", err)
}
if len(reconciliationErrors) > 0 {
return fmt.Errorf("attempted to reconcyle %d circle requests but failed on %d reconciliations: %v", reconciliationCount, len(reconciliationErrors), reconciliationErrors)
}

// 4.3. update the payment status in the DB.
newPaymentStatus, err := transfer.Status.ToPaymentStatus()
if err != nil {
return fmt.Errorf("converting Circle transfer status to Payment status: %w", err)
}
var statusMsg string
switch newStatus {
case data.CircleTransferStatusSuccess:
statusMsg = fmt.Sprintf("Circle transfer completed successfully with the Stellar transaction hash: %q", transfer.TransactionHash)
case data.CircleTransferStatusFailed:
statusMsg = fmt.Sprintf("Circle transfer failed with error: %q", transfer.ErrorCode)
default:
return fmt.Errorf("unexpected Circle transfer status: %q", newStatus)
}
return nil
}

err = s.Models.Payment.UpdateStatus(ctx, dbTx, circleRequest.PaymentID, newPaymentStatus, &statusMsg, transfer.TransactionHash)
if err != nil {
return fmt.Errorf("updating payment status: %w", err)
// reconcileTransferRequest reconciles a Circle transfer request and updates the payment status in the DB. It returns an
// error if the reconciliation fails.
func (s *CircleReconciliationService) reconcileTransferRequest(ctx context.Context, dbTx db.DBTransaction, tnt *tenant.Tenant, circleRequest *data.CircleTransferRequest) error {
// 4.1. get the Circle transfer by ID
transfer, err := s.CircleService.GetTransferByID(ctx, *circleRequest.CircleTransferID)
if err != nil {
var cAPIErr *circle.APIError
if errors.As(err, &cAPIErr) && cAPIErr.StatusCode == http.StatusBadRequest {
// if the the Circle API returns a 400, increment the sync attempts and update the last sync
errJSONBody, marshalErr := json.Marshal(cAPIErr)
if marshalErr != nil {
log.Ctx(ctx).Errorf("marshalling Circle APIError: %v", marshalErr)
}

log.Ctx(ctx).Infof("[tenant=%s] Reconciled Circle transfer request %q with status %q", tnt.Name, *circleRequest.CircleTransferID, newStatus)
// increment the sync attempts and update the last sync attempt time.
var updateErr error
circleRequest, updateErr = s.Models.CircleTransferRequests.Update(ctx, dbTx, circleRequest.IdempotencyKey, data.CircleTransferRequestUpdate{
LastSyncAttemptAt: utils.TimePtr(time.Now()),
SyncAttempts: circleRequest.SyncAttempts + 1,
ResponseBody: errJSONBody,
})
if updateErr != nil {
return fmt.Errorf("updating Circle transfer request sync attempts: %w", updateErr)
}
}
return fmt.Errorf("getting Circle transfer by ID %q: %w", *circleRequest.CircleTransferID, err)
}
jsonBody, err := json.Marshal(transfer)
if err != nil {
return fmt.Errorf("converting transfer body to json: %w", err)
}

// 4.2. update the circle transfer request entry in the DB.
newStatus := data.CircleTransferStatus(transfer.Status)
if *circleRequest.Status == newStatus {
// this condition should be unrechable, but we're adding this log just in case...
log.Ctx(ctx).Debugf("[tenant=%s] Circle transfer request %q is already in status %q, skipping reconciliation...", tnt.Name, circleRequest.IdempotencyKey, newStatus)
return nil
}

now := time.Now()
var completedAt *time.Time
if newStatus.IsCompleted() {
completedAt = &now
}
circleRequest, err = s.Models.CircleTransferRequests.Update(ctx, dbTx, circleRequest.IdempotencyKey, data.CircleTransferRequestUpdate{
Status: newStatus,
CompletedAt: completedAt,
LastSyncAttemptAt: &now,
SyncAttempts: circleRequest.SyncAttempts + 1,
ResponseBody: jsonBody,
})
if outerErr != nil {
return fmt.Errorf("running Circle reconciliation for tenant %q: %w", tnt.Name, outerErr)
if err != nil {
return fmt.Errorf("updating Circle transfer request: %w", err)
}

// 4.3. update the payment status in the DB.
newPaymentStatus, err := transfer.Status.ToPaymentStatus()
if err != nil {
return fmt.Errorf("converting Circle transfer status to Payment status: %w", err)
}
var statusMsg string
switch newStatus {
case data.CircleTransferStatusSuccess:
statusMsg = fmt.Sprintf("Circle transfer completed successfully with the Stellar transaction hash: %q", transfer.TransactionHash)
case data.CircleTransferStatusFailed:
statusMsg = fmt.Sprintf("Circle transfer failed with error: %q", transfer.ErrorCode)
default:
return fmt.Errorf("unexpected Circle transfer status: %q", newStatus)
}

err = s.Models.Payment.UpdateStatus(ctx, dbTx, circleRequest.PaymentID, newPaymentStatus, &statusMsg, transfer.TransactionHash)
if err != nil {
return fmt.Errorf("updating payment status: %w", err)
}

log.Ctx(ctx).Infof("[tenant=%s] Reconciled Circle transfer request %q with status %q", tnt.Name, *circleRequest.CircleTransferID, newStatus)

return nil
}
Loading

0 comments on commit 1bb04a3

Please sign in to comment.