Skip to content

Commit

Permalink
trigger next round without waiting for the constant blockProposal dur…
Browse files Browse the repository at this point in the history
…ation if mempool has enough transactions to fill the block
  • Loading branch information
charithabandi committed Feb 25, 2025
1 parent 8b9f794 commit fafe03c
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 3 deletions.
33 changes: 33 additions & 0 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,31 @@ func (ce *ConsensusEngine) QueueTx(ctx context.Context, tx *ktypes.Transaction)
ce.mempool.Remove(txHash)
return err
}

// if the node is a leader, see if mempool has enough txs to fill the block
// and send a trigger to the CE if it's in the waiting state to start the new round.
if ce.role.Load() == types.RoleLeader {
ce.stateInfo.mtx.RLock()
status := ce.stateInfo.status
ce.stateInfo.mtx.RUnlock()

if status != Committed {
// send the mempoolReady trigger only during the
// newRound and waiting for blkProposal Timeout to elapse.
return nil
}

sz, _ := ce.mempool.Size()
if int64(sz) >= ce.ConsensusParams().MaxBlockSize {
full := ce.mempoolReady.Swap(true)
if !full {
ce.log.Info("Mempool has enough txs to fill the block, sending trigger to the CE", "Txsize", sz)
// only signal leader's CE once when the mempool has enough txs to fill the block
ce.mempoolReadyChan <- struct{}{}
}
}
}

return nil
}

