From f908e5dd6c3b0fa94a3d752fe4dcfdcace56ef8e Mon Sep 17 00:00:00 2001 From: colinlyguo Date: Mon, 12 May 2025 16:28:24 +0800 Subject: [PATCH] feat: add logs to track tx and block propagation delay --- eth/fetcher/tx_fetcher.go | 2 +- eth/handler.go | 11 +++++++---- eth/protocols/eth/handlers.go | 24 ++++++++++++++---------- eth/protocols/eth/peer.go | 2 +- params/version.go | 2 +- 5 files changed, 24 insertions(+), 17 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 24fec710208c..dba52701f9a0 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -794,7 +794,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, return true // continue in the for-each }) - log.Debug("Scheduling transaction retrieval", "peer", peer, "len(f.announces[peer])", len(f.announces[peer]), "len(hashes)", len(hashes)) + log.Trace("Scheduling transaction retrieval", "peer", peer, "len(f.announces[peer])", len(f.announces[peer]), "len(hashes)", len(hashes)) peerAnnounceTxsLenGauge.Update(int64(len(f.announces[peer]))) peerRetrievalTxsLenGauge.Update(int64(len(hashes))) diff --git a/eth/handler.go b/eth/handler.go index f77de69d0bc9..4755a0c7b704 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -455,6 +455,8 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := onlyShadowForkPeers(h.shadowForkPeerIDs, h.peers.peersWithoutBlock(hash)) + log.Debug("Broadcasting block", "hash", hash.Hex(), "number", block.NumberU64(), "size", block.Size()) + // If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) @@ -470,7 +472,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) } - log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + log.Trace("Propagated block", "hash", hash.Hex(), "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it @@ -478,7 +480,7 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { for _, peer := range peers { peer.AsyncSendNewBlockHash(block) } - log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + log.Trace("Announced block", "hash", hash.Hex(), "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } } @@ -503,6 +505,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { if tx.IsL1MessageTx() { continue } + log.Debug("Broadcasting transaction", "hash", tx.Hash().Hex(), "size", tx.Size()) peers := onlyShadowForkPeers(h.shadowForkPeerIDs, h.peers.peersWithoutTransaction(tx.Hash())) // Send the tx unconditionally to a subset of our peers numDirect := int(math.Sqrt(float64(len(peers)))) @@ -518,13 +521,13 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { directPeers++ directCount += len(hashes) peer.AsyncSendTransactions(hashes) - log.Debug("Transactions being broadcasted to", "peer", peer.String(), "len", len(hashes)) + log.Trace("Transactions being broadcasted to", "peer", peer.String(), "len", len(hashes)) } for peer, hashes := range annos { annoPeers++ annoCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) - log.Debug("Transactions being announced to", "peer", peer.String(), "len", len(hashes)) + log.Trace("Transactions being announced to", "peer", peer.String(), "len", len(hashes)) } log.Debug("Transaction broadcast", "txs", len(txs), "announce packs", annoPeers, "announced hashes", annoCount, diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index fd1072596871..c0b8f2c9dc18 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -294,6 +294,7 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { // Mark the peer as owning the block peer.markBlock(ann.Block.Hash()) + log.Debug("Received new block via gossip", "blockHash", ann.Block.Hash().Hex(), "blockNumber", ann.Block.NumberU64(), "peer", peer.String()) return backend.Handle(peer, ann) } @@ -362,12 +363,12 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) } ann := new(NewPooledTransactionHashesPacket) if err := msg.Decode(ann); err != nil { - log.Debug("Failed to decode `NewPooledTransactionHashesPacket`", "peer", peer.String(), "err", err) + log.Trace("Failed to decode `NewPooledTransactionHashesPacket`", "peer", peer.String(), "err", err) newPooledTxHashesFailMeter.Mark(1) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } // Schedule all the unknown hashes for retrieval - log.Debug("handleNewPooledTransactionHashes", "peer", peer.String(), "len(ann)", len(*ann)) + log.Trace("handleNewPooledTransactionHashes", "peer", peer.String(), "len(ann)", len(*ann)) newPooledTxHashesLenGauge.Update(int64(len(*ann))) for _, hash := range *ann { peer.markTransaction(hash) @@ -379,12 +380,15 @@ func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) err // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket66 if err := msg.Decode(&query); err != nil { - log.Debug("Failed to decode `GetPooledTransactionsPacket66`", "peer", peer.String(), "err", err) + log.Trace("Failed to decode `GetPooledTransactionsPacket66`", "peer", peer.String(), "err", err) getPooledTxsFailMeter.Mark(1) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } hashes, txs := answerGetPooledTransactions(backend, query.GetPooledTransactionsPacket, peer) - log.Debug("handleGetPooledTransactions", "peer", peer.String(), "RequestId", query.RequestId, "len(query)", len(query.GetPooledTransactionsPacket), "retrieved", len(hashes)) + log.Trace("handleGetPooledTransactions", "peer", peer.String(), "RequestId", query.RequestId, "len(query)", len(query.GetPooledTransactionsPacket), "retrieved", len(hashes)) + for _, hash := range hashes { + log.Debug("Received new pooled transaction", "hash", hash.Hex(), "peer", peer.String()) + } getPooledTxsQueryLenGauge.Update(int64(len(query.GetPooledTransactionsPacket))) getPooledTxsRetrievedLenGauge.Update(int64(len(hashes))) return peer.ReplyPooledTransactionsRLP(query.RequestId, hashes, txs) @@ -427,16 +431,16 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { var txs TransactionsPacket if err := msg.Decode(&txs); err != nil { handleTxsFailMeter.Mark(1) - log.Debug("Failed to decode `TransactionsPacket`", "peer", peer.String(), "err", err) + log.Trace("Failed to decode `TransactionsPacket`", "peer", peer.String(), "err", err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } - log.Debug("handleTransactions", "peer", peer.String(), "len(txs)", len(txs)) + log.Trace("handleTransactions", "peer", peer.String(), "len(txs)", len(txs)) handleTxsLenGauge.Update(int64(len(txs))) for i, tx := range txs { // Validate and mark the remote transaction if tx == nil { handleTxsNilMeter.Mark(1) - log.Debug("handleTransactions: transaction is nil", "peer", peer.String(), "i", i) + log.Trace("handleTransactions: transaction is nil", "peer", peer.String(), "i", i) return fmt.Errorf("%w: transaction %d is nil", errDecode, i) } peer.markTransaction(tx.Hash()) @@ -453,16 +457,16 @@ func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error var txs PooledTransactionsPacket66 if err := msg.Decode(&txs); err != nil { pooledTxs66FailMeter.Mark(1) - log.Debug("Failed to decode `PooledTransactionsPacket66`", "peer", peer.String(), "err", err) + log.Trace("Failed to decode `PooledTransactionsPacket66`", "peer", peer.String(), "err", err) return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) } - log.Debug("handlePooledTransactions66", "peer", peer.String(), "len(txs)", len(txs.PooledTransactionsPacket)) + log.Trace("handlePooledTransactions66", "peer", peer.String(), "len(txs)", len(txs.PooledTransactionsPacket)) pooledTxs66LenGauge.Update(int64(len(txs.PooledTransactionsPacket))) for i, tx := range txs.PooledTransactionsPacket { // Validate and mark the remote transaction if tx == nil { pooledTxs66NilMeter.Mark(1) - log.Debug("handlePooledTransactions: transaction is nil", "peer", peer.String(), "i", i) + log.Trace("handlePooledTransactions: transaction is nil", "peer", peer.String(), "i", i) return fmt.Errorf("%w: transaction %d is nil", errDecode, i) } peer.markTransaction(tx.Hash()) diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index d7ab78922723..838232051cbc 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -436,7 +436,7 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error { p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) id := rand.Uint64() - log.Debug("Requesting transactions", "RequestId", id, "Peer.id", p.id, "count", len(hashes)) + log.Trace("Requesting transactions", "RequestId", id, "Peer.id", p.id, "count", len(hashes)) peerRequestTxsCntGauge.Update(int64(len(hashes))) requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id) diff --git a/params/version.go b/params/version.go index 7105fe6f63be..07af53cefdfe 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 8 // Minor version component of the current release - VersionPatch = 44 // Patch version component of the current release + VersionPatch = 45 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string )