Skip to content

Commit

Permalink
miner: change mining operation logic and epochEnvironment
Browse files Browse the repository at this point in the history
  • Loading branch information
dCanyon committed Jan 16, 2019
1 parent e1628d9 commit 0408fc0
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 83 deletions.
4 changes: 0 additions & 4 deletions miner/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,3 @@ import (
type LastFinalizedBlock struct {
Number *big.Int
}

type CurrentFork struct {
Number *big.Int
}
156 changes: 99 additions & 57 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,30 @@ type Miner struct {
}

type EpochEnvironment struct {
IsRequest bool
IsUserActivated bool
IsRebase bool
NumBlockMined *big.Int
EpochLength *big.Int
Completed bool

Lock sync.Mutex
IsRequest bool
UserActivated bool
Rebase bool
Completed bool
NumBlockMined *big.Int
EpochLength *big.Int
CurrentFork *big.Int
LastFinalizedBlock *big.Int

lock sync.Mutex
}

// TODO (aiden): it's only for test
var NRE = rootchain.RootChainEpochPrepared{
ForkNumber: big.NewInt(0),
EpochNumber: big.NewInt(1),
StartBlockNumber: big.NewInt(1),
EndBlockNumber: big.NewInt(2),
RequestStart: new(big.Int),
RequestEnd: new(big.Int),
EpochIsEmpty: false,
IsRequest: false,
UserActivated: false,
Rebase: false,
}

func New(pls Backend, config *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, env *EpochEnvironment, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(block *types.Block) bool) *Miner {
Expand All @@ -81,18 +97,20 @@ func New(pls Backend, config *params.ChainConfig, mux *event.TypeMux, engine con
canStart: 2,
}
go miner.update()
go miner.operate()

return miner
}

func NewEpochEnvironment() *EpochEnvironment {
return &EpochEnvironment{
IsRequest: false,
IsUserActivated: false,
NumBlockMined: big.NewInt(0),
EpochLength: big.NewInt(0),
Completed: false,
IsRequest: false,
UserActivated: false,
Rebase: false,
Completed: false,
NumBlockMined: big.NewInt(0),
EpochLength: big.NewInt(0),
CurrentFork: big.NewInt(0),
LastFinalizedBlock: big.NewInt(0),
}
}

Expand Down Expand Up @@ -124,7 +142,8 @@ func (self *Miner) update() {
atomic.StoreInt32(&self.canStart, 1)
atomic.StoreInt32(&self.shouldStart, 0)
if shouldStart {
self.Start(self.coinbase)
// TODO (aiden): there's no need to start miner in here, it starts when rcm connect to root chain contract by reading 1st NRE.
//self.Start(self.coinbase, &NRE)
}
// stop immediately and ignore all further pending events
return
Expand All @@ -135,38 +154,12 @@ func (self *Miner) update() {
}
}

// operate manages finality and fork number according to the events sent by rootchain manager.
func (self *Miner) operate() {
events := self.mux.Subscribe(LastFinalizedBlock{}, CurrentFork{})
defer events.Unsubscribe()

for {
select {
case ev := <-events.Chan():
if ev == nil {
return
}
switch ev.Data.(type) {
case LastFinalizedBlock:
self.worker.mu.Lock()
defer self.worker.mu.Unlock()

self.worker.lastFinalizedBlock = ev.Data.(LastFinalizedBlock).Number
case CurrentFork:
self.worker.mu.Lock()
defer self.worker.mu.Unlock()
func (self *Miner) Start(coinbase common.Address, epoch *rootchain.RootChainEpochPrepared) {

self.worker.currentFork = ev.Data.(CurrentFork).Number
}
case <-self.exitCh:
return
}
if epoch.EpochIsEmpty {
log.Info("ORB epoch is empty, NRB epoch will be started")
return
}
}

func (self *Miner) Start(coinbase common.Address, epoch *rootchain.RootChainEpochPrepared) {
self.env.Lock.Lock()
defer self.env.Lock.Unlock()

atomic.StoreInt32(&self.shouldStart, 1)
self.SetEtherbase(coinbase)
Expand All @@ -176,17 +169,18 @@ func (self *Miner) Start(coinbase common.Address, epoch *rootchain.RootChainEpoc
return
}

self.env.IsRequest = epoch.IsRequest
self.env.IsUserActivated = epoch.UserActivated
self.env.IsRebase = epoch.Rebase
self.env.NumBlockMined = new(big.Int)
self.env.EpochLength = new(big.Int).Add(new(big.Int).Sub(epoch.EndBlockNumber, epoch.StartBlockNumber), big.NewInt(1))
self.env.Completed = false
self.env.setIsRequest(epoch.IsRequest)
self.env.setUserActivated(epoch.UserActivated)
self.env.setRebase(epoch.Rebase)
self.env.setCompleted(false)
self.env.setNumBlockMined(big.NewInt(0))
self.env.setEpochLength(new(big.Int).Add(new(big.Int).Sub(epoch.EndBlockNumber, epoch.StartBlockNumber), big.NewInt(1)))
self.env.setCurrentFork(epoch.ForkNumber)

if epoch.IsRequest && !epoch.UserActivated {
log.Info("NRB epoch is prepared, ORB epoch is started", "ORBepochLength", self.env.EpochLength)
log.Info("ORB epoch is prepared, ORB epoch is started", "ORBepochLength", self.env.EpochLength)
} else if epoch.IsRequest && epoch.UserActivated {
log.Info("NRB epoch is prepared, URB epoch is started", "URBepochLength", self.env.EpochLength)
log.Info("URB epoch is prepared, URB epoch is started", "URBepochLength", self.env.EpochLength)
} else if !epoch.IsRequest {
log.Info("NRB epoch is prepared, NRB epoch is started", "NRBepochLength", self.env.EpochLength)
}
Expand Down Expand Up @@ -248,16 +242,64 @@ func (self *Miner) SetEtherbase(addr common.Address) {
}

func (self *Miner) SetNRBepochLength(length *big.Int) {
self.env.Lock.Lock()
defer self.env.Lock.Unlock()
self.env.lock.Lock()
defer self.env.lock.Unlock()

self.env.EpochLength = length
}

func (env *EpochEnvironment) setCompleted() {
env.Completed = true
func (env *EpochEnvironment) setIsRequest(b bool) {
env.lock.Lock()
defer env.lock.Unlock()

env.IsRequest = b
}

func (env *EpochEnvironment) setUserActivated(b bool) {
env.lock.Lock()
defer env.lock.Unlock()

env.UserActivated = b
}

func (env *EpochEnvironment) setRebase(b bool) {
env.lock.Lock()
defer env.lock.Unlock()

env.Rebase = b
}

func (env *EpochEnvironment) setCompleted(b bool) {
env.lock.Lock()
defer env.lock.Unlock()

env.Completed = b
}

func (env *EpochEnvironment) setNumBlockMined(n *big.Int) {
env.lock.Lock()
defer env.lock.Unlock()

env.NumBlockMined = n
}

func (env *EpochEnvironment) setEpochLength(l *big.Int) {
env.lock.Lock()
defer env.lock.Unlock()

env.EpochLength = l
}

func (env *EpochEnvironment) setCurrentFork(f *big.Int) {
env.lock.Lock()
defer env.lock.Unlock()

env.CurrentFork = f
}

func (env *EpochEnvironment) SetLastFinalizedBlock(n *big.Int) {
env.lock.Lock()
defer env.lock.Unlock()

env.LastFinalizedBlock = n
}
48 changes: 26 additions & 22 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,6 @@ type worker struct {
skipSealHook func(*task) bool // Method to decide whether skipping the sealing.
fullTaskHook func() // Method to call before pushing the full sealing task.
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.

// pls chain
currentFork *big.Int
lastFinalizedBlock *big.Int
}

func newWorker(config *params.ChainConfig, engine consensus.Engine, pls Backend, env *EpochEnvironment, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker {
Expand Down Expand Up @@ -228,7 +224,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, pls Backend,
go worker.taskLoop()

// Submit first work to initialize pending state.
worker.startCh <- struct{}{}
//worker.startCh <- struct{}{}

return worker
}
Expand Down Expand Up @@ -306,13 +302,15 @@ func (w *worker) newWorkLoop(recommit time.Duration) {

// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
if w.isRunning() && !w.env.Completed {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
interrupt = new(int32)
w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
timer.Reset(recommit)
atomic.StoreInt32(&w.newTxs, 0)
}
// recalcRecommit recalculates the resubmitting interval upon feedback.
recalcRecommit := func(target float64, inc bool) {
Expand Down Expand Up @@ -355,7 +353,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) {

case head := <-w.chainHeadCh:
// commit new mining work again only when the epoch is not completed.
if w.isRunning() {
if w.isRunning() && !w.env.Completed {
clearPending(head.Block.NumberU64())
timestamp = time.Now().Unix()
commit(true, commitInterruptNewHead)
Expand Down Expand Up @@ -422,7 +420,7 @@ func (w *worker) mainLoop() {
// ORB, Rebased ORB, URB
w.commitNewWorkForRB(req.interrupt, req.noempty, req.timestamp)
case false:
if w.env.IsRebase {
if w.env.Rebase {
// Rebase NRB
w.commitNewWorkForRebasedNRB(req.interrupt, req.noempty, req.timestamp)
} else {
Expand Down Expand Up @@ -597,6 +595,12 @@ func (w *worker) resultLoop() {
}
logs = append(logs, receipt.Logs...)
}

// if fork of the block is not same as current fork, throw it.
if w.env.CurrentFork.Cmp(block.Difficulty()) != 0 {
continue
}

// Commit block and state to database.
stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
if err != nil {
Expand All @@ -611,19 +615,15 @@ func (w *worker) resultLoop() {

// check if the epoch is completed
if w.env.NumBlockMined.Cmp(w.env.EpochLength) == 0 {
w.env.setCompleted()
w.env.setCompleted(true)
log.Info("Current Epoch is completed")
}

// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())

isURB := w.env.IsUserActivated

// unlock mutex of w.env so that rcm can manipulate the env data
w.env.Lock.Unlock()

// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block, IsURB: isURB})
w.mux.Post(core.NewMinedBlockEvent{Block: block})

var events []interface{}
switch stat {
Expand Down Expand Up @@ -1071,8 +1071,8 @@ func (w *worker) commitNewWorkForRB(interrupt *int32, noempty bool, timestamp in
var parent *types.Block

// if the block to be mined is first block of URB epoch, then parent block is last finalized block
if w.env.IsUserActivated && w.env.NumBlockMined.Cmp(big.NewInt(0)) == 0 {
parent = w.chain.GetBlockByNumber(w.lastFinalizedBlock.Uint64())
if w.env.UserActivated && w.env.NumBlockMined.Cmp(big.NewInt(0)) == 0 {
parent = w.chain.GetBlockByNumber(w.env.LastFinalizedBlock.Uint64())
} else {
parent = w.chain.CurrentBlock()
}
Expand Down Expand Up @@ -1109,6 +1109,10 @@ func (w *worker) commitNewWorkForRB(interrupt *int32, noempty bool, timestamp in
return
}

if w.env.UserActivated && w.env.NumBlockMined.Cmp(big.NewInt(0)) == 0 {
header.Difficulty.Add(header.Difficulty, big.NewInt(1))
}

// TODO: delete because pls block is frontier spec
// If we are care about TheDAO hard-fork check whether to override the extra-data or not
if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
Expand Down

0 comments on commit 0408fc0

Please sign in to comment.