From 3af865aaeed6a42828db17a8bd9fcb762f1792c9 Mon Sep 17 00:00:00 2001 From: alanprot Date: Thu, 16 May 2024 14:11:42 -0700 Subject: [PATCH 1/2] Backfilling the new limits when updating the metrics Signed-off-by: alanprot --- CHANGELOG.md | 1 + pkg/ingester/ingester.go | 37 +++++++++------ pkg/ingester/ingester_test.go | 52 +++++++++++++++------ pkg/ingester/limiter.go | 2 + pkg/ingester/user_state.go | 88 +++++++++++++++++++++-------------- 5 files changed, 114 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f642f24c51d..2dd96538468 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930 * [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931 * [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933 +* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950 * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 * [CHANGE] Query Frontend/Ruler: Omit empty data field in API response. #5953 #5954 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 76fff45bdab..f739c3fbc41 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -877,7 +877,7 @@ func (i *Ingester) updateLoop(ctx context.Context) error { i.stoppedMtx.RUnlock() case <-activeSeriesTickerChan: - i.updateActiveSeries() + i.updateActiveSeries(ctx) case <-maxInflightRequestResetTicker.C: i.maxInflightQueryRequests.Tick() case <-userTSDBConfigTicker.C: @@ -929,7 +929,7 @@ func (i *Ingester) getMaxExemplars(userID string) int64 { return int64(maxExemplarsFromLimits) } -func (i *Ingester) updateActiveSeries() { +func (i *Ingester) updateActiveSeries(ctx context.Context) { purgeTime := time.Now().Add(-i.cfg.ActiveSeriesMetricsIdleTimeout) for _, userID := range i.getTSDBUsers() { @@ -940,7 +940,9 @@ func (i *Ingester) updateActiveSeries() { userDB.activeSeries.Purge(purgeTime) i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(userDB.activeSeries.Active())) - userDB.labelSetCounter.UpdateMetric(userDB, i.metrics.activeSeriesPerLabelSet) + if err := userDB.labelSetCounter.UpdateMetric(ctx, userDB, i.metrics.activeSeriesPerLabelSet); err != nil { + level.Warn(i.logger).Log("msg", "failed to update per labelSet metrics", "user", userID, "err", err) + } } } @@ -1054,18 +1056,19 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Keep track of some stats which are tracked only if the samples will be // successfully committed var ( - succeededSamplesCount = 0 - failedSamplesCount = 0 - succeededExemplarsCount = 0 - failedExemplarsCount = 0 - startAppend = time.Now() - sampleOutOfBoundsCount = 0 - sampleOutOfOrderCount = 0 - sampleTooOldCount = 0 - newValueForTimestampCount = 0 - perUserSeriesLimitCount = 0 - perMetricSeriesLimitCount = 0 - nativeHistogramCount = 0 + succeededSamplesCount = 0 + failedSamplesCount = 0 + succeededExemplarsCount = 0 + failedExemplarsCount = 0 + startAppend = time.Now() + sampleOutOfBoundsCount = 0 + sampleOutOfOrderCount = 0 + sampleTooOldCount = 0 + newValueForTimestampCount = 0 + perUserSeriesLimitCount = 0 + perLabelSetSeriesLimitCount = 0 + perMetricSeriesLimitCount = 0 + nativeHistogramCount = 0 updateFirstPartial = func(errFn func() error) { if firstPartialErr == nil { @@ -1150,6 +1153,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte }) continue case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}): + perLabelSetSeriesLimitCount++ updateFirstPartial(func() error { return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause)) }) @@ -1245,6 +1249,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte if perMetricSeriesLimitCount > 0 { validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount)) } + if perLabelSetSeriesLimitCount > 0 { + validation.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount)) + } if nativeHistogramCount > 0 { validation.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount)) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index c3723707565..56c048513e4 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -109,6 +109,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry) + registry.MustRegister(validation.DiscardedSamples) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) // Wait until it's ACTIVE @@ -132,13 +133,13 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { } } - ing.updateActiveSeries() + ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. # TYPE cortex_ingester_active_series_per_labelset gauge cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 - `), "cortex_ingester_active_series_per_labelset")) + `), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total")) // Should impose limits for _, set := range limits.MaxSeriesPerLabelSet { @@ -154,13 +155,16 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { require.ErrorContains(t, err, set.Id) } - ing.updateActiveSeries() + ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2 # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. # TYPE cortex_ingester_active_series_per_labelset gauge cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 - `), "cortex_ingester_active_series_per_labelset")) + `), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total")) // Should apply composite limits limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet, @@ -187,6 +191,21 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { require.NoError(t, limits.UnmarshalJSON(b)) tenantLimits.setLimits(userID, &limits) + // Should backfill + ing.updateActiveSeries(ctx) + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 2 + # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. + # TYPE cortex_ingester_active_series_per_labelset gauge + cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 0 + cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 0 + cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 0 + cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 + cortex_ingester_active_series_per_labelset{labelset="{label2=\"value2\"}",user="1"} 2 + `), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total")) + // Adding 5 metrics with only 1 label for i := 0; i < 5; i++ { lbls := []string{labels.MetricName, "metric_name", "comp1", "compValue1"} @@ -211,8 +230,11 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) require.ErrorContains(t, err, labels.FromStrings("comp1", "compValue1", "comp2", "compValue2").String()) - ing.updateActiveSeries() + ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="per_labelset_series_limit",user="1"} 3 # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. # TYPE cortex_ingester_active_series_per_labelset gauge cortex_ingester_active_series_per_labelset{labelset="{label1=\"value1\"}",user="1"} 3 @@ -220,7 +242,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\", comp2=\"compValue2\"}",user="1"} 2 cortex_ingester_active_series_per_labelset{labelset="{comp1=\"compValue1\"}",user="1"} 7 cortex_ingester_active_series_per_labelset{labelset="{comp2=\"compValue2\"}",user="1"} 2 - `), "cortex_ingester_active_series_per_labelset")) + `), "cortex_ingester_active_series_per_labelset", "cortex_discarded_samples_total")) // Should bootstrap and apply limits when configuration change limits.MaxSeriesPerLabelSet = append(limits.MaxSeriesPerLabelSet, @@ -249,7 +271,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { assert.Equal(t, http.StatusBadRequest, int(httpResp.Code)) require.ErrorContains(t, err, labels.FromStrings(lbls...).String()) - ing.updateActiveSeries() + ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. # TYPE cortex_ingester_active_series_per_labelset gauge @@ -267,7 +289,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { require.NoError(t, err) require.NoError(t, limits.UnmarshalJSON(b)) tenantLimits.setLimits(userID, &limits) - ing.updateActiveSeries() + ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. # TYPE cortex_ingester_active_series_per_labelset gauge @@ -281,7 +303,7 @@ func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { ing, err = prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) - ing.updateActiveSeries() + ing.updateActiveSeries(ctx) require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(` # HELP cortex_ingester_active_series_per_labelset Number of currently active series per user and labelset. # TYPE cortex_ingester_active_series_per_labelset gauge @@ -1207,7 +1229,7 @@ func TestIngester_Push(t *testing.T) { // Update active series for metrics check. if !testData.disableActiveSeries { - i.updateActiveSeries() + i.updateActiveSeries(ctx) } // Append additional metrics to assert on. @@ -1274,7 +1296,7 @@ func TestIngester_Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testi } // Update active series for metrics check. - i.updateActiveSeries() + i.updateActiveSeries(context.Background()) // Check tracked Prometheus metrics expectedMetrics := ` @@ -1361,7 +1383,7 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) { time.Sleep(200 * time.Millisecond) // Update active series for metrics check. This will remove inactive series. - i.updateActiveSeries() + i.updateActiveSeries(context.Background()) // Check tracked Prometheus metrics expectedMetrics := ` @@ -3733,7 +3755,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) { }) pushSingleSampleWithMetadata(t, i) - i.updateActiveSeries() + i.updateActiveSeries(context.Background()) require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) @@ -3774,7 +3796,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) { }) require.Greater(t, testutil.ToFloat64(i.TSDBState.idleTsdbChecks.WithLabelValues(string(tsdbIdleClosed))), float64(0)) - i.updateActiveSeries() + i.updateActiveSeries(context.Background()) require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) // Flushing removed all series from memory. // Verify that user has disappeared from metrics. @@ -3799,7 +3821,7 @@ func TestIngesterCompactAndCloseIdleTSDB(t *testing.T) { // Pushing another sample will recreate TSDB. pushSingleSampleWithMetadata(t, i) - i.updateActiveSeries() + i.updateActiveSeries(context.Background()) // User is back. require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` diff --git a/pkg/ingester/limiter.go b/pkg/ingester/limiter.go index 75d33c3460b..f636a975e08 100644 --- a/pkg/ingester/limiter.go +++ b/pkg/ingester/limiter.go @@ -105,6 +105,8 @@ func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int return errMaxMetadataPerUserLimitExceeded } +// AssertMaxSeriesPerLabelSet limit has not been reached compared to the current +// number of metrics with metadata in input and returns an error if so. func (l *Limiter) AssertMaxSeriesPerLabelSet(userID string, metric labels.Labels, f func(validation.MaxSeriesPerLabelSet) (int, error)) error { m := l.maxSeriesPerLabelSet(userID, metric) for _, limit := range m { diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index cb23f7901d4..dc9bfe54b1a 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -125,45 +125,49 @@ func (m *labelSetCounter) canAddSeriesForLabelSet(ctx context.Context, u *userTS s.RUnlock() // We still dont keep track of this label value so we need to backfill - ir, err := u.db.Head().Index() - if err != nil { - return 0, err - } + return m.backFillLimit(ctx, u, set, s) + }) +} - defer ir.Close() +func (m *labelSetCounter) backFillLimit(ctx context.Context, u *userTSDB, limit validation.MaxSeriesPerLabelSet, s *labelSetCounterShard) (int, error) { + ir, err := u.db.Head().Index() + if err != nil { + return 0, err + } - s.Lock() - defer s.Unlock() - if r, ok := s.valuesCounter[set.Hash]; !ok { - postings := make([]index.Postings, 0, len(set.LabelSet)) - for _, lbl := range set.LabelSet { - p, err := ir.Postings(ctx, lbl.Name, lbl.Value) - if err != nil { - return 0, err - } - postings = append(postings, p) + defer ir.Close() + + s.Lock() + defer s.Unlock() + if r, ok := s.valuesCounter[limit.Hash]; !ok { + postings := make([]index.Postings, 0, len(limit.LabelSet)) + for _, lbl := range limit.LabelSet { + p, err := ir.Postings(ctx, lbl.Name, lbl.Value) + if err != nil { + return 0, err } + postings = append(postings, p) + } - p := index.Intersect(postings...) + p := index.Intersect(postings...) - totalCount := 0 - for p.Next() { - totalCount++ - } + totalCount := 0 + for p.Next() { + totalCount++ + } - if p.Err() != nil { - return 0, p.Err() - } + if p.Err() != nil { + return 0, p.Err() + } - s.valuesCounter[set.Hash] = &labelSetCounterEntry{ - count: totalCount, - labels: set.LabelSet, - } - return totalCount, nil - } else { - return r.count, nil + s.valuesCounter[limit.Hash] = &labelSetCounterEntry{ + count: totalCount, + labels: limit.LabelSet, } - }) + return totalCount, nil + } else { + return r.count, nil + } } func (m *labelSetCounter) increaseSeriesLabelSet(u *userTSDB, metric labels.Labels) { @@ -195,24 +199,36 @@ func (m *labelSetCounter) decreaseSeriesLabelSet(u *userTSDB, metric labels.Labe } } -func (m *labelSetCounter) UpdateMetric(u *userTSDB, vec *prometheus.GaugeVec) { - currentLbsLimitHash := map[uint64]struct{}{} +func (m *labelSetCounter) UpdateMetric(ctx context.Context, u *userTSDB, vec *prometheus.GaugeVec) error { + currentLbsLimitHash := map[uint64]validation.MaxSeriesPerLabelSet{} for _, l := range m.limiter.limits.MaxSeriesPerLabelSet(u.userID) { - currentLbsLimitHash[l.Hash] = struct{}{} + currentLbsLimitHash[l.Hash] = l } for i := 0; i < numMetricCounterShards; i++ { s := m.shards[i] s.RLock() for h, entry := range s.valuesCounter { - // This limit no longer ecists + // This limit no longer exists if _, ok := currentLbsLimitHash[h]; !ok { vec.DeleteLabelValues(u.userID, entry.labels.String()) continue } - + delete(currentLbsLimitHash, h) vec.WithLabelValues(u.userID, entry.labels.String()).Set(float64(entry.count)) } s.RUnlock() } + + // Backfill all limits that are not being tracked yet + for _, l := range currentLbsLimitHash { + s := m.shards[util.HashFP(model.Fingerprint(l.Hash))%numMetricCounterShards] + count, err := m.backFillLimit(ctx, u, l, s) + if err != nil { + return err + } + vec.WithLabelValues(u.userID, l.LabelSet.String()).Set(float64(count)) + } + + return nil } From 9c808bf8593664285d06513e48f04ae0e78a1a88 Mon Sep 17 00:00:00 2001 From: alanprot Date: Thu, 16 May 2024 21:47:38 +0000 Subject: [PATCH 2/2] make clean-white-noise Signed-off-by: alanprot --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2dd96538468..b441f99460f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,7 @@ * [ENHANCEMENT] Distributor/Querier: Clean stale per-ingester metrics after ingester restarts. #5930 * [ENHANCEMENT] Distributor/Ring: Allow disabling detailed ring metrics by ring member. #5931 * [ENHANCEMENT] KV: Etcd Added etcd.ping-without-stream-allowed parameter to disable/enable PermitWithoutStream #5933 -* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950 +* [ENHANCEMENT] Ingester: Add a new `max_series_per_label_set` limit. This limit functions similarly to `max_series_per_metric`, but allowing users to define the maximum number of series per LabelSet. #5950 * [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906 * [CHANGE] Query Frontend/Ruler: Omit empty data field in API response. #5953 #5954 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920