Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 172 additions & 18 deletions common/monitoring/metricsMonitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"reflect"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/zoobc/zoobc-core/common/model"
Expand All @@ -16,20 +17,71 @@ type lastblockMetrics struct {
}

var (
isMonitoringActive bool
nodePublicKey []byte
receiptCounter prometheus.Counter
unresolvedPeersCounter prometheus.Gauge
resolvedPeersCounter prometheus.Gauge
unresolvedPriorityPeersCounter prometheus.Gauge
resolvedPriorityPeersCounter prometheus.Gauge
isMonitoringActive bool
nodePublicKey []byte

receiptCounter prometheus.Counter
receiptCounterSync sync.Mutex

unresolvedPeersCounter prometheus.Gauge
unresolvedPeersCounterSync sync.Mutex

resolvedPeersCounter prometheus.Gauge
resolvedPeersCounterSync sync.Mutex

unresolvedPriorityPeersCounter prometheus.Gauge
unresolvedPriorityPeersCounterSync sync.Mutex

resolvedPriorityPeersCounter prometheus.Gauge
resolvedPriorityPeersCounterSync sync.Mutex

activeRegisteredNodesGauge prometheus.Gauge
nodeScore prometheus.Gauge
blockerCounter = make(map[string]prometheus.Counter)
statusLockCounter = make(map[int]prometheus.Gauge)
blockchainStatus = make(map[int32]prometheus.Gauge)
blockchainSmithTime = make(map[int32]prometheus.Gauge)
blockchainHeight = make(map[int32]*lastblockMetrics)
activeRegisteredNodesGaugeSync sync.Mutex

nodeScore prometheus.Gauge
nodeScoreSync sync.Mutex

blockerCounter = make(map[string]prometheus.Counter)
blockerCounterSync sync.Mutex

statusLockCounter = make(map[int]prometheus.Gauge)
statusLockCounterSync sync.Mutex

blockchainStatus = make(map[int32]prometheus.Gauge)
blockchainStatusSync sync.Mutex

blockchainSmithTime = make(map[int32]prometheus.Gauge)
blockchainSmithTimeSync sync.Mutex

blockchainHeight = make(map[int32]*lastblockMetrics)
blockchainHeightSync sync.Mutex

goRoutineActivityCounters = make(map[string]prometheus.Gauge)
goRoutineActivityCountersSync sync.Mutex
)

const (
P2pGetPeerInfoServer = "P2pGetPeerInfoServer"
P2pGetMorePeersServer = "P2pGetMorePeersServer"
P2pSendPeersServer = "P2pSendPeersServer"
P2pSendBlockServer = "P2pSendBlockServer"
P2pSendTransactionServer = "P2pSendTransactionServer"
P2pRequestBlockTransactionsServer = "P2pRequestBlockTransactionsServer"
P2pGetCumulativeDifficultyServer = "P2pGetCumulativeDifficultyServer"
P2pGetCommonMilestoneBlockIDsServer = "P2pGetCommonMilestoneBlockIDsServer"
P2pGetNextBlockIDsServer = "P2pGetNextBlockIDsServer"
P2pGetNextBlocksServer = "P2pGetNextBlocksServer"

P2pGetPeerInfoClient = "P2pGetPeerInfoClient"
P2pGetMorePeersClient = "P2pGetMorePeersClient"
P2pSendPeersClient = "P2pSendPeersClient"
P2pSendBlockClient = "P2pSendBlockClient"
P2pSendTransactionClient = "P2pSendTransactionClient"
P2pRequestBlockTransactionsClient = "P2pRequestBlockTransactionsClient"
P2pGetCumulativeDifficultyClient = "P2pGetCumulativeDifficultyClient"
P2pGetCommonMilestoneBlockIDsClient = "P2pGetCommonMilestoneBlockIDsClient"
P2pGetNextBlockIDsClient = "P2pGetNextBlockIDsClient"
P2pGetNextBlocksClient = "P2pGetNextBlocksClient"
)

