Skip to content

Commit 9257ead

Browse files
committed
add distributor per labelset ingestion rate metric
Signed-off-by: Ben Ye <[email protected]>
1 parent 132ccde commit 9257ead

File tree

6 files changed

+453
-0
lines changed

6 files changed

+453
-0
lines changed

pkg/distributor/distributor.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ const (
6464

6565
clearStaleIngesterMetricsInterval = time.Minute
6666

67+
labelSetMetricsTickInterval = 30 * time.Second
68+
6769
// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
6870
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
6971
mergeSlicesParallelism = 8
@@ -107,6 +109,7 @@ type Distributor struct {
107109
// Metrics
108110
queryDuration *instrument.HistogramCollector
109111
receivedSamples *prometheus.CounterVec
112+
receivedSamplesPerLabelSet *prometheus.CounterVec
110113
receivedExemplars *prometheus.CounterVec
111114
receivedMetadata *prometheus.CounterVec
112115
incomingSamples *prometheus.CounterVec
@@ -125,6 +128,9 @@ type Distributor struct {
125128
validateMetrics *validation.ValidateMetrics
126129

127130
asyncExecutor util.AsyncExecutor
131+
132+
// Counter to track metrics per label set.
133+
labelSetCounter *labelSetCounter
128134
}
129135

130136
// Config contains the configuration required to
@@ -290,6 +296,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
290296
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
291297
HATracker: haTracker,
292298
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
299+
labelSetCounter: newLabelSetCounter(),
293300

294301
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
295302
Namespace: "cortex",
@@ -302,6 +309,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
302309
Name: "distributor_received_samples_total",
303310
Help: "The total number of received samples, excluding rejected and deduped samples.",
304311
}, []string{"user", "type"}),
312+
receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
313+
Namespace: "cortex",
314+
Name: "distributor_received_samples_per_labelset_total",
315+
Help: "The total number of received samples per label set, excluding rejected and deduped samples.",
316+
}, []string{"user", "type", "labelset"}),
305317
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
306318
Namespace: "cortex",
307319
Name: "distributor_received_exemplars_total",
@@ -449,6 +461,9 @@ func (d *Distributor) running(ctx context.Context) error {
449461
staleIngesterMetricTicker := time.NewTicker(clearStaleIngesterMetricsInterval)
450462
defer staleIngesterMetricTicker.Stop()
451463

464+
labelSetMetricsTicker := time.NewTicker(labelSetMetricsTickInterval)
465+
defer labelSetMetricsTicker.Stop()
466+
452467
for {
453468
select {
454469
case <-ctx.Done():
@@ -460,6 +475,9 @@ func (d *Distributor) running(ctx context.Context) error {
460475
case <-staleIngesterMetricTicker.C:
461476
d.cleanStaleIngesterMetrics()
462477

478+
case <-labelSetMetricsTicker.C:
479+
d.updateLabelSetMetrics()
480+
463481
case err := <-d.subservicesWatcher.Chan():
464482
return errors.Wrap(err, "distributor subservice failed")
465483
}
@@ -486,6 +504,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
486504
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
487505
}
488506

507+
if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil {
508+
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err)
509+
}
510+
489511
validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log)
490512
}
491513

@@ -777,6 +799,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
777799
return &cortexpb.WriteResponse{}, firstPartialErr
778800
}
779801

