From 3d7fd3fe08946eefb8f0d29c3b9c8088870dbb39 Mon Sep 17 00:00:00 2001 From: capt4ce Date: Mon, 27 Jan 2020 15:30:00 +0800 Subject: [PATCH] keep track of p2p go routine metrics --- common/monitoring/metricsMonitoring.go | 190 ++++++++++++++++++++++--- p2p/client/peerServiceClient.go | 27 ++++ p2p/handler/p2pServerHandler.go | 28 ++++ 3 files changed, 227 insertions(+), 18 deletions(-) diff --git a/common/monitoring/metricsMonitoring.go b/common/monitoring/metricsMonitoring.go index b4ae43878..274c48f66 100644 --- a/common/monitoring/metricsMonitoring.go +++ b/common/monitoring/metricsMonitoring.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "reflect" + "sync" "github.com/prometheus/client_golang/prometheus" "github.com/zoobc/zoobc-core/common/model" @@ -16,20 +17,71 @@ type lastblockMetrics struct { } var ( - isMonitoringActive bool - nodePublicKey []byte - receiptCounter prometheus.Counter - unresolvedPeersCounter prometheus.Gauge - resolvedPeersCounter prometheus.Gauge - unresolvedPriorityPeersCounter prometheus.Gauge - resolvedPriorityPeersCounter prometheus.Gauge + isMonitoringActive bool + nodePublicKey []byte + + receiptCounter prometheus.Counter + receiptCounterSync sync.Mutex + + unresolvedPeersCounter prometheus.Gauge + unresolvedPeersCounterSync sync.Mutex + + resolvedPeersCounter prometheus.Gauge + resolvedPeersCounterSync sync.Mutex + + unresolvedPriorityPeersCounter prometheus.Gauge + unresolvedPriorityPeersCounterSync sync.Mutex + + resolvedPriorityPeersCounter prometheus.Gauge + resolvedPriorityPeersCounterSync sync.Mutex + activeRegisteredNodesGauge prometheus.Gauge - nodeScore prometheus.Gauge - blockerCounter = make(map[string]prometheus.Counter) - statusLockCounter = make(map[int]prometheus.Gauge) - blockchainStatus = make(map[int32]prometheus.Gauge) - blockchainSmithTime = make(map[int32]prometheus.Gauge) - blockchainHeight = make(map[int32]*lastblockMetrics) + activeRegisteredNodesGaugeSync sync.Mutex + + nodeScore prometheus.Gauge + nodeScoreSync sync.Mutex + + blockerCounter = make(map[string]prometheus.Counter) + blockerCounterSync sync.Mutex + + statusLockCounter = make(map[int]prometheus.Gauge) + statusLockCounterSync sync.Mutex + + blockchainStatus = make(map[int32]prometheus.Gauge) + blockchainStatusSync sync.Mutex + + blockchainSmithTime = make(map[int32]prometheus.Gauge) + blockchainSmithTimeSync sync.Mutex + + blockchainHeight = make(map[int32]*lastblockMetrics) + blockchainHeightSync sync.Mutex + + goRoutineActivityCounters = make(map[string]prometheus.Gauge) + goRoutineActivityCountersSync sync.Mutex +) + +const ( + P2pGetPeerInfoServer = "P2pGetPeerInfoServer" + P2pGetMorePeersServer = "P2pGetMorePeersServer" + P2pSendPeersServer = "P2pSendPeersServer" + P2pSendBlockServer = "P2pSendBlockServer" + P2pSendTransactionServer = "P2pSendTransactionServer" + P2pRequestBlockTransactionsServer = "P2pRequestBlockTransactionsServer" + P2pGetCumulativeDifficultyServer = "P2pGetCumulativeDifficultyServer" + P2pGetCommonMilestoneBlockIDsServer = "P2pGetCommonMilestoneBlockIDsServer" + P2pGetNextBlockIDsServer = "P2pGetNextBlockIDsServer" + P2pGetNextBlocksServer = "P2pGetNextBlocksServer" + + P2pGetPeerInfoClient = "P2pGetPeerInfoClient" + P2pGetMorePeersClient = "P2pGetMorePeersClient" + P2pSendPeersClient = "P2pSendPeersClient" + P2pSendBlockClient = "P2pSendBlockClient" + P2pSendTransactionClient = "P2pSendTransactionClient" + P2pRequestBlockTransactionsClient = "P2pRequestBlockTransactionsClient" + P2pGetCumulativeDifficultyClient = "P2pGetCumulativeDifficultyClient" + P2pGetCommonMilestoneBlockIDsClient = "P2pGetCommonMilestoneBlockIDsClient" + P2pGetNextBlockIDsClient = "P2pGetNextBlockIDsClient" + P2pGetNextBlocksClient = "P2pGetNextBlocksClient" ) func SetMonitoringActive(isActive bool) { @@ -45,6 +97,12 @@ func IsMonitoringActive() bool { } func IncrementReceiptCounter() { + if !isMonitoringActive { + return + } + + receiptCounterSync.Lock() + defer receiptCounterSync.Unlock() if receiptCounter == nil { receiptCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: fmt.Sprintf("zoobc_receipts"), @@ -57,6 +115,13 @@ func IncrementReceiptCounter() { } func SetUnresolvedPeersCount(count int) { + if !isMonitoringActive { + return + } + + unresolvedPeersCounterSync.Lock() + defer unresolvedPeersCounterSync.Unlock() + if unresolvedPeersCounter == nil { unresolvedPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_unresolved_peers"), @@ -69,6 +134,13 @@ func SetUnresolvedPeersCount(count int) { } func SetResolvedPeersCount(count int) { + if !isMonitoringActive { + return + } + + resolvedPeersCounterSync.Lock() + defer resolvedPeersCounterSync.Unlock() + if resolvedPeersCounter == nil { resolvedPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_resolved_peers"), @@ -81,6 +153,13 @@ func SetResolvedPeersCount(count int) { } func SetResolvedPriorityPeersCount(count int) { + if !isMonitoringActive { + return + } + + resolvedPriorityPeersCounterSync.Lock() + defer resolvedPriorityPeersCounterSync.Unlock() + if resolvedPriorityPeersCounter == nil { resolvedPriorityPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_resolved_priority_peers"), @@ -93,6 +172,13 @@ func SetResolvedPriorityPeersCount(count int) { } func SetUnresolvedPriorityPeersCount(count int) { + if !isMonitoringActive { + return + } + + unresolvedPriorityPeersCounterSync.Lock() + defer unresolvedPriorityPeersCounterSync.Unlock() + if unresolvedPriorityPeersCounter == nil { unresolvedPriorityPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_unresolved_priority_peers"), @@ -105,6 +191,13 @@ func SetUnresolvedPriorityPeersCount(count int) { } func SetActiveRegisteredNodesCount(count int) { + if !isMonitoringActive { + return + } + + activeRegisteredNodesGaugeSync.Lock() + defer activeRegisteredNodesGaugeSync.Unlock() + if activeRegisteredNodesGauge == nil { activeRegisteredNodesGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_active_registered_nodes"), @@ -121,6 +214,9 @@ func IncrementBlockerMetrics(typeBlocker string) { return } + blockerCounterSync.Lock() + defer blockerCounterSync.Unlock() + if blockerCounter[typeBlocker] == nil { blockerCounter[typeBlocker] = prometheus.NewCounter(prometheus.CounterOpts{ Name: fmt.Sprintf("zoobc_err_%s", typeBlocker), @@ -136,6 +232,9 @@ func IncrementStatusLockCounter(typeStatusLock int) { return } + statusLockCounterSync.Lock() + defer statusLockCounterSync.Unlock() + if statusLockCounter[typeStatusLock] == nil { statusLockCounter[typeStatusLock] = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_status_lock_%d", typeStatusLock), @@ -154,9 +253,8 @@ func DecrementStatusLockCounter(typeStatusLock int) { return } - if !isMonitoringActive { - return - } + statusLockCounterSync.Lock() + defer statusLockCounterSync.Unlock() if statusLockCounter[typeStatusLock] == nil { statusLockCounter[typeStatusLock] = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -164,15 +262,21 @@ func DecrementStatusLockCounter(typeStatusLock int) { Help: fmt.Sprintf("Status lock %d counter", typeStatusLock), }) prometheus.MustRegister(statusLockCounter[typeStatusLock]) - } else { - statusLockCounter[typeStatusLock].Dec() + + // to avoid below as the initial value, on creation on decrement, we exit + return } + statusLockCounter[typeStatusLock].Dec() } func SetBlockchainStatus(chainType int32, newStatus int) { if !isMonitoringActive { return } + + blockchainStatusSync.Lock() + defer blockchainStatusSync.Unlock() + if blockchainStatus[chainType] == nil { blockchainStatus[chainType] = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_blockchain_status_%d", chainType), @@ -187,6 +291,10 @@ func SetBlockchainSmithTime(chainType int32, newTime int64) { if !isMonitoringActive { return } + + blockchainSmithTimeSync.Lock() + defer blockchainSmithTimeSync.Unlock() + if blockchainSmithTime[chainType] == nil { blockchainSmithTime[chainType] = prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_blockchain_%d_smith_time", chainType), @@ -201,6 +309,10 @@ func SetNodeScore(activeBlocksmiths []*model.Blocksmith) { if !isMonitoringActive { return } + + nodeScoreSync.Lock() + defer nodeScoreSync.Unlock() + if nodeScore == nil { nodeScore = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "zoobc_node_score", @@ -225,6 +337,9 @@ func SetLastBlock(chainType int32, block *model.Block) { return } + blockchainHeightSync.Lock() + defer blockchainHeightSync.Unlock() + if blockchainHeight[chainType] == nil { idMsbMetrics := prometheus.NewGauge(prometheus.GaugeOpts{ Name: fmt.Sprintf("zoobc_blockchain_id_%d_msb", chainType), @@ -252,3 +367,42 @@ func SetLastBlock(chainType int32, block *model.Block) { blockchainHeight[chainType].IDLsb.Set(math.Abs(float64(block.GetID() % int64(1000000000)))) blockchainHeight[chainType].Height.Set(float64(block.GetHeight())) } + +func IncrementGoRoutineActivity(activityName string) { + if !isMonitoringActive { + return + } + + goRoutineActivityCountersSync.Lock() + defer goRoutineActivityCountersSync.Unlock() + + if goRoutineActivityCounters[activityName] == nil { + goRoutineActivityCounters[activityName] = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: fmt.Sprintf("zoobc_routines_counter_%s", activityName), + Help: fmt.Sprintf("Go routine counter for %s", activityName), + }) + prometheus.MustRegister(goRoutineActivityCounters[activityName]) + } + goRoutineActivityCounters[activityName].Inc() +} + +func DecrementGoRoutineActivity(activityName string) { + if !isMonitoringActive { + return + } + + goRoutineActivityCountersSync.Lock() + defer goRoutineActivityCountersSync.Unlock() + + if goRoutineActivityCounters[activityName] == nil { + goRoutineActivityCounters[activityName] = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: fmt.Sprintf("zoobc_routines_counter_%s", activityName), + Help: fmt.Sprintf("Go routine counter for %s", activityName), + }) + prometheus.MustRegister(goRoutineActivityCounters[activityName]) + + // to avoid below as the initial value, on creation on decrement, we exit + return + } + goRoutineActivityCounters[activityName].Dec() +} diff --git a/p2p/client/peerServiceClient.go b/p2p/client/peerServiceClient.go index 111de3d23..62560ae5e 100644 --- a/p2p/client/peerServiceClient.go +++ b/p2p/client/peerServiceClient.go @@ -174,6 +174,9 @@ func (psc *PeerServiceClient) getDefaultContext(requestTimeOut time.Duration) (c // GetPeerInfo to get Peer info func (psc *PeerServiceClient) GetPeerInfo(destPeer *model.Peer) (*model.Node, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetPeerInfoClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetPeerInfoClient) + // add a copy to avoid pointer delete connection, err := psc.GetConnection(destPeer) if err != nil { @@ -202,6 +205,9 @@ func (psc *PeerServiceClient) GetPeerInfo(destPeer *model.Peer) (*model.Node, er // GetMorePeers to collect more peers available func (psc *PeerServiceClient) GetMorePeers(destPeer *model.Peer) (*model.GetMorePeersResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetMorePeersClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetMorePeersClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return nil, err @@ -224,6 +230,9 @@ func (psc *PeerServiceClient) GetMorePeers(destPeer *model.Peer) (*model.GetMore // SendPeers sends set of peers to other node (to populate the network) func (psc *PeerServiceClient) SendPeers(destPeer *model.Peer, peersInfo []*model.Node) (*model.Empty, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pSendPeersClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendPeersClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return nil, err @@ -250,6 +259,9 @@ func (psc *PeerServiceClient) SendBlock( block *model.Block, chainType chaintype.ChainType, ) error { + monitoring.IncrementGoRoutineActivity(monitoring.P2pSendBlockClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendBlockClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return err @@ -288,6 +300,9 @@ func (psc *PeerServiceClient) SendTransaction( transactionBytes []byte, chainType chaintype.ChainType, ) error { + monitoring.IncrementGoRoutineActivity(monitoring.P2pSendTransactionClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendTransactionClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return err @@ -325,6 +340,9 @@ func (psc *PeerServiceClient) GetCumulativeDifficulty( destPeer *model.Peer, chaintype chaintype.ChainType, ) (*model.GetCumulativeDifficultyResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return nil, err @@ -353,6 +371,9 @@ func (psc *PeerServiceClient) GetCommonMilestoneBlockIDs( chaintype chaintype.ChainType, lastBlockID, lastMilestoneBlockID int64, ) (*model.GetCommonMilestoneBlockIdsResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return nil, err @@ -384,6 +405,9 @@ func (psc *PeerServiceClient) GetNextBlockIDs( blockID int64, limit uint32, ) (*model.BlockIdsResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return nil, err @@ -415,6 +439,9 @@ func (psc *PeerServiceClient) GetNextBlocks( blockIds []int64, blockID int64, ) (*model.BlocksData, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlocksClient) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlocksClient) + connection, err := psc.GetConnection(destPeer) if err != nil { return nil, err diff --git a/p2p/handler/p2pServerHandler.go b/p2p/handler/p2pServerHandler.go index ddb2e9db8..676e24351 100644 --- a/p2p/handler/p2pServerHandler.go +++ b/p2p/handler/p2pServerHandler.go @@ -6,6 +6,7 @@ import ( "github.com/zoobc/zoobc-core/common/blocker" "github.com/zoobc/zoobc-core/common/chaintype" "github.com/zoobc/zoobc-core/common/model" + "github.com/zoobc/zoobc-core/common/monitoring" service2 "github.com/zoobc/zoobc-core/p2p/service" ) @@ -24,11 +25,17 @@ func NewP2PServerHandler( // GetPeerInfo to return info of this host func (ss *P2PServerHandler) GetPeerInfo(ctx context.Context, req *model.GetPeerInfoRequest) (*model.Node, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetPeerInfoServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetPeerInfoServer) + return ss.Service.GetPeerInfo(ctx, req) } // GetMorePeers contains info other peers func (ss *P2PServerHandler) GetMorePeers(ctx context.Context, req *model.Empty) (*model.GetMorePeersResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetMorePeersServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetMorePeersServer) + var nodes []*model.Node nodes, err := ss.Service.GetMorePeers(ctx, req) if err != nil { @@ -41,6 +48,9 @@ func (ss *P2PServerHandler) GetMorePeers(ctx context.Context, req *model.Empty) // SendPeers receives set of peers info from other node and put them into the unresolved peers func (ss *P2PServerHandler) SendPeers(ctx context.Context, req *model.SendPeersRequest) (*model.Empty, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pSendPeersServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendPeersServer) + // TODO: only accept nodes that are already registered in the node registration if req.Peers == nil { return nil, blocker.NewBlocker( @@ -55,11 +65,17 @@ func (ss *P2PServerHandler) SendPeers(ctx context.Context, req *model.SendPeersR func (ss *P2PServerHandler) GetCumulativeDifficulty(ctx context.Context, req *model.GetCumulativeDifficultyRequest, ) (*model.GetCumulativeDifficultyResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyServer) + return ss.Service.GetCumulativeDifficulty(ctx, chaintype.GetChainType(req.ChainType)) } func (ss *P2PServerHandler) GetCommonMilestoneBlockIDs(ctx context.Context, req *model.GetCommonMilestoneBlockIdsRequest) (*model.GetCommonMilestoneBlockIdsResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsServer) + // if `lastBlockID` is supplied // check it the last `lastBlockID` got matches with the host's lastBlock then return the response as is chainType := chaintype.GetChainType(req.ChainType) @@ -75,6 +91,9 @@ func (ss *P2PServerHandler) GetCommonMilestoneBlockIDs(ctx context.Context, } func (ss *P2PServerHandler) GetNextBlockIDs(ctx context.Context, req *model.GetNextBlockIdsRequest) (*model.BlockIdsResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsServer) + chainType := chaintype.GetChainType(req.ChainType) blockIds, err := ss.Service.GetNextBlockIDs(ctx, chainType, req.Limit, req.BlockId) if err != nil { @@ -86,6 +105,9 @@ func (ss *P2PServerHandler) GetNextBlockIDs(ctx context.Context, req *model.GetN } func (ss *P2PServerHandler) GetNextBlocks(ctx context.Context, req *model.GetNextBlocksRequest) (*model.BlocksData, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlocksServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlocksServer) + // TODO: getting data from cache chainType := chaintype.GetChainType(req.ChainType) return ss.Service.GetNextBlocks( @@ -98,6 +120,9 @@ func (ss *P2PServerHandler) GetNextBlocks(ctx context.Context, req *model.GetNex // SendBlock receive block from other node and calling BlockReceived Event func (ss *P2PServerHandler) SendBlock(ctx context.Context, req *model.SendBlockRequest) (*model.SendBlockResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pSendBlockServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendBlockServer) + // todo: validate request return ss.Service.SendBlock( ctx, @@ -112,6 +137,9 @@ func (ss *P2PServerHandler) SendTransaction( ctx context.Context, req *model.SendTransactionRequest, ) (*model.SendTransactionResponse, error) { + monitoring.IncrementGoRoutineActivity(monitoring.P2pSendTransactionServer) + defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendTransactionServer) + return ss.Service.SendTransaction( ctx, chaintype.GetChainType(req.ChainType),