diff --git a/CHANGELOG.md b/CHANGELOG.md index 470aefe5228..c84cab98155 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686 * [CHANGE] Enable Thanos series limiter in store-gateway. #4702 +* [CHANGE] Distributor: Apply `max_fetched_series_per_query` limit for `/series` API. #4683 ## 1.12.0 in progress diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 032aa64054d..95e3650002f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -7,6 +7,7 @@ import ( "net/http" "sort" "strings" + "sync" "time" "github.com/go-kit/log" @@ -945,6 +946,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st // MetricsForLabelMatchers gets the metrics that match said matchers func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { replicationSet, err := d.GetIngestersForMetadata(ctx) + queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) if err != nil { return nil, err } @@ -953,20 +955,29 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through if err != nil { return nil, err } - - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { - return client.MetricsForLabelMatchers(ctx, req) - }) - if err != nil { - return nil, err - } - + mutex := sync.Mutex{} metrics := map[model.Fingerprint]model.Metric{} - for _, resp := range resps { - ms := ingester_client.FromMetricsForLabelMatchersResponse(resp.(*ingester_client.MetricsForLabelMatchersResponse)) + + _, err = d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + resp, err := client.MetricsForLabelMatchers(ctx, req) + if err != nil { + return nil, err + } + ms := ingester_client.FromMetricsForLabelMatchersResponse(resp) for _, m := range ms { + if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil { + return nil, err + } + mutex.Lock() metrics[m.Fingerprint()] = m + mutex.Unlock() } + + return resp, nil + }) + + if err != nil { + return nil, err } result := make([]metric.Metric, 0, len(metrics)) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 03abe9bfd4d..b01855a8180 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1863,6 +1863,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { matchers []*labels.Matcher expectedResult []metric.Metric expectedIngesters int + queryLimiter *limiter.QueryLimiter + expectedErr error }{ "should return an empty response if no metric match": { matchers: []*labels.Matcher{ @@ -1870,6 +1872,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { }, expectedResult: []metric.Metric{}, expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + expectedErr: nil, }, "should filter metrics by single matcher": { matchers: []*labels.Matcher{ @@ -1880,6 +1884,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[1].lbls)}, }, expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + expectedErr: nil, }, "should filter metrics by multiple matchers": { matchers: []*labels.Matcher{ @@ -1890,6 +1896,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[0].lbls)}, }, expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + expectedErr: nil, }, "should return all matching metrics even if their FastFingerprint collide": { matchers: []*labels.Matcher{ @@ -1900,6 +1908,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[4].lbls)}, }, expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + expectedErr: nil, }, "should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": { shuffleShardEnabled: true, @@ -1912,6 +1922,8 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[1].lbls)}, }, expectedIngesters: 3, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + expectedErr: nil, }, "should query all ingesters if shuffle sharding is enabled but shard size is 0": { shuffleShardEnabled: true, @@ -1924,6 +1936,30 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[1].lbls)}, }, expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + expectedErr: nil, + }, + "should return err if series limit is exhausted": { + shuffleShardEnabled: true, + shuffleShardSize: 0, + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"), + }, + expectedResult: nil, + expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(1, 0, 0), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), + }, + "should not exhaust series limit when only one series is fetched": { + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"), + }, + expectedResult: []metric.Metric{ + {Metric: util.LabelsToMetric(fixtures[2].lbls)}, + }, + expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(1, 0, 0), + expectedErr: nil, }, } @@ -1943,6 +1979,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { // Push fixtures ctx := user.InjectOrgID(context.Background(), "test") + ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) for _, series := range fixtures { req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp) @@ -1951,6 +1988,12 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { } metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) + + if testData.expectedErr != nil { + assert.EqualError(t, err, testData.expectedErr.Error()) + return + } + require.NoError(t, err) assert.ElementsMatch(t, testData.expectedResult, metrics) @@ -1963,6 +2006,97 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { } } +func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) { + const ( + numIngesters = 100 + numSeriesPerRequest = 100 + ) + + tests := map[string]struct { + prepareConfig func(limits *validation.Limits) + prepareSeries func() ([]labels.Labels, []cortexpb.Sample) + matchers []*labels.Matcher + queryLimiter *limiter.QueryLimiter + expectedErr error + }{ + "get series within limits": { + prepareConfig: func(limits *validation.Limits) {}, + prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) { + metrics := make([]labels.Labels, numSeriesPerRequest) + samples := make([]cortexpb.Sample, numSeriesPerRequest) + + for i := 0; i < numSeriesPerRequest; i++ { + lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: fmt.Sprintf("foo_%d", i)}}) + for i := 0; i < 10; i++ { + lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i)) + } + + metrics[i] = lbls.Labels() + samples[i] = cortexpb.Sample{ + Value: float64(i), + TimestampMs: time.Now().UnixNano() / int64(time.Millisecond), + } + } + + return metrics, samples + }, + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"), + }, + queryLimiter: limiter.NewQueryLimiter(100, 0, 0), + expectedErr: nil, + }, + } + + for testName, testData := range tests { + b.Run(testName, func(b *testing.B) { + // Create distributor + ds, ingesters, _, _ := prepare(b, prepConfig{ + numIngesters: numIngesters, + happyIngesters: numIngesters, + numDistributors: 1, + shardByAllLabels: true, + shuffleShardEnabled: false, + shuffleShardSize: 0, + }) + + // Push fixtures + ctx := user.InjectOrgID(context.Background(), "test") + ctx = limiter.AddQueryLimiterToContext(ctx, testData.queryLimiter) + + // Prepare the series to remote write before starting the benchmark. + metrics, samples := testData.prepareSeries() + + if _, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API)); err != nil { + b.Fatalf("error pushing to distributor %v", err) + } + + // Run the benchmark. + b.ReportAllocs() + b.ResetTimer() + + for n := 0; n < b.N; n++ { + now := model.Now() + metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...) + + if testData.expectedErr != nil { + assert.EqualError(b, err, testData.expectedErr.Error()) + return + } + + require.NoError(b, err) + + // Check how many ingesters have been queried. + // Due to the quorum the distributor could cancel the last request towards ingesters + // if all other ones are successful, so we're good either has been queried X or X-1 + // ingesters. + assert.Contains(b, []int{numIngesters, numIngesters - 1}, countMockIngestersCalls(ingesters, "MetricsForLabelMatchers")) + assert.Equal(b, numSeriesPerRequest, len(metrics)) + } + }) + } +} + func TestDistributor_MetricsMetadata(t *testing.T) { const numIngesters = 5 @@ -2058,7 +2192,7 @@ type prepConfig struct { errFail error } -func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) { +func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry, *ring.Ring) { ingesters := []mockIngester{} for i := 0; i < cfg.happyIngesters; i++ { ingesters = append(ingesters, mockIngester{ @@ -2095,7 +2229,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p } kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + tb.Cleanup(func() { assert.NoError(tb, closer.Close()) }) err := kvStore.CAS(context.Background(), ingester.RingKey, func(_ interface{}) (interface{}, bool, error) { @@ -2104,7 +2238,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p }, true, nil }, ) - require.NoError(t, err) + require.NoError(tb, err) // Use a default replication factor of 3 if there isn't a provided replication factor. rf := cfg.replicationFactor @@ -2119,10 +2253,10 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: rf, }, ingester.RingKey, ingester.RingKey, nil, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) + require.NoError(tb, err) + require.NoError(tb, services.StartAndAwaitRunning(context.Background(), ingestersRing)) - test.Poll(t, time.Second, cfg.numIngesters, func() interface{} { + test.Poll(tb, time.Second, cfg.numIngesters, func() interface{} { return ingestersRing.InstancesCount() }) @@ -2163,7 +2297,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p if cfg.enableTracker { codec := GetReplicaDescCodec() ringStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil) - t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + tb.Cleanup(func() { assert.NoError(tb, closer.Close()) }) mock := kv.PrefixClient(ringStore, "prefix") distributorCfg.HATrackerConfig = HATrackerConfig{ EnableHATracker: true, @@ -2175,12 +2309,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p } overrides, err := validation.NewOverrides(*cfg.limits, nil) - require.NoError(t, err) + require.NoError(tb, err) reg := prometheus.NewPedanticRegistry() d, err := New(distributorCfg, clientConfig, overrides, ingestersRing, true, reg, log.NewNopLogger()) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) + require.NoError(tb, err) + require.NoError(tb, services.StartAndAwaitRunning(context.Background(), d)) distributors = append(distributors, d) registries = append(registries, reg) @@ -2189,12 +2323,12 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p // If the distributors ring is setup, wait until the first distributor // updates to the expected size if distributors[0].distributorsRing != nil { - test.Poll(t, time.Second, cfg.numDistributors, func() interface{} { + test.Poll(tb, time.Second, cfg.numDistributors, func() interface{} { return distributors[0].distributorsLifeCycler.HealthyInstancesCount() }) } - t.Cleanup(func() { stopAll(distributors, ingestersRing) }) + tb.Cleanup(func() { stopAll(distributors, ingestersRing) }) return distributors, ingesters, registries, ingestersRing }