diff --git a/sequencer/batchbuilder/batchbuilder.go b/sequencer/batchbuilder/batchbuilder.go index 536603b..ccccb49 100644 --- a/sequencer/batchbuilder/batchbuilder.go +++ b/sequencer/batchbuilder/batchbuilder.go @@ -63,3 +63,18 @@ func (bb *BatchBuilder) Reset(batchNum common.BatchNum, fromSynchronizer bool) e func (bb *BatchBuilder) LocalStateDB() *statedb.LocalStateDB { return bb.localStateDB } + +// BuildBatch takes the transactions and returns the common.ZKInputs of the next batch +func (bb *BatchBuilder) BuildBatch( + configBatch *ConfigBatch, + l1usertxs []common.L1Tx, +) (*common.ZKInputs, error) { + bbStateDB := bb.localStateDB.StateDB + tp := txprocessor.NewTxProcessor(bbStateDB, configBatch.TxProcessorConfig) + + ptOut, err := tp.ProcessTxs(l1usertxs) + if err != nil { + return nil, common.Wrap(err) + } + return ptOut.ZKInputs, nil +} diff --git a/sequencer/coordinator/batch.go b/sequencer/coordinator/batch.go index 9631369..4da49d9 100644 --- a/sequencer/coordinator/batch.go +++ b/sequencer/coordinator/batch.go @@ -76,24 +76,24 @@ type Debug struct { // BatchInfo contans the Batch information type BatchInfo struct { - PipelineNum int - BatchNum common.BatchNum - ServerProof prover.Client - ProofStart time.Time - ZKInputs *common.ZKInputs - Proof *prover.Proof - PublicInputs []*big.Int - L1Batch bool - VerifierIdx uint8 - L1UserTxs []common.L1Tx - L1CoordTxs []common.L1Tx + PipelineNum int + BatchNum common.BatchNum + ServerProof prover.Client + ProofStart time.Time + ZKInputs *common.ZKInputs + Proof *prover.Proof + PublicInputs []*big.Int + // L1Batch bool + VerifierIdx uint8 + L1UserTxs []common.L1Tx + // L1CoordTxs []common.L1Tx L1CoordinatorTxsAuths [][]byte - L2Txs []common.L2Tx - CoordIdxs []common.AccountIdx - ForgeBatchArgs *eth.RollupForgeBatchArgs - Auth *bind.TransactOpts `json:"-"` - EthTxs []*types.Transaction - EthTxsErrs []error + // L2Txs []common.L2Tx + // CoordIdxs []common.AccountIdx + ForgeBatchArgs *eth.RollupForgeBatchArgs + Auth *bind.TransactOpts `json:"-"` + EthTxs []*types.Transaction + EthTxsErrs []error // SendTimestamp the time of batch sent to ethereum SendTimestamp time.Time Receipt *types.Receipt diff --git a/sequencer/coordinator/pipeline.go b/sequencer/coordinator/pipeline.go index cdaea1b..363877a 100644 --- a/sequencer/coordinator/pipeline.go +++ b/sequencer/coordinator/pipeline.go @@ -2,6 +2,7 @@ package coordinator import ( "context" + "database/sql" "fmt" "sync" "time" @@ -12,8 +13,6 @@ import ( "tokamak-sybil-resistance/log" "tokamak-sybil-resistance/synchronizer" "tokamak-sybil-resistance/txselector" - - "github.com/hermeznetwork/tracerr" ) type statsVars struct { @@ -25,7 +24,6 @@ type state struct { batchNum common.BatchNum lastScheduledL1BatchBlockNum int64 lastForgeL1TxsNum int64 - // lastSlotForged int64 } // Pipeline manages the forging of batches with parallel server proofs @@ -68,7 +66,6 @@ func (p *Pipeline) reset( batchNum: batchNum, lastForgeL1TxsNum: stats.Sync.LastForgeL1TxsNum, lastScheduledL1BatchBlockNum: 0, - // lastSlotForged: -1, } p.stats = *stats p.vars = *vars @@ -98,43 +95,43 @@ func (p *Pipeline) reset( return common.Wrap(err) } - // TODO: implement + // TODO: discuss if it's necessary to check all the roots or just one is enough // After reset, check that if the batch exists in the historyDB, the // stateRoot matches with the local one, if not, force a reset from // synchronizer - // batch, err := p.historyDB.GetBatch(p.state.batchNum) - // if common.Unwrap(err) == sql.ErrNoRows { - // // nothing to do - // } else if err != nil { - // return common.Wrap(err) - // } else { - // localStateAccountRoot := p.batchBuilder.LocalStateDB().AccountTree.Root().BigInt() - // localStateScoreRoot := p.batchBuilder.LocalStateDB().ScoreTree.Root().BigInt() - // localStateVouchRoot := p.batchBuilder.LocalStateDB().VouchTree.Root().BigInt() - - // if batch.AccountStateRoot.Cmp(localStateAccountRoot) != 0 || - // batch.ScoreStateRoot.Cmp(localStateScoreRoot) != 0 || - // batch.VouchStateRoot.Cmp(localStateVouchRoot) != 0 { - // log.Debugw( - // "local state roots didn't match the historydb state roots:\n"+ - // "localStateAccountRoot: %v vs historydb stateAccountRoot: %v \n"+ - // "localStateVouchRoot: %v vs historydb stateVouchRoot: %v \n"+ - // "localStateScoreRoot: %v vs historydb stateScoreRoot: %v \n"+ - // "Forcing reset from Synchronizer", - // localStateAccountRoot, batch.AccountStateRoot, - // localStateVouchRoot, batch.VouchStateRoot, - // localStateScoreRoot, batch.ScoreStateRoot, - // ) - // // StateRoots from synchronizer doesn't match StateRoots - // // from batchBuilder, force a reset from synchronizer - // if err := p.txSelector.Reset(p.state.batchNum, true); err != nil { - // return common.Wrap(err) - // } - // if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil { - // return common.Wrap(err) - // } - // } - // } + batch, err := p.historyDB.GetBatch(p.state.batchNum) + if common.Unwrap(err) == sql.ErrNoRows { + // nothing to do + } else if err != nil { + return common.Wrap(err) + } else { + localStateAccountRoot := p.batchBuilder.LocalStateDB().AccountTree.Root().BigInt() + localStateScoreRoot := p.batchBuilder.LocalStateDB().ScoreTree.Root().BigInt() + localStateVouchRoot := p.batchBuilder.LocalStateDB().VouchTree.Root().BigInt() + + if batch.AccountRoot.Cmp(localStateAccountRoot) != 0 || + batch.ScoreRoot.Cmp(localStateScoreRoot) != 0 || + batch.VouchRoot.Cmp(localStateVouchRoot) != 0 { + log.Debugw( + "local state roots didn't match the historydb state roots:\n"+ + "localStateAccountRoot: %v vs historydb stateAccountRoot: %v \n"+ + "localStateVouchRoot: %v vs historydb stateVouchRoot: %v \n"+ + "localStateScoreRoot: %v vs historydb stateScoreRoot: %v \n"+ + "Forcing reset from Synchronizer", + localStateAccountRoot, batch.AccountRoot, + localStateVouchRoot, batch.VouchRoot, + localStateScoreRoot, batch.ScoreRoot, + ) + // StateRoots from synchronizer doesn't match StateRoots + // from batchBuilder, force a reset from synchronizer + if err := p.txSelector.Reset(p.state.batchNum, true); err != nil { + return common.Wrap(err) + } + if err := p.batchBuilder.Reset(p.state.batchNum, true); err != nil { + return common.Wrap(err) + } + } + } return nil } @@ -155,12 +152,11 @@ func (p *Pipeline) handleForgeBatch( ctx context.Context, batchNum common.BatchNum, ) (batchInfo *BatchInfo, err error) { - // 1. Wait for an available serverProof (blocking call) if ctx.Err() != nil { return nil, ctx.Err() } - // 2. Forge the batch internally (make a selection of txs and prepare + // Forge the batch internally (make a selection of txs and prepare // all the smart contract arguments) var skipReason *string batchInfo, skipReason, err = p.forgeBatch(batchNum) @@ -174,7 +170,7 @@ func (p *Pipeline) handleForgeBatch( return nil, common.Wrap(errSkipBatchByPolicy) } - // 3. Send the ZKInputs to the proof server + // Send the ZKInputs to the proof server batchInfo.ServerProof = p.prover batchInfo.ProofStart = time.Now() if err := p.sendServerProof(ctx, batchInfo); ctx.Err() != nil { @@ -188,12 +184,12 @@ func (p *Pipeline) handleForgeBatch( // sendServerProof sends the circuit inputs to the proof server func (p *Pipeline) sendServerProof(ctx context.Context, batchInfo *BatchInfo) error { - // p.cfg.debugBatchStore(batchInfo) + p.cfg.debugBatchStore(batchInfo) - // Call the selected idle server proof with BatchBuilder output, + // Call the server proof with BatchBuilder output, // save server proof info for batchNum if err := batchInfo.ServerProof.CalculateProof(ctx, batchInfo.ZKInputs); err != nil { - return tracerr.Wrap(err) + return common.Wrap(err) } return nil } @@ -204,138 +200,101 @@ func (p *Pipeline) forgeBatch(batchNum common.BatchNum) ( skipReason *string, err error, ) { - // TODO: investigate if we need this for L2Txs - // remove transactions from the pool that have been there for too long - // _, err = p.purger.InvalidateMaybe( - // p.l2DB, - // p.txSelector.LocalAccountsDB(), - // p.stats.Sync.LastBlock.Num, - // int64(batchNum), - // ) - // if err != nil { - // return nil, nil, tracerr.Wrap(err) - // } - // _, err = p.purger.PurgeMaybe(p.l2DB, p.stats.Sync.LastBlock.Num, int64(batchNum)) - // if err != nil { - // return nil, nil, tracerr.Wrap(err) - // } - // Structure to accumulate data and metadata of the batch now := time.Now() batchInfo = &BatchInfo{PipelineNum: p.num, BatchNum: batchNum} batchInfo.Debug.StartTimestamp = now batchInfo.Debug.StartBlockNum = p.stats.Eth.LastBlock.Num + 1 - // var poolL2Txs []common.PoolL2Tx - // var discardedL2Txs []common.PoolL2Tx - var l1UserTxs, l1CoordTxs []common.L1Tx + var l1UserTxs []common.L1Tx var auths [][]byte - var coordIdxs []common.AccountIdx - // 1. Decide if we forge L2Tx or L1+L2Tx - // if p.shouldL1L2Batch(batchInfo) { - batchInfo.L1Batch = true - // 2a: L1+L2 txs _l1UserTxs, err := p.historyDB.GetUnforgedL1UserTxs(p.state.lastForgeL1TxsNum + 1) if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, common.Wrap(err) } // l1UserFutureTxs are the l1UserTxs that are not being forged // in the next batch, but that are also in the queue for the // future batches l1UserFutureTxs, err := p.historyDB.GetUnforgedL1UserFutureTxs(p.state.lastForgeL1TxsNum + 1) if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, common.Wrap(err) } - coordIdxs, auths, l1UserTxs, l1CoordTxs, poolL2Txs, discardedL2Txs, err = - p.txSelector.GetL1L2TxSelection(p.cfg.TxProcessorConfig, _l1UserTxs, l1UserFutureTxs) + // TODO: figure out what happens here and potentially remove txSelector + auths, l1UserTxs, err = + p.txSelector.GetL1TxSelection(p.cfg.TxProcessorConfig, _l1UserTxs, l1UserFutureTxs) if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, common.Wrap(err) } - // } - // else { - // // get l1UserFutureTxs which are all the l1 pending in all the - // // queues - // l1UserFutureTxs, err := p.historyDB.GetUnforgedL1UserFutureTxs(p.state.lastForgeL1TxsNum) //nolint:gomnd - // if err != nil { - // return nil, nil, tracerr.Wrap(err) - // } - - // // 2b: only L2 txs - // coordIdxs, auths, l1CoordTxs, poolL2Txs, discardedL2Txs, err = - // p.txSelector.GetL2TxSelection(p.cfg.TxProcessorConfig, l1UserFutureTxs) - // if err != nil { - // return nil, nil, tracerr.Wrap(err) - // } - // l1UserTxs = nil - // } - if skip, reason, err := p.forgePolicySkipPostSelection(now, - l1UserTxs, l1CoordTxs, poolL2Txs, batchInfo); err != nil { - return nil, nil, tracerr.Wrap(err) + // TODO: depending on what's happening in txSelector, this might not be necessary as well + if skip, reason, err := p.forgePolicySkipPostSelection(now, l1UserTxs, batchInfo); err != nil { + return nil, nil, common.Wrap(err) } else if skip { if err := p.txSelector.Reset(batchInfo.BatchNum-1, false); err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, common.Wrap(err) } - return nil, &reason, tracerr.Wrap(err) + return nil, &reason, common.Wrap(err) } - if batchInfo.L1Batch { - p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 - p.state.lastForgeL1TxsNum++ - } + p.state.lastScheduledL1BatchBlockNum = p.stats.Eth.LastBlock.Num + 1 + p.state.lastForgeL1TxsNum++ - // 3. Save metadata from TxSelector output for BatchNum + // Save metadata from TxSelector output for BatchNum batchInfo.L1UserTxs = l1UserTxs - batchInfo.L1CoordTxs = l1CoordTxs batchInfo.L1CoordinatorTxsAuths = auths - batchInfo.CoordIdxs = coordIdxs - batchInfo.VerifierIdx = p.cfg.VerifierIdx - if err := p.l2DB.StartForging(common.TxIDsFromPoolL2Txs(poolL2Txs), - batchInfo.BatchNum); err != nil { - return nil, nil, tracerr.Wrap(err) - } - if err := p.l2DB.UpdateTxsInfo(discardedL2Txs, batchInfo.BatchNum); err != nil { - return nil, nil, tracerr.Wrap(err) - } - - // Invalidate transactions that become invalid because of - // the poolL2Txs selected. Will mark as invalid the txs that have a - // (fromIdx, nonce) which already appears in the selected txs (includes - // all the nonces smaller than the current one) - err = p.l2DB.InvalidateOldNonces(idxsNonceFromPoolL2Txs(poolL2Txs), batchInfo.BatchNum) - if err != nil { - return nil, nil, tracerr.Wrap(err) - } - - // 4. Call BatchBuilder with TxSelector output + // Call BatchBuilder with TxSelector output configBatch := &batchbuilder.ConfigBatch{ TxProcessorConfig: p.cfg.TxProcessorConfig, } - zkInputs, err := p.batchBuilder.BuildBatch(coordIdxs, configBatch, l1UserTxs, - l1CoordTxs, poolL2Txs) - if err != nil { - return nil, nil, tracerr.Wrap(err) - } - l2Txs, err := common.PoolL2TxsToL2Txs(poolL2Txs) // NOTE: This is a big uggly, find a better way + zkInputs, err := p.batchBuilder.BuildBatch(configBatch, l1UserTxs) if err != nil { - return nil, nil, tracerr.Wrap(err) + return nil, nil, common.Wrap(err) } - batchInfo.L2Txs = l2Txs - // 5. Save metadata from BatchBuilder output for BatchNum + // Save metadata from BatchBuilder output for BatchNum batchInfo.ZKInputs = zkInputs batchInfo.Debug.Status = StatusForged p.cfg.debugBatchStore(batchInfo) log.Infow("Pipeline: batch forged internally", "batch", batchInfo.BatchNum) - p.state.lastSlotForged = p.stats.Sync.Auction.CurrentSlot.SlotNum - return batchInfo, nil, nil } +// forgePolicySkipPostSelection is called after doing a tx selection in a batch to +// determine by policy if we should forge the batch or not. Returns true and +// the reason when the forging of the batch must be skipped. +func (p *Pipeline) forgePolicySkipPostSelection( + now time.Time, + l1UserTxsExtra []common.L1Tx, + batchInfo *BatchInfo, +) (bool, string, error) { + pendingTxs := true + if len(l1UserTxsExtra) == 0 { + // Query the number of unforged L1UserTxs + // (either in a open queue or in a frozen + // not-yet-forged queue). + count, err := p.historyDB.GetUnforgedL1UserTxsCount() + if err != nil { + return false, "", err + } + // If there are future L1UserTxs, we forge a + // batch to advance the queues to be able to + // forge the L1UserTxs in the future. + // Otherwise, skip. + if count == 0 { + pendingTxs = false + } + } + + if pendingTxs { + return false, "", nil + } + return true, "no pending txs", nil +} + func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) { p.rw.Lock() defer p.rw.Unlock() @@ -345,6 +304,7 @@ func (p *Pipeline) setErrAtBatchNum(batchNum common.BatchNum) { // TODO: implement // waitServerProof gets the generated zkProof & sends it to the SmartContract func (p *Pipeline) waitServerProof(ctx context.Context, batchInfo *BatchInfo) error { + // TODO: implement prometheus metrics // defer metric.MeasureDuration(metric.WaitServerProof, batchInfo.ProofStart, // batchInfo.BatchNum.BigInt().String(), strconv.Itoa(batchInfo.PipelineNum)) @@ -404,10 +364,8 @@ func (p *Pipeline) Start( batchNum = p.state.batchNum + 1 batchInfo, err := p.handleForgeBatch(p.ctx, batchNum) if p.ctx.Err() != nil { - // p.revertPoolChanges(batchNum) continue } else if common.Unwrap(err) == errSkipBatchByPolicy { - // p.revertPoolChanges(batchNum) continue } else if err != nil { p.setErrAtBatchNum(batchNum) @@ -416,7 +374,6 @@ func (p *Pipeline) Start( "Pipeline.handleForgBatch: %v", err), FailedBatchNum: batchNum, }) - // p.revertPoolChanges(batchNum) continue } p.lastForgeTime = time.Now() @@ -448,12 +405,10 @@ func (p *Pipeline) Start( // batches because there's been an error and we // wait for the pipeline to be stopped. if p.getErrAtBatchNum() != 0 { - // p.revertPoolChanges(batchNum) return } err := p.waitServerProof(p.ctx, batchInfo) if p.ctx.Err() != nil { - // p.revertPoolChanges(batchNum) return } else if err != nil { log.Errorw("waitServerProof", "err", err) @@ -463,11 +418,8 @@ func (p *Pipeline) Start( "Pipeline.waitServerProof: %v", err), FailedBatchNum: batchInfo.BatchNum, }) - // p.revertPoolChanges(batchNum) return } - // We are done with this serverProof, add it back to the pool - // p.proversPool.Add(p.ctx, batchInfo.ServerProof) p.txManager.AddBatch(p.ctx, batchInfo) }(p, batchInfo, batchNum) } diff --git a/sequencer/coordinator/txmanager.go b/sequencer/coordinator/txmanager.go index 9fccfe7..3a81283 100644 --- a/sequencer/coordinator/txmanager.go +++ b/sequencer/coordinator/txmanager.go @@ -320,12 +320,12 @@ func (t *TxManager) mustL1L2Batch(blockNum int64) bool { func (t *TxManager) shouldSendRollupForgeBatch(batchInfo *BatchInfo) error { nextBlock := t.stats.Eth.LastBlock.Num + 1 - if t.mustL1L2Batch(nextBlock) && !batchInfo.L1Batch { + if t.mustL1L2Batch(nextBlock) /* && !batchInfo.L1Batch*/ { return fmt.Errorf("can't forge non-L1Batch in the next block: %v", nextBlock) } margin := t.cfg.SendBatchBlocksMarginCheck if margin != 0 { - if t.mustL1L2Batch(nextBlock+margin) && !batchInfo.L1Batch { + if t.mustL1L2Batch(nextBlock + margin) /* && !batchInfo.L1Batch */ { return fmt.Errorf("can't forge non-L1Batch after %v blocks: %v", margin, nextBlock) } @@ -411,9 +411,10 @@ func (t *TxManager) NewAuth(ctx context.Context, batchInfo *BatchInfo) (*bind.Tr auth.Value = big.NewInt(0) // in wei gasLimit := t.cfg.ForgeBatchGasCost.Fixed + - uint64(len(batchInfo.L1UserTxs))*t.cfg.ForgeBatchGasCost.L1UserTx + - uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx + - uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx + uint64(len(batchInfo.L1UserTxs))*t.cfg.ForgeBatchGasCost.L1UserTx + // uint64(len(batchInfo.L1UserTxs))*t.cfg.ForgeBatchGasCost.L1UserTx + + // uint64(len(batchInfo.L1CoordTxs))*t.cfg.ForgeBatchGasCost.L1CoordTx + + // uint64(len(batchInfo.L2Txs))*t.cfg.ForgeBatchGasCost.L2Tx auth.GasLimit = gasLimit auth.GasPrice = gasPrice auth.Nonce = nil @@ -516,9 +517,9 @@ func (t *TxManager) sendRollupForgeBatch(ctx context.Context, batchInfo *BatchIn t.cfg.debugBatchStore(batchInfo) if !resend { - if batchInfo.L1Batch { - t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 - } + // if batchInfo.L1Batch { + t.lastSentL1BatchBlockNum = t.stats.Eth.LastBlock.Num + 1 + // } } // if err := t.l2DB.DoneForging(common.TxIDsFromL2Txs(batchInfo.L2Txs), // batchInfo.BatchNum); err != nil { diff --git a/sequencer/txprocessor/txprocessor.go b/sequencer/txprocessor/txprocessor.go index bce1fbf..8ce356b 100644 --- a/sequencer/txprocessor/txprocessor.go +++ b/sequencer/txprocessor/txprocessor.go @@ -179,9 +179,7 @@ func (txProcessor *TxProcessor) resetZKInputs() { // the HistoryDB, and adds Nonce & TokenID to the L2Txs. // And if TypeSynchronizer returns an array of common.Account with all the // created accounts. -func (txProcessor *TxProcessor) ProcessTxs(l1usertxs []common.L1Tx, - -/*l2txs []common.PoolL2Tx*/) (ptOut *ProcessTxOutput, err error) { +func (txProcessor *TxProcessor) ProcessTxs(l1usertxs []common.L1Tx) (ptOut *ProcessTxOutput, err error) { defer func() { if err == nil { err = txProcessor.state.MakeCheckpoint() diff --git a/sequencer/txselector/txselector.go b/sequencer/txselector/txselector.go index 4709bd5..3326dd4 100644 --- a/sequencer/txselector/txselector.go +++ b/sequencer/txselector/txselector.go @@ -133,8 +133,11 @@ func (txsel *TxSelector) Reset(batchNum common.BatchNum, fromSynchronizer bool) // but there is a transactions to them and the authorization of account // creation exists. The L1UserTxs, L1CoordinatorTxs, PoolL2Txs that will be // included in the next batch. -func (txsel *TxSelector) GetL1TxSelection(selectionConfig txprocessor.Config, - l1UserTxs, l1UserFutureTxs []common.L1Tx) ([][]byte, []common.L1Tx, error) { +func (txsel *TxSelector) GetL1TxSelection( + selectionConfig txprocessor.Config, + l1UserTxs, + l1UserFutureTxs []common.L1Tx, +) ([][]byte, []common.L1Tx, error) { accCreationAuths, l1UserTxs, err := txsel.getL1TxSelection(selectionConfig, l1UserTxs, l1UserFutureTxs) return accCreationAuths, l1UserTxs, err }