Skip to content

Commit 0cdf9e4

Browse files
committed
update metrics synchronously
Signed-off-by: Ben Ye <[email protected]>
1 parent 24a62ae commit 0cdf9e4

File tree

4 files changed

+49
-90
lines changed

4 files changed

+49
-90
lines changed

pkg/distributor/distributor.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ type Distributor struct {
129129

130130
asyncExecutor util.AsyncExecutor
131131

132-
// Counter to track metrics per label set.
133-
labelSetCounter *labelSetCounter
132+
// Map to track label sets from user.
133+
labelSetTracker *labelSetTracker
134134
}
135135

136136
// Config contains the configuration required to
@@ -296,7 +296,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
296296
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
297297
HATracker: haTracker,
298298
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
299-
labelSetCounter: newLabelSetCounter(),
300299

301300
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
302301
Namespace: "cortex",
@@ -389,6 +388,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
389388
asyncExecutor: util.NewNoOpExecutor(),
390389
}
391390

391+
d.labelSetTracker = newLabelSetTracker(d.receivedSamplesPerLabelSet)
392+
392393
if cfg.NumPushWorkers > 0 {
393394
util_log.WarnExperimentalUse("Distributor: using goroutine worker pool")
394395
d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg)
@@ -809,7 +810,7 @@ func (d *Distributor) updateLabelSetMetrics() {
809810
}
810811
}
811812

812-
d.labelSetCounter.updateMetrics(activeUserSet, d.receivedSamplesPerLabelSet)
813+
d.labelSetTracker.updateMetrics(activeUserSet)
813814
}
814815

815816
func (d *Distributor) cleanStaleIngesterMetrics() {
@@ -1062,7 +1063,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
10621063
validatedExemplars += len(ts.Exemplars)
10631064
}
10641065
for h, counter := range labelSetCounters {
1065-
d.labelSetCounter.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples)
1066+
d.labelSetTracker.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples)
10661067
}
10671068
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
10681069
}

pkg/distributor/distributor_test.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4135,16 +4135,12 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) {
41354135
_, err = ds[0].Push(ctx, req)
41364136
require.NoError(t, err)
41374137

4138-
ds[0].updateLabelSetMetrics()
41394138
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
41404139
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
41414140
# TYPE cortex_distributor_received_samples_per_labelset_total counter
41424141
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
4143-
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0
41444142
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1
4145-
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0
41464143
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1
4147-
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0
41484144
`), "cortex_distributor_received_samples_per_labelset_total"))
41494145

41504146
// Push more series.
@@ -4166,20 +4162,14 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) {
41664162
req = mockWriteRequest(inputSeries, 1, 1, false)
41674163
_, err = ds[0].Push(ctx2, req)
41684164
require.NoError(t, err)
4169-
ds[0].updateLabelSetMetrics()
41704165
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
41714166
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
41724167
# TYPE cortex_distributor_received_samples_per_labelset_total counter
41734168
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
4174-
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0
41754169
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2
41764170
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1
4177-
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0
4178-
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user2"} 0
41794171
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
41804172
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
4181-
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0
4182-
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0
41834173
`), "cortex_distributor_received_samples_per_labelset_total"))
41844174

41854175
// Remove existing limits and add new limits
@@ -4198,8 +4188,6 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) {
41984188
# TYPE cortex_distributor_received_samples_per_labelset_total counter
41994189
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
42004190
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
4201-
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0
4202-
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0
42034191
`), "cortex_distributor_received_samples_per_labelset_total"))
42044192

42054193
// Metrics from `user` got removed but `user2` metric should remain.
@@ -4208,7 +4196,6 @@ func TestDistributor_PushLabelSetMetrics(t *testing.T) {
42084196
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
42094197
# TYPE cortex_distributor_received_samples_per_labelset_total counter
42104198
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
4211-
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0
42124199
`), "cortex_distributor_received_samples_per_labelset_total"))
42134200
}
42144201

pkg/distributor/metrics.go

Lines changed: 35 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,29 @@ import (
1111
)
1212

1313
const (
14-
numMetricCounterShards = 128
14+
numMetricShards = 128
1515
)
1616