Expand Down Expand Up @@ -289,6 +314,14 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {
// recheck the transactions in the mempool
ce.mempool.RecheckTxs(ctx, ce.recheckTxFn(ce.lastBlockInternal()))

// should there be smaller limit to accommodate for the block serialization?
sz, cnt := ce.mempool.Size()
ce.log.Info("Mempool size after commit rechecks", "size", sz, "count", cnt)
if int64(sz) >= ce.ConsensusParams().MaxBlockSize {
// mempool has enough txs to fill the block
ce.mempoolReady.Store(true)
}

ce.log.Info("Committed Block", "height", height, "hash", blkProp.blkHash.String(),
"appHash", appHash.String(), "numTxs", blkProp.blk.Header.NumTxns)

Expand Down
12 changes: 12 additions & 0 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ type ConsensusEngine struct {
// the mempool while the block is being committed i.e while the accounts are
// being updated.
mempoolMtx sync.Mutex
// mempoolReady indicates consensus engine that has enough txs to propose a block
// CE can adjust it's wait times based on this flag.
// This flag tracks if the mempool filled enough between the commit and
// the blkProposal Timeout and expediate leader getting into the next round.
// Applicable only for the leader
mempoolReady atomic.Bool // shld it be a bool or a channel?
// queueTxs can send a trigger on this channel to notify the consensus engine that the mempool
// has enough txs to propose a block. This is only sent once when the mempoolReady
// flag is updated from false to true by the QueueTx method.
// Applicable only for the leader
mempoolReadyChan chan struct{}

// Broadcasters
proposalBroadcaster ProposalBroadcaster
Expand Down Expand Up @@ -339,6 +350,7 @@ func New(cfg *Config) (*ConsensusEngine, error) {
bestHeightCh: make(chan *discoveryMsg, 1),
newRound: make(chan struct{}, 1),
newBlockProposal: make(chan struct{}, 1),
mempoolReadyChan: make(chan struct{}, 1),

// interfaces
mempool: cfg.Mempool,
Expand Down
12 changes: 10 additions & 2 deletions node/consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (ce *ConsensusEngine) AcceptProposal(height int64, blkID, prevBlockID types
// 1. If the node is a sentry node and doesn't have the block.
// 2. If the node is a validator and missed the block proposal message.
func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *ktypes.BlockHeader, ci *types.CommitInfo, leaderSig []byte) bool {
if ce.stateInfo.hasBlock.Load() == height {
if ce.stateInfo.hasBlock.Load() == height { // ce is notified about the blkAnn message already
return false
}

Expand All @@ -141,6 +141,13 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty

ce.log.Infof("Accept commit? height: %d, blockID: %s, appHash: %s, lastCommitHeight: %d", height, blkID, ci.AppHash, ce.stateInfo.height)

// check if we already downloaded the block through the block proposal message
if (ce.stateInfo.blkProp != nil && ce.stateInfo.blkProp.blkHash == blkID) && (ce.stateInfo.status == Proposed || ce.stateInfo.status == Executed) {
// block is already downloaded and/being processed, accept the commit, don't request the block again
go ce.NotifyBlockCommit(ce.stateInfo.blkProp.blk, ci, blkID, nil)
return false
}

if ce.stateInfo.height+1 != height {
return false
}
Expand Down Expand Up @@ -174,7 +181,6 @@ func (ce *ConsensusEngine) AcceptCommit(height int64, blkID types.Hash, hdr *kty
// In such cases, the old leader produces the block, but this node will not accept the blkAnn message
// from the old leader, as the node has a different leader now. So accept the committed block as
// long as the block is accepted by the majority of the validators.

return true
}

Expand Down Expand Up @@ -251,6 +257,8 @@ func (ce *ConsensusEngine) processBlockProposal(ctx context.Context, blkPropMsg
ce.stateInfo.mtx.Lock()
ce.stateInfo.status = Proposed
ce.stateInfo.blkProp = blkPropMsg
// record that the block is available with the consensus engine
ce.stateInfo.hasBlock.Store(blkPropMsg.height)
ce.stateInfo.mtx.Unlock()

// execCtx is applicable only for the duration of the block execution
Expand Down
1 change: 1 addition & 0 deletions node/consensus/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Mempool interface {
RecheckTxs(ctx context.Context, checkFn mempool.CheckFn)
Store(types.Hash, *ktypes.Transaction) (have, rejected bool)
TxsAvailable() bool
Size() (totalBytes, numTxns int)
}

// BlockStore includes both txns and blocks
Expand Down
25 changes: 24 additions & 1 deletion node/consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ import (
// until one of these nodes gets majority of the validators to commit the block.

func (ce *ConsensusEngine) newBlockRound(ctx context.Context) {
ce.log.Info("Starting a new consensus round", "height", ce.lastCommitHeight()+1)

// Leader can optimize this idle timeout of proposeTimeout duration to propose a block
// if there are enough transactions available in the mempool to fill the block.
// One problem with this might be is that if the validators are slow,
// leader might end up more time waiting for the acks and flood the network with rebroadcasts.
// but probably better than leader doing nothing???

if ce.mempoolReady.Load() { // set by recheckTxs after commit or by the QueueTx method
// mempool has enough transactions to fill the block, trigger the next round
ce.log.Info("Mempool has enough transactions to fill the block, proposing a new block without waiting for the proposeTimeout")
ce.newBlockProposal <- struct{}{}
return
}

ticker := time.NewTicker(ce.proposeTimeout)
now := time.Now()

Expand All @@ -61,17 +76,22 @@ func (ce *ConsensusEngine) newBlockRound(ctx context.Context) {
// if EmptyBlockTimeout is not 0, leader will propose an empty block
// if no transactions or events are available for emptyBlockTimeout duration.
allowEmptyBlocks := ce.emptyBlockTimeout != 0
ce.log.Info("Starting a new consensus round", "height", ce.lastCommitHeight()+1)

for {
select {
case <-ctx.Done():
ce.log.Info("Context cancelled, stopping the new block round")
return
case <-ce.mempoolReadyChan:
ce.log.Info("received mempoolReady signal, proposing a new block")
// mempool has enough txs to fill the block, trigger the next round
ce.newBlockProposal <- struct{}{}
return
case <-ticker.C:
// check for the availability of transactions in the mempool or
// if the leader has any new events to broadcast a voteID transaction
if ce.mempool.TxsAvailable() || ce.blockProcessor.HasEvents() {
ce.log.Info("Mempool has atleast 1 transaction, proposing a new block")
ce.newBlockProposal <- struct{}{}
return
}
Expand Down Expand Up @@ -178,6 +198,9 @@ func (ce *ConsensusEngine) proposeBlock(ctx context.Context) error {
Signature: *sig,
}

// reset the mempool ready flag once the block is proposed
ce.mempoolReady.Store(false)

// We may be ready to commit if we're the only validator.
ce.processVotes(ctx)

Expand Down

0 comments on commit fafe03c

Please sign in to comment.