From 006c7af64e64d6253752800f93c2eec3e238a2ef Mon Sep 17 00:00:00 2001 From: niuxiaojie81 <85773309@qq.com> Date: Wed, 3 Sep 2025 15:50:00 +0800 Subject: [PATCH] consensus: make sure the message processing has exited before close --- consensus/cbft/cbft.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/consensus/cbft/cbft.go b/consensus/cbft/cbft.go index 1a6d37ff52..0490003526 100644 --- a/consensus/cbft/cbft.go +++ b/consensus/cbft/cbft.go @@ -117,6 +117,7 @@ type Cbft struct { eventMux *event.TypeMux closeOnce sync.Once exitCh chan struct{} + cancelWg sync.WaitGroup // Make sure all msg handler goroutines have exited. txPool consensus.TxPoolReset blockChain consensus.ChainReader blockCacheWriter consensus.BlockCacheWriter @@ -296,6 +297,7 @@ func (cbft *Cbft) Start(chain consensus.ChainReader, blockCacheWriter consensus. } utils.SetFalse(&cbft.loading) + cbft.cancelWg.Add(1) go cbft.receiveLoop() cbft.fetcher.Start() @@ -345,8 +347,6 @@ func (cbft *Cbft) ReceiveMessage(msg *ctypes.MsgInfo) error { select { case cbft.peerMsgCh <- msg: cbft.log.Debug("Received message from peer", "peer", msg.PeerID, "type", fmt.Sprintf("%T", msg.Msg), "msgHash", msg.Msg.MsgHash(), "BHash", msg.Msg.BHash(), "msg", msg.String(), "peerMsgCh", len(cbft.peerMsgCh)) - case <-cbft.exitCh: - cbft.log.Warn("Cbft exit") default: cbft.log.Debug("peerMsgCh is full, discard", "peerMsgCh", len(cbft.peerMsgCh)) } @@ -448,8 +448,6 @@ func (cbft *Cbft) ReceiveSyncMsg(msg *ctypes.MsgInfo) error { select { case cbft.syncMsgCh <- msg: cbft.log.Debug("Receive synchronization related messages from peer", "peer", msg.PeerID, "type", fmt.Sprintf("%T", msg.Msg), "msgHash", msg.Msg.MsgHash(), "BHash", msg.Msg.BHash(), "msg", msg.Msg.String(), "syncMsgCh", len(cbft.syncMsgCh)) - case <-cbft.exitCh: - cbft.log.Warn("Cbft exit") default: cbft.log.Debug("syncMsgCh is full, discard", "syncMsgCh", len(cbft.syncMsgCh)) } @@ -485,6 +483,8 @@ func (cbft *Cbft) LoadWal() (err error) { // receiveLoop receives all consensus related messages, all processing logic in the same goroutine func (cbft *Cbft) receiveLoop() { + defer cbft.cancelWg.Done() + // Responsible for handling consensus message logic. consensusMessageHandler := func(msg *ctypes.MsgInfo) { if !cbft.network.ContainsHistoryMessageHash(msg.Msg.MsgHash()) { @@ -521,6 +521,7 @@ func (cbft *Cbft) receiveLoop() { case msg := <-cbft.peerMsgCh: // Forward the message before processing the message. consensusMessageHandler(msg) + case msg := <-cbft.syncMsgCh: if err := cbft.handleSyncMsg(msg); err != nil { if err, ok := err.(HandleError); ok { @@ -532,6 +533,7 @@ func (cbft *Cbft) receiveLoop() { } } cbft.forgetMessage(msg.PeerID) + case msg := <-cbft.asyncExecutor.ExecuteStatus(): cbft.onAsyncExecuteStatus(msg) if cbft.executeStatusHook != nil { @@ -543,8 +545,13 @@ func (cbft *Cbft) receiveLoop() { case <-cbft.state.ViewTimeout(): cbft.OnViewTimeout() + case err := <-cbft.commitErrCh: cbft.OnCommitError(err) + + case <-cbft.exitCh: + cbft.log.Info("Cbft close,exit msg handler") + return } } } @@ -1180,6 +1187,7 @@ func (cbft *Cbft) Close() error { } close(cbft.exitCh) }) + cbft.cancelWg.Wait() // Make sure all msg handler goroutines have exited cbft.bridge.Close() cbft.log.Info("Cbft consensus closed") return nil