Skip to content

Commit

Permalink
Centralize RPC health monitoring (#104)
Browse files Browse the repository at this point in the history
Move hearthbeat monitoring to rpc_service
  • Loading branch information
aditya1702 authored Jan 15, 2025
1 parent 6726c10 commit 9a2358c
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 213 deletions.
1 change: 1 addition & 0 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func setupDeps(cfg Configs) (services.IngestService, error) {
if err != nil {
return nil, fmt.Errorf("instantiating rpc service: %w", err)
}
go rpcService.TrackRPCServiceHealth(context.Background())
tssStore, err := tssstore.NewStore(dbConnectionPool)
if err != nil {
return nil, fmt.Errorf("instantiating tss store: %w", err)
Expand Down
1 change: 1 addition & 0 deletions internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err)
}
go rpcService.TrackRPCServiceHealth(context.Background())

accountService, err := services.NewAccountService(models)
if err != nil {
Expand Down
107 changes: 46 additions & 61 deletions internal/services/channel_account_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,75 +90,60 @@ func (s *channelAccountService) EnsureChannelAccounts(ctx context.Context, numbe
}

func (s *channelAccountService) submitCreateChannelAccountsOnChainTransaction(ctx context.Context, distributionAccountPublicKey string, ops []txnbuild.Operation) error {
err := waitForRPCServiceHealth(ctx, s.RPCService)
if err != nil {
return fmt.Errorf("rpc service did not become healthy: %w", err)
}

accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err)
}
rpcHeartbeatChannel := s.RPCService.GetHeartbeatChannel()
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err())
// The channel account creation goroutine will wait in the background for the rpc service to become healthy on startup.
// This lets the API server startup so that users can start interacting with the API which does not depend on RPC, instead of waiting till it becomes healthy.
case <-rpcHeartbeatChannel:
accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err)
}

tx, err := txnbuild.NewTransaction(
txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: distributionAccountPublicKey,
Sequence: accountSeq,
tx, err := txnbuild.NewTransaction(
txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: distributionAccountPublicKey,
Sequence: accountSeq,
},
IncrementSequenceNum: true,
Operations: ops,
BaseFee: s.BaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)},
},
IncrementSequenceNum: true,
Operations: ops,
BaseFee: s.BaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)},
},
)
if err != nil {
return fmt.Errorf("building transaction: %w", err)
}

signedTx, err := s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("signing transaction: %w", err)
}

hash, err := signedTx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase())
if err != nil {
return fmt.Errorf("getting transaction hash: %w", err)
}

signedTxXDR, err := signedTx.Base64()
if err != nil {
return fmt.Errorf("getting transaction envelope: %w", err)
}

err = s.submitTransaction(ctx, hash, signedTxXDR)
if err != nil {
return fmt.Errorf("submitting channel account transaction to rpc service: %w", err)
}
)
if err != nil {
return fmt.Errorf("building transaction: %w", err)
}

err = s.waitForTransactionConfirmation(ctx, hash)
if err != nil {
return fmt.Errorf("getting transaction status: %w", err)
}
signedTx, err := s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("signing transaction: %w", err)
}

return nil
}
hash, err := signedTx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase())
if err != nil {
return fmt.Errorf("getting transaction hash: %w", err)
}

func waitForRPCServiceHealth(ctx context.Context, rpcService RPCService) error {
// Create a cancellable context for the heartbeat goroutine, once rpc returns healthy status.
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
heartbeat := make(chan entities.RPCGetHealthResult, 1)
defer cancelHeartbeat()
signedTxXDR, err := signedTx.Base64()
if err != nil {
return fmt.Errorf("getting transaction envelope: %w", err)
}

go trackRPCServiceHealth(heartbeatCtx, heartbeat, nil, rpcService)
err = s.submitTransaction(ctx, hash, signedTxXDR)
if err != nil {
return fmt.Errorf("submitting channel account transaction to rpc service: %w", err)
}

for {
select {
case <-heartbeat:
return nil
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err())
err = s.waitForTransactionConfirmation(ctx, hash)
if err != nil {
return fmt.Errorf("getting transaction status: %w", err)
}

return nil
}
}

Expand Down
56 changes: 31 additions & 25 deletions internal/services/channel_account_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package services

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -30,6 +29,7 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
defer dbConnectionPool.Close()

ctx := context.Background()
heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
mockRPCService := RPCServiceMock{}
signatureClient := signing.SignatureClientMock{}
channelAccountStore := store.ChannelAccountStoreMock{}
Expand Down Expand Up @@ -103,6 +103,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil)