802+
func (d *Distributor) updateLabelSetMetrics() {
803+
activeUserSet := make(map[string]map[uint64]struct{})
804+
for _, user := range d.activeUsers.ActiveUsers() {
805+
limits := d.limits.LimitsPerLabelSet(user)
806+
activeUserSet[user] = make(map[uint64]struct{}, len(limits))
807+
for _, l := range limits {
808+
activeUserSet[user][l.Hash] = struct{}{}
809+
}
810+
}
811+
812+
d.labelSetCounter.updateMetrics(activeUserSet, d.receivedSamplesPerLabelSet)
813+
}
814+
780815
func (d *Distributor) cleanStaleIngesterMetrics() {
781816
healthy, unhealthy, err := d.ingestersRing.GetAllInstanceDescs(ring.WriteNoExtend)
782817
if err != nil {
@@ -888,7 +923,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
888923
validatedFloatSamples := 0
889924
validatedHistogramSamples := 0
890925
validatedExemplars := 0
926+
limitsPerLabelSet := d.limits.LimitsPerLabelSet(userID)
891927

928+
labelSetCounters := make(map[uint64]*samplesLabelSetEntry)
892929
var firstPartialErr error
893930

894931
latestSampleTimestampMs := int64(0)
@@ -1005,12 +1042,28 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
10051042
continue
10061043
}
10071044

1045+
for _, l := range validation.LimitsPerLabelSetsForSeries(limitsPerLabelSet, cortexpb.FromLabelAdaptersToLabels(validatedSeries.Labels)) {
1046+
if c, exists := labelSetCounters[l.Hash]; exists {
1047+
c.floatSamples += int64(len(ts.Samples))
1048+
c.histogramSamples += int64(len(ts.Histograms))
1049+
} else {
1050+
labelSetCounters[l.Hash] = &samplesLabelSetEntry{
1051+
floatSamples: int64(len(ts.Samples)),
1052+
histogramSamples: int64(len(ts.Histograms)),
1053+
labels: l.LabelSet,
1054+
}
1055+
}
1056+
}
1057+
10081058
seriesKeys = append(seriesKeys, key)
10091059
validatedTimeseries = append(validatedTimeseries, validatedSeries)
10101060
validatedFloatSamples += len(ts.Samples)
10111061
validatedHistogramSamples += len(ts.Histograms)
10121062
validatedExemplars += len(ts.Exemplars)
10131063
}
1064+
for h, counter := range labelSetCounters {
1065+
d.labelSetCounter.increaseSamplesLabelSet(userID, h, counter.labels, counter.floatSamples, counter.histogramSamples)
1066+
}
10141067
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
10151068
}
10161069

pkg/distributor/distributor_test.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
420420
"cortex_distributor_metadata_in_total",
421421
"cortex_distributor_non_ha_samples_received_total",
422422
"cortex_distributor_latest_seen_sample_timestamp_seconds",
423+
"cortex_distributor_received_samples_per_labelset_total",
423424
}
424425

425426
allMetrics := append(removedMetrics, permanentMetrics...)
@@ -438,6 +439,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
438439
d.nonHASamples.WithLabelValues("userA").Add(5)
439440
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
440441
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)
442+
d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeFloat, "{}").Add(5)
443+
d.receivedSamplesPerLabelSet.WithLabelValues("userA", sampleMetricTypeHistogram, "{}").Add(10)
441444

442445
h, _, _ := r.GetAllInstanceDescs(ring.WriteNoExtend)
443446
ingId0, _ := r.GetInstanceIdByAddr(h[0].Addr)
@@ -473,6 +476,11 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
473476
cortex_distributor_received_metadata_total{user="userA"} 5
474477
cortex_distributor_received_metadata_total{user="userB"} 10
475478
479+
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
480+
# TYPE cortex_distributor_received_samples_per_labelset_total counter
481+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="userA"} 5
482+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="userA"} 10
483+
476484
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
477485
# TYPE cortex_distributor_received_samples_total counter
478486
cortex_distributor_received_samples_total{type="float",user="userA"} 5
@@ -4081,6 +4089,129 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
40814089
}
40824090
}
40834091

