Skip to content

keep track of p2p go routine metrics #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 27, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 172 additions & 18 deletions common/monitoring/metricsMonitoring.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"fmt"
"math"
"reflect"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/zoobc/zoobc-core/common/model"
@@ -16,20 +17,71 @@ type lastblockMetrics struct {
}

var (
isMonitoringActive bool
nodePublicKey []byte
receiptCounter prometheus.Counter
unresolvedPeersCounter prometheus.Gauge
resolvedPeersCounter prometheus.Gauge
unresolvedPriorityPeersCounter prometheus.Gauge
resolvedPriorityPeersCounter prometheus.Gauge
isMonitoringActive bool
nodePublicKey []byte

receiptCounter prometheus.Counter
receiptCounterSync sync.Mutex

unresolvedPeersCounter prometheus.Gauge
unresolvedPeersCounterSync sync.Mutex

resolvedPeersCounter prometheus.Gauge
resolvedPeersCounterSync sync.Mutex

unresolvedPriorityPeersCounter prometheus.Gauge
unresolvedPriorityPeersCounterSync sync.Mutex

resolvedPriorityPeersCounter prometheus.Gauge
resolvedPriorityPeersCounterSync sync.Mutex

activeRegisteredNodesGauge prometheus.Gauge
nodeScore prometheus.Gauge
blockerCounter = make(map[string]prometheus.Counter)
statusLockCounter = make(map[int]prometheus.Gauge)
blockchainStatus = make(map[int32]prometheus.Gauge)
blockchainSmithTime = make(map[int32]prometheus.Gauge)
blockchainHeight = make(map[int32]*lastblockMetrics)
activeRegisteredNodesGaugeSync sync.Mutex

nodeScore prometheus.Gauge
nodeScoreSync sync.Mutex

blockerCounter = make(map[string]prometheus.Counter)
blockerCounterSync sync.Mutex

statusLockCounter = make(map[int]prometheus.Gauge)
statusLockCounterSync sync.Mutex

blockchainStatus = make(map[int32]prometheus.Gauge)
blockchainStatusSync sync.Mutex

blockchainSmithTime = make(map[int32]prometheus.Gauge)
blockchainSmithTimeSync sync.Mutex

blockchainHeight = make(map[int32]*lastblockMetrics)
blockchainHeightSync sync.Mutex

goRoutineActivityCounters = make(map[string]prometheus.Gauge)
goRoutineActivityCountersSync sync.Mutex
)

const (
P2pGetPeerInfoServer = "P2pGetPeerInfoServer"
P2pGetMorePeersServer = "P2pGetMorePeersServer"
P2pSendPeersServer = "P2pSendPeersServer"
P2pSendBlockServer = "P2pSendBlockServer"
P2pSendTransactionServer = "P2pSendTransactionServer"
P2pRequestBlockTransactionsServer = "P2pRequestBlockTransactionsServer"
P2pGetCumulativeDifficultyServer = "P2pGetCumulativeDifficultyServer"
P2pGetCommonMilestoneBlockIDsServer = "P2pGetCommonMilestoneBlockIDsServer"
P2pGetNextBlockIDsServer = "P2pGetNextBlockIDsServer"
P2pGetNextBlocksServer = "P2pGetNextBlocksServer"

P2pGetPeerInfoClient = "P2pGetPeerInfoClient"
P2pGetMorePeersClient = "P2pGetMorePeersClient"
P2pSendPeersClient = "P2pSendPeersClient"
P2pSendBlockClient = "P2pSendBlockClient"
P2pSendTransactionClient = "P2pSendTransactionClient"
P2pRequestBlockTransactionsClient = "P2pRequestBlockTransactionsClient"
P2pGetCumulativeDifficultyClient = "P2pGetCumulativeDifficultyClient"
P2pGetCommonMilestoneBlockIDsClient = "P2pGetCommonMilestoneBlockIDsClient"
P2pGetNextBlockIDsClient = "P2pGetNextBlockIDsClient"
P2pGetNextBlocksClient = "P2pGetNextBlocksClient"
)

