From 44d7d338415840d293f66e45d2e2ca19518aa334 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Fri, 16 May 2025 11:15:48 +0900 Subject: [PATCH] Add ingester.skip-metadata-limits flag Signed-off-by: SungJin1212 --- CHANGELOG.md | 2 +- docs/configuration/config-file-reference.md | 4 ++ integration/backward_compatibility_test.go | 77 +++++++++++++++++++++ pkg/ingester/ingester.go | 6 +- pkg/ingester/user_metrics_metadata.go | 44 ++++++++---- pkg/ingester/user_metrics_metadata_test.go | 65 ++++++++--------- 6 files changed, 151 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bdc480d88d..53b66db963b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ * [FEATURE] Ruler: Add support for group labels. #6665 * [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 -* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681 +* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744 * [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695 * [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607 * [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 93293edf82b..02a82e61198 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3267,6 +3267,10 @@ instance_limits: # Maximum number of entries in the regex matchers cache. 0 to disable. # CLI flag: -ingester.matchers-cache-max-items [matchers_cache_max_items: | default = 0] + +# If enabled, the metadata API returns all metadata regardless of the limits. +# CLI flag: -ingester.skip-metadata-limits +[skip_metadata_limits: | default = true] ``` ### `ingester_client_config` diff --git a/integration/backward_compatibility_test.go b/integration/backward_compatibility_test.go index 5cd65bd923c..75ae91edb57 100644 --- a/integration/backward_compatibility_test.go +++ b/integration/backward_compatibility_test.go @@ -123,6 +123,83 @@ func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) { } } +// Test for #6744. When the querier is running on an older version, while the ingester is running on a newer +// version, the ingester should return all metadata. +func TestMetadataAPIWhenDeployment(t *testing.T) { + oldImage := "quay.io/cortexproject/cortex:v1.19.0" + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + + minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + oldFlags := mergeFlags(baseFlags, map[string]string{ + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // consul + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + }) + + newFlags := mergeFlags(oldFlags, map[string]string{ + // ingester + "-ingester.skip-metadata-limits": "true", + }) + + // Start Cortex components + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), newFlags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), newFlags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), oldFlags, oldImage) + require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) + + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // Wait until querier has updated the ring. + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + metadataMetricNum := 5 + metadataPerMetrics := 2 + metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum) + for i := 0; i < metadataMetricNum; i++ { + for j := 0; j < metadataPerMetrics; j++ { + metadata = append(metadata, prompb.MetricMetadata{ + MetricFamilyName: fmt.Sprintf("metadata_name_%d", i), + Help: fmt.Sprintf("metadata_help_%d_%d", i, j), + Unit: fmt.Sprintf("metadata_unit_%d_%d", i, j), + }) + } + } + res, err := client.Push(nil, metadata...) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // should return all metadata regardless of the limit + maxLimit := metadataMetricNum * metadataPerMetrics + for i := -1; i <= maxLimit; i++ { + result, err := client.Metadata("", strconv.Itoa(i)) + require.NoError(t, err) + require.Equal(t, metadataMetricNum, len(result)) + for _, metadata := range result { + require.Equal(t, metadataPerMetrics, len(metadata)) + } + } +} + // Test cortex which uses Prometheus v3.x can support holt_winters function func TestCanSupportHoltWintersFunc(t *testing.T) { s, err := e2e.NewScenario(networkName) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index af618488efd..0d2b7d25175 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -155,6 +155,9 @@ type Config struct { // Maximum number of entries in the matchers cache. 0 to disable. MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` + + // If enabled, the metadata API returns all metadata regardless of the limits. + SkipMetadataLimits bool `yaml:"skip_metadata_limits"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -176,6 +179,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.") f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.") + f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.") cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.") } @@ -3057,7 +3061,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata { // Ensure it was not created between switching locks. userMetadata, ok := i.usersMetadata[userID] if !ok { - userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID) + userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID, i.cfg.SkipMetadataLimits) i.usersMetadata[userID] = userMetadata } return userMetadata diff --git a/pkg/ingester/user_metrics_metadata.go b/pkg/ingester/user_metrics_metadata.go index 8f451c884b4..b6bd5421582 100644 --- a/pkg/ingester/user_metrics_metadata.go +++ b/pkg/ingester/user_metrics_metadata.go @@ -11,26 +11,33 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +const ( + defaultLimit = -1 + defaultLimitPerMetric = -1 +) + // userMetricsMetadata allows metric metadata of a tenant to be held by the ingester. // Metadata is kept as a set as it can come from multiple targets that Prometheus scrapes // with the same metric name. type userMetricsMetadata struct { - limiter *Limiter - metrics *ingesterMetrics - validateMetrics *validation.ValidateMetrics - userID string + limiter *Limiter + metrics *ingesterMetrics + validateMetrics *validation.ValidateMetrics + userID string + skipMetadataLimits bool mtx sync.RWMutex metricToMetadata map[string]metricMetadataSet } -func newMetadataMap(l *Limiter, m *ingesterMetrics, v *validation.ValidateMetrics, userID string) *userMetricsMetadata { +func newMetadataMap(l *Limiter, m *ingesterMetrics, v *validation.ValidateMetrics, userID string, skipMetadataLimits bool) *userMetricsMetadata { return &userMetricsMetadata{ - metricToMetadata: map[string]metricMetadataSet{}, - limiter: l, - metrics: m, - validateMetrics: v, - userID: userID, + metricToMetadata: map[string]metricMetadataSet{}, + limiter: l, + metrics: m, + validateMetrics: v, + userID: userID, + skipMetadataLimits: skipMetadataLimits, } } @@ -88,8 +95,17 @@ func (mm *userMetricsMetadata) purge(deadline time.Time) { func (mm *userMetricsMetadata) toClientMetadata(req *client.MetricsMetadataRequest) []*cortexpb.MetricMetadata { mm.mtx.RLock() defer mm.mtx.RUnlock() + r := make([]*cortexpb.MetricMetadata, 0, len(mm.metricToMetadata)) - if req.Limit == 0 { + limit := req.Limit + limitPerMetric := req.LimitPerMetric + + if mm.skipMetadataLimits { + // set limit and limitPerMetric to default + limit = defaultLimit + limitPerMetric = defaultLimitPerMetric + } + if limit == 0 { return r } @@ -99,16 +115,16 @@ func (mm *userMetricsMetadata) toClientMetadata(req *client.MetricsMetadataReque return r } - metadataSet.add(req.LimitPerMetric, &r) + metadataSet.add(limitPerMetric, &r) return r } var metrics int64 for _, set := range mm.metricToMetadata { - if req.Limit > 0 && metrics >= req.Limit { + if limit > 0 && metrics >= limit { break } - set.add(req.LimitPerMetric, &r) + set.add(limitPerMetric, &r) metrics++ } return r diff --git a/pkg/ingester/user_metrics_metadata_test.go b/pkg/ingester/user_metrics_metadata_test.go index 2a28601ced3..69c2ad04163 100644 --- a/pkg/ingester/user_metrics_metadata_test.go +++ b/pkg/ingester/user_metrics_metadata_test.go @@ -14,11 +14,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -const ( - defaultLimit = -1 - defaultLimitPerMetric = -1 -) - func Test_UserMetricsMetadata(t *testing.T) { userId := "user-1" @@ -43,33 +38,13 @@ func Test_UserMetricsMetadata(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 1, false, "") - userMetricsMetadata := newMetadataMap(limiter, m, validation.NewValidateMetrics(reg), userId) - - addMetricMetadata := func(name string, i int) { - metadata := &cortexpb.MetricMetadata{ - MetricFamilyName: fmt.Sprintf("%s_%d", name, i), - Type: cortexpb.GAUGE, - Help: fmt.Sprintf("a help for %s", name), - Unit: fmt.Sprintf("a unit for %s", name), - } - - err := userMetricsMetadata.add(name, metadata) - require.NoError(t, err) - } - - metadataNumPerMetric := 3 - for _, m := range []string{"metric1", "metric2"} { - for i := range metadataNumPerMetric { - addMetricMetadata(m, i) - } - } - tests := []struct { - description string - limit int64 - limitPerMetric int64 - metric string - expectedLength int + description string + limit int64 + limitPerMetric int64 + metric string + expectedLength int + skipMetadataLimits bool }{ { description: "limit: 1", @@ -122,10 +97,38 @@ func Test_UserMetricsMetadata(t *testing.T) { metric: "dummy", expectedLength: 0, }, + { + description: "enable skipMetadataLimits", + limit: 1, + limitPerMetric: 2, + expectedLength: 2 * 3, // # of metric * metadataNumPerMetric + skipMetadataLimits: true, + }, } for _, test := range tests { t.Run(test.description, func(t *testing.T) { + userMetricsMetadata := newMetadataMap(limiter, m, validation.NewValidateMetrics(reg), userId, test.skipMetadataLimits) + + addMetricMetadata := func(name string, i int) { + metadata := &cortexpb.MetricMetadata{ + MetricFamilyName: fmt.Sprintf("%s_%d", name, i), + Type: cortexpb.GAUGE, + Help: fmt.Sprintf("a help for %s", name), + Unit: fmt.Sprintf("a unit for %s", name), + } + + err := userMetricsMetadata.add(name, metadata) + require.NoError(t, err) + } + + metadataNumPerMetric := 3 + for _, m := range []string{"metric1", "metric2"} { + for i := range metadataNumPerMetric { + addMetricMetadata(m, i) + } + } + req := &client.MetricsMetadataRequest{Limit: test.limit, LimitPerMetric: test.limitPerMetric, Metric: test.metric} r := userMetricsMetadata.toClientMetadata(req)