From 45ccce05a3db74de7b979302bc4ea232907e2a96 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 11:48:42 -0800 Subject: [PATCH 1/4] Implement matcher cache Signed-off-by: alanprot --- docs/configuration/config-file-reference.md | 4 ++ integration/query_fuzz_test.go | 5 +- pkg/distributor/distributor_test.go | 9 +-- pkg/ingester/client/compat.go | 60 +++++++++--------- pkg/ingester/client/compat_test.go | 3 +- pkg/ingester/ingester.go | 24 ++++++-- pkg/ingester/ingester_test.go | 56 +++++++++++++++++ pkg/ingester/lifecycle_test.go | 1 + pkg/ingester/metrics.go | 68 +++++++++++++++++++++ pkg/querier/remote_read.go | 3 +- 10 files changed, 192 insertions(+), 41 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 6a36afefb0..d10a027131 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3181,6 +3181,10 @@ instance_limits: # change by changing this option. # CLI flag: -ingester.disable-chunk-trimming [disable_chunk_trimming: | default = false] + +# Maximum number of entries in the matchers cache. 0 to disable. +# CLI flag: -ingester.matchers-cache-max-items +[matchers_cache_max_items: | default = 0] ``` ### `ingester_client_config` diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 85fe9fca2e..cb2e851425 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -378,8 +378,9 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { "-blocks-storage.expanded_postings_cache.head.enabled": "true", "-blocks-storage.expanded_postings_cache.block.enabled": "true", // Ingester. - "-ring.store": "consul", - "-consul.hostname": consul2.NetworkHTTPEndpoint(), + "-ring.store": "consul", + "-consul.hostname": consul2.NetworkHTTPEndpoint(), + "-ingester.matchers-cache-max-items": "10000", // Distributor. "-distributor.replication-factor": "1", // Store-gateway. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4ac187ffd7..b6d1cee24c 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "go.uber.org/atomic" @@ -3374,7 +3375,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts return nil, errFail } - _, _, matchers, err := client.FromQueryRequest(req) + _, _, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), req) if err != nil { return nil, err } @@ -3400,7 +3401,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest return nil, errFail } - _, _, matchers, err := client.FromQueryRequest(req) + _, _, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), req) if err != nil { return nil, err } @@ -3459,7 +3460,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c return nil, errFail } - _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req) + _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(storecache.NewNoopMatcherCache(), req) if err != nil { return nil, err } @@ -3491,7 +3492,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client. return nil, errFail } - _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req) + _, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(storecache.NewNoopMatcherCache(), req) if err != nil { return nil, err } diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 1a8e178641..4e655b54bd 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -3,12 +3,12 @@ package client import ( "fmt" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" - - "github.com/cortexproject/cortex/pkg/cortexpb" + storecache "github.com/thanos-io/thanos/pkg/store/cache" ) // ToQueryRequest builds a QueryRequest proto. @@ -26,8 +26,8 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ } // FromQueryRequest unpacks a QueryRequest proto. -func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) { - matchers, err := FromLabelMatchers(req.Matchers) +func FromQueryRequest(cache storecache.MatchersCache, req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) { + matchers, err := FromLabelMatchers(cache, req.Matchers) if err != nil { return 0, 0, nil, err } @@ -55,10 +55,10 @@ func ToExemplarQueryRequest(from, to model.Time, matchers ...[]*labels.Matcher) } // FromExemplarQueryRequest unpacks a ExemplarQueryRequest proto. -func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) { +func FromExemplarQueryRequest(cache storecache.MatchersCache, req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) { var result [][]*labels.Matcher for _, m := range req.Matchers { - matchers, err := FromLabelMatchers(m.Matchers) + matchers, err := FromLabelMatchers(cache, m.Matchers) if err != nil { return 0, 0, nil, err } @@ -175,10 +175,10 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) { } // FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto -func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) { +func FromMetricsForLabelMatchersRequest(cache storecache.MatchersCache, req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) { matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet)) for _, matchers := range req.MatchersSet { - matchers, err := FromLabelMatchers(matchers.Matchers) + matchers, err := FromLabelMatchers(cache, matchers.Matchers) if err != nil { return 0, 0, 0, nil, err } @@ -206,12 +206,12 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit } // FromLabelValuesRequest unpacks a LabelValuesRequest proto -func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) { +func FromLabelValuesRequest(cache storecache.MatchersCache, req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) { var err error var matchers []*labels.Matcher if req.Matchers != nil { - matchers, err = FromLabelMatchers(req.Matchers.Matchers) + matchers, err = FromLabelMatchers(cache, req.Matchers.Matchers) if err != nil { return "", 0, 0, 0, nil, err } @@ -236,12 +236,12 @@ func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matc } // FromLabelNamesRequest unpacks a LabelNamesRequest proto -func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) { +func FromLabelNamesRequest(cache storecache.MatchersCache, req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) { var err error var matchers []*labels.Matcher if req.Matchers != nil { - matchers, err = FromLabelMatchers(req.Matchers.Matchers) + matchers, err = FromLabelMatchers(cache, req.Matchers.Matchers) if err != nil { return 0, 0, 0, nil, err } @@ -275,27 +275,31 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) { return result, nil } -func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) { +func FromLabelMatchers(cache storecache.MatchersCache, matchers []*LabelMatcher) ([]*labels.Matcher, error) { result := make([]*labels.Matcher, 0, len(matchers)) for _, matcher := range matchers { - var mtype labels.MatchType - switch matcher.Type { - case EQUAL: - mtype = labels.MatchEqual - case NOT_EQUAL: - mtype = labels.MatchNotEqual - case REGEX_MATCH: - mtype = labels.MatchRegexp - case REGEX_NO_MATCH: - mtype = labels.MatchNotRegexp - default: - return nil, fmt.Errorf("invalid matcher type") - } - matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value) + m, err := cache.GetOrSet(matcher.String(), func() (*labels.Matcher, error) { + var mtype labels.MatchType + switch matcher.Type { + case EQUAL: + mtype = labels.MatchEqual + case NOT_EQUAL: + mtype = labels.MatchNotEqual + case REGEX_MATCH: + mtype = labels.MatchRegexp + case REGEX_NO_MATCH: + mtype = labels.MatchNotRegexp + default: + return nil, fmt.Errorf("invalid matcher type") + } + return labels.NewMatcher(mtype, matcher.GetName(), matcher.GetValue()) + }) + if err != nil { return nil, err } - result = append(result, matcher) + + result = append(result, m) } return result, nil } diff --git a/pkg/ingester/client/compat_test.go b/pkg/ingester/client/compat_test.go index c9467abd0a..43e1ac3c7e 100644 --- a/pkg/ingester/client/compat_test.go +++ b/pkg/ingester/client/compat_test.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + storecache "github.com/thanos-io/thanos/pkg/store/cache" ) func TestQueryRequest(t *testing.T) { @@ -41,7 +42,7 @@ func TestQueryRequest(t *testing.T) { t.Fatal(err) } - haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req) + haveFrom, haveTo, haveMatchers, err := FromQueryRequest(storecache.NewNoopMatcherCache(), req) if err != nil { t.Fatal(err) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1d29b8b95d..4225709e6f 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "html" "io" "math" @@ -145,6 +146,9 @@ type Config struct { // When disabled, the result may contain samples outside the queried time range but Select() performances // may be improved. DisableChunkTrimming bool `yaml:"disable_chunk_trimming"` + + // Maximum number of entries in the matchers cache. 0 to disable. + MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -173,6 +177,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.") f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.") + f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the matchers cache. 0 to disable.") } func (cfg *Config) Validate() error { @@ -243,6 +248,7 @@ type Ingester struct { inflightQueryRequests atomic.Int64 maxInflightQueryRequests util_math.MaxTracker + matchersCache storecache.MatchersCache expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory } @@ -708,7 +714,15 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe logger: logger, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache), + matchersCache: storecache.NewNoopMatcherCache(), + } + + if cfg.MatchersCacheMaxItems > 0 { + r := prometheus.NewRegistry() + registerer.MustRegister(newMatchCacheMetrics(r)) + i.matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.MatchersCacheMaxItems), storecache.WithPromRegistry(r)) } + i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, @@ -1474,7 +1488,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery return nil, err } - from, through, matchers, err := client.FromExemplarQueryRequest(req) + from, through, matchers, err := client.FromExemplarQueryRequest(i.matchersCache, req) if err != nil { return nil, err } @@ -1564,7 +1578,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu return nil, cleanup, err } - labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req) + labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(i.matchersCache, req) if err != nil { return nil, cleanup, err } @@ -1654,7 +1668,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR return nil, cleanup, err } - startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req) + startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(i.matchersCache, req) if err != nil { return nil, cleanup, err } @@ -1768,7 +1782,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien } // Parse the request - _, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req) + _, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(i.matchersCache, req) if err != nil { return cleanup, err } @@ -1982,7 +1996,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ return err } - from, through, matchers, err := client.FromQueryRequest(req) + from, through, matchers, err := client.FromQueryRequest(i.matchersCache, req) if err != nil { return err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index b4852d4076..41442ae59c 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -119,6 +119,62 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e return set, nil } +func TestMatcherCache(t *testing.T) { + limits := defaultLimitsTestConfig() + userID := "1" + tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits}) + registry := prometheus.NewRegistry() + + dir := t.TempDir() + chunksDir := filepath.Join(dir, "chunks") + blocksDir := filepath.Join(dir, "blocks") + require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) + require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) + + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry, true) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) + + defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() interface{} { + return ing.lifecycler.GetState() + }) + ctx := user.InjectOrgID(context.Background(), userID) + numberOfDifferentMatchers := 50 + callPerMatcher := 10 + for j := 0; j < numberOfDifferentMatchers; j++ { + for i := 0; i < callPerMatcher; i++ { + s := &mockQueryStreamServer{ctx: ctx} + err = ing.QueryStream(&client.QueryRequest{ + StartTimestampMs: math.MinInt64, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: fmt.Sprintf("%d", j)}}, + }, s) + require.NoError(t, err) + } + } + + require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(fmt.Sprintf(` + # HELP ingester_matchers_cache_evicted_total Total number of items evicted from the cache + # TYPE ingester_matchers_cache_evicted_total counter + ingester_matchers_cache_evicted_total 0 + # HELP ingester_matchers_cache_hits_total Total number of cache hits for series matchers + # TYPE ingester_matchers_cache_hits_total counter + ingester_matchers_cache_hits_total %v + # HELP ingester_matchers_cache_items Total number of cached items + # TYPE ingester_matchers_cache_items gauge + ingester_matchers_cache_items %v + # HELP ingester_matchers_cache_max_items Maximum number of items that can be cached + # TYPE ingester_matchers_cache_max_items gauge + ingester_matchers_cache_max_items 0 + # HELP ingester_matchers_cache_requests_total Total number of cache requests for series matchers + # TYPE ingester_matchers_cache_requests_total counter + ingester_matchers_cache_requests_total %v + `, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, numberOfDifferentMatchers, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total")) +} + func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { limits := defaultLimitsTestConfig() userID := "1" diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index efa739b426..4fab7d716e 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -43,6 +43,7 @@ func defaultIngesterTestConfig(t testing.TB) Config { cfg.LifecyclerConfig.FinalSleep = 0 cfg.ActiveSeriesMetricsEnabled = true cfg.LabelsStringInterningEnabled = true + cfg.MatchersCacheMaxItems = 1024 return cfg } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 6cfe49dc1a..ec99ef9974 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -1,11 +1,13 @@ package ingester import ( + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" ) @@ -676,3 +678,69 @@ func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Re func (sm *tsdbMetrics) removeRegistryForUser(userID string) { sm.regs.RemoveUserRegistry(userID, false) } + +type matcherCacheMetrics struct { + r *prometheus.Registry + + requestsTotal *prometheus.Desc + hitsTotal *prometheus.Desc + numItems *prometheus.Desc + maxItems *prometheus.Desc + evicted *prometheus.Desc +} + +func newMatchCacheMetrics(r *prometheus.Registry) *matcherCacheMetrics { + m := &matcherCacheMetrics{ + r: r, + requestsTotal: prometheus.NewDesc( + "ingester_matchers_cache_requests_total", + "Total number of cache requests for series matchers", + nil, nil), + hitsTotal: prometheus.NewDesc( + "ingester_matchers_cache_hits_total", + "Total number of cache hits for series matchers", + nil, nil), + numItems: prometheus.NewDesc( + "ingester_matchers_cache_items", + "Total number of cached items", + nil, nil), + maxItems: prometheus.NewDesc( + "ingester_matchers_cache_max_items", + "Maximum number of items that can be cached", + nil, nil), + evicted: prometheus.NewDesc( + "ingester_matchers_cache_evicted_total", + "Total number of items evicted from the cache", + nil, nil), + } + return m +} + +func (m *matcherCacheMetrics) Describe(out chan<- *prometheus.Desc) { + out <- m.requestsTotal + out <- m.hitsTotal + out <- m.numItems + out <- m.maxItems + out <- m.evicted +} + +func (m *matcherCacheMetrics) Collect(out chan<- prometheus.Metric) { + gm, err := m.r.Gather() + if err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to gather metrics from registry", "err", err) + return + } + + mfm, err := util.NewMetricFamilyMap(gm) + + if err != nil { + level.Warn(util_log.Logger).Log("msg", "failed to create metric family map", "err", err) + return + } + + out <- prometheus.MustNewConstMetric(m.requestsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_requests_total")) + out <- prometheus.MustNewConstMetric(m.hitsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_hits_total")) + out <- prometheus.MustNewConstMetric(m.numItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_items")) + out <- prometheus.MustNewConstMetric(m.maxItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_max_items")) + out <- prometheus.MustNewConstMetric(m.evicted, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_max_items")) +} diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go index a7e86b96e5..f68cf683e9 100644 --- a/pkg/querier/remote_read.go +++ b/pkg/querier/remote_read.go @@ -1,6 +1,7 @@ package querier import ( + storecache "github.com/thanos-io/thanos/pkg/store/cache" "net/http" "github.com/go-kit/log" @@ -34,7 +35,7 @@ func RemoteReadHandler(q storage.Queryable, logger log.Logger) http.Handler { errors := make(chan error) for i, qr := range req.Queries { go func(i int, qr *client.QueryRequest) { - from, to, matchers, err := client.FromQueryRequest(qr) + from, to, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), qr) if err != nil { errors <- err return From ccf1f598756717599f25db1368c16858f4a366ee Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 12:05:05 -0800 Subject: [PATCH 2/4] changelog Signed-off-by: alanprot --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cccce2dde..da0adc15c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [FEATURE] Ingester: Add support for cache query matchers via `-ingester.matchers-cache-max-items. #6477 * [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 From 8b57e7bba8fdd4bbeb59fbf694e6477aae295e3e Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 13:38:26 -0800 Subject: [PATCH 3/4] lint Signed-off-by: alanprot --- pkg/ingester/client/compat.go | 3 ++- pkg/ingester/ingester.go | 5 ++++- pkg/querier/remote_read.go | 2 +- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 4e655b54bd..3a81ca6ea6 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -3,12 +3,13 @@ package client import ( "fmt" - "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" storecache "github.com/thanos-io/thanos/pkg/store/cache" + + "github.com/cortexproject/cortex/pkg/cortexpb" ) // ToQueryRequest builds a QueryRequest proto. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4225709e6f..2bdadf0215 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - storecache "github.com/thanos-io/thanos/pkg/store/cache" "html" "io" "math" @@ -38,6 +37,7 @@ import ( "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/shipper" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" @@ -721,6 +721,9 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe r := prometheus.NewRegistry() registerer.MustRegister(newMatchCacheMetrics(r)) i.matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.MatchersCacheMaxItems), storecache.WithPromRegistry(r)) + if err != nil { + return nil, err + } } i.metrics = newIngesterMetrics(registerer, diff --git a/pkg/querier/remote_read.go b/pkg/querier/remote_read.go index f68cf683e9..a40c0b2e3a 100644 --- a/pkg/querier/remote_read.go +++ b/pkg/querier/remote_read.go @@ -1,12 +1,12 @@ package querier import ( - storecache "github.com/thanos-io/thanos/pkg/store/cache" "net/http" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/storage" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" From 5e0d93118eeb7211c8af688c11134a09187bcd6d Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 6 Jan 2025 14:39:09 -0800 Subject: [PATCH 4/4] fix evict metric Signed-off-by: alanprot --- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_test.go | 12 +++++++----- pkg/ingester/metrics.go | 16 +++++++++------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2bdadf0215..3d01c10199 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -719,7 +719,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe if cfg.MatchersCacheMaxItems > 0 { r := prometheus.NewRegistry() - registerer.MustRegister(newMatchCacheMetrics(r)) + registerer.MustRegister(newMatchCacheMetrics(r, logger)) i.matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.MatchersCacheMaxItems), storecache.WithPromRegistry(r)) if err != nil { return nil, err diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 41442ae59c..44c7742a9b 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -130,8 +130,9 @@ func TestMatcherCache(t *testing.T) { blocksDir := filepath.Join(dir, "blocks") require.NoError(t, os.Mkdir(chunksDir, os.ModePerm)) require.NoError(t, os.Mkdir(blocksDir, os.ModePerm)) - - ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry, true) + cfg := defaultIngesterTestConfig(t) + cfg.MatchersCacheMaxItems = 50 + ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, tenantLimits, blocksDir, registry, true) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) @@ -142,7 +143,8 @@ func TestMatcherCache(t *testing.T) { return ing.lifecycler.GetState() }) ctx := user.InjectOrgID(context.Background(), userID) - numberOfDifferentMatchers := 50 + // Lets have 1 key evicted + numberOfDifferentMatchers := cfg.MatchersCacheMaxItems + 1 callPerMatcher := 10 for j := 0; j < numberOfDifferentMatchers; j++ { for i := 0; i < callPerMatcher; i++ { @@ -159,7 +161,7 @@ func TestMatcherCache(t *testing.T) { require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(fmt.Sprintf(` # HELP ingester_matchers_cache_evicted_total Total number of items evicted from the cache # TYPE ingester_matchers_cache_evicted_total counter - ingester_matchers_cache_evicted_total 0 + ingester_matchers_cache_evicted_total 1 # HELP ingester_matchers_cache_hits_total Total number of cache hits for series matchers # TYPE ingester_matchers_cache_hits_total counter ingester_matchers_cache_hits_total %v @@ -172,7 +174,7 @@ func TestMatcherCache(t *testing.T) { # HELP ingester_matchers_cache_requests_total Total number of cache requests for series matchers # TYPE ingester_matchers_cache_requests_total counter ingester_matchers_cache_requests_total %v - `, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, numberOfDifferentMatchers, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total")) + `, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, cfg.MatchersCacheMaxItems, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total")) } func TestIngesterPerLabelsetLimitExceeded(t *testing.T) { diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index ec99ef9974..fdbe5cd4fa 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -1,13 +1,13 @@ package ingester import ( + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" - util_log "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" ) @@ -680,7 +680,8 @@ func (sm *tsdbMetrics) removeRegistryForUser(userID string) { } type matcherCacheMetrics struct { - r *prometheus.Registry + r *prometheus.Registry + logger log.Logger requestsTotal *prometheus.Desc hitsTotal *prometheus.Desc @@ -689,9 +690,10 @@ type matcherCacheMetrics struct { evicted *prometheus.Desc } -func newMatchCacheMetrics(r *prometheus.Registry) *matcherCacheMetrics { +func newMatchCacheMetrics(r *prometheus.Registry, l log.Logger) *matcherCacheMetrics { m := &matcherCacheMetrics{ - r: r, + r: r, + logger: l, requestsTotal: prometheus.NewDesc( "ingester_matchers_cache_requests_total", "Total number of cache requests for series matchers", @@ -727,14 +729,14 @@ func (m *matcherCacheMetrics) Describe(out chan<- *prometheus.Desc) { func (m *matcherCacheMetrics) Collect(out chan<- prometheus.Metric) { gm, err := m.r.Gather() if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to gather metrics from registry", "err", err) + level.Warn(m.logger).Log("msg", "failed to gather metrics from registry", "err", err) return } mfm, err := util.NewMetricFamilyMap(gm) if err != nil { - level.Warn(util_log.Logger).Log("msg", "failed to create metric family map", "err", err) + level.Warn(m.logger).Log("msg", "failed to create metric family map", "err", err) return } @@ -742,5 +744,5 @@ func (m *matcherCacheMetrics) Collect(out chan<- prometheus.Metric) { out <- prometheus.MustNewConstMetric(m.hitsTotal, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_hits_total")) out <- prometheus.MustNewConstMetric(m.numItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_items")) out <- prometheus.MustNewConstMetric(m.maxItems, prometheus.GaugeValue, mfm.SumGauges("thanos_matchers_cache_max_items")) - out <- prometheus.MustNewConstMetric(m.evicted, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_max_items")) + out <- prometheus.MustNewConstMetric(m.evicted, prometheus.CounterValue, mfm.SumCounters("thanos_matchers_cache_evicted_total")) }