Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralize RPC health monitoring #104

Merged
merged 16 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func setupDeps(cfg Configs) (services.IngestService, error) {
if err != nil {
return nil, fmt.Errorf("instantiating rpc service: %w", err)
}
go rpcService.TrackRPCServiceHealth(context.Background())
tssStore, err := tssstore.NewStore(dbConnectionPool)
if err != nil {
return nil, fmt.Errorf("instantiating tss store: %w", err)
Expand Down
1 change: 1 addition & 0 deletions internal/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) {
if err != nil {
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err)
}
go rpcService.TrackRPCServiceHealth(context.Background())

accountService, err := services.NewAccountService(models)
if err != nil {
Expand Down
106 changes: 45 additions & 61 deletions internal/services/channel_account_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,75 +90,59 @@ func (s *channelAccountService) EnsureChannelAccounts(ctx context.Context, numbe
}

func (s *channelAccountService) submitCreateChannelAccountsOnChainTransaction(ctx context.Context, distributionAccountPublicKey string, ops []txnbuild.Operation) error {
err := waitForRPCServiceHealth(ctx, s.RPCService)
if err != nil {
return fmt.Errorf("rpc service did not become healthy: %w", err)
}

accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err)
}
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err())
// The channel account creation goroutine will wait in the background for the rpc service to become healthy on startup.
// This lets the API server startup so that users can start interacting with the API which does not depend on RPC, instead of waiting till it becomes healthy.
case <-s.RPCService.GetHeartbeatChannel():
accountSeq, err := s.RPCService.GetAccountLedgerSequence(distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("getting ledger sequence for distribution account public key: %s: %w", distributionAccountPublicKey, err)
}

tx, err := txnbuild.NewTransaction(
txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: distributionAccountPublicKey,
Sequence: accountSeq,
tx, err := txnbuild.NewTransaction(
txnbuild.TransactionParams{
SourceAccount: &txnbuild.SimpleAccount{
AccountID: distributionAccountPublicKey,
Sequence: accountSeq,
},
IncrementSequenceNum: true,
Operations: ops,
BaseFee: s.BaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)},
},
IncrementSequenceNum: true,
Operations: ops,
BaseFee: s.BaseFee,
Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(300)},
},
)
if err != nil {
return fmt.Errorf("building transaction: %w", err)
}

signedTx, err := s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("signing transaction: %w", err)
}

hash, err := signedTx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase())
if err != nil {
return fmt.Errorf("getting transaction hash: %w", err)
}

signedTxXDR, err := signedTx.Base64()
if err != nil {
return fmt.Errorf("getting transaction envelope: %w", err)
}

err = s.submitTransaction(ctx, hash, signedTxXDR)
if err != nil {
return fmt.Errorf("submitting channel account transaction to rpc service: %w", err)
}
)
if err != nil {
return fmt.Errorf("building transaction: %w", err)
}

err = s.waitForTransactionConfirmation(ctx, hash)
if err != nil {
return fmt.Errorf("getting transaction status: %w", err)
}
signedTx, err := s.DistributionAccountSignatureClient.SignStellarTransaction(ctx, tx, distributionAccountPublicKey)
if err != nil {
return fmt.Errorf("signing transaction: %w", err)
}

return nil
}
hash, err := signedTx.HashHex(s.DistributionAccountSignatureClient.NetworkPassphrase())
if err != nil {
return fmt.Errorf("getting transaction hash: %w", err)
}

func waitForRPCServiceHealth(ctx context.Context, rpcService RPCService) error {
// Create a cancellable context for the heartbeat goroutine, once rpc returns healthy status.
heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
heartbeat := make(chan entities.RPCGetHealthResult, 1)
defer cancelHeartbeat()
signedTxXDR, err := signedTx.Base64()
if err != nil {
return fmt.Errorf("getting transaction envelope: %w", err)
}

go trackRPCServiceHealth(heartbeatCtx, heartbeat, nil, rpcService)
err = s.submitTransaction(ctx, hash, signedTxXDR)
if err != nil {
return fmt.Errorf("submitting channel account transaction to rpc service: %w", err)
}

for {
select {
case <-heartbeat:
return nil
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for rpc service to become healthy: %w", ctx.Err())
err = s.waitForTransactionConfirmation(ctx, hash)
if err != nil {
return fmt.Errorf("getting transaction status: %w", err)
}

return nil
}
}

Expand Down
48 changes: 16 additions & 32 deletions internal/services/channel_account_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package services

import (
"context"
"fmt"
"testing"
"time"

"github.com/stellar/go/keypair"
"github.com/stellar/go/network"
Expand All @@ -30,6 +28,7 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
defer dbConnectionPool.Close()

ctx := context.Background()
heartbeatChan := make(chan entities.RPCGetHealthResult, 1)
mockRPCService := RPCServiceMock{}
signatureClient := signing.SignatureClientMock{}
channelAccountStore := store.ChannelAccountStoreMock{}
Expand Down Expand Up @@ -103,6 +102,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil)

// Create and set up the heartbeat channel
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

mockRPCService.
On("GetAccountLedgerSequence", distributionAccount.Address()).
Expand Down Expand Up @@ -174,6 +178,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil)