// Create and set up the heartbeat channel
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

mockRPCService.
On("GetAccountLedgerSequence", distributionAccount.Address()).
Expand Down Expand Up @@ -174,6 +179,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil)

// Create and set up the heartbeat channel
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

mockRPCService.
On("GetAccountLedgerSequence", distributionAccount.Address()).
Expand Down Expand Up @@ -223,10 +233,10 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
Return(network.TestNetworkPassphrase).
Once()
defer signatureClient.AssertExpectations(t)

mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil)
// Create and set up the heartbeat channel
heartbeatChan <- entities.RPCGetHealthResult{Status: "healthy"}
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

mockRPCService.
On("GetAccountLedgerSequence", distributionAccount.Address()).
Expand All @@ -251,33 +261,29 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
require.Error(t, err)
assert.Contains(t, err.Error(), "transaction failed")
})
}

func TestWaitForRPCServiceHealth(t *testing.T) {
mockRPCService := RPCServiceMock{}
ctx := context.Background()
t.Run("fails if rpc service is not healthy", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

t.Run("successful", func(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).
channelAccountStore.
On("Count", ctx).
Return(2, nil).
Once()
defer mockRPCService.AssertExpectations(t)

err := waitForRPCServiceHealth(ctx, &mockRPCService)
require.NoError(t, err)
})
defer channelAccountStore.AssertExpectations(t)

t.Run("context_cancelled", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
distributionAccount := keypair.MustRandom()
signatureClient.
On("GetAccountPublicKey", ctx).
Return(distributionAccount.Address(), nil).
Once()
defer signatureClient.AssertExpectations(t)

mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{}, fmt.Errorf("connection failed"))
heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)
defer mockRPCService.AssertExpectations(t)

err := waitForRPCServiceHealth(ctx, &mockRPCService)
err := s.EnsureChannelAccounts(ctx, 5)
require.Error(t, err)
assert.Contains(t, err.Error(), "context cancelled while waiting for rpc service to become healthy")
})
Expand Down
49 changes: 5 additions & 44 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
)

const (
rpcHealthCheckSleepTime = 5 * time.Second
rpcHealthCheckMaxWaitTime = 60 * time.Second
ingestHealthCheckMaxWaitTime = 90 * time.Second
)

Expand Down Expand Up @@ -79,12 +77,9 @@ func NewIngestService(
}

func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger uint32) error {
rpcHeartbeat := make(chan entities.RPCGetHealthResult, 1)
ingestHeartbeat := make(chan any, 1)

// Start service health trackers
go trackRPCServiceHealth(ctx, rpcHeartbeat, m.appTracker, m.rpcService)
go trackIngestServiceHealth(ctx, ingestHeartbeat, m.appTracker)
ingestHeartbeatChannel := make(chan any, 1)
rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel()
go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker)

if startLedger == 0 {
var err error
Expand All @@ -99,7 +94,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case resp := <-rpcHeartbeat:
case resp := <-rpcHeartbeatChannel:
switch {
// Case-1: wallet-backend is running behind rpc's oldest ledger. In this case, we start
// ingestion from rpc's oldest ledger.
Expand All @@ -120,7 +115,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u
log.Error("getTransactions: %w", err)
continue
}
ingestHeartbeat <- true
ingestHeartbeatChannel <- true
err = m.ingestPayments(ctx, ledgerTransactions)
if err != nil {
return fmt.Errorf("error ingesting payments: %w", err)
Expand Down Expand Up @@ -278,40 +273,6 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa
return nil
}

func trackRPCServiceHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker, rpcService RPCService) {
healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime)
warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime)
defer func() {
healthCheckTicker.Stop()
warningTicker.Stop()
close(heartbeat)
}()

for {
select {
case <-ctx.Done():
return
case <-warningTicker.C:
warn := fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime)
log.Warn(warn)
if tracker != nil {
tracker.CaptureMessage(warn)
} else {
log.Warn("App Tracker is nil")
}
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
case <-healthCheckTicker.C:
result, err := rpcService.GetHealth()
if err != nil {
log.Warnf("rpc health check failed: %v", err)
continue
}
heartbeat <- result
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
}
}
}

func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker apptracker.AppTracker) {
ticker := time.NewTicker(ingestHealthCheckMaxWaitTime)
defer func() {
Expand Down
Loading

0 comments on commit 9a2358c

Please sign in to comment.