From acbc8a0767aed2540f9c5fce856fd1e17911d791 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 13 Jan 2025 15:44:28 -0500 Subject: [PATCH] fix data race in tests --- internal/services/ingest_test.go | 61 +++++++++------------------ internal/services/rpc_service_test.go | 7 ++- 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index eca20e5..33d2ebe 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -363,26 +363,6 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { tssStore, err := tssstore.NewStore(dbConnectionPool) require.NoError(t, err) - // Create and set up the heartbeat channel - heartbeatChan := make(chan entities.RPCGetHealthResult, 1) - go func() { - for { - select { - case <-ctx.Done(): - return - default: - health, _ := mockRPCService.GetHealth() - heartbeatChan <- health - time.Sleep(10 * time.Second) - } - } - }() - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 50, - }, nil).Once() - ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) require.NoError(t, err) @@ -427,6 +407,12 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { OldestLedgerCloseTime: int64(1), } mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() + heartbeatChan := make(chan entities.RPCGetHealthResult, 1) + heartbeatChan <- entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + } mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) err = ingestService.Run(ctx, uint32(49), uint32(50)) @@ -467,16 +453,23 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { // Create and set up the heartbeat channel heartbeatChan := make(chan entities.RPCGetHealthResult, 1) mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan) - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + + // Send first heartbeat showing RPC is behind + heartbeatChan <- entities.RPCGetHealthResult{ Status: "healthy", LatestLedger: 50, OldestLedger: 1, - }, nil).Once() - mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ - Status: "healthy", - LatestLedger: 100, - OldestLedger: 50, - }, nil) + } + + // After a delay, send second heartbeat showing RPC has caught up + go func() { + time.Sleep(6 * time.Second) // Sleep longer than the service's 5 second wait + heartbeatChan <- entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + } + }() // Capture debug logs to verify waiting message var logBuffer bytes.Buffer @@ -510,20 +503,6 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { mockRPCService.On("GetTransactions", int64(100), "", 50).Return(mockResult, nil).Once() mockAppTracker.On("CaptureMessage", mock.Anything).Maybe().Return(nil) - // Start the heartbeat goroutine - go func() { - for { - select { - case <-ctx.Done(): - return - default: - time.Sleep(10 * time.Second) - health, _ := mockRPCService.GetHealth() - heartbeatChan <- health - } - } - }() - // Start ingestion at ledger 100 (ahead of RPC's initial position at 50) err = ingestService.Run(ctx, uint32(100), uint32(100)) require.NoError(t, err) diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index 9135f3b..584f138 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -382,7 +382,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { rpcURL := "http://test-url-track-rpc-service-health" rpcService, err := NewRPCService(rpcURL, mockHTTPClient) require.NoError(t, err) - go rpcService.TrackRPCServiceHealth(ctx) + healthResult := entities.RPCGetHealthResult{ Status: "healthy", LatestLedger: 100, @@ -403,7 +403,10 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { } }`))), } - mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil) + mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil).Run(func(args mock.Arguments) { + cancel() + }) + rpcService.TrackRPCServiceHealth(ctx) // Get result from heartbeat channel select {