diff --git a/miner/events.go b/miner/events.go index 393fd5b20..cc4aee02e 100644 --- a/miner/events.go +++ b/miner/events.go @@ -7,7 +7,3 @@ import ( type LastFinalizedBlock struct { Number *big.Int } - -type CurrentFork struct { - Number *big.Int -} diff --git a/miner/miner.go b/miner/miner.go index bb948e8e3..0d27f8da5 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -60,14 +60,30 @@ type Miner struct { } type EpochEnvironment struct { - IsRequest bool - IsUserActivated bool - IsRebase bool - NumBlockMined *big.Int - EpochLength *big.Int - Completed bool - - Lock sync.Mutex + IsRequest bool + UserActivated bool + Rebase bool + Completed bool + NumBlockMined *big.Int + EpochLength *big.Int + CurrentFork *big.Int + LastFinalizedBlock *big.Int + + lock sync.Mutex +} + +// TODO (aiden): it's only for test +var NRE = rootchain.RootChainEpochPrepared{ + ForkNumber: big.NewInt(0), + EpochNumber: big.NewInt(1), + StartBlockNumber: big.NewInt(1), + EndBlockNumber: big.NewInt(2), + RequestStart: new(big.Int), + RequestEnd: new(big.Int), + EpochIsEmpty: false, + IsRequest: false, + UserActivated: false, + Rebase: false, } func New(pls Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, env *EpochEnvironment, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool) *Miner { @@ -81,18 +97,20 @@ func New(pls Backend, config *params.ChainConfig, mux *event.TypeMux, engine con canStart: 2, } go miner.update() - go miner.operate() return miner } func NewEpochEnvironment() *EpochEnvironment { return &EpochEnvironment{ - IsRequest: false, - IsUserActivated: false, - NumBlockMined: big.NewInt(0), - EpochLength: big.NewInt(0), - Completed: false, + IsRequest: false, + UserActivated: false, + Rebase: false, + Completed: false, + NumBlockMined: big.NewInt(0), + EpochLength: big.NewInt(0), + CurrentFork: big.NewInt(0), + LastFinalizedBlock: big.NewInt(0), } } @@ -124,7 +142,8 @@ func (self *Miner) update() { atomic.StoreInt32(&self.canStart, 1) atomic.StoreInt32(&self.shouldStart, 0) if shouldStart { - self.Start(self.coinbase) + // TODO (aiden): there's no need to start miner in here, it starts when rcm connect to root chain contract by reading 1st NRE. + //self.Start(self.coinbase, &NRE) } // stop immediately and ignore all further pending events return @@ -135,38 +154,12 @@ func (self *Miner) update() { } } -// operate manages finality and fork number according to the events sent by rootchain manager. -func (self *Miner) operate() { - events := self.mux.Subscribe(LastFinalizedBlock{}, CurrentFork{}) - defer events.Unsubscribe() - - for { - select { - case ev := <-events.Chan(): - if ev == nil { - return - } - switch ev.Data.(type) { - case LastFinalizedBlock: - self.worker.mu.Lock() - defer self.worker.mu.Unlock() - - self.worker.lastFinalizedBlock = ev.Data.(LastFinalizedBlock).Number - case CurrentFork: - self.worker.mu.Lock() - defer self.worker.mu.Unlock() +func (self *Miner) Start(coinbase common.Address, epoch *rootchain.RootChainEpochPrepared) { - self.worker.currentFork = ev.Data.(CurrentFork).Number - } - case <-self.exitCh: - return - } + if epoch.EpochIsEmpty { + log.Info("ORB epoch is empty, NRB epoch will be started") + return } -} - -func (self *Miner) Start(coinbase common.Address, epoch *rootchain.RootChainEpochPrepared) { - self.env.Lock.Lock() - defer self.env.Lock.Unlock() atomic.StoreInt32(&self.shouldStart, 1) self.SetEtherbase(coinbase) @@ -176,17 +169,18 @@ func (self *Miner) Start(coinbase common.Address, epoch *rootchain.RootChainEpoc return } - self.env.IsRequest = epoch.IsRequest - self.env.IsUserActivated = epoch.UserActivated - self.env.IsRebase = epoch.Rebase - self.env.NumBlockMined = new(big.Int) - self.env.EpochLength = new(big.Int).Add(new(big.Int).Sub(epoch.EndBlockNumber, epoch.StartBlockNumber), big.NewInt(1)) - self.env.Completed = false + self.env.setIsRequest(epoch.IsRequest) + self.env.setUserActivated(epoch.UserActivated) + self.env.setRebase(epoch.Rebase) + self.env.setCompleted(false) + self.env.setNumBlockMined(big.NewInt(0)) + self.env.setEpochLength(new(big.Int).Add(new(big.Int).Sub(epoch.EndBlockNumber, epoch.StartBlockNumber), big.NewInt(1))) + self.env.setCurrentFork(epoch.ForkNumber) if epoch.IsRequest && !epoch.UserActivated { - log.Info("NRB epoch is prepared, ORB epoch is started", "ORBepochLength", self.env.EpochLength) + log.Info("ORB epoch is prepared, ORB epoch is started", "ORBepochLength", self.env.EpochLength) } else if epoch.IsRequest && epoch.UserActivated { - log.Info("NRB epoch is prepared, URB epoch is started", "URBepochLength", self.env.EpochLength) + log.Info("URB epoch is prepared, URB epoch is started", "URBepochLength", self.env.EpochLength) } else if !epoch.IsRequest { log.Info("NRB epoch is prepared, NRB epoch is started", "NRBepochLength", self.env.EpochLength) } @@ -248,16 +242,64 @@ func (self *Miner) SetEtherbase(addr common.Address) { } func (self *Miner) SetNRBepochLength(length *big.Int) { - self.env.Lock.Lock() - defer self.env.Lock.Unlock() + self.env.lock.Lock() + defer self.env.lock.Unlock() self.env.EpochLength = length } -func (env *EpochEnvironment) setCompleted() { - env.Completed = true +func (env *EpochEnvironment) setIsRequest(b bool) { + env.lock.Lock() + defer env.lock.Unlock() + + env.IsRequest = b +} + +func (env *EpochEnvironment) setUserActivated(b bool) { + env.lock.Lock() + defer env.lock.Unlock() + + env.UserActivated = b +} + +func (env *EpochEnvironment) setRebase(b bool) { + env.lock.Lock() + defer env.lock.Unlock() + + env.Rebase = b +} + +func (env *EpochEnvironment) setCompleted(b bool) { + env.lock.Lock() + defer env.lock.Unlock() + + env.Completed = b } func (env *EpochEnvironment) setNumBlockMined(n *big.Int) { + env.lock.Lock() + defer env.lock.Unlock() + env.NumBlockMined = n } + +func (env *EpochEnvironment) setEpochLength(l *big.Int) { + env.lock.Lock() + defer env.lock.Unlock() + + env.EpochLength = l +} + +func (env *EpochEnvironment) setCurrentFork(f *big.Int) { + env.lock.Lock() + defer env.lock.Unlock() + + env.CurrentFork = f +} + +func (env *EpochEnvironment) SetLastFinalizedBlock(n *big.Int) { + env.lock.Lock() + defer env.lock.Unlock() + + env.LastFinalizedBlock = n +} diff --git a/miner/worker.go b/miner/worker.go index 762248132..96582dae4 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -178,10 +178,6 @@ type worker struct { skipSealHook func(*task) bool // Method to decide whether skipping the sealing. fullTaskHook func() // Method to call before pushing the full sealing task. resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. - - // pls chain - currentFork *big.Int - lastFinalizedBlock *big.Int } func newWorker(config *params.ChainConfig, engine consensus.Engine, pls Backend, env *EpochEnvironment, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker { @@ -228,7 +224,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, pls Backend, go worker.taskLoop() // Submit first work to initialize pending state. - worker.startCh <- struct{}{} + //worker.startCh <- struct{}{} return worker } @@ -306,13 +302,15 @@ func (w *worker) newWorkLoop(recommit time.Duration) { // commit aborts in-flight transaction execution with given signal and resubmits a new one. commit := func(noempty bool, s int32) { - if interrupt != nil { - atomic.StoreInt32(interrupt, s) + if w.isRunning() && !w.env.Completed { + if interrupt != nil { + atomic.StoreInt32(interrupt, s) + } + interrupt = new(int32) + w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp} + timer.Reset(recommit) + atomic.StoreInt32(&w.newTxs, 0) } - interrupt = new(int32) - w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp} - timer.Reset(recommit) - atomic.StoreInt32(&w.newTxs, 0) } // recalcRecommit recalculates the resubmitting interval upon feedback. recalcRecommit := func(target float64, inc bool) { @@ -355,7 +353,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { case head := <-w.chainHeadCh: // commit new mining work again only when the epoch is not completed. - if w.isRunning() { + if w.isRunning() && !w.env.Completed { clearPending(head.Block.NumberU64()) timestamp = time.Now().Unix() commit(true, commitInterruptNewHead) @@ -422,7 +420,7 @@ func (w *worker) mainLoop() { // ORB, Rebased ORB, URB w.commitNewWorkForRB(req.interrupt, req.noempty, req.timestamp) case false: - if w.env.IsRebase { + if w.env.Rebase { // Rebase NRB w.commitNewWorkForRebasedNRB(req.interrupt, req.noempty, req.timestamp) } else { @@ -597,6 +595,12 @@ func (w *worker) resultLoop() { } logs = append(logs, receipt.Logs...) } + + // if fork of the block is not same as current fork, throw it. + if w.env.CurrentFork.Cmp(block.Difficulty()) != 0 { + continue + } + // Commit block and state to database. stat, err := w.chain.WriteBlockWithState(block, receipts, task.state) if err != nil { @@ -611,19 +615,15 @@ func (w *worker) resultLoop() { // check if the epoch is completed if w.env.NumBlockMined.Cmp(w.env.EpochLength) == 0 { - w.env.setCompleted() + w.env.setCompleted(true) + log.Info("Current Epoch is completed") } // Insert the block into the set of pending ones to resultLoop for confirmations w.unconfirmed.Insert(block.NumberU64(), block.Hash()) - isURB := w.env.IsUserActivated - - // unlock mutex of w.env so that rcm can manipulate the env data - w.env.Lock.Unlock() - // Broadcast the block and announce chain insertion event - w.mux.Post(core.NewMinedBlockEvent{Block: block, IsURB: isURB}) + w.mux.Post(core.NewMinedBlockEvent{Block: block}) var events []interface{} switch stat { @@ -1071,8 +1071,8 @@ func (w *worker) commitNewWorkForRB(interrupt *int32, noempty bool, timestamp in var parent *types.Block // if the block to be mined is first block of URB epoch, then parent block is last finalized block - if w.env.IsUserActivated && w.env.NumBlockMined.Cmp(big.NewInt(0)) == 0 { - parent = w.chain.GetBlockByNumber(w.lastFinalizedBlock.Uint64()) + if w.env.UserActivated && w.env.NumBlockMined.Cmp(big.NewInt(0)) == 0 { + parent = w.chain.GetBlockByNumber(w.env.LastFinalizedBlock.Uint64()) } else { parent = w.chain.CurrentBlock() } @@ -1109,6 +1109,10 @@ func (w *worker) commitNewWorkForRB(interrupt *int32, noempty bool, timestamp in return } + if w.env.UserActivated && w.env.NumBlockMined.Cmp(big.NewInt(0)) == 0 { + header.Difficulty.Add(header.Difficulty, big.NewInt(1)) + } + // TODO: delete because pls block is frontier spec // If we are care about TheDAO hard-fork check whether to override the extra-data or not if daoBlock := w.config.DAOForkBlock; daoBlock != nil {