From cf482702844a28ea24907354539fc2ce111d8246 Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 1 Nov 2024 12:20:27 -0700 Subject: [PATCH 01/10] Implementing Expanded Postings Cache Signed-off-by: alanprot --- docs/blocks-storage/querier.md | 35 ++ docs/blocks-storage/store-gateway.md | 35 ++ docs/configuration/config-file-reference.md | 35 ++ integration/query_fuzz_test.go | 197 ++++++++++ pkg/ingester/ingester.go | 26 +- pkg/ingester/ingester_test.go | 270 ++++++++++++- pkg/ingester/metrics.go | 9 + pkg/ingester/metrics_test.go | 3 +- pkg/storage/tsdb/cached_chunks_querier.go | 128 ++++++ pkg/storage/tsdb/config.go | 5 + pkg/storage/tsdb/expanded_postings_cache.go | 372 ++++++++++++++++++ .../tsdb/expanded_postings_cache_test.go | 103 +++++ 12 files changed, 1209 insertions(+), 9 deletions(-) create mode 100644 pkg/storage/tsdb/cached_chunks_querier.go create mode 100644 pkg/storage/tsdb/expanded_postings_cache.go create mode 100644 pkg/storage/tsdb/expanded_postings_cache_test.go diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 6596d1cadb..2387d61ac0 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1544,4 +1544,39 @@ blocks_storage: # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] + + postings_cache: + head: + # Max bytes for postings cache + # CLI flag: -blocks-storage.postings-cache.head.max-bytes + [max_bytes: | default = 10485760] + + # Max items for postings cache + # CLI flag: -blocks-storage.postings-cache.head.max-items + [max_items: | default = 10000] + + # TTL for postings cache + # CLI flag: -blocks-storage.postings-cache.head.ttl + [ttl: | default = 10m] + + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.postings-cache.head.enabled + [enabled: | default = false] + + blocks: + # Max bytes for postings cache + # CLI flag: -blocks-storage.postings-cache.block.max-bytes + [max_bytes: | default = 10485760] + + # Max items for postings cache + # CLI flag: -blocks-storage.postings-cache.block.max-items + [max_items: | default = 10000] + + # TTL for postings cache + # CLI flag: -blocks-storage.postings-cache.block.ttl + [ttl: | default = 10m] + + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.postings-cache.block.enabled + [enabled: | default = false] ``` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 08ef369a85..135341d6e2 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1635,4 +1635,39 @@ blocks_storage: # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] + + postings_cache: + head: + # Max bytes for postings cache + # CLI flag: -blocks-storage.postings-cache.head.max-bytes + [max_bytes: | default = 10485760] + + # Max items for postings cache + # CLI flag: -blocks-storage.postings-cache.head.max-items + [max_items: | default = 10000] + + # TTL for postings cache + # CLI flag: -blocks-storage.postings-cache.head.ttl + [ttl: | default = 10m] + + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.postings-cache.head.enabled + [enabled: | default = false] + + blocks: + # Max bytes for postings cache + # CLI flag: -blocks-storage.postings-cache.block.max-bytes + [max_bytes: | default = 10485760] + + # Max items for postings cache + # CLI flag: -blocks-storage.postings-cache.block.max-items + [max_items: | default = 10000] + + # TTL for postings cache + # CLI flag: -blocks-storage.postings-cache.block.ttl + [ttl: | default = 10m] + + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.postings-cache.block.enabled + [enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 16b9616df1..49a67f78b0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2081,6 +2081,41 @@ tsdb: # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] + + postings_cache: + head: + # Max bytes for postings cache + # CLI flag: -blocks-storage.postings-cache.head.max-bytes + [max_bytes: | default = 10485760] + + # Max items for postings cache + # CLI flag: -blocks-storage.postings-cache.head.max-items + [max_items: | default = 10000] + + # TTL for postings cache + # CLI flag: -blocks-storage.postings-cache.head.ttl + [ttl: | default = 10m] + + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.postings-cache.head.enabled + [enabled: | default = false] + + blocks: + # Max bytes for postings cache + # CLI flag: -blocks-storage.postings-cache.block.max-bytes + [max_bytes: | default = 10485760] + + # Max items for postings cache + # CLI flag: -blocks-storage.postings-cache.block.max-items + [max_items: | default = 10000] + + # TTL for postings cache + # CLI flag: -blocks-storage.postings-cache.block.ttl + [ttl: | default = 10m] + + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.postings-cache.block.enabled + [enabled: | default = false] ``` ### `compactor_config` diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index a9d15a7ea9..1c9c5cfe8e 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -205,6 +205,203 @@ func TestDisableChunkTrimmingFuzz(t *testing.T) { } } +func TestExpandedPostingsCacheFuzz(t *testing.T) { + stableCortexImage := "quay.io/cortexproject/cortex:v1.18.0" + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul1 := e2edb.NewConsulWithName("consul1") + consul2 := e2edb.NewConsulWithName("consul2") + require.NoError(t, s.StartAndWaitReady(consul1, consul2)) + + flags1 := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul1.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + flags2 := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.postings-cache.head.enabled": "true", + "-blocks-storage.postings-cache.block.enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul2.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path1 := path.Join(s.SharedDir(), "cortex-1") + path2 := path.Join(s.SharedDir(), "cortex-2") + + flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1}) + flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2}) + // Start Cortex replicas. + cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, stableCortexImage) + cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "") + require.NoError(t, s.StartAndWaitReady(cortex1, cortex2)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + var clients []*e2ecortex.Client + c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + clients = append(clients, c1, c2) + + now := time.Now() + // Push some series to Cortex. + start := now.Add(-24 * time.Hour) + scrapeInterval := 30 * time.Second + + numSeries := 10 + numberOfLabelsPerSeries := 5 + numSamples := 10 + ss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries) + lbls := make([]labels.Labels, numSeries*numberOfLabelsPerSeries) + + for i := 0; i < numSeries; i++ { + for j := 0; j < numberOfLabelsPerSeries; j++ { + series := e2e.GenerateSeriesWithSamples( + fmt.Sprintf("test_series_%d", i), + start, + scrapeInterval, + i*numSamples, + numSamples, + prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)}, + ) + ss[i*numberOfLabelsPerSeries+j] = series + + builder := labels.NewBuilder(labels.EmptyLabels()) + for _, lbl := range series.Labels { + builder.Set(lbl.Name, lbl.Value) + } + lbls[i*numberOfLabelsPerSeries+j] = builder.Labels() + } + } + + rnd := rand.New(rand.NewSource(now.Unix())) + opts := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(true), + promqlsmith.WithEnableAtModifier(true), + } + ps := promqlsmith.New(rnd, lbls, opts...) + + // Create the queries with the original labels + testRun := 100 + queries := make([]string, testRun) + for i := 0; i < testRun; i++ { + expr := ps.WalkRangeQuery() + queries[i] = expr.Pretty(0) + } + + // Lets run multiples iterations and create new series every iteration + for k := 0; k < 5; k++ { + + nss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries) + for i := 0; i < numSeries; i++ { + for j := 0; j < numberOfLabelsPerSeries; j++ { + nss[i*numberOfLabelsPerSeries+j] = e2e.GenerateSeriesWithSamples( + fmt.Sprintf("test_series_%d", i), + start.Add(scrapeInterval*time.Duration(numSamples*j)), + scrapeInterval, + i*numSamples, + numSamples, + prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)}, + prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)}, + ) + } + } + + for _, client := range clients { + res, err := client.Push(nss) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + type testCase struct { + query string + res1, res2 model.Value + err1, err2 error + } + + queryStart := time.Now().Add(-time.Hour * 24) + queryEnd := time.Now() + cases := make([]*testCase, 0, 200) + + for _, query := range queries { + res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval) + res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval) + cases = append(cases, &testCase{ + query: query, + res1: res1, + res2: res2, + err1: err1, + err2: err2, + }) + } + + failures := 0 + for i, tc := range cases { + qt := "range query" + if tc.err1 != nil || tc.err2 != nil { + if !cmp.Equal(tc.err1, tc.err2) { + t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2) + failures++ + } + } else if !cmp.Equal(tc.res1, tc.res2, comparer) { + t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String()) + failures++ + } + } + if failures > 0 { + require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures) + } + } +} + func TestVerticalShardingFuzz(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dc98d4f4e3..00dd1337ce 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -318,6 +318,8 @@ type userTSDB struct { interner util.Interner blockRetentionPeriod int64 + + postingCache cortex_tsdb.ExpandedPostingsCache } // Explicitly wrapping the tsdb.DB functions that we use. @@ -453,6 +455,10 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { if u.labelsStringInterningEnabled { metric.InternStrings(u.interner.Intern) } + + if u.postingCache != nil { + u.postingCache.ExpireSeries(metric) + } } // PostDeletion implements SeriesLifecycleCallback interface. @@ -470,6 +476,9 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) if u.labelsStringInterningEnabled { metric.ReleaseStrings(u.interner.Release) } + if u.postingCache != nil { + u.postingCache.ExpireSeries(metric) + } } } @@ -701,7 +710,8 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, - &i.maxInflightQueryRequests) + &i.maxInflightQueryRequests, + cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled) i.validateMetrics = validation.NewValidateMetrics(registerer) // Replace specific metrics which we can't directly track but we need to read @@ -783,6 +793,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe nil, &i.inflightPushRequests, &i.maxInflightQueryRequests, + cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled, ) i.TSDBState.shipperIngesterID = "flusher" @@ -2162,6 +2173,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds() + var postingCache cortex_tsdb.ExpandedPostingsCache + if i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled || i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled { + logutil.WarnExperimentalUse("expanded postings cache") + postingCache = cortex_tsdb.NewBlocksPostingsForMatchersCache(i.cfg.BlocksStorageConfig.TSDB.PostingsCache, i.metrics.expandedPostingsCacheMetrics) + } + userDB := &userTSDB{ userID: userID, activeSeries: NewActiveSeries(), @@ -2176,6 +2193,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled, blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), + postingCache: postingCache, } enableExemplars := false @@ -2211,6 +2229,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks. EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms, + BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + if postingCache != nil { + return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + } + return tsdb.NewBlockChunkQuerier(b, mint, maxt) + }, }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1ee6b41b1a..a922678f9c 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -41,6 +42,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -2060,7 +2062,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Send a lot of samples - _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000, 1)) require.NoError(b, err) ingester.ingestionRate.Tick() @@ -2084,7 +2086,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Send some samples for one tenant (not the same that is used during the test) ctx := user.InjectOrgID(context.Background(), "different_tenant") - _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000, 1)) require.NoError(b, err) }, runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { @@ -2104,7 +2106,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { - _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000, 1)) require.NoError(b, err) }, runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { @@ -5078,6 +5080,260 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) { `), "cortex_ingester_instance_limits")) } +func TestExpendedPostingsCache(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + + runQuery := func(t *testing.T, ctx context.Context, i *Ingester, matchers []*client.LabelMatcher) []client.TimeSeriesChunk { + s := &mockQueryStreamServer{ctx: ctx} + + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: matchers, + }, s) + require.NoError(t, err) + return s.series + } + + tc := map[string]struct { + cacheConfig cortex_tsdb.TSDBPostingsCacheConfig + expectedBlockPostingCall int + expectedHeadPostingCall int + }{ + "cacheDisabled": { + expectedBlockPostingCall: 0, + expectedHeadPostingCall: 0, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Head: cortex_tsdb.PostingsCacheConfig{ + Enabled: false, + }, + Blocks: cortex_tsdb.PostingsCacheConfig{ + Enabled: false, + }, + }, + }, + "enabled cache blocks": { + expectedBlockPostingCall: 1, + expectedHeadPostingCall: 0, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Blocks: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxItems: 1000, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, + "enabled cache blocks and head": { + expectedBlockPostingCall: 1, + expectedHeadPostingCall: 1, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Blocks: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxItems: 1000, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + Head: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxItems: 1000, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, + } + + for name, c := range tc { + t.Run(name, func(t *testing.T) { + postingsForMatchersCalls := atomic.Int64{} + cfg.BlocksStorageConfig.TSDB.PostingsCache = c.cacheConfig + + cfg.BlocksStorageConfig.TSDB.PostingsCache.PostingsForMatchers = func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + postingsForMatchersCalls.Add(1) + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + cfg.LifecyclerConfig.JoinAfter = 0 + + ctx := user.InjectOrgID(context.Background(), "test") + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + metricNames := []string{"metric1", "metric2"} + + // Generate 4 hours of data so we have 1 block + head + totalSamples := 4 * 60 + var samples = make([]cortexpb.Sample, 0, totalSamples) + + for i := 0; i < totalSamples; i++ { + samples = append(samples, cortexpb.Sample{ + Value: float64(i), + TimestampMs: int64(i * 60 * 1000), + }) + } + + lbls := make([]labels.Labels, 0, len(samples)) + for j := 0; j < 10; j++ { + for i := 0; i < len(samples); i++ { + lbls = append(lbls, labels.FromStrings(labels.MetricName, metricNames[i%len(metricNames)], "a", fmt.Sprintf("aaa%v", j))) + } + } + + for i := len(samples); i < len(lbls); i++ { + samples = append(samples, samples[i%len(samples)]) + } + + req := cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) + _, err = i.Push(ctx, req) + require.NoError(t, err) + + i.compactBlocks(ctx, false, nil) + + extraMatcher := []struct { + matchers []*client.LabelMatcher + expectedLenght int + }{ + { + expectedLenght: 10, + matchers: []*client.LabelMatcher{ + { + Type: client.REGEX_MATCH, + Name: "a", + Value: "aaa.*", + }, + }, + }, + { + expectedLenght: 1, + matchers: []*client.LabelMatcher{ + { + Type: client.EQUAL, + Name: "a", + Value: "aaa1", + }, + }, + }, + } + + // Run queries with no cache + for _, name := range metricNames { + for _, m := range extraMatcher { + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, append(m.matchers, &client.LabelMatcher{Type: client.EQUAL, Name: labels.MetricName, Value: name})), m.expectedLenght) + // Query block and Head + require.Equal(t, int64(c.expectedBlockPostingCall+c.expectedHeadPostingCall), postingsForMatchersCalls.Load()) + } + } + + if c.expectedHeadPostingCall > 0 || c.expectedBlockPostingCall > 0 { + metric := ` + # HELP cortex_ingester_expanded_postings_cache_requests Count of cache adds in the ingester postings cache. + # TYPE cortex_ingester_expanded_postings_cache_requests counter +` + if c.expectedBlockPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_requests{cache="block"} 4 +` + } + + if c.expectedHeadPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_requests{cache="head"} 4 +` + } + + err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_expanded_postings_cache_requests") + require.NoError(t, err) + } + + // Calling again and it should hit the cache + for _, name := range metricNames { + for _, m := range extraMatcher { + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, append(m.matchers, &client.LabelMatcher{Type: client.EQUAL, Name: labels.MetricName, Value: name})), m.expectedLenght) + require.Equal(t, int64(0), postingsForMatchersCalls.Load()) + } + } + + if c.expectedHeadPostingCall > 0 || c.expectedBlockPostingCall > 0 { + metric := ` + # HELP cortex_ingester_expanded_postings_cache_hits Count of cache hits in the ingester postings cache. + # TYPE cortex_ingester_expanded_postings_cache_hits counter +` + if c.expectedBlockPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_hits{cache="block"} 4 +` + } + + if c.expectedHeadPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_hits{cache="head"} 4 +` + } + + err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_expanded_postings_cache_hits") + require.NoError(t, err) + } + + // Check the number total of series with the first metric name + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricNames[0]}}), 10) + // Query block and head + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedBlockPostingCall+c.expectedHeadPostingCall)) + + // Adding a metric for the first metric name so we expire all caches for that metric name + _, err = i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, metricNames[0], "extra", "1")}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + + for in, name := range metricNames { + for _, m := range extraMatcher { + postingsForMatchersCalls.Store(0) + + require.Len(t, runQuery(t, ctx, i, append(m.matchers, &client.LabelMatcher{Type: client.EQUAL, Name: labels.MetricName, Value: name})), m.expectedLenght) + + // first metric name should be expired + if in == 0 { + // Query only head as the block is already cached and the head was expired + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedHeadPostingCall)) + } else { + require.Equal(t, postingsForMatchersCalls.Load(), int64(0)) + } + } + } + + // Check if the new metric name was added + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricNames[0]}}), 11) + // Query only head as the block is already cached and the head was expired + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedHeadPostingCall)) + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricNames[0]}}), 11) + // Return all from the caches + require.Equal(t, postingsForMatchersCalls.Load(), int64(0)) + + // Should never cache head postings there is not matcher for the metric name + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) + // Query block and head but bypass head + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedBlockPostingCall)) + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) + // Return cached value from block and bypass head + require.Equal(t, int64(0), postingsForMatchersCalls.Load()) + }) + } +} + func TestIngester_inflightPushRequests(t *testing.T) { limits := InstanceLimits{MaxInflightPushRequests: 1} @@ -5103,7 +5359,7 @@ func TestIngester_inflightPushRequests(t *testing.T) { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { count := 3500000 - req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", count)), count) + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", count)), count, 1) // Signal that we're going to do the real push now. close(startCh) @@ -5120,7 +5376,7 @@ func TestIngester_inflightPushRequests(t *testing.T) { } time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing... - req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1024) + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1024, 1) _, err := i.Push(ctx, req) require.Equal(t, errTooManyInflightPushRequests, err) @@ -5182,14 +5438,14 @@ func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) { require.Equal(t, err, errTooManyInflightQueryRequests) } -func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest { +func generateSamplesForLabel(l labels.Labels, count int, sampleIntervalInMs int) *cortexpb.WriteRequest { var lbls = make([]labels.Labels, 0, count) var samples = make([]cortexpb.Sample, 0, count) for i := 0; i < count; i++ { samples = append(samples, cortexpb.Sample{ Value: float64(i), - TimestampMs: int64(i), + TimestampMs: int64(i * sampleIntervalInMs), }) lbls = append(lbls, l) } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 3e2d98fc4b..1e5fc1b0c7 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" + "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" util_math "github.com/cortexproject/cortex/pkg/util/math" ) @@ -54,6 +55,9 @@ type ingesterMetrics struct { maxInflightPushRequests prometheus.GaugeFunc inflightRequests prometheus.GaugeFunc inflightQueryRequests prometheus.GaugeFunc + + // Posting Cache Metrics + expandedPostingsCacheMetrics *tsdb.ExpandedPostingsCacheMetrics } func newIngesterMetrics(r prometheus.Registerer, @@ -63,6 +67,7 @@ func newIngesterMetrics(r prometheus.Registerer, ingestionRate *util_math.EwmaRate, inflightPushRequests *atomic.Int64, maxInflightQueryRequests *util_math.MaxTracker, + postingsCacheEnabled bool, ) *ingesterMetrics { const ( instanceLimits = "cortex_ingester_instance_limits" @@ -235,6 +240,10 @@ func newIngesterMetrics(r prometheus.Registerer, }, []string{"user"}), } + if postingsCacheEnabled && r != nil { + m.expandedPostingsCacheMetrics = tsdb.NewPostingCacheMetrics(r) + } + if activeSeriesEnabled && r != nil { r.MustRegister(m.activeSeriesPerUser) } diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index df992686d3..5c2e0e15d5 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -34,7 +34,8 @@ func TestIngesterMetrics(t *testing.T) { }, ingestionRate, inflightPushRequests, - &maxInflightQueryRequests) + &maxInflightQueryRequests, + false) require.NotNil(t, m) diff --git a/pkg/storage/tsdb/cached_chunks_querier.go b/pkg/storage/tsdb/cached_chunks_querier.go new file mode 100644 index 0000000000..3dcb19ab4c --- /dev/null +++ b/pkg/storage/tsdb/cached_chunks_querier.go @@ -0,0 +1,128 @@ +package tsdb + +import ( + "context" + "errors" + "fmt" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + prom_tsdb "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" +) + +/* + This file is basically a copy from https://github.com/prometheus/prometheus/blob/e2e01c1cffbfc4f26f5e9fe6138af87d7ff16122/tsdb/querier.go + with the difference that the PostingsForMatchers function is called from the Postings Cache +*/ + +type blockBaseQuerier struct { + blockID ulid.ULID + index prom_tsdb.IndexReader + chunks prom_tsdb.ChunkReader + tombstones tombstones.Reader + + closed bool + + mint, maxt int64 +} + +func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) { + indexr, err := b.Index() + if err != nil { + return nil, fmt.Errorf("open index reader: %w", err) + } + chunkr, err := b.Chunks() + if err != nil { + indexr.Close() + return nil, fmt.Errorf("open chunk reader: %w", err) + } + tombsr, err := b.Tombstones() + if err != nil { + indexr.Close() + chunkr.Close() + return nil, fmt.Errorf("open tombstone reader: %w", err) + } + + if tombsr == nil { + tombsr = tombstones.NewMemTombstones() + } + return &blockBaseQuerier{ + blockID: b.Meta().ULID, + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil +} + +func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.SortedLabelValues(ctx, name, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.LabelNames(ctx, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) Close() error { + if q.closed { + return errors.New("block querier already closed") + } + + errs := tsdb_errors.NewMulti( + q.index.Close(), + q.chunks.Close(), + q.tombstones.Close(), + ) + q.closed = true + return errs.Err() +} + +type cachedBlockChunkQuerier struct { + *blockBaseQuerier + + cache ExpandedPostingsCache +} + +func NewCachedBlockChunkQuerier(cache ExpandedPostingsCache, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + q, err := newBlockBaseQuerier(b, mint, maxt) + if err != nil { + return nil, err + } + return &cachedBlockChunkQuerier{blockBaseQuerier: q, cache: cache}, nil +} + +func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt, q.cache) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, index prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64, + cache ExpandedPostingsCache, +) storage.ChunkSeriesSet { + disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + + if hints != nil { + mint = hints.Start + maxt = hints.End + disableTrimming = hints.DisableTrimming + } + p, err := cache.PostingsForMatchers(ctx, blockID, index, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + if sharded { + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } + if sortSeries { + p = index.SortedPostings(p) + } + return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming) +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 5a33115182..323b8f8787 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -166,6 +166,9 @@ type TSDBConfig struct { // Enable native histogram ingestion. EnableNativeHistograms bool `yaml:"enable_native_histograms"` + + // Posting Cache Configuration for TSDB + PostingsCache TSDBPostingsCacheConfig `yaml:"postings_cache"` } // RegisterFlags registers the TSDBConfig flags. @@ -195,6 +198,8 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.") f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.") f.BoolVar(&cfg.EnableNativeHistograms, "blocks-storage.tsdb.enable-native-histograms", false, "[EXPERIMENTAL] True to enable native histogram.") + + cfg.PostingsCache.RegisterFlagsWithPrefix("blocks-storage.", f) } // Validate the config. diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go new file mode 100644 index 0000000000..8139a79e6c --- /dev/null +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -0,0 +1,372 @@ +package tsdb + +import ( + "container/list" + "context" + "flag" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/cortexproject/cortex/pkg/util/extract" +) + +var ( + rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD") + headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") +) + +type ExpandedPostingsCacheMetrics struct { + CacheRequests *prometheus.CounterVec + CacheHits *prometheus.CounterVec + CacheEvicts *prometheus.CounterVec +} + +func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics { + return &ExpandedPostingsCacheMetrics{ + CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_cache_requests", + Help: "Count of cache adds in the ingester postings cache.", + }, []string{"cache"}), + CacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_cache_hits", + Help: "Count of cache hits in the ingester postings cache.", + }, []string{"cache"}), + CacheEvicts: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_cache_evicts", + Help: "Count of cache evictions in the ingester postings cache, excluding items that got evicted due to TTL.", + }, []string{"cache"}), + } +} + +type TSDBPostingsCacheConfig struct { + Head PostingsCacheConfig `yaml:"head"` + Blocks PostingsCacheConfig `yaml:"blocks"` + + PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"` + timeNow func() time.Time `yaml:"-"` +} + +type PostingsCacheConfig struct { + MaxBytes int64 `yaml:"max_bytes"` + MaxItems int `yaml:"max_items"` + Ttl time.Duration `yaml:"ttl"` + Enabled bool `yaml:"enabled"` +} + +func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.Head.RegisterFlagsWithPrefix(prefix, "head", f) + cfg.Blocks.RegisterFlagsWithPrefix(prefix, "block", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) { + f.Int64Var(&cfg.MaxBytes, prefix+"postings-cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache") + f.IntVar(&cfg.MaxItems, prefix+"postings-cache."+block+".max-items", 10000, "Max items for postings cache") + f.DurationVar(&cfg.Ttl, prefix+"postings-cache."+block+".ttl", 10*time.Minute, "TTL for postings cache") + f.BoolVar(&cfg.Enabled, prefix+"postings-cache."+block+".enabled", false, "Whether the postings cache is enabled or not") +} + +type ExpandedPostingsCache interface { + PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + ExpireSeries(metric labels.Labels) +} + +type BlocksPostingsForMatchersCache struct { + strippedLock []sync.RWMutex + + headCache *fifoCache[*postingsPromise] + blocksCache *fifoCache[*postingsPromise] + + headSeedByMetricName []int + postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + timeNow func() time.Time + + metrics *ExpandedPostingsCacheMetrics +} + +func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { + if cfg.PostingsForMatchers == nil { + cfg.PostingsForMatchers = tsdb.PostingsForMatchers + } + + if cfg.timeNow == nil { + cfg.timeNow = time.Now + } + + return &BlocksPostingsForMatchersCache{ + headCache: newFifoCache[*postingsPromise](cfg.Head, "head", metrics, cfg.timeNow), + blocksCache: newFifoCache[*postingsPromise](cfg.Blocks, "block", metrics, cfg.timeNow), + headSeedByMetricName: make([]int, 10000), + strippedLock: make([]sync.RWMutex, 1000), + postingsForMatchersFunc: cfg.PostingsForMatchers, + timeNow: cfg.timeNow, + metrics: metrics, + } +} + +func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { + metricName, err := extract.MetricNameFromLabels(metric) + if err != nil { + return + } + + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := h % uint64(len(c.strippedLock)) + c.strippedLock[l].Lock() + defer c.strippedLock[l].Unlock() + c.headSeedByMetricName[i]++ +} + +func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + return c.fetchPostings(blockID, ix, ms...)(ctx) +} + +func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { + var seed string + cache := c.blocksCache + + // If is a head block, lets add the seed on the cache key so we can + // invalidate the cache when new series are created for this metric name + if isHeadBlock(blockID) { + cache = c.headCache + metricName, ok := metricNameFromMatcher(ms) + // Lets not cache head if we don;t find an equal matcher for the label __name__ + if !ok { + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + } + + seed = c.getSeedForMetricName(metricName) + } + + // Let's bypass cache if not enabled + if !cache.cfg.Enabled { + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + } + + c.metrics.CacheRequests.WithLabelValues(cache.name).Inc() + + key := c.cacheKey(seed, blockID, ms...) + + promise := &postingsPromise{ + done: make(chan struct{}), + } + oldPromise, loaded := cache.getOrStore(key, promise) + if loaded { + c.metrics.CacheHits.WithLabelValues(cache.name).Inc() + close(promise.done) + return func(ctx context.Context) (index.Postings, error) { + return oldPromise.result(ctx) + } + } + defer close(promise.done) + // Use context.Background() as this promise is maybe shared across calls + postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) + + if err == nil { + promise.ids, promise.err = index.ExpandPostings(postings) + sizeBytes := int64(len(key)) + int64(len(promise.ids)*8) + cache.created(key, c.timeNow(), sizeBytes) + } + + return promise.result +} + +func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := h % uint64(len(c.strippedLock)) + c.strippedLock[l].RLock() + defer c.strippedLock[l].RUnlock() + return strconv.Itoa(c.headSeedByMetricName[i]) +} + +func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { + slices.SortFunc(ms, func(i, j *labels.Matcher) int { + if i.Type != j.Type { + return int(i.Type - j.Type) + } + if i.Name != j.Name { + return strings.Compare(i.Name, j.Name) + } + if i.Value != j.Value { + return strings.Compare(i.Value, j.Value) + } + return 0 + }) + + const ( + typeLen = 2 + sepLen = 1 + ) + + var size int + for _, m := range ms { + size += len(seed) + len(blockID.String()) + len(m.Name) + len(m.Value) + typeLen + 2*sepLen + } + sb := strings.Builder{} + sb.Grow(size) + sb.WriteString(seed) + sb.WriteByte('|') + sb.WriteString(blockID.String()) + for _, m := range ms { + sb.WriteString(m.Name) + sb.WriteString(m.Type.String()) + sb.WriteString(m.Value) + sb.WriteByte('|') + } + key := sb.String() + return key +} + +func isHeadBlock(blockID ulid.ULID) bool { + return blockID == rangeHeadULID || blockID == headULID +} + +func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { + for _, m := range ms { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + return m.Value, true + } + } + + return "", false +} + +type fifoCache[V any] struct { + cfg PostingsCacheConfig + cachedValues *sync.Map + timeNow func() time.Time + name string + metrics ExpandedPostingsCacheMetrics + + // Fields from here should be locked + cachedMtx sync.RWMutex + cached *list.List + cachedBytes int64 +} + +func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { + return &fifoCache[V]{ + cachedValues: new(sync.Map), + cached: list.New(), + cfg: cfg, + timeNow: timeNow, + name: name, + metrics: *metrics, + } +} + +func (c *fifoCache[V]) expire() { + if c.cfg.Ttl <= 0 { + return + } + c.cachedMtx.RLock() + if !c.shouldEvictHead() { + c.cachedMtx.RUnlock() + return + } + c.cachedMtx.RUnlock() + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + for c.shouldEvictHead() { + c.evictHead() + } +} + +func (c *fifoCache[V]) getOrStore(k string, v V) (V, bool) { + c.expire() + if !c.cfg.Enabled { + return v, false + } + + loaded, ok := c.cachedValues.LoadOrStore(k, v) + return loaded.(V), ok +} + +func (c *fifoCache[V]) contains(k string) bool { + _, ok := c.cachedValues.Load(k) + return ok +} + +func (c *fifoCache[V]) shouldEvictHead() bool { + if c.cached.Len() > c.cfg.MaxItems || c.cachedBytes > c.cfg.MaxBytes { + c.metrics.CacheEvicts.WithLabelValues(c.name).Inc() + return true + } + h := c.cached.Front() + if h == nil { + return false + } + ts := h.Value.(*cacheEntry).ts + r := c.timeNow().Sub(ts) + return r >= c.cfg.Ttl +} +func (c *fifoCache[V]) evictHead() { + front := c.cached.Front() + oldest := front.Value.(*cacheEntry) + c.cachedValues.Delete(oldest.key) + c.cached.Remove(front) + c.cachedBytes -= oldest.sizeBytes +} + +func (c *fifoCache[V]) created(key string, ts time.Time, sizeBytes int64) { + if c.cfg.Ttl <= 0 { + c.cachedValues.Delete(key) + return + } + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached.PushBack(&cacheEntry{ + key: key, + ts: ts, + sizeBytes: sizeBytes, + }) + c.cachedBytes += sizeBytes +} + +type cacheEntry struct { + key string + ts time.Time + sizeBytes int64 +} + +type postingsPromise struct { + done chan struct{} + + ids []storage.SeriesRef + err error +} + +func (p *postingsPromise) result(ctx context.Context) (index.Postings, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-p.done: + if ctx.Err() != nil { + return nil, ctx.Err() + } + + return index.NewListPostings(p.ids), p.err + } +} + +func MemHashString(str string) uint64 { + return xxhash.Sum64(yoloBuf(str)) +} diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go new file mode 100644 index 0000000000..555c9d5e9e --- /dev/null +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -0,0 +1,103 @@ +package tsdb + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestFifoCacheDisabled(t *testing.T) { + cfg := PostingsCacheConfig{} + cfg.Enabled = false + m := NewPostingCacheMetrics(prometheus.DefaultRegisterer) + timeNow := time.Now + cache := newFifoCache[int](cfg, "test", m, timeNow) + old, loaded := cache.getOrStore("key1", 1) + require.False(t, loaded) + require.Equal(t, 1, old) + require.False(t, cache.contains("key1")) +} + +func TestFifoCacheExpire(t *testing.T) { + tc := map[string]struct { + cfg PostingsCacheConfig + expectedFinalItems int + ttlExpire bool + }{ + "MaxItems": { + expectedFinalItems: 3, + cfg: PostingsCacheConfig{ + MaxItems: 3, + Enabled: true, + Ttl: time.Hour, + MaxBytes: 10 << 20, + }, + }, + "MaxBytes": { + expectedFinalItems: 10, + cfg: PostingsCacheConfig{ + MaxItems: 10 << 20, + Enabled: true, + Ttl: time.Hour, + MaxBytes: 80, // 10*8, + }, + }, + "TTL": { + expectedFinalItems: 10, + ttlExpire: true, + cfg: PostingsCacheConfig{ + MaxItems: 10 << 20, + Enabled: true, + Ttl: time.Hour, + MaxBytes: 80, // 10*8, + }, + }, + } + + for name, c := range tc { + t.Run(name, func(t *testing.T) { + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) + timeNow := time.Now + cache := newFifoCache[int](c.cfg, "test", m, timeNow) + + numberOfKeys := 100 + + for i := 0; i < numberOfKeys; i++ { + key := fmt.Sprintf("key%d", i) + old, loaded := cache.getOrStore(key, 1) + require.False(t, loaded) + cache.created(key, time.Now(), 8) + require.Equal(t, 1, old) + require.True(t, cache.contains(key)) + old, loaded = cache.getOrStore(key, 1) + require.True(t, loaded) + require.Equal(t, 1, old) + } + + totalCacheSize := 0 + + for i := 0; i < numberOfKeys; i++ { + key := fmt.Sprintf("key%d", i) + if cache.contains(key) { + totalCacheSize++ + } + } + + require.Equal(t, c.expectedFinalItems, totalCacheSize) + + if c.ttlExpire { + for i := 0; i < numberOfKeys; i++ { + key := fmt.Sprintf("key%d", i) + cache.timeNow = func() time.Time { + return timeNow().Add(2 * c.cfg.Ttl) + } + _, loaded := cache.getOrStore(key, 1) + require.False(t, loaded) + } + } + }) + } +} From b88cf6b2b69748c22ceb446a06ee671cffe93f03 Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 1 Nov 2024 13:39:02 -0700 Subject: [PATCH 02/10] small nit Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 8139a79e6c..9a578ffd88 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -27,6 +27,14 @@ var ( headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") ) +const ( + // size of the seed array. Each seed is a 64bits int (8 bytes) + // totaling 8mb + seedArraySize = 1024 * 1024 + + numOfSeedsStripes = 512 +) + type ExpandedPostingsCacheMetrics struct { CacheRequests *prometheus.CounterVec CacheHits *prometheus.CounterVec @@ -108,8 +116,8 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp return &BlocksPostingsForMatchersCache{ headCache: newFifoCache[*postingsPromise](cfg.Head, "head", metrics, cfg.timeNow), blocksCache: newFifoCache[*postingsPromise](cfg.Blocks, "block", metrics, cfg.timeNow), - headSeedByMetricName: make([]int, 10000), - strippedLock: make([]sync.RWMutex, 1000), + headSeedByMetricName: make([]int, seedArraySize), + strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: cfg.PostingsForMatchers, timeNow: cfg.timeNow, metrics: metrics, From 132de4b862960a3f7c6c24a63cfe36b94095717a Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 1 Nov 2024 16:40:47 -0700 Subject: [PATCH 03/10] refactoring the cache so we dont need to call expire on every request Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 132 +++++++++++------- .../tsdb/expanded_postings_cache_test.go | 63 ++++++--- 2 files changed, 127 insertions(+), 68 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 9a578ffd88..7f2f610971 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -94,8 +94,8 @@ type ExpandedPostingsCache interface { type BlocksPostingsForMatchersCache struct { strippedLock []sync.RWMutex - headCache *fifoCache[*postingsPromise] - blocksCache *fifoCache[*postingsPromise] + headCache *fifoCache[[]storage.SeriesRef] + blocksCache *fifoCache[[]storage.SeriesRef] headSeedByMetricName []int postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) @@ -114,8 +114,8 @@ func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *Exp } return &BlocksPostingsForMatchersCache{ - headCache: newFifoCache[*postingsPromise](cfg.Head, "head", metrics, cfg.timeNow), - blocksCache: newFifoCache[*postingsPromise](cfg.Blocks, "block", metrics, cfg.timeNow), + headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), + blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), headSeedByMetricName: make([]int, seedArraySize), strippedLock: make([]sync.RWMutex, numOfSeedsStripes), postingsForMatchersFunc: cfg.PostingsForMatchers, @@ -170,30 +170,32 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd c.metrics.CacheRequests.WithLabelValues(cache.name).Inc() - key := c.cacheKey(seed, blockID, ms...) + fetch := func() ([]storage.SeriesRef, int64, error) { + // Use context.Background() as this promise is maybe shared across calls + postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) - promise := &postingsPromise{ - done: make(chan struct{}), + if err == nil { + ids, err := index.ExpandPostings(postings) + return ids, int64(len(ids) * 8), err + } + + return nil, 0, err } - oldPromise, loaded := cache.getOrStore(key, promise) + + key := c.cacheKey(seed, blockID, ms...) + promise, loaded := cache.getPromiseForKey(key, fetch) if loaded { c.metrics.CacheHits.WithLabelValues(cache.name).Inc() - close(promise.done) - return func(ctx context.Context) (index.Postings, error) { - return oldPromise.result(ctx) - } - } - defer close(promise.done) - // Use context.Background() as this promise is maybe shared across calls - postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) - - if err == nil { - promise.ids, promise.err = index.ExpandPostings(postings) - sizeBytes := int64(len(key)) + int64(len(promise.ids)*8) - cache.created(key, c.timeNow(), sizeBytes) } - return promise.result + return c.result(promise) +} + +func (c *BlocksPostingsForMatchersCache) result(promise *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { + return func(ctx context.Context) (index.Postings, error) { + ids, err := promise.result(ctx) + return index.NewListPostings(ids), err + } } func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { @@ -298,14 +300,38 @@ func (c *fifoCache[V]) expire() { } } -func (c *fifoCache[V]) getOrStore(k string, v V) (V, bool) { - c.expire() +func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { + r := &cacheEntryPromise[V]{ + done: make(chan struct{}), + } + + defer close(r.done) + if !c.cfg.Enabled { - return v, false + r.v, _, r.err = fetch() + return r, false } - loaded, ok := c.cachedValues.LoadOrStore(k, v) - return loaded.(V), ok + loaded, ok := c.cachedValues.LoadOrStore(k, r) + + if !ok { + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + r.ts = c.timeNow() + c.created(k, r.sizeBytes) + c.expire() + } + + // If is cached but is expired, lets try to replace the cache value + if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) { + if c.cachedValues.CompareAndSwap(k, loaded, r) { + r.v, r.sizeBytes, r.err = fetch() + loaded = r + r.ts = c.timeNow() + } + } + + return loaded.(*cacheEntryPromise[V]), ok } func (c *fifoCache[V]) contains(k string) bool { @@ -322,59 +348,63 @@ func (c *fifoCache[V]) shouldEvictHead() bool { if h == nil { return false } - ts := h.Value.(*cacheEntry).ts - r := c.timeNow().Sub(ts) - return r >= c.cfg.Ttl + key := h.Value.(string) + + if l, ok := c.cachedValues.Load(key); ok { + return l.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) + } + + return false } + func (c *fifoCache[V]) evictHead() { front := c.cached.Front() - oldest := front.Value.(*cacheEntry) - c.cachedValues.Delete(oldest.key) c.cached.Remove(front) - c.cachedBytes -= oldest.sizeBytes + oldestKey := front.Value.(string) + if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { + c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes + } } -func (c *fifoCache[V]) created(key string, ts time.Time, sizeBytes int64) { +func (c *fifoCache[V]) created(key string, sizeBytes int64) { if c.cfg.Ttl <= 0 { c.cachedValues.Delete(key) return } c.cachedMtx.Lock() defer c.cachedMtx.Unlock() - c.cached.PushBack(&cacheEntry{ - key: key, - ts: ts, - sizeBytes: sizeBytes, - }) + c.cached.PushBack(key) c.cachedBytes += sizeBytes } -type cacheEntry struct { - key string +type cacheEntryPromise[V any] struct { ts time.Time sizeBytes int64 -} -type postingsPromise struct { done chan struct{} - - ids []storage.SeriesRef - err error + v V + err error } -func (p *postingsPromise) result(ctx context.Context) (index.Postings, error) { +func (ce *cacheEntryPromise[V]) result(ctx context.Context) (V, error) { select { case <-ctx.Done(): - return nil, ctx.Err() - case <-p.done: + return ce.v, ctx.Err() + case <-ce.done: if ctx.Err() != nil { - return nil, ctx.Err() + return ce.v, ctx.Err() } - return index.NewListPostings(p.ids), p.err + return ce.v, ce.err } } +func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { + ts := ce.ts + r := now.Sub(ts) + return r >= ttl +} + func MemHashString(str string) uint64 { return xxhash.Sum64(yoloBuf(str)) } diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 555c9d5e9e..139861081a 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -1,7 +1,9 @@ package tsdb import ( + "context" "fmt" + "strings" "testing" "time" @@ -15,13 +17,21 @@ func TestFifoCacheDisabled(t *testing.T) { m := NewPostingCacheMetrics(prometheus.DefaultRegisterer) timeNow := time.Now cache := newFifoCache[int](cfg, "test", m, timeNow) - old, loaded := cache.getOrStore("key1", 1) + old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { + return 1, 0, nil + }) require.False(t, loaded) - require.Equal(t, 1, old) + v, err := old.result(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, v) require.False(t, cache.contains("key1")) } func TestFifoCacheExpire(t *testing.T) { + + keySize := 20 + numberOfKeys := 100 + tc := map[string]struct { cfg PostingsCacheConfig expectedFinalItems int @@ -42,17 +52,17 @@ func TestFifoCacheExpire(t *testing.T) { MaxItems: 10 << 20, Enabled: true, Ttl: time.Hour, - MaxBytes: 80, // 10*8, + MaxBytes: int64(10 * (8 + keySize)), }, }, "TTL": { - expectedFinalItems: 10, + expectedFinalItems: numberOfKeys, ttlExpire: true, cfg: PostingsCacheConfig{ MaxItems: 10 << 20, Enabled: true, Ttl: time.Hour, - MaxBytes: 80, // 10*8, + MaxBytes: 10 << 20, }, }, } @@ -63,24 +73,29 @@ func TestFifoCacheExpire(t *testing.T) { timeNow := time.Now cache := newFifoCache[int](c.cfg, "test", m, timeNow) - numberOfKeys := 100 - for i := 0; i < numberOfKeys; i++ { - key := fmt.Sprintf("key%d", i) - old, loaded := cache.getOrStore(key, 1) + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 8, nil + }) require.False(t, loaded) - cache.created(key, time.Now(), 8) - require.Equal(t, 1, old) + v, err := p.result(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, v) require.True(t, cache.contains(key)) - old, loaded = cache.getOrStore(key, 1) + p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 0, nil + }) require.True(t, loaded) - require.Equal(t, 1, old) + v, err = p.result(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, v) } totalCacheSize := 0 for i := 0; i < numberOfKeys; i++ { - key := fmt.Sprintf("key%d", i) + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) if cache.contains(key) { totalCacheSize++ } @@ -90,14 +105,28 @@ func TestFifoCacheExpire(t *testing.T) { if c.ttlExpire { for i := 0; i < numberOfKeys; i++ { - key := fmt.Sprintf("key%d", i) + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) cache.timeNow = func() time.Time { return timeNow().Add(2 * c.cfg.Ttl) } - _, loaded := cache.getOrStore(key, 1) - require.False(t, loaded) + p, _ := cache.getPromiseForKey(key, func() (int, int64, error) { + return 2, 8, nil + }) + //require.False(t, loaded) + v, err := p.result(context.Background()) + require.NoError(t, err) + // New value + require.Equal(t, 2, v) } } }) } } + +func RepeatStringIfNeeded(seed string, length int) string { + if len(seed) > length { + return seed + } + + return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] +} From c153cb11daa9b0265c018a367fa3b60c65a920aa Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 1 Nov 2024 16:54:28 -0700 Subject: [PATCH 04/10] Update total cache size when updating the item Signed-off-by: alanprot --- pkg/storage/tsdb/expanded_postings_cache.go | 14 +++++++++++++- pkg/storage/tsdb/expanded_postings_cache_test.go | 16 ++++++++++------ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 7f2f610971..57137d4db2 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -304,7 +304,6 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) r := &cacheEntryPromise[V]{ done: make(chan struct{}), } - defer close(r.done) if !c.cfg.Enabled { @@ -326,8 +325,11 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error) if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) { if c.cachedValues.CompareAndSwap(k, loaded, r) { r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) loaded = r r.ts = c.timeNow() + ok = false } } @@ -377,6 +379,16 @@ func (c *fifoCache[V]) created(key string, sizeBytes int64) { c.cachedBytes += sizeBytes } +func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { + if oldSize == newSizeBytes { + return + } + + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cachedBytes += newSizeBytes - oldSize +} + type cacheEntryPromise[V any] struct { ts time.Time sizeBytes int64 diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 139861081a..380e49f728 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -104,19 +104,23 @@ func TestFifoCacheExpire(t *testing.T) { require.Equal(t, c.expectedFinalItems, totalCacheSize) if c.ttlExpire { + cache.timeNow = func() time.Time { + return timeNow().Add(2 * c.cfg.Ttl) + } + for i := 0; i < numberOfKeys; i++ { key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) - cache.timeNow = func() time.Time { - return timeNow().Add(2 * c.cfg.Ttl) - } - p, _ := cache.getPromiseForKey(key, func() (int, int64, error) { - return 2, 8, nil + originalSize := cache.cachedBytes + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 2, 18, nil }) - //require.False(t, loaded) + require.False(t, loaded) v, err := p.result(context.Background()) require.NoError(t, err) // New value require.Equal(t, 2, v) + // Total Size Updated + require.Equal(t, originalSize+10, cache.cachedBytes) } } }) From be467a79007d6a0b8ba8efa2f1e3ca7859d52a47 Mon Sep 17 00:00:00 2001 From: alanprot Date: Fri, 1 Nov 2024 17:16:56 -0700 Subject: [PATCH 05/10] Fix fuzzy test after change the flag name Signed-off-by: alanprot --- CHANGELOG.md | 1 + docs/blocks-storage/querier.md | 21 ++++++++++-------- docs/blocks-storage/store-gateway.md | 21 ++++++++++-------- docs/configuration/config-file-reference.md | 21 ++++++++++-------- integration/query_fuzz_test.go | 24 ++++++++++----------- pkg/storage/tsdb/config.go | 2 +- pkg/storage/tsdb/expanded_postings_cache.go | 8 +++---- 7 files changed, 54 insertions(+), 44 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 879682273f..63cddfebb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209 * [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257 * [ENHANCEMENT] Upgrade build image and Go version to 1.23.2. #6261 #6262 +* [ENHANCEMENT] Ingester: Introduce a new experimental feature for caching expanded postings on the ingester. #6296 * [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276 * [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 2387d61ac0..a387d91519 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1545,38 +1545,41 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] - postings_cache: + # [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when + # querying blocks. Caching can be configured separately for the head and + # compacted blocks. + expanded_postings_cache: head: # Max bytes for postings cache - # CLI flag: -blocks-storage.postings-cache.head.max-bytes + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] # Max items for postings cache - # CLI flag: -blocks-storage.postings-cache.head.max-items + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-items [max_items: | default = 10000] # TTL for postings cache - # CLI flag: -blocks-storage.postings-cache.head.ttl + # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.postings-cache.head.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled [enabled: | default = false] blocks: # Max bytes for postings cache - # CLI flag: -blocks-storage.postings-cache.block.max-bytes + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] # Max items for postings cache - # CLI flag: -blocks-storage.postings-cache.block.max-items + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-items [max_items: | default = 10000] # TTL for postings cache - # CLI flag: -blocks-storage.postings-cache.block.ttl + # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.postings-cache.block.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled [enabled: | default = false] ``` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 135341d6e2..dd7bc23890 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1636,38 +1636,41 @@ blocks_storage: # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] - postings_cache: + # [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when + # querying blocks. Caching can be configured separately for the head and + # compacted blocks. + expanded_postings_cache: head: # Max bytes for postings cache - # CLI flag: -blocks-storage.postings-cache.head.max-bytes + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] # Max items for postings cache - # CLI flag: -blocks-storage.postings-cache.head.max-items + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-items [max_items: | default = 10000] # TTL for postings cache - # CLI flag: -blocks-storage.postings-cache.head.ttl + # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.postings-cache.head.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled [enabled: | default = false] blocks: # Max bytes for postings cache - # CLI flag: -blocks-storage.postings-cache.block.max-bytes + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] # Max items for postings cache - # CLI flag: -blocks-storage.postings-cache.block.max-items + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-items [max_items: | default = 10000] # TTL for postings cache - # CLI flag: -blocks-storage.postings-cache.block.ttl + # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.postings-cache.block.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled [enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 49a67f78b0..72b26a8cf4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2082,39 +2082,42 @@ tsdb: # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] - postings_cache: + # [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when + # querying blocks. Caching can be configured separately for the head and + # compacted blocks. + expanded_postings_cache: head: # Max bytes for postings cache - # CLI flag: -blocks-storage.postings-cache.head.max-bytes + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] # Max items for postings cache - # CLI flag: -blocks-storage.postings-cache.head.max-items + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-items [max_items: | default = 10000] # TTL for postings cache - # CLI flag: -blocks-storage.postings-cache.head.ttl + # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.postings-cache.head.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled [enabled: | default = false] blocks: # Max bytes for postings cache - # CLI flag: -blocks-storage.postings-cache.block.max-bytes + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] # Max items for postings cache - # CLI flag: -blocks-storage.postings-cache.block.max-items + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-items [max_items: | default = 10000] # TTL for postings cache - # CLI flag: -blocks-storage.postings-cache.block.ttl + # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.postings-cache.block.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled [enabled: | default = false] ``` diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 1c9c5cfe8e..664786b4b3 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -243,18 +243,18 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) { flags2 := mergeFlags( AlertmanagerLocalFlags(), map[string]string{ - "-store.engine": blocksStorageEngine, - "-blocks-storage.backend": "filesystem", - "-blocks-storage.tsdb.head-compaction-interval": "4m", - "-blocks-storage.tsdb.block-ranges-period": "2h", - "-blocks-storage.tsdb.ship-interval": "1h", - "-blocks-storage.bucket-store.sync-interval": "15m", - "-blocks-storage.tsdb.retention-period": "2h", - "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, - "-blocks-storage.bucket-store.bucket-index.enabled": "true", - "-querier.query-store-for-labels-enabled": "true", - "-blocks-storage.postings-cache.head.enabled": "true", - "-blocks-storage.postings-cache.block.enabled": "true", + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.expanded_postings_cache.head.enabled": "true", + "-blocks-storage.expanded_postings_cache.block.enabled": "true", // Ingester. "-ring.store": "consul", "-consul.hostname": consul2.NetworkHTTPEndpoint(), diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 323b8f8787..612061676c 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -168,7 +168,7 @@ type TSDBConfig struct { EnableNativeHistograms bool `yaml:"enable_native_histograms"` // Posting Cache Configuration for TSDB - PostingsCache TSDBPostingsCacheConfig `yaml:"postings_cache"` + PostingsCache TSDBPostingsCacheConfig `yaml:"expanded_postings_cache" doc:"description=[EXPERIMENTAL] If enabled, ingesters will cache expanded postings when querying blocks. Caching can be configured separately for the head and compacted blocks."` } // RegisterFlags registers the TSDBConfig flags. diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 57137d4db2..e10b4980ae 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -80,10 +80,10 @@ func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *fl // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) { - f.Int64Var(&cfg.MaxBytes, prefix+"postings-cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache") - f.IntVar(&cfg.MaxItems, prefix+"postings-cache."+block+".max-items", 10000, "Max items for postings cache") - f.DurationVar(&cfg.Ttl, prefix+"postings-cache."+block+".ttl", 10*time.Minute, "TTL for postings cache") - f.BoolVar(&cfg.Enabled, prefix+"postings-cache."+block+".enabled", false, "Whether the postings cache is enabled or not") + f.Int64Var(&cfg.MaxBytes, prefix+"expanded_postings_cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache") + f.IntVar(&cfg.MaxItems, prefix+"expanded_postings_cache."+block+".max-items", 10000, "Max items for postings cache") + f.DurationVar(&cfg.Ttl, prefix+"expanded_postings_cache."+block+".ttl", 10*time.Minute, "TTL for postings cache") + f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not") } type ExpandedPostingsCache interface { From d8d226daf8f8d280e2d2261ff9ef7883d2097c82 Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 4 Nov 2024 14:29:44 -0800 Subject: [PATCH 06/10] remove max item config + create a new test case with only head cache enabled Signed-off-by: alanprot --- docs/blocks-storage/querier.md | 8 -------- docs/blocks-storage/store-gateway.md | 8 -------- docs/configuration/config-file-reference.md | 8 -------- pkg/ingester/ingester_test.go | 18 +++++++++++++----- pkg/storage/tsdb/expanded_postings_cache.go | 4 +--- 5 files changed, 14 insertions(+), 32 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index a387d91519..f71a4be780 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1554,10 +1554,6 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] - # Max items for postings cache - # CLI flag: -blocks-storage.expanded_postings_cache.head.max-items - [max_items: | default = 10000] - # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] @@ -1571,10 +1567,6 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] - # Max items for postings cache - # CLI flag: -blocks-storage.expanded_postings_cache.block.max-items - [max_items: | default = 10000] - # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index dd7bc23890..96e852ddbc 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1645,10 +1645,6 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] - # Max items for postings cache - # CLI flag: -blocks-storage.expanded_postings_cache.head.max-items - [max_items: | default = 10000] - # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] @@ -1662,10 +1658,6 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] - # Max items for postings cache - # CLI flag: -blocks-storage.expanded_postings_cache.block.max-items - [max_items: | default = 10000] - # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 72b26a8cf4..b7c0ef0674 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2091,10 +2091,6 @@ tsdb: # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] - # Max items for postings cache - # CLI flag: -blocks-storage.expanded_postings_cache.head.max-items - [max_items: | default = 10000] - # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] @@ -2108,10 +2104,6 @@ tsdb: # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] - # Max items for postings cache - # CLI flag: -blocks-storage.expanded_postings_cache.block.max-items - [max_items: | default = 10000] - # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index a922678f9c..81e5a00234 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5113,31 +5113,39 @@ func TestExpendedPostingsCache(t *testing.T) { }, }, }, - "enabled cache blocks": { + "enabled cache on compacted blocks": { expectedBlockPostingCall: 1, expectedHeadPostingCall: 0, cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ Blocks: cortex_tsdb.PostingsCacheConfig{ Ttl: time.Hour, - MaxItems: 1000, MaxBytes: 1024 * 1024 * 1024, Enabled: true, }, }, }, - "enabled cache blocks and head": { + "enabled cache on head": { + expectedBlockPostingCall: 0, + expectedHeadPostingCall: 1, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Head: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, + "enabled cache on compacted blocks and head": { expectedBlockPostingCall: 1, expectedHeadPostingCall: 1, cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ Blocks: cortex_tsdb.PostingsCacheConfig{ Ttl: time.Hour, - MaxItems: 1000, MaxBytes: 1024 * 1024 * 1024, Enabled: true, }, Head: cortex_tsdb.PostingsCacheConfig{ Ttl: time.Hour, - MaxItems: 1000, MaxBytes: 1024 * 1024 * 1024, Enabled: true, }, diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index e10b4980ae..92a739a55c 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -68,7 +68,6 @@ type TSDBPostingsCacheConfig struct { type PostingsCacheConfig struct { MaxBytes int64 `yaml:"max_bytes"` - MaxItems int `yaml:"max_items"` Ttl time.Duration `yaml:"ttl"` Enabled bool `yaml:"enabled"` } @@ -81,7 +80,6 @@ func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *fl // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) { f.Int64Var(&cfg.MaxBytes, prefix+"expanded_postings_cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache") - f.IntVar(&cfg.MaxItems, prefix+"expanded_postings_cache."+block+".max-items", 10000, "Max items for postings cache") f.DurationVar(&cfg.Ttl, prefix+"expanded_postings_cache."+block+".ttl", 10*time.Minute, "TTL for postings cache") f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not") } @@ -342,7 +340,7 @@ func (c *fifoCache[V]) contains(k string) bool { } func (c *fifoCache[V]) shouldEvictHead() bool { - if c.cached.Len() > c.cfg.MaxItems || c.cachedBytes > c.cfg.MaxBytes { + if c.cachedBytes > c.cfg.MaxBytes { c.metrics.CacheEvicts.WithLabelValues(c.name).Inc() return true } From 41aad718f1ccc8edcde3535a41590086eea37d2c Mon Sep 17 00:00:00 2001 From: alanprot Date: Mon, 4 Nov 2024 14:47:53 -0800 Subject: [PATCH 07/10] Documenting enabled as first field on the config Signed-off-by: alanprot --- docs/blocks-storage/querier.md | 12 ++++++------ docs/blocks-storage/store-gateway.md | 12 ++++++------ docs/configuration/config-file-reference.md | 12 ++++++------ pkg/storage/tsdb/expanded_postings_cache.go | 2 +- pkg/storage/tsdb/expanded_postings_cache_test.go | 11 ----------- 5 files changed, 19 insertions(+), 30 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index f71a4be780..45b6617a18 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1550,6 +1550,10 @@ blocks_storage: # compacted blocks. expanded_postings_cache: head: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + [enabled: | default = false] + # Max bytes for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] @@ -1558,11 +1562,11 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + blocks: # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled [enabled: | default = false] - blocks: # Max bytes for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] @@ -1570,8 +1574,4 @@ blocks_storage: # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] - - # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled - [enabled: | default = false] ``` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 96e852ddbc..5654ecbde2 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1641,6 +1641,10 @@ blocks_storage: # compacted blocks. expanded_postings_cache: head: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + [enabled: | default = false] + # Max bytes for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] @@ -1649,11 +1653,11 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + blocks: # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled [enabled: | default = false] - blocks: # Max bytes for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] @@ -1661,8 +1665,4 @@ blocks_storage: # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] - - # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled - [enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index b7c0ef0674..34a50549d4 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2087,6 +2087,10 @@ tsdb: # compacted blocks. expanded_postings_cache: head: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + [enabled: | default = false] + # Max bytes for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes [max_bytes: | default = 10485760] @@ -2095,11 +2099,11 @@ tsdb: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + blocks: # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled [enabled: | default = false] - blocks: # Max bytes for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes [max_bytes: | default = 10485760] @@ -2107,10 +2111,6 @@ tsdb: # TTL for postings cache # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl [ttl: | default = 10m] - - # Whether the postings cache is enabled or not - # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled - [enabled: | default = false] ``` ### `compactor_config` diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 92a739a55c..43eba098cb 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -67,9 +67,9 @@ type TSDBPostingsCacheConfig struct { } type PostingsCacheConfig struct { + Enabled bool `yaml:"enabled"` MaxBytes int64 `yaml:"max_bytes"` Ttl time.Duration `yaml:"ttl"` - Enabled bool `yaml:"enabled"` } func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go index 380e49f728..d4b80abcbd 100644 --- a/pkg/storage/tsdb/expanded_postings_cache_test.go +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -37,19 +37,9 @@ func TestFifoCacheExpire(t *testing.T) { expectedFinalItems int ttlExpire bool }{ - "MaxItems": { - expectedFinalItems: 3, - cfg: PostingsCacheConfig{ - MaxItems: 3, - Enabled: true, - Ttl: time.Hour, - MaxBytes: 10 << 20, - }, - }, "MaxBytes": { expectedFinalItems: 10, cfg: PostingsCacheConfig{ - MaxItems: 10 << 20, Enabled: true, Ttl: time.Hour, MaxBytes: int64(10 * (8 + keySize)), @@ -59,7 +49,6 @@ func TestFifoCacheExpire(t *testing.T) { expectedFinalItems: numberOfKeys, ttlExpire: true, cfg: PostingsCacheConfig{ - MaxItems: 10 << 20, Enabled: true, Ttl: time.Hour, MaxBytes: 10 << 20, From ba91d8907659bb8580b3f3149a60e5ef362884e1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 4 Nov 2024 23:09:08 -0800 Subject: [PATCH 08/10] Fix race on chunks multilevel cache + Optimize to avoid refetching already found keys. (#6312) * Creating a test to show the race on the multilevel cache Signed-off-by: alanprot * fix the race problem * Only fetch keys that were not found on the previous cache Signed-off-by: alanprot --------- Signed-off-by: alanprot --- pkg/storage/tsdb/multilevel_chunk_cache.go | 17 +++++- .../tsdb/multilevel_chunk_cache_test.go | 60 +++++++++++++++++-- 2 files changed, 69 insertions(+), 8 deletions(-) diff --git a/pkg/storage/tsdb/multilevel_chunk_cache.go b/pkg/storage/tsdb/multilevel_chunk_cache.go index e1b0f5bc20..8daa2f56ce 100644 --- a/pkg/storage/tsdb/multilevel_chunk_cache.go +++ b/pkg/storage/tsdb/multilevel_chunk_cache.go @@ -98,6 +98,7 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues()) defer timer.ObserveDuration() + missingKeys := keys hits := map[string][]byte{} backfillItems := make([]map[string][]byte, len(m.caches)-1) @@ -108,13 +109,25 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str if ctx.Err() != nil { return nil } - if data := c.Fetch(ctx, keys); len(data) > 0 { + if data := c.Fetch(ctx, missingKeys); len(data) > 0 { for k, d := range data { hits[k] = d } if i > 0 && len(hits) > 0 { - backfillItems[i-1] = hits + // lets fetch only the mising keys + m := missingKeys[:0] + for _, key := range missingKeys { + if _, ok := hits[key]; !ok { + m = append(m, key) + } + } + + missingKeys = m + + for k, b := range hits { + backfillItems[i-1][k] = b + } } if len(hits) == len(keys) { diff --git a/pkg/storage/tsdb/multilevel_chunk_cache_test.go b/pkg/storage/tsdb/multilevel_chunk_cache_test.go index c72c1f3a55..4b50b8fd02 100644 --- a/pkg/storage/tsdb/multilevel_chunk_cache_test.go +++ b/pkg/storage/tsdb/multilevel_chunk_cache_test.go @@ -6,8 +6,10 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/cache" ) func Test_MultiLevelChunkCacheStore(t *testing.T) { @@ -72,6 +74,43 @@ func Test_MultiLevelChunkCacheStore(t *testing.T) { } } +func Test_MultiLevelChunkCacheFetchRace(t *testing.T) { + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: time.Hour * 24, + } + reg := prometheus.NewRegistry() + + m1 := newMockChunkCache("m1", map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }) + + inMemory, err := cache.NewInMemoryCacheWithConfig("test", log.NewNopLogger(), reg, cache.InMemoryCacheConfig{MaxSize: 10 * 1024, MaxItemSize: 1024}) + require.NoError(t, err) + + inMemory.Store(map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, time.Minute) + + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, inMemory, m1) + + hits := c.Fetch(context.Background(), []string{"key1", "key2", "key3", "key4"}) + + require.Equal(t, 3, len(hits)) + + // We should be able to change the returned values without any race problem + delete(hits, "key1") + + mlc := c.(*multiLevelChunkCache) + //Wait until async operation finishes. + mlc.backfillProcessor.Stop() + +} + func Test_MultiLevelChunkCacheFetch(t *testing.T) { cfg := MultiLevelChunkCacheConfig{ MaxAsyncConcurrency: 10, @@ -81,12 +120,14 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { } testCases := map[string]struct { - m1ExistingData map[string][]byte - m2ExistingData map[string][]byte - expectedM1Data map[string][]byte - expectedM2Data map[string][]byte - expectedFetchedData map[string][]byte - fetchKeys []string + m1ExistingData map[string][]byte + m2ExistingData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + expectedFetchedData map[string][]byte + expectedM1FetchedKeys []string + expectedM2FetchedKeys []string + fetchKeys []string }{ "fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": { m1ExistingData: map[string][]byte{ @@ -96,6 +137,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { "key2": []byte("value2"), "key3": []byte("value3"), }, + expectedM1FetchedKeys: []string{"key1", "key2", "key3"}, + expectedM2FetchedKeys: []string{"key2", "key3"}, expectedM1Data: map[string][]byte{ "key1": []byte("value1"), "key2": []byte("value2"), @@ -119,6 +162,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { m2ExistingData: map[string][]byte{ "key2": []byte("value2"), }, + expectedM1FetchedKeys: []string{"key1", "key2", "key3"}, + expectedM2FetchedKeys: []string{"key2", "key3"}, expectedM1Data: map[string][]byte{ "key1": []byte("value1"), "key2": []byte("value2"), @@ -157,6 +202,8 @@ type mockChunkCache struct { mu sync.Mutex name string data map[string][]byte + + fetchedKeys []string } func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache { @@ -180,6 +227,7 @@ func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]by h := map[string][]byte{} for _, k := range keys { + m.fetchedKeys = append(m.fetchedKeys, k) if _, ok := m.data[k]; ok { h[k] = m.data[k] } From 37c7a2871cb48e7be80669588c129806d438e17e Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 5 Nov 2024 10:32:23 -0800 Subject: [PATCH 09/10] Improve Doc Signed-off-by: alanprot --- docs/blocks-storage/querier.md | 4 ++++ docs/blocks-storage/store-gateway.md | 4 ++++ docs/configuration/config-file-reference.md | 4 ++++ pkg/storage/tsdb/expanded_postings_cache.go | 4 ++-- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 45b6617a18..4fc1958ee0 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1549,6 +1549,8 @@ blocks_storage: # querying blocks. Caching can be configured separately for the head and # compacted blocks. expanded_postings_cache: + # If enabled, ingesters will cache expanded postings for the head block. + # Only queries with with an equal matcher for metric __name__ are cached. head: # Whether the postings cache is enabled or not # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled @@ -1562,6 +1564,8 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + # If enabled, ingesters will cache expanded postings for the compacted + # blocks. The cache is shared between all blocks. blocks: # Whether the postings cache is enabled or not # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 5654ecbde2..95f90d6ed2 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1640,6 +1640,8 @@ blocks_storage: # querying blocks. Caching can be configured separately for the head and # compacted blocks. expanded_postings_cache: + # If enabled, ingesters will cache expanded postings for the head block. + # Only queries with with an equal matcher for metric __name__ are cached. head: # Whether the postings cache is enabled or not # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled @@ -1653,6 +1655,8 @@ blocks_storage: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + # If enabled, ingesters will cache expanded postings for the compacted + # blocks. The cache is shared between all blocks. blocks: # Whether the postings cache is enabled or not # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 34a50549d4..28b98a6d18 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2086,6 +2086,8 @@ tsdb: # querying blocks. Caching can be configured separately for the head and # compacted blocks. expanded_postings_cache: + # If enabled, ingesters will cache expanded postings for the head block. + # Only queries with with an equal matcher for metric __name__ are cached. head: # Whether the postings cache is enabled or not # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled @@ -2099,6 +2101,8 @@ tsdb: # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl [ttl: | default = 10m] + # If enabled, ingesters will cache expanded postings for the compacted + # blocks. The cache is shared between all blocks. blocks: # Whether the postings cache is enabled or not # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index 43eba098cb..ba4cf98c92 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -59,8 +59,8 @@ func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetri } type TSDBPostingsCacheConfig struct { - Head PostingsCacheConfig `yaml:"head"` - Blocks PostingsCacheConfig `yaml:"blocks"` + Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."` + Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."` PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"` timeNow func() time.Time `yaml:"-"` From 827e8fb621889fb482cecf05707456a9564f3079 Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 5 Nov 2024 10:58:34 -0800 Subject: [PATCH 10/10] create new cortex_ingester_expanded_postings_non_cacheable_queries metric Signed-off-by: alanprot --- pkg/ingester/ingester_test.go | 13 ++++++-- pkg/storage/tsdb/expanded_postings_cache.go | 34 +++++++++++++-------- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 81e5a00234..2f7a4c7cf6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -5245,7 +5245,7 @@ func TestExpendedPostingsCache(t *testing.T) { if c.expectedHeadPostingCall > 0 || c.expectedBlockPostingCall > 0 { metric := ` - # HELP cortex_ingester_expanded_postings_cache_requests Count of cache adds in the ingester postings cache. + # HELP cortex_ingester_expanded_postings_cache_requests Total number of requests to the cache. # TYPE cortex_ingester_expanded_postings_cache_requests counter ` if c.expectedBlockPostingCall > 0 { @@ -5275,7 +5275,7 @@ func TestExpendedPostingsCache(t *testing.T) { if c.expectedHeadPostingCall > 0 || c.expectedBlockPostingCall > 0 { metric := ` - # HELP cortex_ingester_expanded_postings_cache_hits Count of cache hits in the ingester postings cache. + # HELP cortex_ingester_expanded_postings_cache_hits Total number of hit requests to the cache. # TYPE cortex_ingester_expanded_postings_cache_hits counter ` if c.expectedBlockPostingCall > 0 { @@ -5334,6 +5334,15 @@ func TestExpendedPostingsCache(t *testing.T) { require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) // Query block and head but bypass head require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedBlockPostingCall)) + if c.cacheConfig.Head.Enabled { + err = testutil.GatherAndCompare(r, bytes.NewBufferString(` + # HELP cortex_ingester_expanded_postings_non_cacheable_queries Total number of non cacheable queries. + # TYPE cortex_ingester_expanded_postings_non_cacheable_queries counter + cortex_ingester_expanded_postings_non_cacheable_queries{cache="head"} 1 +`), "cortex_ingester_expanded_postings_non_cacheable_queries") + require.NoError(t, err) + } + postingsForMatchersCalls.Store(0) require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) // Return cached value from block and bypass head diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go index ba4cf98c92..f3c1f674ee 100644 --- a/pkg/storage/tsdb/expanded_postings_cache.go +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -36,24 +36,29 @@ const ( ) type ExpandedPostingsCacheMetrics struct { - CacheRequests *prometheus.CounterVec - CacheHits *prometheus.CounterVec - CacheEvicts *prometheus.CounterVec + CacheRequests *prometheus.CounterVec + CacheHits *prometheus.CounterVec + CacheEvicts *prometheus.CounterVec + NonCacheableQueries *prometheus.CounterVec } func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics { return &ExpandedPostingsCacheMetrics{ CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_expanded_postings_cache_requests", - Help: "Count of cache adds in the ingester postings cache.", + Help: "Total number of requests to the cache.", }, []string{"cache"}), CacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_expanded_postings_cache_hits", - Help: "Count of cache hits in the ingester postings cache.", + Help: "Total number of hit requests to the cache.", }, []string{"cache"}), CacheEvicts: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_expanded_postings_cache_evicts", - Help: "Count of cache evictions in the ingester postings cache, excluding items that got evicted due to TTL.", + Help: "Total number of evictions in the cache, excluding items that got evicted due to TTL.", + }, []string{"cache"}), + NonCacheableQueries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_non_cacheable_queries", + Help: "Total number of non cacheable queries.", }, []string{"cache"}), } } @@ -148,15 +153,18 @@ func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd // invalidate the cache when new series are created for this metric name if isHeadBlock(blockID) { cache = c.headCache - metricName, ok := metricNameFromMatcher(ms) - // Lets not cache head if we don;t find an equal matcher for the label __name__ - if !ok { - return func(ctx context.Context) (index.Postings, error) { - return tsdb.PostingsForMatchers(ctx, ix, ms...) + if cache.cfg.Enabled { + metricName, ok := metricNameFromMatcher(ms) + // Lets not cache head if we don;t find an equal matcher for the label __name__ + if !ok { + c.metrics.NonCacheableQueries.WithLabelValues(cache.name).Inc() + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } } - } - seed = c.getSeedForMetricName(metricName) + seed = c.getSeedForMetricName(metricName) + } } // Let's bypass cache if not enabled