func SetMonitoringActive(isActive bool) {
Expand All @@ -45,6 +97,12 @@ func IsMonitoringActive() bool {
}

func IncrementReceiptCounter() {
if !isMonitoringActive {
return
}

receiptCounterSync.Lock()
defer receiptCounterSync.Unlock()
if receiptCounter == nil {
receiptCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("zoobc_receipts"),
Expand All @@ -57,6 +115,13 @@ func IncrementReceiptCounter() {
}

func SetUnresolvedPeersCount(count int) {
if !isMonitoringActive {
return
}

unresolvedPeersCounterSync.Lock()
defer unresolvedPeersCounterSync.Unlock()

if unresolvedPeersCounter == nil {
unresolvedPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_unresolved_peers"),
Expand All @@ -69,6 +134,13 @@ func SetUnresolvedPeersCount(count int) {
}

func SetResolvedPeersCount(count int) {
if !isMonitoringActive {
return
}

resolvedPeersCounterSync.Lock()
defer resolvedPeersCounterSync.Unlock()

if resolvedPeersCounter == nil {
resolvedPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_resolved_peers"),
Expand All @@ -81,6 +153,13 @@ func SetResolvedPeersCount(count int) {
}

func SetResolvedPriorityPeersCount(count int) {
if !isMonitoringActive {
return
}

resolvedPriorityPeersCounterSync.Lock()
defer resolvedPriorityPeersCounterSync.Unlock()

if resolvedPriorityPeersCounter == nil {
resolvedPriorityPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_resolved_priority_peers"),
Expand All @@ -93,6 +172,13 @@ func SetResolvedPriorityPeersCount(count int) {
}

func SetUnresolvedPriorityPeersCount(count int) {
if !isMonitoringActive {
return
}

unresolvedPriorityPeersCounterSync.Lock()
defer unresolvedPriorityPeersCounterSync.Unlock()

if unresolvedPriorityPeersCounter == nil {
unresolvedPriorityPeersCounter = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_unresolved_priority_peers"),
Expand All @@ -105,6 +191,13 @@ func SetUnresolvedPriorityPeersCount(count int) {
}

func SetActiveRegisteredNodesCount(count int) {
if !isMonitoringActive {
return
}

activeRegisteredNodesGaugeSync.Lock()
defer activeRegisteredNodesGaugeSync.Unlock()

if activeRegisteredNodesGauge == nil {
activeRegisteredNodesGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_active_registered_nodes"),
Expand All @@ -121,6 +214,9 @@ func IncrementBlockerMetrics(typeBlocker string) {
return
}

blockerCounterSync.Lock()
defer blockerCounterSync.Unlock()

if blockerCounter[typeBlocker] == nil {
blockerCounter[typeBlocker] = prometheus.NewCounter(prometheus.CounterOpts{
Name: fmt.Sprintf("zoobc_err_%s", typeBlocker),
Expand All @@ -136,6 +232,9 @@ func IncrementStatusLockCounter(typeStatusLock int) {
return
}

statusLockCounterSync.Lock()
defer statusLockCounterSync.Unlock()

if statusLockCounter[typeStatusLock] == nil {
statusLockCounter[typeStatusLock] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_status_lock_%d", typeStatusLock),
Expand All @@ -154,25 +253,30 @@ func DecrementStatusLockCounter(typeStatusLock int) {
return
}

if !isMonitoringActive {
return
}
statusLockCounterSync.Lock()
defer statusLockCounterSync.Unlock()

if statusLockCounter[typeStatusLock] == nil {
statusLockCounter[typeStatusLock] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_status_lock_%d", typeStatusLock),
Help: fmt.Sprintf("Status lock %d counter", typeStatusLock),
})
prometheus.MustRegister(statusLockCounter[typeStatusLock])
} else {
statusLockCounter[typeStatusLock].Dec()

// to avoid below as the initial value, on creation on decrement, we exit
return
}
statusLockCounter[typeStatusLock].Dec()
}

func SetBlockchainStatus(chainType int32, newStatus int) {
if !isMonitoringActive {
return
}

blockchainStatusSync.Lock()
defer blockchainStatusSync.Unlock()

if blockchainStatus[chainType] == nil {
blockchainStatus[chainType] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_blockchain_status_%d", chainType),
Expand All @@ -187,6 +291,10 @@ func SetBlockchainSmithTime(chainType int32, newTime int64) {
if !isMonitoringActive {
return
}

blockchainSmithTimeSync.Lock()
defer blockchainSmithTimeSync.Unlock()

if blockchainSmithTime[chainType] == nil {
blockchainSmithTime[chainType] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_blockchain_%d_smith_time", chainType),
Expand All @@ -201,6 +309,10 @@ func SetNodeScore(activeBlocksmiths []*model.Blocksmith) {
if !isMonitoringActive {
return
}

nodeScoreSync.Lock()
defer nodeScoreSync.Unlock()

if nodeScore == nil {
nodeScore = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "zoobc_node_score",
Expand All @@ -225,6 +337,9 @@ func SetLastBlock(chainType int32, block *model.Block) {
return
}

blockchainHeightSync.Lock()
defer blockchainHeightSync.Unlock()

if blockchainHeight[chainType] == nil {
idMsbMetrics := prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_blockchain_id_%d_msb", chainType),
Expand Down Expand Up @@ -252,3 +367,42 @@ func SetLastBlock(chainType int32, block *model.Block) {
blockchainHeight[chainType].IDLsb.Set(math.Abs(float64(block.GetID() % int64(1000000000))))
blockchainHeight[chainType].Height.Set(float64(block.GetHeight()))
}

func IncrementGoRoutineActivity(activityName string) {
if !isMonitoringActive {
return
}

goRoutineActivityCountersSync.Lock()
defer goRoutineActivityCountersSync.Unlock()

if goRoutineActivityCounters[activityName] == nil {
goRoutineActivityCounters[activityName] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_routines_counter_%s", activityName),
Help: fmt.Sprintf("Go routine counter for %s", activityName),
})
prometheus.MustRegister(goRoutineActivityCounters[activityName])
}
goRoutineActivityCounters[activityName].Inc()
}

func DecrementGoRoutineActivity(activityName string) {
if !isMonitoringActive {
return
}

goRoutineActivityCountersSync.Lock()
defer goRoutineActivityCountersSync.Unlock()

if goRoutineActivityCounters[activityName] == nil {
goRoutineActivityCounters[activityName] = prometheus.NewGauge(prometheus.GaugeOpts{
Name: fmt.Sprintf("zoobc_routines_counter_%s", activityName),
Help: fmt.Sprintf("Go routine counter for %s", activityName),
})
prometheus.MustRegister(goRoutineActivityCounters[activityName])

// to avoid below as the initial value, on creation on decrement, we exit
return
}
goRoutineActivityCounters[activityName].Dec()
}
27 changes: 27 additions & 0 deletions p2p/client/peerServiceClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ func (psc *PeerServiceClient) getDefaultContext(requestTimeOut time.Duration) (c

// GetPeerInfo to get Peer info
func (psc *PeerServiceClient) GetPeerInfo(destPeer *model.Peer) (*model.Node, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetPeerInfoClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetPeerInfoClient)

// add a copy to avoid pointer delete
connection, err := psc.GetConnection(destPeer)
if err != nil {
Expand Down Expand Up @@ -202,6 +205,9 @@ func (psc *PeerServiceClient) GetPeerInfo(destPeer *model.Peer) (*model.Node, er

// GetMorePeers to collect more peers available
func (psc *PeerServiceClient) GetMorePeers(destPeer *model.Peer) (*model.GetMorePeersResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetMorePeersClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetMorePeersClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
Expand All @@ -224,6 +230,9 @@ func (psc *PeerServiceClient) GetMorePeers(destPeer *model.Peer) (*model.GetMore

// SendPeers sends set of peers to other node (to populate the network)
func (psc *PeerServiceClient) SendPeers(destPeer *model.Peer, peersInfo []*model.Node) (*model.Empty, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendPeersClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendPeersClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
Expand All @@ -250,6 +259,9 @@ func (psc *PeerServiceClient) SendBlock(
block *model.Block,
chainType chaintype.ChainType,
) error {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendBlockClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendBlockClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return err
Expand Down Expand Up @@ -288,6 +300,9 @@ func (psc *PeerServiceClient) SendTransaction(
transactionBytes []byte,
chainType chaintype.ChainType,
) error {
monitoring.IncrementGoRoutineActivity(monitoring.P2pSendTransactionClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pSendTransactionClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return err
Expand Down Expand Up @@ -325,6 +340,9 @@ func (psc *PeerServiceClient) GetCumulativeDifficulty(
destPeer *model.Peer,
chaintype chaintype.ChainType,
) (*model.GetCumulativeDifficultyResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCumulativeDifficultyClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -353,6 +371,9 @@ func (psc *PeerServiceClient) GetCommonMilestoneBlockIDs(
chaintype chaintype.ChainType,
lastBlockID, lastMilestoneBlockID int64,
) (*model.GetCommonMilestoneBlockIdsResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetCommonMilestoneBlockIDsClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -384,6 +405,9 @@ func (psc *PeerServiceClient) GetNextBlockIDs(
blockID int64,
limit uint32,
) (*model.BlockIdsResponse, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlockIDsClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
Expand Down Expand Up @@ -415,6 +439,9 @@ func (psc *PeerServiceClient) GetNextBlocks(
blockIds []int64,
blockID int64,
) (*model.BlocksData, error) {
monitoring.IncrementGoRoutineActivity(monitoring.P2pGetNextBlocksClient)
defer monitoring.DecrementGoRoutineActivity(monitoring.P2pGetNextBlocksClient)

connection, err := psc.GetConnection(destPeer)
if err != nil {
return nil, err
Expand Down
Loading