17-
type labelSetCounter struct {
17+
type labelSetTracker struct {
18+
receivedSamplesPerLabelSet *prometheus.CounterVec
19+
1820
shards []*labelSetCounterShard
1921
}
2022

21-
func newLabelSetCounter() *labelSetCounter {
22-
shards := make([]*labelSetCounterShard, 0, numMetricCounterShards)
23-
for i := 0; i < numMetricCounterShards; i++ {
23+
func newLabelSetTracker(receivedSamplesPerLabelSet *prometheus.CounterVec) *labelSetTracker {
24+
shards := make([]*labelSetCounterShard, 0, numMetricShards)
25+
for i := 0; i < numMetricShards; i++ {
2426
shards = append(shards, &labelSetCounterShard{
2527
RWMutex: &sync.RWMutex{},
26-
valuesCounter: map[string]map[uint64]*samplesLabelSetEntry{},
28+
userLabelSets: map[string]map[uint64]labels.Labels{},
2729
})
2830
}
29-
return &labelSetCounter{shards: shards}
31+
return &labelSetTracker{shards: shards, receivedSamplesPerLabelSet: receivedSamplesPerLabelSet}
3032
}
3133

3234
type labelSetCounterShard struct {
3335
*sync.RWMutex
34-
valuesCounter map[string]map[uint64]*samplesLabelSetEntry
36+
userLabelSets map[string]map[uint64]labels.Labels
3537
}
3638

3739
type samplesLabelSetEntry struct {
@@ -40,63 +42,51 @@ type samplesLabelSetEntry struct {
4042
labels labels.Labels
4143
}
4244

43-
func (s *samplesLabelSetEntry) reset() {
44-
s.floatSamples = 0
45-
s.histogramSamples = 0
46-
}
47-
48-
func (m *labelSetCounter) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) {
49-
s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricCounterShards]
45+
func (m *labelSetTracker) increaseSamplesLabelSet(userId string, hash uint64, labelSet labels.Labels, floatSamples, histogramSamples int64) {
46+
s := m.shards[util.HashFP(model.Fingerprint(hash))%numMetricShards]
5047
s.Lock()
51-
defer s.Unlock()
52-
if userEntry, ok := s.valuesCounter[userId]; ok {
53-
if e, ok2 := userEntry[hash]; ok2 {
54-
e.floatSamples += floatSamples
55-
e.histogramSamples += histogramSamples
56-
} else {
57-
userEntry[hash] = &samplesLabelSetEntry{
58-
floatSamples: floatSamples,
59-
histogramSamples: histogramSamples,
60-
labels: labelSet,
61-
}
48+
if userEntry, ok := s.userLabelSets[userId]; ok {
49+
if _, ok2 := userEntry[hash]; !ok2 {
50+
userEntry[hash] = labelSet
6251
}
6352
} else {
64-
s.valuesCounter[userId] = map[uint64]*samplesLabelSetEntry{
65-
hash: {
66-
floatSamples: floatSamples,
67-
histogramSamples: histogramSamples,
68-
labels: labelSet,
69-
},
70-
}
53+
s.userLabelSets[userId] = map[uint64]labels.Labels{hash: labelSet}
54+
}
55+
// Unlock before we update metrics.
56+
s.Unlock()
57+
58+
labelSetStr := labelSet.String()
59+
if floatSamples > 0 {
60+
m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeFloat, labelSetStr).Add(float64(floatSamples))
61+
}
62+
if histogramSamples > 0 {
63+
m.receivedSamplesPerLabelSet.WithLabelValues(userId, sampleMetricTypeHistogram, labelSetStr).Add(float64(histogramSamples))
7164
}
7265
}
7366

74-
func (m *labelSetCounter) updateMetrics(userSet map[string]map[uint64]struct{}, receivedSamplesPerLabelSet *prometheus.CounterVec) {
75-
for i := 0; i < numMetricCounterShards; i++ {
67+
// Clean up dangling user and label set from the tracker as well as metrics.
68+
func (m *labelSetTracker) updateMetrics(userSet map[string]map[uint64]struct{}) {
69+
for i := 0; i < numMetricShards; i++ {
7670
shard := m.shards[i]
7771
shard.Lock()
7872

79-
for user, userEntry := range shard.valuesCounter {
73+
for user, userEntry := range shard.userLabelSets {
8074
limits, ok := userSet[user]
8175
if !ok {
8276
// If user is removed, we will delete user metrics in cleanupInactiveUser loop
8377
// so skip deleting metrics here.
84-
delete(shard.valuesCounter, user)
78+
delete(shard.userLabelSets, user)
8579
continue
8680
}
87-
for h, entry := range userEntry {
88-
labelSetStr := entry.labels.String()
81+
for h, lbls := range userEntry {
8982
// This limit no longer exists.
9083
if _, ok := limits[h]; !ok {
9184
delete(userEntry, h)
92-
receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr)
93-
receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr)
85+
labelSetStr := lbls.String()
86+
m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeFloat, labelSetStr)
87+
m.receivedSamplesPerLabelSet.DeleteLabelValues(user, sampleMetricTypeHistogram, labelSetStr)
9488
continue
9589
}
96-
receivedSamplesPerLabelSet.WithLabelValues(user, sampleMetricTypeFloat, labelSetStr).Add(float64(entry.floatSamples))
97-
receivedSamplesPerLabelSet.WithLabelValues(user, sampleMetricTypeHistogram, labelSetStr).Add(float64(entry.histogramSamples))
98-
// Reset entry counter to 0. Delete it only if it is removed from the limit.
99-
entry.reset()
10090
}
10191
}
10292

pkg/distributor/metrics_test.go

Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ import (
1111
)
1212

1313
func TestLabelSetCounter(t *testing.T) {
14-
counter := newLabelSetCounter()
15-
1614
metricName := "cortex_distributor_received_samples_per_labelset_total"
1715
reg := prometheus.NewPedanticRegistry()
1816
dummyCounter := prometheus.NewCounterVec(prometheus.CounterOpts{
1917
Name: metricName,
2018
Help: "",
2119
}, []string{"user", "type", "labelset"})
20+
counter := newLabelSetTracker(dummyCounter)
2221
reg.MustRegister(dummyCounter)
2322

2423
userID := "1"
@@ -31,22 +30,12 @@ func TestLabelSetCounter(t *testing.T) {
3130
counter.increaseSamplesLabelSet(userID2, 0, labels.FromStrings("foo", "bar"), 100, 5)
3231
counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100)
3332

34-
userSet := map[string]map[uint64]struct {
35-
}{
36-
userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}},
37-
userID2: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}},
38-
}
39-
counter.updateMetrics(userSet, dummyCounter)
40-
4133
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
4234
# TYPE cortex_distributor_received_samples_per_labelset_total counter
43-
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 0
4435
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 100
4536
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10
4637
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100
47-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0
4838
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5
49-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0
5039
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5
5140
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 20
5241
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 20
@@ -57,13 +46,6 @@ func TestLabelSetCounter(t *testing.T) {
5746
counter.increaseSamplesLabelSet(userID2, 2, labels.FromStrings("cluster", "us-west-2"), 0, 100)
5847
counter.increaseSamplesLabelSet(userID2, 4, labels.FromStrings("cluster", "us-west-2"), 10, 10)
5948
counter.increaseSamplesLabelSet(userID3, 4, labels.FromStrings("cluster", "us-east-1"), 30, 30)
60-
userSet = map[string]map[uint64]struct {
61-
}{
62-
userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}},
63-
userID2: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}},
64-
userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}},
65-
}
66-
counter.updateMetrics(userSet, dummyCounter)
6749