// Create and set up the heartbeat channel
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

mockRPCService.
On("GetAccountLedgerSequence", distributionAccount.Address()).
Expand Down Expand Up @@ -227,6 +236,11 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil)

// Create and set up the heartbeat channel
health, _ := mockRPCService.GetHealth()
heartbeatChan <- health
mockRPCService.On("GetHeartbeatChannel").Return(heartbeatChan)

mockRPCService.
On("GetAccountLedgerSequence", distributionAccount.Address()).
Expand All @@ -253,36 +267,6 @@ func TestChannelAccountServiceEnsureChannelAccounts(t *testing.T) {
})
}

func TestWaitForRPCServiceHealth(t *testing.T) {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
mockRPCService := RPCServiceMock{}
ctx := context.Background()

t.Run("successful", func(t *testing.T) {
mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{Status: "healthy"}, nil).
Once()
defer mockRPCService.AssertExpectations(t)

err := waitForRPCServiceHealth(ctx, &mockRPCService)
require.NoError(t, err)
})

t.Run("context_cancelled", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

mockRPCService.
On("GetHealth").
Return(entities.RPCGetHealthResult{}, fmt.Errorf("connection failed"))
defer mockRPCService.AssertExpectations(t)

err := waitForRPCServiceHealth(ctx, &mockRPCService)
require.Error(t, err)
assert.Contains(t, err.Error(), "context cancelled while waiting for rpc service to become healthy")
})
}

func TestSubmitTransaction(t *testing.T) {
dbt := dbtest.Open(t)
defer dbt.Close()
Expand Down
49 changes: 5 additions & 44 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
)

const (
rpcHealthCheckSleepTime = 5 * time.Second
rpcHealthCheckMaxWaitTime = 60 * time.Second
ingestHealthCheckMaxWaitTime = 90 * time.Second
)

Expand Down Expand Up @@ -79,12 +77,9 @@ func NewIngestService(
}

func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger uint32) error {
rpcHeartbeat := make(chan entities.RPCGetHealthResult, 1)
ingestHeartbeat := make(chan any, 1)

// Start service health trackers
go trackRPCServiceHealth(ctx, rpcHeartbeat, m.appTracker, m.rpcService)
go trackIngestServiceHealth(ctx, ingestHeartbeat, m.appTracker)
ingestHeartbeatChannel := make(chan any, 1)
rpcHeartbeatChannel := m.rpcService.GetHeartbeatChannel()
go trackIngestServiceHealth(ctx, ingestHeartbeatChannel, m.appTracker)

if startLedger == 0 {
var err error
Expand All @@ -99,7 +94,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case resp := <-rpcHeartbeat:
case resp := <-rpcHeartbeatChannel:
switch {
// Case-1: wallet-backend is running behind rpc's oldest ledger. In this case, we start
// ingestion from rpc's oldest ledger.
Expand All @@ -120,7 +115,7 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u
log.Error("getTransactions: %w", err)
continue
}
ingestHeartbeat <- true
ingestHeartbeatChannel <- true
err = m.ingestPayments(ctx, ledgerTransactions)
if err != nil {
return fmt.Errorf("error ingesting payments: %w", err)
Expand Down Expand Up @@ -278,40 +273,6 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa
return nil
}

func trackRPCServiceHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker, rpcService RPCService) {
healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime)
warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime)
defer func() {
healthCheckTicker.Stop()
warningTicker.Stop()
close(heartbeat)
}()

for {
select {
case <-ctx.Done():
return
case <-warningTicker.C:
warn := fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime)
log.Warn(warn)
if tracker != nil {
tracker.CaptureMessage(warn)
} else {
log.Warn("App Tracker is nil")
}
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
case <-healthCheckTicker.C:
result, err := rpcService.GetHealth()
if err != nil {
log.Warnf("rpc health check failed: %v", err)
continue
}
heartbeat <- result
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
}
}
}

func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker apptracker.AppTracker) {
ticker := time.NewTicker(ingestHealthCheckMaxWaitTime)
defer func() {
Expand Down
Loading
Loading