From 717bb4762ff640b60901c3f2c5d3e8e5c80007f2 Mon Sep 17 00:00:00 2001 From: Gui Iribarren Date: Wed, 21 Aug 2024 10:21:49 +0200 Subject: [PATCH] indexer: reindex blocks and transactions after migrations indexer: new method ReindexBlocks indexerdb: make CreateBlock and CreateTransaction an UPSERT --- vochain/indexer/block.go | 14 ++++ vochain/indexer/db/blocks.sql.go | 6 ++ vochain/indexer/db/transactions.sql.go | 8 ++ vochain/indexer/indexer.go | 102 +++++++++++++++++++++++ vochain/indexer/queries/blocks.sql | 8 +- vochain/indexer/queries/transactions.sql | 11 ++- vochain/indexer/transaction.go | 8 +- 7 files changed, 153 insertions(+), 4 deletions(-) diff --git a/vochain/indexer/block.go b/vochain/indexer/block.go index 5d2118ef4..c6021e530 100644 --- a/vochain/indexer/block.go +++ b/vochain/indexer/block.go @@ -76,3 +76,17 @@ func (idx *Indexer) BlockList(limit, offset int, chainID, hash, proposerAddress } return list, uint64(results[0].TotalCount), nil } + +// CountBlocks returns how many blocks are indexed. +func (idx *Indexer) CountBlocks() (uint64, error) { + results, err := idx.readOnlyQuery.SearchBlocks(context.TODO(), indexerdb.SearchBlocksParams{ + Limit: 1, + }) + if err != nil { + return 0, err + } + if len(results) == 0 { + return 0, nil + } + return uint64(results[0].TotalCount), nil +} diff --git a/vochain/indexer/db/blocks.sql.go b/vochain/indexer/db/blocks.sql.go index e634a10b4..669f59602 100644 --- a/vochain/indexer/db/blocks.sql.go +++ b/vochain/indexer/db/blocks.sql.go @@ -17,6 +17,12 @@ INSERT INTO blocks( ) VALUES ( ?, ?, ?, ?, ?, ? ) +ON CONFLICT(height) DO UPDATE +SET chain_id = excluded.chain_id, + time = excluded.time, + hash = excluded.hash, + proposer_address = excluded.proposer_address, + last_block_hash = excluded.last_block_hash ` type CreateBlockParams struct { diff --git a/vochain/indexer/db/transactions.sql.go b/vochain/indexer/db/transactions.sql.go index dffac5013..7fabaa528 100644 --- a/vochain/indexer/db/transactions.sql.go +++ b/vochain/indexer/db/transactions.sql.go @@ -41,6 +41,14 @@ INSERT INTO transactions ( ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? ) +ON CONFLICT(hash) DO UPDATE +SET block_height = excluded.block_height, + block_index = excluded.block_index, + type = excluded.type, + subtype = excluded.subtype, + raw_tx = excluded.raw_tx, + signature = excluded.signature, + signer = excluded.signer ` type CreateTransactionParams struct { diff --git a/vochain/indexer/indexer.go b/vochain/indexer/indexer.go index d66ce1f07..b764c8812 100644 --- a/vochain/indexer/indexer.go +++ b/vochain/indexer/indexer.go @@ -1,6 +1,7 @@ package indexer import ( + "bytes" "context" "database/sql" "embed" @@ -172,6 +173,12 @@ func (idx *Indexer) startDB() error { } goose.SetLogger(log.GooseLogger()) goose.SetBaseFS(embedMigrations) + + if gooseMigrationsPending(idx.readWriteDB, "migrations") { + log.Info("indexer db needs migration, scheduling a reindex after sync") + go idx.ReindexBlocks(false) + } + if err := goose.Up(idx.readWriteDB, "migrations"); err != nil { return fmt.Errorf("goose up: %w", err) } @@ -249,6 +256,27 @@ func (idx *Indexer) RestoreBackup(path string) error { return nil } +func gooseMigrationsPending(db *sql.DB, dir string) bool { + // Get the latest applied migration version + currentVersion, err := goose.GetDBVersion(db) + if err != nil { + log.Errorf("failed to get current database version: %v", err) + return false + } + + // Collect migrations after the current version + migrations, err := goose.CollectMigrations(dir, currentVersion, goose.MaxVersion) + if err != nil { + if errors.Is(err, goose.ErrNoMigrationFiles) { + return false + } + log.Errorf("failed to collect migrations: %v", err) + return false + } + + return len(migrations) > 0 +} + // SaveBackup backs up the database to a file on disk. // Note that writes to the database may be blocked until the backup finishes, // and an error may occur if a file at path already exists. @@ -402,6 +430,80 @@ func (idx *Indexer) AfterSyncBootstrap(inTest bool) { log.Infof("live results recovery computation finished, took %s", time.Since(startTime)) } +// ReindexBlocks reindexes all blocks found in blockstore +func (idx *Indexer) ReindexBlocks(inTest bool) { + if !inTest { + <-idx.App.WaitUntilSynced() + } + + // Note that holding blockMu means new votes aren't added until the reindex finishes. + idx.blockMu.Lock() + defer idx.blockMu.Unlock() + + if idx.App.Node == nil || idx.App.Node.BlockStore() == nil { + return + } + + idxBlockCount, err := idx.CountBlocks() + if err != nil { + log.Warnf("indexer CountBlocks returned error: %s", err) + } + log.Infow("start reindexing", + "blockStoreBase", idx.App.Node.BlockStore().Base(), + "blockStoreHeight", idx.App.Node.BlockStore().Height(), + "indexerBlockCount", idxBlockCount, + ) + queries := idx.blockTxQueries() + for height := idx.App.Node.BlockStore().Base(); height <= idx.App.Node.BlockStore().Height(); height++ { + if b := idx.App.GetBlockByHeight(int64(height)); b != nil { + // Blocks + func() { + idxBlock, err := idx.readOnlyQuery.GetBlockByHeight(context.TODO(), b.Height) + if err == nil && idxBlock.Time != b.Time { + log.Errorf("while reindexing blocks, block %d timestamp in db (%s) differs from blockstore (%s), leaving untouched", height, idxBlock.Time, b.Time) + return + } + if _, err := queries.CreateBlock(context.TODO(), indexerdb.CreateBlockParams{ + ChainID: b.ChainID, + Height: b.Height, + Time: b.Time, + Hash: nonNullBytes(b.Hash()), + ProposerAddress: nonNullBytes(b.ProposerAddress), + LastBlockHash: nonNullBytes(b.LastBlockID.Hash), + }); err != nil { + log.Errorw(err, "cannot index new block") + } + }() + + // Transactions + func() { + for index, tx := range b.Data.Txs { + idxTx, err := idx.readOnlyQuery.GetTransactionByHeightAndIndex(context.TODO(), indexerdb.GetTransactionByHeightAndIndexParams{ + BlockHeight: b.Height, + BlockIndex: int64(index), + }) + if err == nil && !bytes.Equal(idxTx.Hash, tx.Hash()) { + log.Errorf("while reindexing txs, tx %d/%d hash in db (%x) differs from blockstore (%x), leaving untouched", b.Height, index, idxTx.Hash, tx.Hash()) + return + } + vtx := new(vochaintx.Tx) + if err := vtx.Unmarshal(tx, b.ChainID); err != nil { + log.Errorw(err, fmt.Sprintf("cannot unmarshal tx %d/%d", b.Height, index)) + continue + } + idx.indexTx(vtx, uint32(b.Height), int32(index)) + } + }() + } + } + + log.Infow("finished reindexing", + "blockStoreBase", idx.App.Node.BlockStore().Base(), + "blockStoreHeight", idx.App.Node.BlockStore().Height(), + "indexerBlockCount", idxBlockCount, + ) +} + // Commit is called by the APP when a block is confirmed and included into the chain func (idx *Indexer) Commit(height uint32) error { idx.blockMu.Lock() diff --git a/vochain/indexer/queries/blocks.sql b/vochain/indexer/queries/blocks.sql index 656de4e21..d15332cdf 100644 --- a/vochain/indexer/queries/blocks.sql +++ b/vochain/indexer/queries/blocks.sql @@ -3,7 +3,13 @@ INSERT INTO blocks( chain_id, height, time, hash, proposer_address, last_block_hash ) VALUES ( ?, ?, ?, ?, ?, ? -); +) +ON CONFLICT(height) DO UPDATE +SET chain_id = excluded.chain_id, + time = excluded.time, + hash = excluded.hash, + proposer_address = excluded.proposer_address, + last_block_hash = excluded.last_block_hash; -- name: GetBlockByHeight :one SELECT * FROM blocks diff --git a/vochain/indexer/queries/transactions.sql b/vochain/indexer/queries/transactions.sql index 4384cd32d..c9c152482 100644 --- a/vochain/indexer/queries/transactions.sql +++ b/vochain/indexer/queries/transactions.sql @@ -3,7 +3,16 @@ INSERT INTO transactions ( hash, block_height, block_index, type, subtype, raw_tx, signature, signer ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ? -); +) +ON CONFLICT(hash) DO UPDATE +SET block_height = excluded.block_height, + block_index = excluded.block_index, + type = excluded.type, + subtype = excluded.subtype, + raw_tx = excluded.raw_tx, + signature = excluded.signature, + signer = excluded.signer; + -- name: GetTransactionByHash :one SELECT * FROM transactions diff --git a/vochain/indexer/transaction.go b/vochain/indexer/transaction.go index c965aa412..31beb4465 100644 --- a/vochain/indexer/transaction.go +++ b/vochain/indexer/transaction.go @@ -94,9 +94,13 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) idx.blockMu.Lock() defer idx.blockMu.Unlock() + idx.indexTx(tx, blockHeight, txIndex) +} + +func (idx *Indexer) indexTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) { rawtx, err := proto.Marshal(tx.Tx) if err != nil { - log.Errorw(err, "indexer cannot marshal new transaction") + log.Errorw(err, "indexer cannot marshal transaction") return } @@ -121,6 +125,6 @@ func (idx *Indexer) OnNewTx(tx *vochaintx.Tx, blockHeight uint32, txIndex int32) Signature: nonNullBytes(tx.Signature), Signer: nonNullBytes(signer), }); err != nil { - log.Errorw(err, "cannot index new transaction") + log.Errorw(err, "cannot index transaction") } }