6850
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
6951
# TYPE cortex_distributor_received_samples_per_labelset_total counter
@@ -73,18 +55,20 @@ func TestLabelSetCounter(t *testing.T) {
7355
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210
7456
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10
7557
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100
76-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0
7758
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5
78-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0
7959
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5
8060
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40
8161
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40
8262
`), metricName))
8363

8464
// Remove user 2. But metrics for user 2 not cleaned up as it is expected to be cleaned up
8565
// in cleanupInactiveUser loop. It is expected to have 3 minutes delay in this case.
86-
delete(userSet, userID2)
87-
counter.updateMetrics(userSet, dummyCounter)
66+
userSet := map[string]map[uint64]struct {
67+
}{
68+
userID: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}},
69+
userID3: {0: struct{}{}, 1: struct{}{}, 2: struct{}{}, 3: struct{}{}, 4: struct{}{}},
70+
}
71+
counter.updateMetrics(userSet)
8872
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
8973
# TYPE cortex_distributor_received_samples_per_labelset_total counter
9074
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-east-1\"}",type="float",user="3"} 30
@@ -93,9 +77,7 @@ func TestLabelSetCounter(t *testing.T) {
9377
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210
9478
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10
9579
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100
96-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0
9780
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5
98-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="float",user="1"} 0
9981
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"baz\"}",type="histogram",user="1"} 5
10082
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="1"} 40
10183
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="1"} 40
@@ -108,15 +90,14 @@ func TestLabelSetCounter(t *testing.T) {
10890
userID2: {},
10991
userID3: {},
11092
}
111-
counter.updateMetrics(userSet, dummyCounter)
93+
counter.updateMetrics(userSet)
11294

11395
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
11496
# TYPE cortex_distributor_received_samples_per_labelset_total counter
11597
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="float",user="2"} 10
11698
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"us-west-2\"}",type="histogram",user="2"} 210
11799
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="1"} 10
118100
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="float",user="2"} 100
119-
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="1"} 0
120101
cortex_distributor_received_samples_per_labelset_total{labelset="{foo=\"bar\"}",type="histogram",user="2"} 5
121102
`), metricName))
122103
}

0 commit comments

Comments
 (0)