From 578557854dd35c4ea397ddea1746d6a327e58a94 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 Jan 2025 13:00:55 -0500 Subject: [PATCH 01/16] add heartbeat monitoring to rpc service --- internal/services/mocks.go | 5 +++++ internal/services/rpc_service.go | 37 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/internal/services/mocks.go b/internal/services/mocks.go index 5b02b4e..e9ecdf1 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -12,6 +12,11 @@ type RPCServiceMock struct { var _ RPCService = (*RPCServiceMock)(nil) +func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult { + args := r.Called() + return args.Get(0).(chan entities.RPCGetHealthResult) +} + func (r *RPCServiceMock) SendTransaction(transactionXdr string) (entities.RPCSendTransactionResult, error) { args := r.Called(transactionXdr) return args.Get(0).(entities.RPCSendTransactionResult), args.Error(1) diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index 3279baf..8ca1140 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -2,11 +2,14 @@ package services import ( "bytes" + "context" "encoding/json" "errors" "fmt" "io" + "time" + "github.com/stellar/go/support/log" "github.com/stellar/wallet-backend/internal/entities" "github.com/stellar/wallet-backend/internal/utils" ) @@ -22,11 +25,13 @@ type RPCService interface { GetHealth() (entities.RPCGetHealthResult, error) GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error) GetAccountLedgerSequence(address string) (int64, error) + GetHeartbeatChannel() chan entities.RPCGetHealthResult } type rpcService struct { rpcURL string httpClient utils.HTTPClient + heartbeatChannel chan entities.RPCGetHealthResult } var PageLimit = 200 @@ -47,6 +52,10 @@ func NewRPCService(rpcURL string, httpClient utils.HTTPClient) (*rpcService, err }, nil } +func (r *rpcService) GetHeartbeatChannel() chan entities.RPCGetHealthResult { + return r.heartbeatChannel +} + func (r *rpcService) GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error) { resultBytes, err := r.sendRPCRequest("getTransaction", entities.RPCParams{Hash: transactionHash}) @@ -153,6 +162,34 @@ func (r *rpcService) GetAccountLedgerSequence(address string) (int64, error) { return int64(accountEntry.SeqNum), nil } +func (r *rpcService) trackRPCServiceHealth(ctx context.Context) { + healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime) + warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime) + defer func() { + healthCheckTicker.Stop() + warningTicker.Stop() + close(r.heartbeatChannel) + }() + + for { + select { + case <-ctx.Done(): + return + case <-warningTicker.C: + log.Warn(fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime)) + warningTicker.Reset(rpcHealthCheckMaxWaitTime) + case <-healthCheckTicker.C: + result, err := r.GetHealth() + if err != nil { + log.Warnf("rpc health check failed: %v", err) + continue + } + r.heartbeatChannel <- result + warningTicker.Reset(rpcHealthCheckMaxWaitTime) + } + } +} + func (r *rpcService) sendRPCRequest(method string, params entities.RPCParams) (json.RawMessage, error) { payload := map[string]interface{}{ From ef07ab9f82085c386e2417e59c110b284853cbd9 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 Jan 2025 13:22:23 -0500 Subject: [PATCH 02/16] Add heartbeat monitoring to rpc - 2 --- internal/services/rpc_service.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index 8ca1140..dc0651b 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -46,10 +46,14 @@ func NewRPCService(rpcURL string, httpClient utils.HTTPClient) (*rpcService, err return nil, errors.New("httpClient cannot be nil") } - return &rpcService{ + heartbeatChannel := make(chan entities.RPCGetHealthResult, 1) + rpcService := &rpcService{ rpcURL: rpcURL, httpClient: httpClient, - }, nil + heartbeatChannel: heartbeatChannel, + } + go rpcService.trackRPCServiceHealth(context.Background()) + return rpcService, nil } func (r *rpcService) GetHeartbeatChannel() chan entities.RPCGetHealthResult { From f49d5e42423864f5b593523dfe39ed13d4457726 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 Jan 2025 13:23:22 -0500 Subject: [PATCH 03/16] move constants from ingest to rpc service --- internal/services/rpc_service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index dc0651b..39886b5 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -15,6 +15,8 @@ import ( ) const ( + rpcHealthCheckSleepTime = 5 * time.Second + rpcHealthCheckMaxWaitTime = 60 * time.Second getHealthMethodName = "getHealth" ) From 408ec131d4feacae5aa002450d303900339d0ac3 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 Jan 2025 15:44:44 -0500 Subject: [PATCH 04/16] Update ingestion workflow with new rpcService health tracking --- internal/services/ingest.go | 49 +--------- internal/services/ingest_test.go | 155 ++++++++++++++++++++++--------- 2 files changed, 117 insertions(+), 87 deletions(-) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index eac9e77..0351260 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -21,8 +21,6 @@ import ( ) const ( - rpcHealthCheckSleepTime = 5 * time.Second - rpcHealthCheckMaxWaitTime = 60 * time.Second ingestHealthCheckMaxWaitTime = 90 * time.Second ) @@ -79,12 +77,9 @@ func NewIngestService( } func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger uint32) error { - rpcHeartbeat := make(chan entities.RPCGetHealthResult, 1) - ingestHeartbeat := make(chan any, 1) - - // Start service health trackers - go trackRPCServiceHealth(ctx, rpcHeartbeat, m.appTracker, m.rpcService) - go trackIngestServiceHealth(ctx, ingestHeartbeat, m.appTracker) + ingestHeartbeatChannel := make(chan any, 1) + rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel() + go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker) if startLedger == 0 { var err error @@ -99,7 +94,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u select { case <-ctx.Done(): return fmt.Errorf("context cancelled: %w", ctx.Err()) - case resp := <-rpcHeartbeat: + case resp := <-rpcHeartbeatChannel: switch { // Case-1: wallet-backend is running behind rpc's oldest ledger. In this case, we start // ingestion from rpc's oldest ledger. @@ -120,7 +115,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u log.Error("getTransactions: %w", err) continue } - ingestHeartbeat <- true + ingestHeartbeatChannel <- true err = m.ingestPayments(ctx, ledgerTransactions) if err != nil { return fmt.Errorf("error ingesting payments: %w", err) @@ -278,40 +273,6 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa return nil } -func trackRPCServiceHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker, rpcService RPCService) { - healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime) - warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime) - defer func() { - healthCheckTicker.Stop() - warningTicker.Stop() - close(heartbeat) - }() - - for { - select { - case <-ctx.Done(): - return - case <-warningTicker.C: - warn := fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime) - log.Warn(warn) - if tracker != nil { - tracker.CaptureMessage(warn) - } else { - log.Warn("App Tracker is nil") - } - warningTicker.Reset(rpcHealthCheckMaxWaitTime) - case <-healthCheckTicker.C: - result, err := rpcService.GetHealth() - if err != nil { - log.Warnf("rpc health check failed: %v", err) - continue - } - heartbeat <- result - warningTicker.Reset(rpcHealthCheckMaxWaitTime) - } - } -} - func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker apptracker.AppTracker) { ticker := time.NewTicker(ingestHealthCheckMaxWaitTime) defer func() { diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index e37325c..0433bcb 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -3,14 +3,15 @@ package services import ( "bytes" "context" - "errors" + "io" + "net/http" "os" "testing" "time" "github.com/stellar/go/keypair" - "github.com/stellar/go/support/log" "github.com/stellar/go/network" + "github.com/stellar/go/support/log" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" @@ -25,6 +26,7 @@ import ( "github.com/stellar/wallet-backend/internal/tss" tssrouter "github.com/stellar/wallet-backend/internal/tss/router" tssstore "github.com/stellar/wallet-backend/internal/tss/store" + "github.com/stellar/wallet-backend/internal/utils" ) func TestGetLedgerTransactions(t *testing.T) { @@ -364,17 +366,32 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { tssStore, err := tssstore.NewStore(dbConnectionPool) require.NoError(t, err) + // Create and set up the heartbeat channel + heartbeatChan := make(chan entities.RPCGetHealthResult, 1) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + health, _ := mockRPCService.GetHealth() + heartbeatChan <- health + time.Sleep(10 * time.Second) + } + } + }() + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + }, nil).Once() + ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) require.NoError(t, err) srcAccount := keypair.MustRandom().Address() destAccount := keypair.MustRandom().Address() - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 50, - }, nil) paymentOp := txnbuild.Payment{ SourceAccount: srcAccount, Destination: destAccount, @@ -413,6 +430,7 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { OldestLedgerCloseTime: int64(1), } mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() + mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) err = ingestService.Run(ctx, uint32(49), uint32(50)) require.NoError(t, err) @@ -449,18 +467,30 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) require.NoError(t, err) - // First call shows RPC is behind + // Create and set up the heartbeat channel + heartbeatChan := make(chan entities.RPCGetHealthResult, 1) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + health, _ := mockRPCService.GetHealth() + heartbeatChan <- health + time.Sleep(10 * time.Second) + } + } + }() + mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ Status: "healthy", LatestLedger: 50, OldestLedger: 1, }, nil).Once() - - // Second call after sleep shows RPC has caught up mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ Status: "healthy", - LatestLedger: 100, // RPC has caught up to ledger 100 - OldestLedger: 1, + LatestLedger: 100, + OldestLedger: 50, }, nil).Once() // Capture debug logs to verify waiting message @@ -513,9 +543,9 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { } func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { - mockRPCService := &RPCServiceMock{} - mockAppTracker := &apptracker.MockAppTracker{} - heartbeat := make(chan entities.RPCGetHealthResult, 1) + mockHTTPClient := &utils.MockHTTPClient{} + rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -526,50 +556,89 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { OldestLedger: 1, LedgerRetentionWindow: 0, } - mockRPCService.On("GetHealth").Return(healthResult, nil).Once().Run(func(args mock.Arguments) { - cancel() - }) - trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService) + // Mock the HTTP response for GetHealth + mockResponse := &http.Response{ + Body: io.NopCloser(bytes.NewBuffer([]byte(`{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "status": "healthy", + "latestLedger": 100, + "oldestLedger": 1, + "ledgerRetentionWindow": 0 + } + }`))), + } + mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything).Return(mockResponse, nil).Once() - assert.Equal(t, healthResult, <-heartbeat) - mockRPCService.AssertExpectations(t) - mockAppTracker.AssertNotCalled(t, "CaptureMessage") + // Start tracking health in background + go rpcService.trackRPCServiceHealth(ctx) + + // Get result from heartbeat channel + select { + case result := <-rpcService.GetHeartbeatChannel(): + assert.Equal(t, healthResult, result) + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for heartbeat") + } + + mockHTTPClient.AssertExpectations(t) } func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { + var logBuffer bytes.Buffer + log.DefaultLogger.SetOutput(&logBuffer) + defer log.DefaultLogger.SetOutput(os.Stderr) + + mockHTTPClient := &utils.MockHTTPClient{} + rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 70*time.Second) defer cancel() - mockRPCService := &RPCServiceMock{} - mockRPCService.On("GetHealth").Return( - entities.RPCGetHealthResult{}, - errors.New("rpc error"), - ) - - mockAppTracker := &apptracker.MockAppTracker{} - mockAppTracker.On("CaptureMessage", "rpc service unhealthy for over 1m0s").Run(func(args mock.Arguments) { - cancel() - }) - heartbeat := make(chan entities.RPCGetHealthResult, 1) + // Mock error response for GetHealth with a valid http.Response + mockResponse := &http.Response{ + Body: io.NopCloser(bytes.NewBuffer([]byte(`{ + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32601, + "message": "rpc error" + } + }`))), + } + mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything). + Return(mockResponse, nil) - go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService) + go rpcService.trackRPCServiceHealth(ctx) - // Wait long enough for both warnings to trigger + // Wait long enough for warning to trigger time.Sleep(65 * time.Second) - mockRPCService.AssertExpectations(t) - mockAppTracker.AssertExpectations(t) + logOutput := logBuffer.String() + assert.Contains(t, logOutput, "rpc service unhealthy for over 1m0s") + mockHTTPClient.AssertExpectations(t) } func TestTrackRPCService_ContextCancelled(t *testing.T) { - mockRPCService := &RPCServiceMock{} - mockAppTracker := &apptracker.MockAppTracker{} - heartbeat := make(chan entities.RPCGetHealthResult, 1) + mockHTTPClient := &utils.MockHTTPClient{} + rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService) - mockRPCService.AssertNotCalled(t, "GetHealth") - mockAppTracker.AssertNotCalled(t, "CaptureMessage") + go rpcService.trackRPCServiceHealth(ctx) + + // Cancel context immediately + cancel() + + // Verify channel is closed after context cancellation + time.Sleep(100 * time.Millisecond) + _, ok := <-rpcService.GetHeartbeatChannel() + assert.False(t, ok, "channel should be closed") + + mockHTTPClient.AssertNotCalled(t, "Post") } From 16735469d133a3a5df2be296fb01dd400aa90ae9 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 Jan 2025 16:55:28 -0500 Subject: [PATCH 05/16] use rpcService.GetHeartbeatChannel in creating channel accounts --- internal/services/channel_account_service.go | 104 ++++++++---------- .../services/channel_account_service_test.go | 48 +++----- 2 files changed, 59 insertions(+), 93 deletions(-) diff --git a/internal/services/channel_account_service.go b/internal/services/channel_account_service.go index b3a8a93..c4632ff 100644 --- a/internal/services/channel_account_service.go +++ b/internal/services/channel_account_service.go @@ -90,75 +90,57 @@ func (s *channelAccountService) EnsureChannelAccounts(ctx context.Context, numbe } func (s *channelAccountService) submitCreateChannelAccountsOnChainTransaction(ctx context.Context, distributionAccountPublicKey string, ops []txnbuild.Operation) error { - err := waitForRPCServiceHealth(ctx, s.RPCService) - if err != nil { - return fmt.Errorf("rpc service did not become healthy: %w", err) - } - - accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey) - if err != nil { - return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err) - } + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err()) + case <-s.RPCService.GetHeartbeatChannel(): + accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey) + if err != nil { + return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err) + } - tx, err := txnbuild.NewTransaction( - txnbuild.TransactionParams{ - SourceAccount: &txnbuild.SimpleAccount{ - AccountID: distributionAccountPublicKey, - Sequence: accountSeq, + tx, err := txnbuild.NewTransaction( + txnbuild.TransactionParams{ + SourceAccount: &txnbuild.SimpleAccount{ + AccountID: distributionAccountPublicKey, + Sequence: accountSeq, + }, + IncrementSequenceNum: true, + Operations: ops, + BaseFee: s.BaseFee, + Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)}, }, - IncrementSequenceNum: true, - Operations: ops, - BaseFee: s.BaseFee, - Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)}, - }, - ) - if err != nil { - return fmt.Errorf("building transaction: %w", err) - } - - signedTx, err := s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey) - if err != nil { - return fmt.Errorf("signing transaction: %w", err) - } - - hash, err := signedTx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase()) - if err != nil { - return fmt.Errorf("getting transaction hash: %w", err) - } - - signedTxXDR, err := signedTx.Base64() - if err != nil { - return fmt.Errorf("getting transaction envelope: %w", err) - } - - err = s.submitTransaction(ctx, hash, signedTxXDR) - if err != nil { - return fmt.Errorf("submitting channel account transaction to rpc service: %w", err) - } + ) + if err != nil { + return fmt.Errorf("building transaction: %w", err) + } - err = s.waitForTransactionConfirmation(ctx, hash) - if err != nil { - return fmt.Errorf("getting transaction status: %w", err) - } + signedTx, err := s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey) + if err != nil { + return fmt.Errorf("signing transaction: %w", err) + } - return nil -} + hash, err := signedTx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase()) + if err != nil { + return fmt.Errorf("getting transaction hash: %w", err) + } -func waitForRPCServiceHealth(ctx context.Context, rpcService RPCService) error { - // Create a cancellable context for the heartbeat goroutine, once rpc returns healthy status. - heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx) - heartbeat := make(chan entities.RPCGetHealthResult, 1) - defer cancelHeartbeat() + signedTxXDR, err := signedTx.Base64() + if err != nil { + return fmt.Errorf("getting transaction envelope: %w", err) + } - go trackRPCServiceHealth(heartbeatCtx, heartbeat, nil, rpcService) + err = s.submitTransaction(ctx, hash, signedTxXDR) + if err != nil { + return fmt.Errorf("submitting channel account transaction to rpc service: %w", err) + } - for { - select { - case <-heartbeat: - return nil - case <-ctx.Done(): - return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err()) + err = s.waitForTransactionConfirmation(ctx, hash) + if err != nil { + return fmt.Errorf("getting transaction status: %w", err) } + + return nil } } diff --git a/internal/services/channel_account_service_test.go b/internal/services/channel_account_service_test.go index 8991074..19d4212 100644 --- a/internal/services/channel_account_service_test.go +++ b/internal/services/channel_account_service_test.go @@ -2,9 +2,7 @@ package services import ( "context" - "fmt" "testing" - "time" "github.com/stellar/go/keypair" "github.com/stellar/go/network" @@ -30,6 +28,7 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { defer dbConnectionPool.Close() ctx := context.Background() + heartbeatChan := make(chan entities.RPCGetHealthResult, 1) mockRPCService := RPCServiceMock{} signatureClient := signing.SignatureClientMock{} channelAccountStore := store.ChannelAccountStoreMock{} @@ -103,6 +102,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { mockRPCService. On("GetHealth"). Return(entities.RPCGetHealthResult{Status: "healthy"}, nil) + + // Create and set up the heartbeat channel + health, _ := mockRPCService.GetHealth() + heartbeatChan <- health + mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) mockRPCService. On("GetAccountLedgerSequence", distributionAccount.Address()). @@ -174,6 +178,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { mockRPCService. On("GetHealth"). Return(entities.RPCGetHealthResult{Status: "healthy"}, nil) + + // Create and set up the heartbeat channel + health, _ := mockRPCService.GetHealth() + heartbeatChan <- health + mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) mockRPCService. On("GetAccountLedgerSequence", distributionAccount.Address()). @@ -227,6 +236,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { mockRPCService. On("GetHealth"). Return(entities.RPCGetHealthResult{Status: "healthy"}, nil) + + // Create and set up the heartbeat channel + health, _ := mockRPCService.GetHealth() + heartbeatChan <- health + mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) mockRPCService. On("GetAccountLedgerSequence", distributionAccount.Address()). @@ -253,36 +267,6 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { }) } -func TestWaitForRPCServiceHealth(t *testing.T) { - mockRPCService := RPCServiceMock{} - ctx := context.Background() - - t.Run("successful", func(t *testing.T) { - mockRPCService. - On("GetHealth"). - Return(entities.RPCGetHealthResult{Status: "healthy"}, nil). - Once() - defer mockRPCService.AssertExpectations(t) - - err := waitForRPCServiceHealth(ctx, &mockRPCService) - require.NoError(t, err) - }) - - t.Run("context_cancelled", func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - mockRPCService. - On("GetHealth"). - Return(entities.RPCGetHealthResult{}, fmt.Errorf("connection failed")) - defer mockRPCService.AssertExpectations(t) - - err := waitForRPCServiceHealth(ctx, &mockRPCService) - require.Error(t, err) - assert.Contains(t, err.Error(), "context cancelled while waiting for rpc service to become healthy") - }) -} - func TestSubmitTransaction(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() From f3b4098e693751fcbe1835058bb7792cd05e9637 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 10 Jan 2025 17:17:07 -0500 Subject: [PATCH 06/16] move tests from ingest to rpc service --- internal/services/ingest_test.go | 104 -------------------------- internal/services/rpc_service_test.go | 99 ++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 104 deletions(-) diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 0433bcb..a77ae77 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -3,8 +3,6 @@ package services import ( "bytes" "context" - "io" - "net/http" "os" "testing" "time" @@ -26,7 +24,6 @@ import ( "github.com/stellar/wallet-backend/internal/tss" tssrouter "github.com/stellar/wallet-backend/internal/tss/router" tssstore "github.com/stellar/wallet-backend/internal/tss/store" - "github.com/stellar/wallet-backend/internal/utils" ) func TestGetLedgerTransactions(t *testing.T) { @@ -541,104 +538,3 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { mockRPCService.AssertExpectations(t) } - -func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { - mockHTTPClient := &utils.MockHTTPClient{} - rpcService, err := NewRPCService("http://test-url", mockHTTPClient) - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - healthResult := entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 1, - LedgerRetentionWindow: 0, - } - - // Mock the HTTP response for GetHealth - mockResponse := &http.Response{ - Body: io.NopCloser(bytes.NewBuffer([]byte(`{ - "jsonrpc": "2.0", - "id": 1, - "result": { - "status": "healthy", - "latestLedger": 100, - "oldestLedger": 1, - "ledgerRetentionWindow": 0 - } - }`))), - } - mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything).Return(mockResponse, nil).Once() - - // Start tracking health in background - go rpcService.trackRPCServiceHealth(ctx) - - // Get result from heartbeat channel - select { - case result := <-rpcService.GetHeartbeatChannel(): - assert.Equal(t, healthResult, result) - case <-time.After(10 * time.Second): - t.Fatal("timeout waiting for heartbeat") - } - - mockHTTPClient.AssertExpectations(t) -} - -func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { - var logBuffer bytes.Buffer - log.DefaultLogger.SetOutput(&logBuffer) - defer log.DefaultLogger.SetOutput(os.Stderr) - - mockHTTPClient := &utils.MockHTTPClient{} - rpcService, err := NewRPCService("http://test-url", mockHTTPClient) - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 70*time.Second) - defer cancel() - - // Mock error response for GetHealth with a valid http.Response - mockResponse := &http.Response{ - Body: io.NopCloser(bytes.NewBuffer([]byte(`{ - "jsonrpc": "2.0", - "id": 1, - "error": { - "code": -32601, - "message": "rpc error" - } - }`))), - } - mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything). - Return(mockResponse, nil) - - go rpcService.trackRPCServiceHealth(ctx) - - // Wait long enough for warning to trigger - time.Sleep(65 * time.Second) - - logOutput := logBuffer.String() - assert.Contains(t, logOutput, "rpc service unhealthy for over 1m0s") - mockHTTPClient.AssertExpectations(t) -} - -func TestTrackRPCService_ContextCancelled(t *testing.T) { - mockHTTPClient := &utils.MockHTTPClient{} - rpcService, err := NewRPCService("http://test-url", mockHTTPClient) - require.NoError(t, err) - - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - go rpcService.trackRPCServiceHealth(ctx) - - // Cancel context immediately - cancel() - - // Verify channel is closed after context cancellation - time.Sleep(100 * time.Millisecond) - _, ok := <-rpcService.GetHeartbeatChannel() - assert.False(t, ok, "channel should be closed") - - mockHTTPClient.AssertNotCalled(t, "Post") -} diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index fc4456c..b154803 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -2,18 +2,22 @@ package services import ( "bytes" + "context" "encoding/json" "errors" "fmt" "io" "net/http" + "os" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/stellar/go/support/log" "github.com/stellar/wallet-backend/internal/entities" "github.com/stellar/wallet-backend/internal/utils" ) @@ -369,3 +373,98 @@ func TestSendGetHealth(t *testing.T) { assert.Equal(t, "sending getHealth request: sending POST request to RPC: connection failed", err.Error()) }) } + +func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { + mockHTTPClient := &utils.MockHTTPClient{} + rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + require.NoError(t, err) + + healthResult := entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 1, + LedgerRetentionWindow: 0, + } + + // Mock the HTTP response for GetHealth + mockResponse := &http.Response{ + Body: io.NopCloser(bytes.NewBuffer([]byte(`{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "status": "healthy", + "latestLedger": 100, + "oldestLedger": 1, + "ledgerRetentionWindow": 0 + } + }`))), + } + mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything).Return(mockResponse, nil).Once() + + // Get result from heartbeat channel + select { + case result := <-rpcService.GetHeartbeatChannel(): + assert.Equal(t, healthResult, result) + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for heartbeat") + } + + mockHTTPClient.AssertExpectations(t) +} + +func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { + var logBuffer bytes.Buffer + log.DefaultLogger.SetOutput(&logBuffer) + defer log.DefaultLogger.SetOutput(os.Stderr) + + mockHTTPClient := &utils.MockHTTPClient{} + rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 70*time.Second) + defer cancel() + + // Mock error response for GetHealth with a valid http.Response + mockResponse := &http.Response{ + Body: io.NopCloser(bytes.NewBuffer([]byte(`{ + "jsonrpc": "2.0", + "id": 1, + "error": { + "code": -32601, + "message": "rpc error" + } + }`))), + } + mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything). + Return(mockResponse, nil) + + go rpcService.trackRPCServiceHealth(ctx) + + // Wait long enough for warning to trigger + time.Sleep(65 * time.Second) + + logOutput := logBuffer.String() + assert.Contains(t, logOutput, "rpc service unhealthy for over 1m0s") + mockHTTPClient.AssertExpectations(t) +} + +func TestTrackRPCService_ContextCancelled(t *testing.T) { + mockHTTPClient := &utils.MockHTTPClient{} + rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go rpcService.trackRPCServiceHealth(ctx) + + // Cancel context immediately + cancel() + + // Verify channel is closed after context cancellation + time.Sleep(100 * time.Millisecond) + _, ok := <-rpcService.GetHeartbeatChannel() + assert.False(t, ok, "channel should be closed") + + mockHTTPClient.AssertNotCalled(t, "Post") +} From 3769556fa4149f80d0f290806a320c5531b8c4fc Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 11:26:21 -0500 Subject: [PATCH 07/16] Remove Once() from mock --- internal/services/rpc_service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index b154803..3249e40 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -399,7 +399,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { } }`))), } - mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything).Return(mockResponse, nil).Once() + mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything).Return(mockResponse, nil) // Get result from heartbeat channel select { From fd7d6d66d744ca949049172f5460758004bd61cc Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 12:19:43 -0500 Subject: [PATCH 08/16] fix failing tests --- internal/services/rpc_service_test.go | 37 +++++++++++++++++---------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 3249e40..21481e7 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -34,8 +34,10 @@ func (e *errorReader) Close() error { func TestSendRPCRequest(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://api.vibrantapp.com/soroban/rpc" - rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + rpcURL := "http://test-url" + rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) + require.NoError(t, err) + defer close(rpcService.heartbeatChannel) t.Run("successful", func(t *testing.T) { httpResponse := http.Response{ @@ -129,8 +131,10 @@ func TestSendRPCRequest(t *testing.T) { func TestSendTransaction(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://api.vibrantapp.com/soroban/rpc" - rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + rpcURL := "http://test-url" + rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) + require.NoError(t, err) + defer close(rpcService.heartbeatChannel) t.Run("successful", func(t *testing.T) { transactionXDR := "AAAAAgAAAABYJgX6SmA2tGVDv3GXfOWbkeL869ahE0e5DG9HnXQw/QAAAGQAAjpnAAAAAQAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAQAAAACxaDFEbbssZfrbRgFxTYIygITSQxsUpDmneN2gAZBEFQAAAAAAAAAABfXhAAAAAAAAAAAA" @@ -190,8 +194,10 @@ func TestSendTransaction(t *testing.T) { func TestGetTransaction(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://api.vibrantapp.com/soroban/rpc" - rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + rpcURL := "http://test-url" + rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) + require.NoError(t, err) + defer close(rpcService.heartbeatChannel) t.Run("successful", func(t *testing.T) { transactionHash := "6bc97bddc21811c626839baf4ab574f4f9f7ddbebb44d286ae504396d4e752da" @@ -266,8 +272,10 @@ func TestGetTransaction(t *testing.T) { func TestGetTransactions(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://api.vibrantapp.com/soroban/rpc" - rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + rpcURL := "http://test-url" + rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) + require.NoError(t, err) + defer close(rpcService.heartbeatChannel) t.Run("rpc_request_fails", func(t *testing.T) { mockHTTPClient. @@ -329,9 +337,11 @@ func TestGetTransactions(t *testing.T) { func TestSendGetHealth(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://api.vibrantapp.com/soroban/rpc" - rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) - + rpcURL := "http://test-url" + rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) + require.NoError(t, err) + defer close(rpcService.heartbeatChannel) + t.Run("successful", func(t *testing.T) { payload := map[string]interface{}{ "jsonrpc": "2.0", @@ -378,6 +388,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { mockHTTPClient := &utils.MockHTTPClient{} rpcService, err := NewRPCService("http://test-url", mockHTTPClient) require.NoError(t, err) + defer close(rpcService.heartbeatChannel) healthResult := entities.RPCGetHealthResult{ Status: "healthy", @@ -420,7 +431,7 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { mockHTTPClient := &utils.MockHTTPClient{} rpcService, err := NewRPCService("http://test-url", mockHTTPClient) require.NoError(t, err) - + defer close(rpcService.heartbeatChannel) ctx, cancel := context.WithTimeout(context.Background(), 70*time.Second) defer cancel() @@ -452,7 +463,7 @@ func TestTrackRPCService_ContextCancelled(t *testing.T) { mockHTTPClient := &utils.MockHTTPClient{} rpcService, err := NewRPCService("http://test-url", mockHTTPClient) require.NoError(t, err) - + defer close(rpcService.heartbeatChannel) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() From c7774c293ada912c0242fc37b57f40ed5cdcba6c Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 13:19:19 -0500 Subject: [PATCH 09/16] Add comment --- internal/services/channel_account_service.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/services/channel_account_service.go b/internal/services/channel_account_service.go index c4632ff..c3f2b76 100644 --- a/internal/services/channel_account_service.go +++ b/internal/services/channel_account_service.go @@ -93,6 +93,8 @@ func (s *channelAccountService) submitCreateChannelAccountsOnChainTransaction(ct select { case <-ctx.Done(): return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err()) + // The channel account creation goroutine will wait in the background for the rpc service to become healthy on startup. + // This lets the API server startup so that users can start interacting with the API which does not depend on RPC, instead of waiting till it becomes healthy. case <-s.RPCService.GetHeartbeatChannel(): accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey) if err != nil { From f82619d8a1f7b35c798e78122785de32aaca2992 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 13:20:14 -0500 Subject: [PATCH 10/16] Start rpc tracking goroutine from outside --- internal/ingest/ingest.go | 1 + internal/serve/serve.go | 1 + internal/services/mocks.go | 6 +++ internal/services/rpc_service.go | 23 +++++----- internal/services/rpc_service_test.go | 64 +++++++++++++++++---------- 5 files changed, 59 insertions(+), 36 deletions(-) diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 69bf778..fc5357d 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -63,6 +63,7 @@ func setupDeps(cfg Configs) (services.IngestService, error) { if err != nil { return nil, fmt.Errorf("instantiating rpc service: %w", err) } + go rpcService.TrackRPCServiceHealth(context.Background()) tssStore, err := tssstore.NewStore(dbConnectionPool) if err != nil { return nil, fmt.Errorf("instantiating tss store: %w", err) diff --git a/internal/serve/serve.go b/internal/serve/serve.go index 4da8b41..438993f 100644 --- a/internal/serve/serve.go +++ b/internal/serve/serve.go @@ -154,6 +154,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { if err != nil { return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err) } + go rpcService.TrackRPCServiceHealth(context.Background()) accountService, err := services.NewAccountService(models) if err != nil { diff --git a/internal/services/mocks.go b/internal/services/mocks.go index e9ecdf1..e17bdb5 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -1,6 +1,8 @@ package services import ( + "context" + "github.com/stretchr/testify/mock" "github.com/stellar/wallet-backend/internal/entities" @@ -12,6 +14,10 @@ type RPCServiceMock struct { var _ RPCService = (*RPCServiceMock)(nil) +func (r *RPCServiceMock) TrackRPCServiceHealth(ctx context.Context) { + r.Called(ctx) +} + func (r *RPCServiceMock) GetHeartbeatChannel() chan entities.RPCGetHealthResult { args := r.Called() return args.Get(0).(chan entities.RPCGetHealthResult) diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index 39886b5..160cf6e 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -15,9 +15,9 @@ import ( ) const ( - rpcHealthCheckSleepTime = 5 * time.Second - rpcHealthCheckMaxWaitTime = 60 * time.Second - getHealthMethodName = "getHealth" + rpcHealthCheckSleepTime = 5 * time.Second + rpcHealthCheckMaxWaitTime = 60 * time.Second + getHealthMethodName = "getHealth" ) type RPCService interface { @@ -28,11 +28,12 @@ type RPCService interface { GetLedgerEntries(keys []string) (entities.RPCGetLedgerEntriesResult, error) GetAccountLedgerSequence(address string) (int64, error) GetHeartbeatChannel() chan entities.RPCGetHealthResult + TrackRPCServiceHealth(ctx context.Context) } type rpcService struct { - rpcURL string - httpClient utils.HTTPClient + rpcURL string + httpClient utils.HTTPClient heartbeatChannel chan entities.RPCGetHealthResult } @@ -49,13 +50,11 @@ func NewRPCService(rpcURL string, httpClient utils.HTTPClient) (*rpcService, err } heartbeatChannel := make(chan entities.RPCGetHealthResult, 1) - rpcService := &rpcService{ - rpcURL: rpcURL, - httpClient: httpClient, + return &rpcService{ + rpcURL: rpcURL, + httpClient: httpClient, heartbeatChannel: heartbeatChannel, - } - go rpcService.trackRPCServiceHealth(context.Background()) - return rpcService, nil + }, nil } func (r *rpcService) GetHeartbeatChannel() chan entities.RPCGetHealthResult { @@ -168,7 +167,7 @@ func (r *rpcService) GetAccountLedgerSequence(address string) (int64, error) { return int64(accountEntry.SeqNum), nil } -func (r *rpcService) trackRPCServiceHealth(ctx context.Context) { +func (r *rpcService) TrackRPCServiceHealth(ctx context.Context) { healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime) warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime) defer func() { diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 21481e7..7508e26 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -33,11 +33,14 @@ func (e *errorReader) Close() error { } func TestSendRPCRequest(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url" + rpcURL := "http://test-url-send-rpc-request" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) + go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { httpResponse := http.Response{ @@ -130,11 +133,14 @@ func TestSendRPCRequest(t *testing.T) { } func TestSendTransaction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url" + rpcURL := "http://test-url-send-transaction" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) + go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { transactionXDR := "AAAAAgAAAABYJgX6SmA2tGVDv3GXfOWbkeL869ahE0e5DG9HnXQw/QAAAGQAAjpnAAAAAQAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAQAAAACxaDFEbbssZfrbRgFxTYIygITSQxsUpDmneN2gAZBEFQAAAAAAAAAABfXhAAAAAAAAAAAA" @@ -193,12 +199,14 @@ func TestSendTransaction(t *testing.T) { } func TestGetTransaction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url" + rpcURL := "http://test-url-get-transaction" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) - + go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { transactionHash := "6bc97bddc21811c626839baf4ab574f4f9f7ddbebb44d286ae504396d4e752da" params := entities.RPCParams{Hash: transactionHash} @@ -271,12 +279,14 @@ func TestGetTransaction(t *testing.T) { } func TestGetTransactions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url" + rpcURL := "http://test-url-get-transactions" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) - + go rpcService.TrackRPCServiceHealth(ctx) t.Run("rpc_request_fails", func(t *testing.T) { mockHTTPClient. On("Post", rpcURL, "application/json", mock.Anything). @@ -336,12 +346,15 @@ func TestGetTransactions(t *testing.T) { } func TestSendGetHealth(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url" + rpcURL := "http://test-url-send-get-health" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) - + go rpcService.TrackRPCServiceHealth(ctx) + t.Run("successful", func(t *testing.T) { payload := map[string]interface{}{ "jsonrpc": "2.0", @@ -385,11 +398,14 @@ func TestSendGetHealth(t *testing.T) { } func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mockHTTPClient := &utils.MockHTTPClient{} - rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + rpcURL := "http://test-url-track-rpc-service-health" + rpcService, err := NewRPCService(rpcURL, mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) - + go rpcService.TrackRPCServiceHealth(ctx) healthResult := entities.RPCGetHealthResult{ Status: "healthy", LatestLedger: 100, @@ -410,7 +426,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { } }`))), } - mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything).Return(mockResponse, nil) + mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil) // Get result from heartbeat channel select { @@ -429,9 +445,9 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { defer log.DefaultLogger.SetOutput(os.Stderr) mockHTTPClient := &utils.MockHTTPClient{} - rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + rpcURL := "http://test-url-track-rpc-service-health" + rpcService, err := NewRPCService(rpcURL, mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) ctx, cancel := context.WithTimeout(context.Background(), 70*time.Second) defer cancel() @@ -446,10 +462,10 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { } }`))), } - mockHTTPClient.On("Post", "http://test-url", "application/json", mock.Anything). + mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything). Return(mockResponse, nil) - go rpcService.trackRPCServiceHealth(ctx) + go rpcService.TrackRPCServiceHealth(ctx) // Wait long enough for warning to trigger time.Sleep(65 * time.Second) @@ -461,13 +477,13 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { func TestTrackRPCService_ContextCancelled(t *testing.T) { mockHTTPClient := &utils.MockHTTPClient{} - rpcService, err := NewRPCService("http://test-url", mockHTTPClient) + rpcURL := "http://test-url-track-rpc-service-health" + rpcService, err := NewRPCService(rpcURL, mockHTTPClient) require.NoError(t, err) - defer close(rpcService.heartbeatChannel) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - go rpcService.trackRPCServiceHealth(ctx) + go rpcService.TrackRPCServiceHealth(ctx) // Cancel context immediately cancel() From 68049af2964b03dd985cfcc88569417fc755ba47 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 14:41:35 -0500 Subject: [PATCH 11/16] Remove goroutines from other tests --- internal/services/rpc_service_test.go | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 7508e26..68f6f25 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -33,14 +33,10 @@ func (e *errorReader) Close() error { } func TestSendRPCRequest(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockHTTPClient := utils.MockHTTPClient{} rpcURL := "http://test-url-send-rpc-request" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { httpResponse := http.Response{ @@ -133,14 +129,10 @@ func TestSendRPCRequest(t *testing.T) { } func TestSendTransaction(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockHTTPClient := utils.MockHTTPClient{} rpcURL := "http://test-url-send-transaction" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { transactionXDR := "AAAAAgAAAABYJgX6SmA2tGVDv3GXfOWbkeL869ahE0e5DG9HnXQw/QAAAGQAAjpnAAAAAQAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAQAAAACxaDFEbbssZfrbRgFxTYIygITSQxsUpDmneN2gAZBEFQAAAAAAAAAABfXhAAAAAAAAAAAA" @@ -199,14 +191,10 @@ func TestSendTransaction(t *testing.T) { } func TestGetTransaction(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockHTTPClient := utils.MockHTTPClient{} rpcURL := "http://test-url-get-transaction" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { transactionHash := "6bc97bddc21811c626839baf4ab574f4f9f7ddbebb44d286ae504396d4e752da" params := entities.RPCParams{Hash: transactionHash} @@ -279,14 +267,10 @@ func TestGetTransaction(t *testing.T) { } func TestGetTransactions(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockHTTPClient := utils.MockHTTPClient{} rpcURL := "http://test-url-get-transactions" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) t.Run("rpc_request_fails", func(t *testing.T) { mockHTTPClient. On("Post", rpcURL, "application/json", mock.Anything). @@ -346,14 +330,10 @@ func TestGetTransactions(t *testing.T) { } func TestSendGetHealth(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockHTTPClient := utils.MockHTTPClient{} rpcURL := "http://test-url-send-get-health" rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) t.Run("successful", func(t *testing.T) { payload := map[string]interface{}{ @@ -398,7 +378,7 @@ func TestSendGetHealth(t *testing.T) { } func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() mockHTTPClient := &utils.MockHTTPClient{} From 5eb04ac5ef62f81826f5e645204b96929c38f708 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 14:43:40 -0500 Subject: [PATCH 12/16] Revert some changes --- internal/services/rpc_service_test.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 68f6f25..9135f3b 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -34,9 +34,8 @@ func (e *errorReader) Close() error { func TestSendRPCRequest(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url-send-rpc-request" - rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) - require.NoError(t, err) + rpcURL := "http://api.vibrantapp.com/soroban/rpc" + rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) t.Run("successful", func(t *testing.T) { httpResponse := http.Response{ @@ -130,9 +129,8 @@ func TestSendRPCRequest(t *testing.T) { func TestSendTransaction(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url-send-transaction" - rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) - require.NoError(t, err) + rpcURL := "http://api.vibrantapp.com/soroban/rpc" + rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) t.Run("successful", func(t *testing.T) { transactionXDR := "AAAAAgAAAABYJgX6SmA2tGVDv3GXfOWbkeL869ahE0e5DG9HnXQw/QAAAGQAAjpnAAAAAQAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAEAAAAAAAAAAQAAAACxaDFEbbssZfrbRgFxTYIygITSQxsUpDmneN2gAZBEFQAAAAAAAAAABfXhAAAAAAAAAAAA" @@ -192,9 +190,9 @@ func TestSendTransaction(t *testing.T) { func TestGetTransaction(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url-get-transaction" - rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) - require.NoError(t, err) + rpcURL := "http://api.vibrantapp.com/soroban/rpc" + rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + t.Run("successful", func(t *testing.T) { transactionHash := "6bc97bddc21811c626839baf4ab574f4f9f7ddbebb44d286ae504396d4e752da" params := entities.RPCParams{Hash: transactionHash} @@ -268,9 +266,9 @@ func TestGetTransaction(t *testing.T) { func TestGetTransactions(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url-get-transactions" - rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) - require.NoError(t, err) + rpcURL := "http://api.vibrantapp.com/soroban/rpc" + rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + t.Run("rpc_request_fails", func(t *testing.T) { mockHTTPClient. On("Post", rpcURL, "application/json", mock.Anything). @@ -331,9 +329,8 @@ func TestGetTransactions(t *testing.T) { func TestSendGetHealth(t *testing.T) { mockHTTPClient := utils.MockHTTPClient{} - rpcURL := "http://test-url-send-get-health" - rpcService, err := NewRPCService(rpcURL, &mockHTTPClient) - require.NoError(t, err) + rpcURL := "http://api.vibrantapp.com/soroban/rpc" + rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) t.Run("successful", func(t *testing.T) { payload := map[string]interface{}{ From 1cbfb72f213f6c9599a1d48d7aaa96984076a153 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 15:06:43 -0500 Subject: [PATCH 13/16] Fix failing test - 1 --- internal/services/ingest_test.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index a77ae77..eca20e5 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -466,18 +466,6 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { // Create and set up the heartbeat channel heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - go func() { - for { - select { - case <-ctx.Done(): - return - default: - health, _ := mockRPCService.GetHealth() - heartbeatChan <- health - time.Sleep(10 * time.Second) - } - } - }() mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ Status: "healthy", @@ -488,7 +476,7 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { Status: "healthy", LatestLedger: 100, OldestLedger: 50, - }, nil).Once() + }, nil) // Capture debug logs to verify waiting message var logBuffer bytes.Buffer @@ -522,6 +510,20 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { mockRPCService.On("GetTransactions", int64(100), "", 50).Return(mockResult, nil).Once() mockAppTracker.On("CaptureMessage", mock.Anything).Maybe().Return(nil) + // Start the heartbeat goroutine + go func() { + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(10 * time.Second) + health, _ := mockRPCService.GetHealth() + heartbeatChan <- health + } + } + }() + // Start ingestion at ledger 100 (ahead of RPC's initial position at 50) err = ingestService.Run(ctx, uint32(100), uint32(100)) require.NoError(t, err) From acbc8a0767aed2540f9c5fce856fd1e17911d791 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 15:44:28 -0500 Subject: [PATCH 14/16] fix data race in tests --- internal/services/ingest_test.go | 61 +++++++++------------------ internal/services/rpc_service_test.go | 7 ++- 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index eca20e5..33d2ebe 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -363,26 +363,6 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { tssStore, err := tssstore.NewStore(dbConnectionPool) require.NoError(t, err) - // Create and set up the heartbeat channel - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - go func() { - for { - select { - case <-ctx.Done(): - return - default: - health, _ := mockRPCService.GetHealth() - heartbeatChan <- health - time.Sleep(10 * time.Second) - } - } - }() - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 50, - }, nil).Once() - ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) require.NoError(t, err) @@ -427,6 +407,12 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { OldestLedgerCloseTime: int64(1), } mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() + heartbeatChan := make(chan entities.RPCGetHealthResult, 1) + heartbeatChan <- entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + } mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) err = ingestService.Run(ctx, uint32(49), uint32(50)) @@ -467,16 +453,23 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { // Create and set up the heartbeat channel heartbeatChan := make(chan entities.RPCGetHealthResult, 1) mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + + // Send first heartbeat showing RPC is behind + heartbeatChan <- entities.RPCGetHealthResult{ Status: "healthy", LatestLedger: 50, OldestLedger: 1, - }, nil).Once() - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 50, - }, nil) + } + + // After a delay, send second heartbeat showing RPC has caught up + go func() { + time.Sleep(6 * time.Second) // Sleep longer than the service's 5 second wait + heartbeatChan <- entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + } + }() // Capture debug logs to verify waiting message var logBuffer bytes.Buffer @@ -510,20 +503,6 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { mockRPCService.On("GetTransactions", int64(100), "", 50).Return(mockResult, nil).Once() mockAppTracker.On("CaptureMessage", mock.Anything).Maybe().Return(nil) - // Start the heartbeat goroutine - go func() { - for { - select { - case <-ctx.Done(): - return - default: - time.Sleep(10 * time.Second) - health, _ := mockRPCService.GetHealth() - heartbeatChan <- health - } - } - }() - // Start ingestion at ledger 100 (ahead of RPC's initial position at 50) err = ingestService.Run(ctx, uint32(100), uint32(100)) require.NoError(t, err) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 9135f3b..584f138 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -382,7 +382,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { rpcURL := "http://test-url-track-rpc-service-health" rpcService, err := NewRPCService(rpcURL, mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) + healthResult := entities.RPCGetHealthResult{ Status: "healthy", LatestLedger: 100, @@ -403,7 +403,10 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { } }`))), } - mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil) + mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil).Run(func(args mock.Arguments) { + cancel() + }) + rpcService.TrackRPCServiceHealth(ctx) // Get result from heartbeat channel select { From 738c1772fa3f32017812c0f1219d420b9d8db4a1 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 16:49:48 -0500 Subject: [PATCH 15/16] fix data race - 2 --- internal/services/rpc_service_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 584f138..5e36e09 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "net/http" - "os" "strings" "testing" "time" @@ -420,9 +419,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { } func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { - var logBuffer bytes.Buffer - log.DefaultLogger.SetOutput(&logBuffer) - defer log.DefaultLogger.SetOutput(os.Stderr) + getLogs := log.DefaultLogger.StartTest(log.WarnLevel) mockHTTPClient := &utils.MockHTTPClient{} rpcURL := "http://test-url-track-rpc-service-health" @@ -450,8 +447,14 @@ func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { // Wait long enough for warning to trigger time.Sleep(65 * time.Second) - logOutput := logBuffer.String() - assert.Contains(t, logOutput, "rpc service unhealthy for over 1m0s") + entries := getLogs() + testFailed := true + for _, entry := range entries { + if strings.Contains(entry.Message, "rpc service unhealthy for over 1m0s") { + testFailed = false + } + } + assert.False(t, testFailed) mockHTTPClient.AssertExpectations(t) } @@ -465,9 +468,6 @@ func TestTrackRPCService_ContextCancelled(t *testing.T) { go rpcService.TrackRPCServiceHealth(ctx) - // Cancel context immediately - cancel() - // Verify channel is closed after context cancellation time.Sleep(100 * time.Millisecond) _, ok := <-rpcService.GetHeartbeatChannel() From 2d540604254f150155e8eb9d9cbe564f5a88b8b2 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 15 Jan 2025 11:53:26 -0500 Subject: [PATCH 16/16] Add test for case where channel accounts are not created and ctx is cancelled --- internal/services/channel_account_service.go | 3 +- .../services/channel_account_service_test.go | 34 +++++++++++++++---- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/internal/services/channel_account_service.go b/internal/services/channel_account_service.go index c3f2b76..4e515bb 100644 --- a/internal/services/channel_account_service.go +++ b/internal/services/channel_account_service.go @@ -90,12 +90,13 @@ func (s *channelAccountService) EnsureChannelAccounts(ctx context.Context, numbe } func (s *channelAccountService) submitCreateChannelAccountsOnChainTransaction(ctx context.Context, distributionAccountPublicKey string, ops []txnbuild.Operation) error { + rpcHeartbeatChannel := s.RPCService.GetHeartbeatChannel() select { case <-ctx.Done(): return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err()) // The channel account creation goroutine will wait in the background for the rpc service to become healthy on startup. // This lets the API server startup so that users can start interacting with the API which does not depend on RPC, instead of waiting till it becomes healthy. - case <-s.RPCService.GetHeartbeatChannel(): + case <-rpcHeartbeatChannel: accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey) if err != nil { return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err) diff --git a/internal/services/channel_account_service_test.go b/internal/services/channel_account_service_test.go index 19d4212..39eb151 100644 --- a/internal/services/channel_account_service_test.go +++ b/internal/services/channel_account_service_test.go @@ -3,6 +3,7 @@ package services import ( "context" "testing" + "time" "github.com/stellar/go/keypair" "github.com/stellar/go/network" @@ -232,14 +233,9 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { Return(network.TestNetworkPassphrase). Once() defer signatureClient.AssertExpectations(t) - - mockRPCService. - On("GetHealth"). - Return(entities.RPCGetHealthResult{Status: "healthy"}, nil) // Create and set up the heartbeat channel - health, _ := mockRPCService.GetHealth() - heartbeatChan <- health + heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"} mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) mockRPCService. @@ -265,6 +261,32 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) { require.Error(t, err) assert.Contains(t, err.Error(), "transaction failed") }) + + t.Run("fails if rpc service is not healthy", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + channelAccountStore. + On("Count", ctx). + Return(2, nil). + Once() + defer channelAccountStore.AssertExpectations(t) + + distributionAccount := keypair.MustRandom() + signatureClient. + On("GetAccountPublicKey", ctx). + Return(distributionAccount.Address(), nil). + Once() + defer signatureClient.AssertExpectations(t) + + heartbeatChan := make(chan entities.RPCGetHealthResult, 1) + mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) + defer mockRPCService.AssertExpectations(t) + + err := s.EnsureChannelAccounts(ctx, 5) + require.Error(t, err) + assert.Contains(t, err.Error(), "context cancelled while waiting for rpc service to become healthy") + }) } func TestSubmitTransaction(t *testing.T) {