func SetMonitoringActive(isActive bool) {
@@ -45,6 +97,12 @@ func IsMonitoringActive() bool {
}

func IncrementReceiptCounter() {
if !isMonitoringActive {
return
}

receiptCounterSync.Lock()
defer receiptCounterSync.Unlock()
if receiptCounter == nil {
receiptCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("zoobc_receipts"),
@@ -57,6 +115,13 @@ func IncrementReceiptCounter() {
}

func SetUnresolvedPeersCount(count int) {
if !isMonitoringActive {
return
}

unresolvedPeersCounterSync.Lock()
defer unresolvedPeersCounterSync.Unlock()

if unresolvedPeersCounter == nil {
unresolvedPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_unresolved_peers"),
@@ -69,6 +134,13 @@ func SetUnresolvedPeersCount(count int) {
}

func SetResolvedPeersCount(count int) {
if !isMonitoringActive {
return
}

resolvedPeersCounterSync.Lock()
defer resolvedPeersCounterSync.Unlock()

if resolvedPeersCounter == nil {
resolvedPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_resolved_peers"),
@@ -81,6 +153,13 @@ func SetResolvedPeersCount(count int) {
}

func SetResolvedPriorityPeersCount(count int) {
if !isMonitoringActive {
return
}

resolvedPriorityPeersCounterSync.Lock()
defer resolvedPriorityPeersCounterSync.Unlock()

if resolvedPriorityPeersCounter == nil {
resolvedPriorityPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_resolved_priority_peers"),
@@ -93,6 +172,13 @@ func SetResolvedPriorityPeersCount(count int) {
}

func SetUnresolvedPriorityPeersCount(count int) {
if !isMonitoringActive {
return
}

unresolvedPriorityPeersCounterSync.Lock()
defer unresolvedPriorityPeersCounterSync.Unlock()

if unresolvedPriorityPeersCounter == nil {
unresolvedPriorityPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_unresolved_priority_peers"),
@@ -105,6 +191,13 @@ func SetUnresolvedPriorityPeersCount(count int) {
}

func SetActiveRegisteredNodesCount(count int) {
if !isMonitoringActive {
return
}

activeRegisteredNodesGaugeSync.Lock()
defer activeRegisteredNodesGaugeSync.Unlock()

if activeRegisteredNodesGauge == nil {
activeRegisteredNodesGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_active_registered_nodes"),
@@ -121,6 +214,9 @@ func IncrementBlockerMetrics(typeBlocker string) {
return
}

blockerCounterSync.Lock()
defer blockerCounterSync.Unlock()

if blockerCounter[typeBlocker] == nil {
blockerCounter[typeBlocker] = prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("zoobc_err_%s", typeBlocker),
@@ -136,6 +232,9 @@ func IncrementStatusLockCounter(typeStatusLock int) {
return
}

statusLockCounterSync.Lock()
defer statusLockCounterSync.Unlock()

if statusLockCounter[typeStatusLock] == nil {
statusLockCounter[typeStatusLock] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_status_lock_%d", typeStatusLock),
@@ -154,25 +253,30 @@ func DecrementStatusLockCounter(typeStatusLock int) {
return
}

if !isMonitoringActive {
return
}
statusLockCounterSync.Lock()
defer statusLockCounterSync.Unlock()

if statusLockCounter[typeStatusLock] == nil {
statusLockCounter[typeStatusLock] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_status_lock_%d", typeStatusLock),
Help: fmt.Sprintf("Status lock %d counter", typeStatusLock),
})
prometheus.MustRegister(statusLockCounter[typeStatusLock])
} else {
statusLockCounter[typeStatusLock].Dec()

// to avoid below as the initial value, on creation on decrement, we exit
return
}
statusLockCounter[typeStatusLock].Dec()
}

func SetBlockchainStatus(chainType int32, newStatus int) {
if !isMonitoringActive {
return
}

blockchainStatusSync.Lock()
defer blockchainStatusSync.Unlock()

if blockchainStatus[chainType] == nil {
blockchainStatus[chainType] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_blockchain_status_%d", chainType),
@@ -187,6 +291,10 @@ func SetBlockchainSmithTime(chainType int32, newTime int64) {
if !isMonitoringActive {
return
}

blockchainSmithTimeSync.Lock()
defer blockchainSmithTimeSync.Unlock()

if blockchainSmithTime[chainType] == nil {
blockchainSmithTime[chainType] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_blockchain_%d_smith_time", chainType),
@@ -201,6 +309,10 @@ func SetNodeScore(activeBlocksmiths []*model.Blocksmith) {
if !isMonitoringActive {
return
}

nodeScoreSync.Lock()
defer nodeScoreSync.Unlock()

if nodeScore == nil {
nodeScore = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "zoobc_node_score",
@@ -225,6 +337,9 @@ func SetLastBlock(chainType int32, block *model.Block) {
return
}

blockchainHeightSync.Lock()
defer blockchainHeightSync.Unlock()

if blockchainHeight[chainType] == nil {
idMsbMetrics := prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_blockchain_id_%d_msb", chainType),
@@ -252,3 +367,42 @@ func SetLastBlock(chainType int32, block *model.Block) {
blockchainHeight[chainType].IDLsb.Set(math.Abs(float64(block.GetID() % int64(1000000000))))
blockchainHeight[chainType].Height.Set(float64(block.GetHeight()))
}

func IncrementGoRoutineActivity(activityName string) {
if !isMonitoringActive {
return
}

goRoutineActivityCountersSync.Lock()
defer goRoutineActivityCountersSync.Unlock()

if goRoutineActivityCounters[activityName] == nil {
goRoutineActivityCounters[activityName] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_routines_counter_%s", activityName),
Help: fmt.Sprintf("Go routine counter for %s", activityName),
})
prometheus.MustRegister(goRoutineActivityCounters[activityName])
}
goRoutineActivityCounters[activityName].Inc()
}

func DecrementGoRoutineActivity(activityName string) {
if !isMonitoringActive {
return
}

goRoutineActivityCountersSync.Lock()
defer goRoutineActivityCountersSync.Unlock()

if goRoutineActivityCounters[activityName] == nil {
goRoutineActivityCounters[activityName] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_routines_counter_%s", activityName),
Help: fmt.Sprintf("Go routine counter for %s", activityName),
})
prometheus.MustRegister(goRoutineActivityCounters[activityName])

// to avoid below as the initial value, on creation on decrement, we exit
return
}
goRoutineActivityCounters[activityName].Dec()
}
27 changes: 27 additions & 0 deletions p2p/client/peerServiceClient.go
Original file line number Diff line number Diff line change
@@ -174,6 +174,9 @@ func (psc *PeerServiceClient) getDefaultContext(requestTimeOut time.Duration) (c

// GetPeerInfo to get Peer info
func (psc *PeerServiceClient) GetPeerInfo(destPeer *model.Peer) (*model.Node, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetPeerInfoClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetPeerInfoClient)

// add a copy to avoid pointer delete
connection, err := psc.GetConnection(destPeer)
if err != nil {
@@ -202,6 +205,9 @@ func (psc *PeerServiceClient) GetPeerInfo(destPeer *model.Peer) (*model.Node, er

// GetMorePeers to collect more peers available
func (psc *PeerServiceClient) GetMorePeers(destPeer *model.Peer) (*model.GetMorePeersResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetMorePeersClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetMorePeersClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
@@ -224,6 +230,9 @@ func (psc *PeerServiceClient) GetMorePeers(destPeer *model.Peer) (*model.GetMore

// SendPeers sends set of peers to other node (to populate the network)
func (psc *PeerServiceClient) SendPeers(destPeer *model.Peer, peersInfo []*model.Node) (*model.Empty, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendPeersClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendPeersClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
@@ -250,6 +259,9 @@ func (psc *PeerServiceClient) SendBlock(
block *model.Block,
chainType chaintype.ChainType,
) error {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendBlockClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendBlockClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return err
@@ -288,6 +300,9 @@ func (psc *PeerServiceClient) SendTransaction(
transactionBytes []byte,
chainType chaintype.ChainType,
) error {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendTransactionClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendTransactionClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return err
@@ -325,6 +340,9 @@ func (psc *PeerServiceClient) GetCumulativeDifficulty(
destPeer *model.Peer,
chaintype chaintype.ChainType,
) (*model.GetCumulativeDifficultyResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
@@ -353,6 +371,9 @@ func (psc *PeerServiceClient) GetCommonMilestoneBlockIDs(
chaintype chaintype.ChainType,
lastBlockID, lastMilestoneBlockID int64,
) (*model.GetCommonMilestoneBlockIdsResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
@@ -384,6 +405,9 @@ func (psc *PeerServiceClient) GetNextBlockIDs(
blockID int64,
limit uint32,
) (*model.BlockIdsResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
@@ -415,6 +439,9 @@ func (psc *PeerServiceClient) GetNextBlocks(
blockIds []int64,
blockID int64,
) (*model.BlocksData, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlocksClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlocksClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
28 changes: 28 additions & 0 deletions p2p/handler/p2pServerHandler.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"github.com/zoobc/zoobc-core/common/blocker"
"github.com/zoobc/zoobc-core/common/chaintype"
"github.com/zoobc/zoobc-core/common/model"
"github.com/zoobc/zoobc-core/common/monitoring"
service2 "github.com/zoobc/zoobc-core/p2p/service"
)

@@ -24,11 +25,17 @@ func NewP2PServerHandler(

// GetPeerInfo to return info of this host
func (ss *P2PServerHandler) GetPeerInfo(ctx context.Context, req *model.GetPeerInfoRequest) (*model.Node, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetPeerInfoServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetPeerInfoServer)

return ss.Service.GetPeerInfo(ctx, req)
}

// GetMorePeers contains info other peers
func (ss *P2PServerHandler) GetMorePeers(ctx context.Context, req *model.Empty) (*model.GetMorePeersResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetMorePeersServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetMorePeersServer)

var nodes []*model.Node
nodes, err := ss.Service.GetMorePeers(ctx, req)
if err != nil {
@@ -41,6 +48,9 @@ func (ss *P2PServerHandler) GetMorePeers(ctx context.Context, req *model.Empty)

// SendPeers receives set of peers info from other node and put them into the unresolved peers
func (ss *P2PServerHandler) SendPeers(ctx context.Context, req *model.SendPeersRequest) (*model.Empty, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendPeersServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendPeersServer)

// TODO: only accept nodes that are already registered in the node registration
if req.Peers == nil {
return nil, blocker.NewBlocker(
@@ -55,11 +65,17 @@ func (ss *P2PServerHandler) SendPeers(ctx context.Context, req *model.SendPeersR
func (ss *P2PServerHandler) GetCumulativeDifficulty(ctx context.Context,
req *model.GetCumulativeDifficultyRequest,
) (*model.GetCumulativeDifficultyResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyServer)

return ss.Service.GetCumulativeDifficulty(ctx, chaintype.GetChainType(req.ChainType))
}

func (ss *P2PServerHandler) GetCommonMilestoneBlockIDs(ctx context.Context,
req *model.GetCommonMilestoneBlockIdsRequest) (*model.GetCommonMilestoneBlockIdsResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsServer)

// if `lastBlockID` is supplied
// check it the last `lastBlockID` got matches with the host's lastBlock then return the response as is
chainType := chaintype.GetChainType(req.ChainType)
@@ -75,6 +91,9 @@ func (ss *P2PServerHandler) GetCommonMilestoneBlockIDs(ctx context.Context,
}

func (ss *P2PServerHandler) GetNextBlockIDs(ctx context.Context, req *model.GetNextBlockIdsRequest) (*model.BlockIdsResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsServer)

chainType := chaintype.GetChainType(req.ChainType)
blockIds, err := ss.Service.GetNextBlockIDs(ctx, chainType, req.Limit, req.BlockId)
if err != nil {
@@ -86,6 +105,9 @@ func (ss *P2PServerHandler) GetNextBlockIDs(ctx context.Context, req *model.GetN
}

func (ss *P2PServerHandler) GetNextBlocks(ctx context.Context, req *model.GetNextBlocksRequest) (*model.BlocksData, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlocksServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlocksServer)

// TODO: getting data from cache
chainType := chaintype.GetChainType(req.ChainType)
return ss.Service.GetNextBlocks(
@@ -98,6 +120,9 @@ func (ss *P2PServerHandler) GetNextBlocks(ctx context.Context, req *model.GetNex

// SendBlock receive block from other node and calling BlockReceived Event
func (ss *P2PServerHandler) SendBlock(ctx context.Context, req *model.SendBlockRequest) (*model.SendBlockResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendBlockServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendBlockServer)

// todo: validate request
return ss.Service.SendBlock(
ctx,
@@ -112,6 +137,9 @@ func (ss *P2PServerHandler) SendTransaction(
ctx context.Context,
req *model.SendTransactionRequest,
) (*model.SendTransactionResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendTransactionServer)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendTransactionServer)

return ss.Service.SendTransaction(
ctx,
chaintype.GetChainType(req.ChainType),