Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions consensus/cbft/cbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type Cbft struct {
eventMux *event.TypeMux
closeOnce sync.Once
exitCh chan struct{}
cancelWg sync.WaitGroup // Make sure all msg handler goroutines have exited.
txPool consensus.TxPoolReset
blockChain consensus.ChainReader
blockCacheWriter consensus.BlockCacheWriter
Expand Down Expand Up @@ -296,6 +297,7 @@ func (cbft *Cbft) Start(chain consensus.ChainReader, blockCacheWriter consensus.
}
utils.SetFalse(&cbft.loading)

cbft.cancelWg.Add(1)
go cbft.receiveLoop()

cbft.fetcher.Start()
Expand Down Expand Up @@ -345,8 +347,6 @@ func (cbft *Cbft) ReceiveMessage(msg *ctypes.MsgInfo) error {
select {
case cbft.peerMsgCh <- msg:
cbft.log.Debug("Received message from peer", "peer", msg.PeerID, "type", fmt.Sprintf("%T", msg.Msg), "msgHash", msg.Msg.MsgHash(), "BHash", msg.Msg.BHash(), "msg", msg.String(), "peerMsgCh", len(cbft.peerMsgCh))
case <-cbft.exitCh:
cbft.log.Warn("Cbft exit")
default:
cbft.log.Debug("peerMsgCh is full, discard", "peerMsgCh", len(cbft.peerMsgCh))
}
Expand Down Expand Up @@ -448,8 +448,6 @@ func (cbft *Cbft) ReceiveSyncMsg(msg *ctypes.MsgInfo) error {
select {
case cbft.syncMsgCh <- msg:
cbft.log.Debug("Receive synchronization related messages from peer", "peer", msg.PeerID, "type", fmt.Sprintf("%T", msg.Msg), "msgHash", msg.Msg.MsgHash(), "BHash", msg.Msg.BHash(), "msg", msg.Msg.String(), "syncMsgCh", len(cbft.syncMsgCh))
case <-cbft.exitCh:
cbft.log.Warn("Cbft exit")
default:
cbft.log.Debug("syncMsgCh is full, discard", "syncMsgCh", len(cbft.syncMsgCh))
}
Expand Down Expand Up @@ -485,6 +483,8 @@ func (cbft *Cbft) LoadWal() (err error) {

// receiveLoop receives all consensus related messages, all processing logic in the same goroutine
func (cbft *Cbft) receiveLoop() {
defer cbft.cancelWg.Done()

// Responsible for handling consensus message logic.
consensusMessageHandler := func(msg *ctypes.MsgInfo) {
if !cbft.network.ContainsHistoryMessageHash(msg.Msg.MsgHash()) {
Expand Down Expand Up @@ -521,6 +521,7 @@ func (cbft *Cbft) receiveLoop() {
case msg := <-cbft.peerMsgCh:
// Forward the message before processing the message.
consensusMessageHandler(msg)

case msg := <-cbft.syncMsgCh:
if err := cbft.handleSyncMsg(msg); err != nil {
if err, ok := err.(HandleError); ok {
Expand All @@ -532,6 +533,7 @@ func (cbft *Cbft) receiveLoop() {
}
}
cbft.forgetMessage(msg.PeerID)

case msg := <-cbft.asyncExecutor.ExecuteStatus():
cbft.onAsyncExecuteStatus(msg)
if cbft.executeStatusHook != nil {
Expand All @@ -543,8 +545,13 @@ func (cbft *Cbft) receiveLoop() {

case <-cbft.state.ViewTimeout():
cbft.OnViewTimeout()

case err := <-cbft.commitErrCh:
cbft.OnCommitError(err)

case <-cbft.exitCh:
cbft.log.Info("Cbft close,exit msg handler")
return
}
}
}
Expand Down Expand Up @@ -1180,6 +1187,7 @@ func (cbft *Cbft) Close() error {
}
close(cbft.exitCh)
})
cbft.cancelWg.Wait() // Make sure all msg handler goroutines have exited
cbft.bridge.Close()
cbft.log.Info("Cbft consensus closed")
return nil
Expand Down
Loading