Skip to content

Commit

Permalink
fix data race in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Jan 13, 2025
1 parent 1cbfb72 commit acbc8a0
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 43 deletions.
61 changes: 20 additions & 41 deletions internal/services/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,26 +363,6 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) {
tssStore, err := tssstore.NewStore(dbConnectionPool)
require.NoError(t, err)

// Create and set up the heartbeat channel
heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
go func() {
for {
select {
case <-ctx.Done():
return
default:
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
time.Sleep(10 * time.Second)
}
}
}()
mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
OldestLedger: 50,
}, nil).Once()

ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore)
require.NoError(t, err)

Expand Down Expand Up @@ -427,6 +407,12 @@ func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) {
OldestLedgerCloseTime: int64(1),
}
mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once()
heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
heartbeatChan <- entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
OldestLedger: 50,
}
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

err = ingestService.Run(ctx, uint32(49), uint32(50))
Expand Down Expand Up @@ -467,16 +453,23 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) {
// Create and set up the heartbeat channel
heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)
mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{

// Send first heartbeat showing RPC is behind
heartbeatChan <- entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 50,
OldestLedger: 1,
}, nil).Once()
mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
OldestLedger: 50,
}, nil)
}

// After a delay, send second heartbeat showing RPC has caught up
go func() {
time.Sleep(6 * time.Second) // Sleep longer than the service's 5 second wait
heartbeatChan <- entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
OldestLedger: 50,
}
}()

// Capture debug logs to verify waiting message
var logBuffer bytes.Buffer
Expand Down Expand Up @@ -510,20 +503,6 @@ func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) {
mockRPCService.On("GetTransactions", int64(100), "", 50).Return(mockResult, nil).Once()
mockAppTracker.On("CaptureMessage", mock.Anything).Maybe().Return(nil)

// Start the heartbeat goroutine
go func() {
for {
select {
case <-ctx.Done():
return
default:
time.Sleep(10 * time.Second)
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
}
}
}()

// Start ingestion at ledger 100 (ahead of RPC's initial position at 50)
err = ingestService.Run(ctx, uint32(100), uint32(100))
require.NoError(t, err)
Expand Down
7 changes: 5 additions & 2 deletions internal/services/rpc_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) {
rpcURL := "http://test-url-track-rpc-service-health"
rpcService, err := NewRPCService(rpcURL, mockHTTPClient)
require.NoError(t, err)
go rpcService.TrackRPCServiceHealth(ctx)

healthResult := entities.RPCGetHealthResult{
Status: "healthy",
LatestLedger: 100,
Expand All @@ -403,7 +403,10 @@ func TestTrackRPCServiceHealth_HealthyService(t *testing.T) {
}
}`))),
}
mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil)
mockHTTPClient.On("Post", rpcURL, "application/json", mock.Anything).Return(mockResponse, nil).Run(func(args mock.Arguments) {
cancel()
})
rpcService.TrackRPCServiceHealth(ctx)

// Get result from heartbeat channel
select {
Expand Down

0 comments on commit acbc8a0

Please sign in to comment.