4092+
func TestDistributor_PushLabelSetMetrics(t *testing.T) {
4093+
t.Parallel()
4094+
inputSeries := []labels.Labels{
4095+
{
4096+
{Name: "__name__", Value: "foo"},
4097+
{Name: "cluster", Value: "one"},
4098+
},
4099+
{
4100+
{Name: "__name__", Value: "bar"},
4101+
{Name: "cluster", Value: "one"},
4102+
},
4103+
{
4104+
{Name: "__name__", Value: "bar"},
4105+
{Name: "cluster", Value: "two"},
4106+
},
4107+
{
4108+
{Name: "__name__", Value: "foo"},
4109+
{Name: "cluster", Value: "three"},
4110+
},
4111+
}
4112+
4113+
var err error
4114+
var limits validation.Limits
4115+
flagext.DefaultValues(&limits)
4116+
limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{
4117+
{Hash: 0, LabelSet: labels.FromStrings("cluster", "one")},
4118+
{Hash: 1, LabelSet: labels.FromStrings("cluster", "two")},
4119+
{Hash: 2, LabelSet: labels.EmptyLabels()},
4120+
}
4121+
4122+
ds, _, regs, _ := prepare(t, prepConfig{
4123+
numIngesters: 2,
4124+
happyIngesters: 2,
4125+
numDistributors: 1,
4126+
shardByAllLabels: true,
4127+
limits: &limits,
4128+
})
4129+
reg := regs[0]
4130+
4131+
// Push the series to the distributor
4132+
id := "user"
4133+
req := mockWriteRequest(inputSeries, 1, 1, false)
4134+
ctx := user.InjectOrgID(context.Background(), id)
4135+
_, err = ds[0].Push(ctx, req)
4136+
require.NoError(t, err)
4137+
4138+
ds[0].updateLabelSetMetrics()
4139+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
4140+
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
4141+
# TYPE cortex_distributor_received_samples_per_labelset_total counter
4142+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
4143+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0
4144+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 1
4145+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0
4146+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 1
4147+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0
4148+
`), "cortex_distributor_received_samples_per_labelset_total"))
4149+
4150+
// Push more series.
4151+
inputSeries = []labels.Labels{
4152+
{
4153+
{Name: "__name__", Value: "baz"},
4154+
{Name: "cluster", Value: "two"},
4155+
},
4156+
{
4157+
{Name: "__name__", Value: "foo"},
4158+
{Name: "cluster", Value: "four"},
4159+
},
4160+
}
4161+
// Write the same request twice for different users.
4162+
req = mockWriteRequest(inputSeries, 1, 1, false)
4163+
ctx2 := user.InjectOrgID(context.Background(), "user2")
4164+
_, err = ds[0].Push(ctx, req)
4165+
require.NoError(t, err)
4166+
req = mockWriteRequest(inputSeries, 1, 1, false)
4167+
_, err = ds[0].Push(ctx2, req)
4168+
require.NoError(t, err)
4169+
ds[0].updateLabelSetMetrics()
4170+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
4171+
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
4172+
# TYPE cortex_distributor_received_samples_per_labelset_total counter
4173+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="float",user="user"} 2
4174+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"one\"}",type="histogram",user="user"} 0
4175+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user"} 2
4176+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="float",user="user2"} 1
4177+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user"} 0
4178+
cortex_distributor_received_samples_per_labelset_total{labelset="{cluster=\"two\"}",type="histogram",user="user2"} 0
4179+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
4180+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
4181+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0
4182+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0
4183+
`), "cortex_distributor_received_samples_per_labelset_total"))
4184+
4185+
// Remove existing limits and add new limits
4186+
limits.LimitsPerLabelSet = []validation.LimitsPerLabelSet{
4187+
{Hash: 3, LabelSet: labels.FromStrings("cluster", "three")},
4188+
{Hash: 4, LabelSet: labels.FromStrings("cluster", "four")},
4189+
{Hash: 2, LabelSet: labels.EmptyLabels()},
4190+
}
4191+
ds[0].limits, err = validation.NewOverrides(limits, nil)
4192+
require.NoError(t, err)
4193+
ds[0].updateLabelSetMetrics()
4194+
// Old label set metrics are removed. New label set metrics will be added when
4195+
// new requests come in.
4196+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
4197+
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
4198+
# TYPE cortex_distributor_received_samples_per_labelset_total counter
4199+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user"} 2
4200+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
4201+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user"} 0
4202+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0
4203+
`), "cortex_distributor_received_samples_per_labelset_total"))
4204+
4205+
// Metrics from `user` got removed but `user2` metric should remain.
4206+
ds[0].cleanupInactiveUser(id)
4207+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
4208+
# HELP cortex_distributor_received_samples_per_labelset_total The total number of received samples per label set, excluding rejected and deduped samples.
4209+
# TYPE cortex_distributor_received_samples_per_labelset_total counter
4210+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="float",user="user2"} 1
4211+
cortex_distributor_received_samples_per_labelset_total{labelset="{}",type="histogram",user="user2"} 0
4212+
`), "cortex_distributor_received_samples_per_labelset_total"))
4213+
}
4214+
40844215
func countMockIngestersCalls(ingesters []*mockIngester, name string) int {
40854216
count := 0
40864217
for i := 0; i < len(ingesters); i++ {

0 commit comments

Comments
 (0)