diff --git a/CHANGELOG.md b/CHANGELOG.md index 879682273f..63cddfebb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209 * [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257 * [ENHANCEMENT] Upgrade build image and Go version to 1.23.2. #6261 #6262 +* [ENHANCEMENT] Ingester: Introduce a new experimental feature for caching expanded postings on the ingester. #6296 * [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276 * [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310 * [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 6596d1cadb..4fc1958ee0 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1544,4 +1544,38 @@ blocks_storage: # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] + + # [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when + # querying blocks. Caching can be configured separately for the head and + # compacted blocks. + expanded_postings_cache: + # If enabled, ingesters will cache expanded postings for the head block. + # Only queries with with an equal matcher for metric __name__ are cached. + head: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + [enabled: | default = false] + + # Max bytes for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes + [max_bytes: | default = 10485760] + + # TTL for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl + [ttl: | default = 10m] + + # If enabled, ingesters will cache expanded postings for the compacted + # blocks. The cache is shared between all blocks. + blocks: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled + [enabled: | default = false] + + # Max bytes for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes + [max_bytes: | default = 10485760] + + # TTL for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl + [ttl: | default = 10m] ``` diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 08ef369a85..95f90d6ed2 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -1635,4 +1635,38 @@ blocks_storage: # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] + + # [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when + # querying blocks. Caching can be configured separately for the head and + # compacted blocks. + expanded_postings_cache: + # If enabled, ingesters will cache expanded postings for the head block. + # Only queries with with an equal matcher for metric __name__ are cached. + head: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + [enabled: | default = false] + + # Max bytes for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes + [max_bytes: | default = 10485760] + + # TTL for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl + [ttl: | default = 10m] + + # If enabled, ingesters will cache expanded postings for the compacted + # blocks. The cache is shared between all blocks. + blocks: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled + [enabled: | default = false] + + # Max bytes for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes + [max_bytes: | default = 10485760] + + # TTL for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl + [ttl: | default = 10m] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 16b9616df1..28b98a6d18 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2081,6 +2081,40 @@ tsdb: # [EXPERIMENTAL] True to enable native histogram. # CLI flag: -blocks-storage.tsdb.enable-native-histograms [enable_native_histograms: | default = false] + + # [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when + # querying blocks. Caching can be configured separately for the head and + # compacted blocks. + expanded_postings_cache: + # If enabled, ingesters will cache expanded postings for the head block. + # Only queries with with an equal matcher for metric __name__ are cached. + head: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.head.enabled + [enabled: | default = false] + + # Max bytes for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes + [max_bytes: | default = 10485760] + + # TTL for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.head.ttl + [ttl: | default = 10m] + + # If enabled, ingesters will cache expanded postings for the compacted + # blocks. The cache is shared between all blocks. + blocks: + # Whether the postings cache is enabled or not + # CLI flag: -blocks-storage.expanded_postings_cache.block.enabled + [enabled: | default = false] + + # Max bytes for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes + [max_bytes: | default = 10485760] + + # TTL for postings cache + # CLI flag: -blocks-storage.expanded_postings_cache.block.ttl + [ttl: | default = 10m] ``` ### `compactor_config` diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index a9d15a7ea9..664786b4b3 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -205,6 +205,203 @@ func TestDisableChunkTrimmingFuzz(t *testing.T) { } } +func TestExpandedPostingsCacheFuzz(t *testing.T) { + stableCortexImage := "quay.io/cortexproject/cortex:v1.18.0" + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul1 := e2edb.NewConsulWithName("consul1") + consul2 := e2edb.NewConsulWithName("consul2") + require.NoError(t, s.StartAndWaitReady(consul1, consul2)) + + flags1 := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul1.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + flags2 := mergeFlags( + AlertmanagerLocalFlags(), + map[string]string{ + "-store.engine": blocksStorageEngine, + "-blocks-storage.backend": "filesystem", + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "15m", + "-blocks-storage.tsdb.retention-period": "2h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-querier.query-store-for-labels-enabled": "true", + "-blocks-storage.expanded_postings_cache.head.enabled": "true", + "-blocks-storage.expanded_postings_cache.block.enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul2.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + path1 := path.Join(s.SharedDir(), "cortex-1") + path2 := path.Join(s.SharedDir(), "cortex-2") + + flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1}) + flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2}) + // Start Cortex replicas. + cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, stableCortexImage) + cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "") + require.NoError(t, s.StartAndWaitReady(cortex1, cortex2)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + var clients []*e2ecortex.Client + c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + clients = append(clients, c1, c2) + + now := time.Now() + // Push some series to Cortex. + start := now.Add(-24 * time.Hour) + scrapeInterval := 30 * time.Second + + numSeries := 10 + numberOfLabelsPerSeries := 5 + numSamples := 10 + ss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries) + lbls := make([]labels.Labels, numSeries*numberOfLabelsPerSeries) + + for i := 0; i < numSeries; i++ { + for j := 0; j < numberOfLabelsPerSeries; j++ { + series := e2e.GenerateSeriesWithSamples( + fmt.Sprintf("test_series_%d", i), + start, + scrapeInterval, + i*numSamples, + numSamples, + prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)}, + ) + ss[i*numberOfLabelsPerSeries+j] = series + + builder := labels.NewBuilder(labels.EmptyLabels()) + for _, lbl := range series.Labels { + builder.Set(lbl.Name, lbl.Value) + } + lbls[i*numberOfLabelsPerSeries+j] = builder.Labels() + } + } + + rnd := rand.New(rand.NewSource(now.Unix())) + opts := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(true), + promqlsmith.WithEnableAtModifier(true), + } + ps := promqlsmith.New(rnd, lbls, opts...) + + // Create the queries with the original labels + testRun := 100 + queries := make([]string, testRun) + for i := 0; i < testRun; i++ { + expr := ps.WalkRangeQuery() + queries[i] = expr.Pretty(0) + } + + // Lets run multiples iterations and create new series every iteration + for k := 0; k < 5; k++ { + + nss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries) + for i := 0; i < numSeries; i++ { + for j := 0; j < numberOfLabelsPerSeries; j++ { + nss[i*numberOfLabelsPerSeries+j] = e2e.GenerateSeriesWithSamples( + fmt.Sprintf("test_series_%d", i), + start.Add(scrapeInterval*time.Duration(numSamples*j)), + scrapeInterval, + i*numSamples, + numSamples, + prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)}, + prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)}, + ) + } + } + + for _, client := range clients { + res, err := client.Push(nss) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + type testCase struct { + query string + res1, res2 model.Value + err1, err2 error + } + + queryStart := time.Now().Add(-time.Hour * 24) + queryEnd := time.Now() + cases := make([]*testCase, 0, 200) + + for _, query := range queries { + res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval) + res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval) + cases = append(cases, &testCase{ + query: query, + res1: res1, + res2: res2, + err1: err1, + err2: err2, + }) + } + + failures := 0 + for i, tc := range cases { + qt := "range query" + if tc.err1 != nil || tc.err2 != nil { + if !cmp.Equal(tc.err1, tc.err2) { + t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2) + failures++ + } + } else if !cmp.Equal(tc.res1, tc.res2, comparer) { + t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String()) + failures++ + } + } + if failures > 0 { + require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures) + } + } +} + func TestVerticalShardingFuzz(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index dc98d4f4e3..00dd1337ce 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -318,6 +318,8 @@ type userTSDB struct { interner util.Interner blockRetentionPeriod int64 + + postingCache cortex_tsdb.ExpandedPostingsCache } // Explicitly wrapping the tsdb.DB functions that we use. @@ -453,6 +455,10 @@ func (u *userTSDB) PostCreation(metric labels.Labels) { if u.labelsStringInterningEnabled { metric.InternStrings(u.interner.Intern) } + + if u.postingCache != nil { + u.postingCache.ExpireSeries(metric) + } } // PostDeletion implements SeriesLifecycleCallback interface. @@ -470,6 +476,9 @@ func (u *userTSDB) PostDeletion(metrics map[chunks.HeadSeriesRef]labels.Labels) if u.labelsStringInterningEnabled { metric.ReleaseStrings(u.interner.Release) } + if u.postingCache != nil { + u.postingCache.ExpireSeries(metric) + } } } @@ -701,7 +710,8 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, - &i.maxInflightQueryRequests) + &i.maxInflightQueryRequests, + cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled) i.validateMetrics = validation.NewValidateMetrics(registerer) // Replace specific metrics which we can't directly track but we need to read @@ -783,6 +793,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe nil, &i.inflightPushRequests, &i.maxInflightQueryRequests, + cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled || cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled, ) i.TSDBState.shipperIngesterID = "flusher" @@ -2162,6 +2173,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds() + var postingCache cortex_tsdb.ExpandedPostingsCache + if i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Head.Enabled || i.cfg.BlocksStorageConfig.TSDB.PostingsCache.Blocks.Enabled { + logutil.WarnExperimentalUse("expanded postings cache") + postingCache = cortex_tsdb.NewBlocksPostingsForMatchersCache(i.cfg.BlocksStorageConfig.TSDB.PostingsCache, i.metrics.expandedPostingsCacheMetrics) + } + userDB := &userTSDB{ userID: userID, activeSeries: NewActiveSeries(), @@ -2176,6 +2193,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { labelsStringInterningEnabled: i.cfg.LabelsStringInterningEnabled, blockRetentionPeriod: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), + postingCache: postingCache, } enableExemplars := false @@ -2211,6 +2229,12 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks. EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms, + BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + if postingCache != nil { + return cortex_tsdb.NewCachedBlockChunkQuerier(postingCache, b, mint, maxt) + } + return tsdb.NewBlockChunkQuerier(b, mint, maxt) + }, }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 1ee6b41b1a..2f7a4c7cf6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -41,6 +42,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -2060,7 +2062,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Send a lot of samples - _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000, 1)) require.NoError(b, err) ingester.ingestionRate.Tick() @@ -2084,7 +2086,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { // Send some samples for one tenant (not the same that is used during the test) ctx := user.InjectOrgID(context.Background(), "different_tenant") - _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000, 1)) require.NoError(b, err) }, runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { @@ -2104,7 +2106,7 @@ func Benchmark_Ingester_PushOnError(b *testing.B) { return true }, beforeBenchmark: func(b *testing.B, ingester *Ingester, numSeriesPerRequest int) { - _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000)) + _, err := ingester.Push(ctx, generateSamplesForLabel(labels.FromStrings(labels.MetricName, "test"), 10000, 1)) require.NoError(b, err) }, runBenchmark: func(b *testing.B, ingester *Ingester, metrics []labels.Labels, samples []cortexpb.Sample) { @@ -5078,6 +5080,277 @@ func TestIngester_instanceLimitsMetrics(t *testing.T) { `), "cortex_ingester_instance_limits")) } +func TestExpendedPostingsCache(t *testing.T) { + cfg := defaultIngesterTestConfig(t) + cfg.BlocksStorageConfig.TSDB.BlockRanges = []time.Duration{2 * time.Hour} + + runQuery := func(t *testing.T, ctx context.Context, i *Ingester, matchers []*client.LabelMatcher) []client.TimeSeriesChunk { + s := &mockQueryStreamServer{ctx: ctx} + + err := i.QueryStream(&client.QueryRequest{ + StartTimestampMs: 0, + EndTimestampMs: math.MaxInt64, + Matchers: matchers, + }, s) + require.NoError(t, err) + return s.series + } + + tc := map[string]struct { + cacheConfig cortex_tsdb.TSDBPostingsCacheConfig + expectedBlockPostingCall int + expectedHeadPostingCall int + }{ + "cacheDisabled": { + expectedBlockPostingCall: 0, + expectedHeadPostingCall: 0, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Head: cortex_tsdb.PostingsCacheConfig{ + Enabled: false, + }, + Blocks: cortex_tsdb.PostingsCacheConfig{ + Enabled: false, + }, + }, + }, + "enabled cache on compacted blocks": { + expectedBlockPostingCall: 1, + expectedHeadPostingCall: 0, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Blocks: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, + "enabled cache on head": { + expectedBlockPostingCall: 0, + expectedHeadPostingCall: 1, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Head: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, + "enabled cache on compacted blocks and head": { + expectedBlockPostingCall: 1, + expectedHeadPostingCall: 1, + cacheConfig: cortex_tsdb.TSDBPostingsCacheConfig{ + Blocks: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + Head: cortex_tsdb.PostingsCacheConfig{ + Ttl: time.Hour, + MaxBytes: 1024 * 1024 * 1024, + Enabled: true, + }, + }, + }, + } + + for name, c := range tc { + t.Run(name, func(t *testing.T) { + postingsForMatchersCalls := atomic.Int64{} + cfg.BlocksStorageConfig.TSDB.PostingsCache = c.cacheConfig + + cfg.BlocksStorageConfig.TSDB.PostingsCache.PostingsForMatchers = func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + postingsForMatchersCalls.Add(1) + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + cfg.LifecyclerConfig.JoinAfter = 0 + + ctx := user.InjectOrgID(context.Background(), "test") + + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until the ingester is ACTIVE + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + metricNames := []string{"metric1", "metric2"} + + // Generate 4 hours of data so we have 1 block + head + totalSamples := 4 * 60 + var samples = make([]cortexpb.Sample, 0, totalSamples) + + for i := 0; i < totalSamples; i++ { + samples = append(samples, cortexpb.Sample{ + Value: float64(i), + TimestampMs: int64(i * 60 * 1000), + }) + } + + lbls := make([]labels.Labels, 0, len(samples)) + for j := 0; j < 10; j++ { + for i := 0; i < len(samples); i++ { + lbls = append(lbls, labels.FromStrings(labels.MetricName, metricNames[i%len(metricNames)], "a", fmt.Sprintf("aaa%v", j))) + } + } + + for i := len(samples); i < len(lbls); i++ { + samples = append(samples, samples[i%len(samples)]) + } + + req := cortexpb.ToWriteRequest(lbls, samples, nil, nil, cortexpb.API) + _, err = i.Push(ctx, req) + require.NoError(t, err) + + i.compactBlocks(ctx, false, nil) + + extraMatcher := []struct { + matchers []*client.LabelMatcher + expectedLenght int + }{ + { + expectedLenght: 10, + matchers: []*client.LabelMatcher{ + { + Type: client.REGEX_MATCH, + Name: "a", + Value: "aaa.*", + }, + }, + }, + { + expectedLenght: 1, + matchers: []*client.LabelMatcher{ + { + Type: client.EQUAL, + Name: "a", + Value: "aaa1", + }, + }, + }, + } + + // Run queries with no cache + for _, name := range metricNames { + for _, m := range extraMatcher { + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, append(m.matchers, &client.LabelMatcher{Type: client.EQUAL, Name: labels.MetricName, Value: name})), m.expectedLenght) + // Query block and Head + require.Equal(t, int64(c.expectedBlockPostingCall+c.expectedHeadPostingCall), postingsForMatchersCalls.Load()) + } + } + + if c.expectedHeadPostingCall > 0 || c.expectedBlockPostingCall > 0 { + metric := ` + # HELP cortex_ingester_expanded_postings_cache_requests Total number of requests to the cache. + # TYPE cortex_ingester_expanded_postings_cache_requests counter +` + if c.expectedBlockPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_requests{cache="block"} 4 +` + } + + if c.expectedHeadPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_requests{cache="head"} 4 +` + } + + err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_expanded_postings_cache_requests") + require.NoError(t, err) + } + + // Calling again and it should hit the cache + for _, name := range metricNames { + for _, m := range extraMatcher { + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, append(m.matchers, &client.LabelMatcher{Type: client.EQUAL, Name: labels.MetricName, Value: name})), m.expectedLenght) + require.Equal(t, int64(0), postingsForMatchersCalls.Load()) + } + } + + if c.expectedHeadPostingCall > 0 || c.expectedBlockPostingCall > 0 { + metric := ` + # HELP cortex_ingester_expanded_postings_cache_hits Total number of hit requests to the cache. + # TYPE cortex_ingester_expanded_postings_cache_hits counter +` + if c.expectedBlockPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_hits{cache="block"} 4 +` + } + + if c.expectedHeadPostingCall > 0 { + metric += ` + cortex_ingester_expanded_postings_cache_hits{cache="head"} 4 +` + } + + err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_expanded_postings_cache_hits") + require.NoError(t, err) + } + + // Check the number total of series with the first metric name + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricNames[0]}}), 10) + // Query block and head + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedBlockPostingCall+c.expectedHeadPostingCall)) + + // Adding a metric for the first metric name so we expire all caches for that metric name + _, err = i.Push(ctx, cortexpb.ToWriteRequest( + []labels.Labels{labels.FromStrings(labels.MetricName, metricNames[0], "extra", "1")}, []cortexpb.Sample{{Value: 2, TimestampMs: 4 * 60 * 60 * 1000}}, nil, nil, cortexpb.API)) + require.NoError(t, err) + + for in, name := range metricNames { + for _, m := range extraMatcher { + postingsForMatchersCalls.Store(0) + + require.Len(t, runQuery(t, ctx, i, append(m.matchers, &client.LabelMatcher{Type: client.EQUAL, Name: labels.MetricName, Value: name})), m.expectedLenght) + + // first metric name should be expired + if in == 0 { + // Query only head as the block is already cached and the head was expired + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedHeadPostingCall)) + } else { + require.Equal(t, postingsForMatchersCalls.Load(), int64(0)) + } + } + } + + // Check if the new metric name was added + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricNames[0]}}), 11) + // Query only head as the block is already cached and the head was expired + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedHeadPostingCall)) + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: metricNames[0]}}), 11) + // Return all from the caches + require.Equal(t, postingsForMatchersCalls.Load(), int64(0)) + + // Should never cache head postings there is not matcher for the metric name + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) + // Query block and head but bypass head + require.Equal(t, postingsForMatchersCalls.Load(), int64(c.expectedBlockPostingCall)) + if c.cacheConfig.Head.Enabled { + err = testutil.GatherAndCompare(r, bytes.NewBufferString(` + # HELP cortex_ingester_expanded_postings_non_cacheable_queries Total number of non cacheable queries. + # TYPE cortex_ingester_expanded_postings_non_cacheable_queries counter + cortex_ingester_expanded_postings_non_cacheable_queries{cache="head"} 1 +`), "cortex_ingester_expanded_postings_non_cacheable_queries") + require.NoError(t, err) + } + + postingsForMatchersCalls.Store(0) + require.Len(t, runQuery(t, ctx, i, []*client.LabelMatcher{{Type: client.EQUAL, Name: "extra", Value: "1"}}), 1) + // Return cached value from block and bypass head + require.Equal(t, int64(0), postingsForMatchersCalls.Load()) + }) + } +} + func TestIngester_inflightPushRequests(t *testing.T) { limits := InstanceLimits{MaxInflightPushRequests: 1} @@ -5103,7 +5376,7 @@ func TestIngester_inflightPushRequests(t *testing.T) { g, ctx := errgroup.WithContext(ctx) g.Go(func() error { count := 3500000 - req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", count)), count) + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, fmt.Sprintf("real-%d", count)), count, 1) // Signal that we're going to do the real push now. close(startCh) @@ -5120,7 +5393,7 @@ func TestIngester_inflightPushRequests(t *testing.T) { } time.Sleep(10 * time.Millisecond) // Give first goroutine a chance to start pushing... - req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1024) + req := generateSamplesForLabel(labels.FromStrings(labels.MetricName, "testcase"), 1024, 1) _, err := i.Push(ctx, req) require.Equal(t, errTooManyInflightPushRequests, err) @@ -5182,14 +5455,14 @@ func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) { require.Equal(t, err, errTooManyInflightQueryRequests) } -func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest { +func generateSamplesForLabel(l labels.Labels, count int, sampleIntervalInMs int) *cortexpb.WriteRequest { var lbls = make([]labels.Labels, 0, count) var samples = make([]cortexpb.Sample, 0, count) for i := 0; i < count; i++ { samples = append(samples, cortexpb.Sample{ Value: float64(i), - TimestampMs: int64(i), + TimestampMs: int64(i * sampleIntervalInMs), }) lbls = append(lbls, l) } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 3e2d98fc4b..1e5fc1b0c7 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -5,6 +5,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" + "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" util_math "github.com/cortexproject/cortex/pkg/util/math" ) @@ -54,6 +55,9 @@ type ingesterMetrics struct { maxInflightPushRequests prometheus.GaugeFunc inflightRequests prometheus.GaugeFunc inflightQueryRequests prometheus.GaugeFunc + + // Posting Cache Metrics + expandedPostingsCacheMetrics *tsdb.ExpandedPostingsCacheMetrics } func newIngesterMetrics(r prometheus.Registerer, @@ -63,6 +67,7 @@ func newIngesterMetrics(r prometheus.Registerer, ingestionRate *util_math.EwmaRate, inflightPushRequests *atomic.Int64, maxInflightQueryRequests *util_math.MaxTracker, + postingsCacheEnabled bool, ) *ingesterMetrics { const ( instanceLimits = "cortex_ingester_instance_limits" @@ -235,6 +240,10 @@ func newIngesterMetrics(r prometheus.Registerer, }, []string{"user"}), } + if postingsCacheEnabled && r != nil { + m.expandedPostingsCacheMetrics = tsdb.NewPostingCacheMetrics(r) + } + if activeSeriesEnabled && r != nil { r.MustRegister(m.activeSeriesPerUser) } diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index df992686d3..5c2e0e15d5 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -34,7 +34,8 @@ func TestIngesterMetrics(t *testing.T) { }, ingestionRate, inflightPushRequests, - &maxInflightQueryRequests) + &maxInflightQueryRequests, + false) require.NotNil(t, m) diff --git a/pkg/storage/tsdb/cached_chunks_querier.go b/pkg/storage/tsdb/cached_chunks_querier.go new file mode 100644 index 0000000000..3dcb19ab4c --- /dev/null +++ b/pkg/storage/tsdb/cached_chunks_querier.go @@ -0,0 +1,128 @@ +package tsdb + +import ( + "context" + "errors" + "fmt" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + prom_tsdb "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" +) + +/* + This file is basically a copy from https://github.com/prometheus/prometheus/blob/e2e01c1cffbfc4f26f5e9fe6138af87d7ff16122/tsdb/querier.go + with the difference that the PostingsForMatchers function is called from the Postings Cache +*/ + +type blockBaseQuerier struct { + blockID ulid.ULID + index prom_tsdb.IndexReader + chunks prom_tsdb.ChunkReader + tombstones tombstones.Reader + + closed bool + + mint, maxt int64 +} + +func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) { + indexr, err := b.Index() + if err != nil { + return nil, fmt.Errorf("open index reader: %w", err) + } + chunkr, err := b.Chunks() + if err != nil { + indexr.Close() + return nil, fmt.Errorf("open chunk reader: %w", err) + } + tombsr, err := b.Tombstones() + if err != nil { + indexr.Close() + chunkr.Close() + return nil, fmt.Errorf("open tombstone reader: %w", err) + } + + if tombsr == nil { + tombsr = tombstones.NewMemTombstones() + } + return &blockBaseQuerier{ + blockID: b.Meta().ULID, + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil +} + +func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.SortedLabelValues(ctx, name, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.LabelNames(ctx, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) Close() error { + if q.closed { + return errors.New("block querier already closed") + } + + errs := tsdb_errors.NewMulti( + q.index.Close(), + q.chunks.Close(), + q.tombstones.Close(), + ) + q.closed = true + return errs.Err() +} + +type cachedBlockChunkQuerier struct { + *blockBaseQuerier + + cache ExpandedPostingsCache +} + +func NewCachedBlockChunkQuerier(cache ExpandedPostingsCache, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + q, err := newBlockBaseQuerier(b, mint, maxt) + if err != nil { + return nil, err + } + return &cachedBlockChunkQuerier{blockBaseQuerier: q, cache: cache}, nil +} + +func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt, q.cache) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, index prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64, + cache ExpandedPostingsCache, +) storage.ChunkSeriesSet { + disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + + if hints != nil { + mint = hints.Start + maxt = hints.End + disableTrimming = hints.DisableTrimming + } + p, err := cache.PostingsForMatchers(ctx, blockID, index, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + if sharded { + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } + if sortSeries { + p = index.SortedPostings(p) + } + return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming) +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 5a33115182..612061676c 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -166,6 +166,9 @@ type TSDBConfig struct { // Enable native histogram ingestion. EnableNativeHistograms bool `yaml:"enable_native_histograms"` + + // Posting Cache Configuration for TSDB + PostingsCache TSDBPostingsCacheConfig `yaml:"expanded_postings_cache" doc:"description=[EXPERIMENTAL] If enabled, ingesters will cache expanded postings when querying blocks. Caching can be configured separately for the head and compacted blocks."` } // RegisterFlags registers the TSDBConfig flags. @@ -195,6 +198,8 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.") f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.") f.BoolVar(&cfg.EnableNativeHistograms, "blocks-storage.tsdb.enable-native-histograms", false, "[EXPERIMENTAL] True to enable native histogram.") + + cfg.PostingsCache.RegisterFlagsWithPrefix("blocks-storage.", f) } // Validate the config. diff --git a/pkg/storage/tsdb/expanded_postings_cache.go b/pkg/storage/tsdb/expanded_postings_cache.go new file mode 100644 index 0000000000..f3c1f674ee --- /dev/null +++ b/pkg/storage/tsdb/expanded_postings_cache.go @@ -0,0 +1,428 @@ +package tsdb + +import ( + "container/list" + "context" + "flag" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/index" + + "github.com/cortexproject/cortex/pkg/util/extract" +) + +var ( + rangeHeadULID = ulid.MustParse("0000000000XXXXXXXRANGEHEAD") + headULID = ulid.MustParse("0000000000XXXXXXXXXXXXHEAD") +) + +const ( + // size of the seed array. Each seed is a 64bits int (8 bytes) + // totaling 8mb + seedArraySize = 1024 * 1024 + + numOfSeedsStripes = 512 +) + +type ExpandedPostingsCacheMetrics struct { + CacheRequests *prometheus.CounterVec + CacheHits *prometheus.CounterVec + CacheEvicts *prometheus.CounterVec + NonCacheableQueries *prometheus.CounterVec +} + +func NewPostingCacheMetrics(r prometheus.Registerer) *ExpandedPostingsCacheMetrics { + return &ExpandedPostingsCacheMetrics{ + CacheRequests: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_cache_requests", + Help: "Total number of requests to the cache.", + }, []string{"cache"}), + CacheHits: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_cache_hits", + Help: "Total number of hit requests to the cache.", + }, []string{"cache"}), + CacheEvicts: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_cache_evicts", + Help: "Total number of evictions in the cache, excluding items that got evicted due to TTL.", + }, []string{"cache"}), + NonCacheableQueries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_expanded_postings_non_cacheable_queries", + Help: "Total number of non cacheable queries.", + }, []string{"cache"}), + } +} + +type TSDBPostingsCacheConfig struct { + Head PostingsCacheConfig `yaml:"head" doc:"description=If enabled, ingesters will cache expanded postings for the head block. Only queries with with an equal matcher for metric __name__ are cached."` + Blocks PostingsCacheConfig `yaml:"blocks" doc:"description=If enabled, ingesters will cache expanded postings for the compacted blocks. The cache is shared between all blocks."` + + PostingsForMatchers func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) `yaml:"-"` + timeNow func() time.Time `yaml:"-"` +} + +type PostingsCacheConfig struct { + Enabled bool `yaml:"enabled"` + MaxBytes int64 `yaml:"max_bytes"` + Ttl time.Duration `yaml:"ttl"` +} + +func (cfg *TSDBPostingsCacheConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.Head.RegisterFlagsWithPrefix(prefix, "head", f) + cfg.Blocks.RegisterFlagsWithPrefix(prefix, "block", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *PostingsCacheConfig) RegisterFlagsWithPrefix(prefix, block string, f *flag.FlagSet) { + f.Int64Var(&cfg.MaxBytes, prefix+"expanded_postings_cache."+block+".max-bytes", 10*1024*1024, "Max bytes for postings cache") + f.DurationVar(&cfg.Ttl, prefix+"expanded_postings_cache."+block+".ttl", 10*time.Minute, "TTL for postings cache") + f.BoolVar(&cfg.Enabled, prefix+"expanded_postings_cache."+block+".enabled", false, "Whether the postings cache is enabled or not") +} + +type ExpandedPostingsCache interface { + PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + ExpireSeries(metric labels.Labels) +} + +type BlocksPostingsForMatchersCache struct { + strippedLock []sync.RWMutex + + headCache *fifoCache[[]storage.SeriesRef] + blocksCache *fifoCache[[]storage.SeriesRef] + + headSeedByMetricName []int + postingsForMatchersFunc func(ctx context.Context, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) + timeNow func() time.Time + + metrics *ExpandedPostingsCacheMetrics +} + +func NewBlocksPostingsForMatchersCache(cfg TSDBPostingsCacheConfig, metrics *ExpandedPostingsCacheMetrics) ExpandedPostingsCache { + if cfg.PostingsForMatchers == nil { + cfg.PostingsForMatchers = tsdb.PostingsForMatchers + } + + if cfg.timeNow == nil { + cfg.timeNow = time.Now + } + + return &BlocksPostingsForMatchersCache{ + headCache: newFifoCache[[]storage.SeriesRef](cfg.Head, "head", metrics, cfg.timeNow), + blocksCache: newFifoCache[[]storage.SeriesRef](cfg.Blocks, "block", metrics, cfg.timeNow), + headSeedByMetricName: make([]int, seedArraySize), + strippedLock: make([]sync.RWMutex, numOfSeedsStripes), + postingsForMatchersFunc: cfg.PostingsForMatchers, + timeNow: cfg.timeNow, + metrics: metrics, + } +} + +func (c *BlocksPostingsForMatchersCache) ExpireSeries(metric labels.Labels) { + metricName, err := extract.MetricNameFromLabels(metric) + if err != nil { + return + } + + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := h % uint64(len(c.strippedLock)) + c.strippedLock[l].Lock() + defer c.strippedLock[l].Unlock() + c.headSeedByMetricName[i]++ +} + +func (c *BlocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context, blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) (index.Postings, error) { + return c.fetchPostings(blockID, ix, ms...)(ctx) +} + +func (c *BlocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) { + var seed string + cache := c.blocksCache + + // If is a head block, lets add the seed on the cache key so we can + // invalidate the cache when new series are created for this metric name + if isHeadBlock(blockID) { + cache = c.headCache + if cache.cfg.Enabled { + metricName, ok := metricNameFromMatcher(ms) + // Lets not cache head if we don;t find an equal matcher for the label __name__ + if !ok { + c.metrics.NonCacheableQueries.WithLabelValues(cache.name).Inc() + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + } + + seed = c.getSeedForMetricName(metricName) + } + } + + // Let's bypass cache if not enabled + if !cache.cfg.Enabled { + return func(ctx context.Context) (index.Postings, error) { + return tsdb.PostingsForMatchers(ctx, ix, ms...) + } + } + + c.metrics.CacheRequests.WithLabelValues(cache.name).Inc() + + fetch := func() ([]storage.SeriesRef, int64, error) { + // Use context.Background() as this promise is maybe shared across calls + postings, err := c.postingsForMatchersFunc(context.Background(), ix, ms...) + + if err == nil { + ids, err := index.ExpandPostings(postings) + return ids, int64(len(ids) * 8), err + } + + return nil, 0, err + } + + key := c.cacheKey(seed, blockID, ms...) + promise, loaded := cache.getPromiseForKey(key, fetch) + if loaded { + c.metrics.CacheHits.WithLabelValues(cache.name).Inc() + } + + return c.result(promise) +} + +func (c *BlocksPostingsForMatchersCache) result(promise *cacheEntryPromise[[]storage.SeriesRef]) func(ctx context.Context) (index.Postings, error) { + return func(ctx context.Context) (index.Postings, error) { + ids, err := promise.result(ctx) + return index.NewListPostings(ids), err + } +} + +func (c *BlocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string { + h := MemHashString(metricName) + i := h % uint64(len(c.headSeedByMetricName)) + l := h % uint64(len(c.strippedLock)) + c.strippedLock[l].RLock() + defer c.strippedLock[l].RUnlock() + return strconv.Itoa(c.headSeedByMetricName[i]) +} + +func (c *BlocksPostingsForMatchersCache) cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string { + slices.SortFunc(ms, func(i, j *labels.Matcher) int { + if i.Type != j.Type { + return int(i.Type - j.Type) + } + if i.Name != j.Name { + return strings.Compare(i.Name, j.Name) + } + if i.Value != j.Value { + return strings.Compare(i.Value, j.Value) + } + return 0 + }) + + const ( + typeLen = 2 + sepLen = 1 + ) + + var size int + for _, m := range ms { + size += len(seed) + len(blockID.String()) + len(m.Name) + len(m.Value) + typeLen + 2*sepLen + } + sb := strings.Builder{} + sb.Grow(size) + sb.WriteString(seed) + sb.WriteByte('|') + sb.WriteString(blockID.String()) + for _, m := range ms { + sb.WriteString(m.Name) + sb.WriteString(m.Type.String()) + sb.WriteString(m.Value) + sb.WriteByte('|') + } + key := sb.String() + return key +} + +func isHeadBlock(blockID ulid.ULID) bool { + return blockID == rangeHeadULID || blockID == headULID +} + +func metricNameFromMatcher(ms []*labels.Matcher) (string, bool) { + for _, m := range ms { + if m.Name == labels.MetricName && m.Type == labels.MatchEqual { + return m.Value, true + } + } + + return "", false +} + +type fifoCache[V any] struct { + cfg PostingsCacheConfig + cachedValues *sync.Map + timeNow func() time.Time + name string + metrics ExpandedPostingsCacheMetrics + + // Fields from here should be locked + cachedMtx sync.RWMutex + cached *list.List + cachedBytes int64 +} + +func newFifoCache[V any](cfg PostingsCacheConfig, name string, metrics *ExpandedPostingsCacheMetrics, timeNow func() time.Time) *fifoCache[V] { + return &fifoCache[V]{ + cachedValues: new(sync.Map), + cached: list.New(), + cfg: cfg, + timeNow: timeNow, + name: name, + metrics: *metrics, + } +} + +func (c *fifoCache[V]) expire() { + if c.cfg.Ttl <= 0 { + return + } + c.cachedMtx.RLock() + if !c.shouldEvictHead() { + c.cachedMtx.RUnlock() + return + } + c.cachedMtx.RUnlock() + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + for c.shouldEvictHead() { + c.evictHead() + } +} + +func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) { + r := &cacheEntryPromise[V]{ + done: make(chan struct{}), + } + defer close(r.done) + + if !c.cfg.Enabled { + r.v, _, r.err = fetch() + return r, false + } + + loaded, ok := c.cachedValues.LoadOrStore(k, r) + + if !ok { + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + r.ts = c.timeNow() + c.created(k, r.sizeBytes) + c.expire() + } + + // If is cached but is expired, lets try to replace the cache value + if ok && loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) { + if c.cachedValues.CompareAndSwap(k, loaded, r) { + r.v, r.sizeBytes, r.err = fetch() + r.sizeBytes += int64(len(k)) + c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes) + loaded = r + r.ts = c.timeNow() + ok = false + } + } + + return loaded.(*cacheEntryPromise[V]), ok +} + +func (c *fifoCache[V]) contains(k string) bool { + _, ok := c.cachedValues.Load(k) + return ok +} + +func (c *fifoCache[V]) shouldEvictHead() bool { + if c.cachedBytes > c.cfg.MaxBytes { + c.metrics.CacheEvicts.WithLabelValues(c.name).Inc() + return true + } + h := c.cached.Front() + if h == nil { + return false + } + key := h.Value.(string) + + if l, ok := c.cachedValues.Load(key); ok { + return l.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) + } + + return false +} + +func (c *fifoCache[V]) evictHead() { + front := c.cached.Front() + c.cached.Remove(front) + oldestKey := front.Value.(string) + if oldest, loaded := c.cachedValues.LoadAndDelete(oldestKey); loaded { + c.cachedBytes -= oldest.(*cacheEntryPromise[V]).sizeBytes + } +} + +func (c *fifoCache[V]) created(key string, sizeBytes int64) { + if c.cfg.Ttl <= 0 { + c.cachedValues.Delete(key) + return + } + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cached.PushBack(key) + c.cachedBytes += sizeBytes +} + +func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) { + if oldSize == newSizeBytes { + return + } + + c.cachedMtx.Lock() + defer c.cachedMtx.Unlock() + c.cachedBytes += newSizeBytes - oldSize +} + +type cacheEntryPromise[V any] struct { + ts time.Time + sizeBytes int64 + + done chan struct{} + v V + err error +} + +func (ce *cacheEntryPromise[V]) result(ctx context.Context) (V, error) { + select { + case <-ctx.Done(): + return ce.v, ctx.Err() + case <-ce.done: + if ctx.Err() != nil { + return ce.v, ctx.Err() + } + + return ce.v, ce.err + } +} + +func (ce *cacheEntryPromise[V]) isExpired(ttl time.Duration, now time.Time) bool { + ts := ce.ts + r := now.Sub(ts) + return r >= ttl +} + +func MemHashString(str string) uint64 { + return xxhash.Sum64(yoloBuf(str)) +} diff --git a/pkg/storage/tsdb/expanded_postings_cache_test.go b/pkg/storage/tsdb/expanded_postings_cache_test.go new file mode 100644 index 0000000000..d4b80abcbd --- /dev/null +++ b/pkg/storage/tsdb/expanded_postings_cache_test.go @@ -0,0 +1,125 @@ +package tsdb + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestFifoCacheDisabled(t *testing.T) { + cfg := PostingsCacheConfig{} + cfg.Enabled = false + m := NewPostingCacheMetrics(prometheus.DefaultRegisterer) + timeNow := time.Now + cache := newFifoCache[int](cfg, "test", m, timeNow) + old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) { + return 1, 0, nil + }) + require.False(t, loaded) + v, err := old.result(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, v) + require.False(t, cache.contains("key1")) +} + +func TestFifoCacheExpire(t *testing.T) { + + keySize := 20 + numberOfKeys := 100 + + tc := map[string]struct { + cfg PostingsCacheConfig + expectedFinalItems int + ttlExpire bool + }{ + "MaxBytes": { + expectedFinalItems: 10, + cfg: PostingsCacheConfig{ + Enabled: true, + Ttl: time.Hour, + MaxBytes: int64(10 * (8 + keySize)), + }, + }, + "TTL": { + expectedFinalItems: numberOfKeys, + ttlExpire: true, + cfg: PostingsCacheConfig{ + Enabled: true, + Ttl: time.Hour, + MaxBytes: 10 << 20, + }, + }, + } + + for name, c := range tc { + t.Run(name, func(t *testing.T) { + m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry()) + timeNow := time.Now + cache := newFifoCache[int](c.cfg, "test", m, timeNow) + + for i := 0; i < numberOfKeys; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 8, nil + }) + require.False(t, loaded) + v, err := p.result(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, v) + require.True(t, cache.contains(key)) + p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) { + return 1, 0, nil + }) + require.True(t, loaded) + v, err = p.result(context.Background()) + require.NoError(t, err) + require.Equal(t, 1, v) + } + + totalCacheSize := 0 + + for i := 0; i < numberOfKeys; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + if cache.contains(key) { + totalCacheSize++ + } + } + + require.Equal(t, c.expectedFinalItems, totalCacheSize) + + if c.ttlExpire { + cache.timeNow = func() time.Time { + return timeNow().Add(2 * c.cfg.Ttl) + } + + for i := 0; i < numberOfKeys; i++ { + key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize) + originalSize := cache.cachedBytes + p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) { + return 2, 18, nil + }) + require.False(t, loaded) + v, err := p.result(context.Background()) + require.NoError(t, err) + // New value + require.Equal(t, 2, v) + // Total Size Updated + require.Equal(t, originalSize+10, cache.cachedBytes) + } + } + }) + } +} + +func RepeatStringIfNeeded(seed string, length int) string { + if len(seed) > length { + return seed + } + + return strings.Repeat(seed, 1+length/len(seed))[:max(length, len(seed))] +} diff --git a/pkg/storage/tsdb/multilevel_chunk_cache.go b/pkg/storage/tsdb/multilevel_chunk_cache.go index e1b0f5bc20..8daa2f56ce 100644 --- a/pkg/storage/tsdb/multilevel_chunk_cache.go +++ b/pkg/storage/tsdb/multilevel_chunk_cache.go @@ -98,6 +98,7 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues()) defer timer.ObserveDuration() + missingKeys := keys hits := map[string][]byte{} backfillItems := make([]map[string][]byte, len(m.caches)-1) @@ -108,13 +109,25 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str if ctx.Err() != nil { return nil } - if data := c.Fetch(ctx, keys); len(data) > 0 { + if data := c.Fetch(ctx, missingKeys); len(data) > 0 { for k, d := range data { hits[k] = d } if i > 0 && len(hits) > 0 { - backfillItems[i-1] = hits + // lets fetch only the mising keys + m := missingKeys[:0] + for _, key := range missingKeys { + if _, ok := hits[key]; !ok { + m = append(m, key) + } + } + + missingKeys = m + + for k, b := range hits { + backfillItems[i-1][k] = b + } } if len(hits) == len(keys) { diff --git a/pkg/storage/tsdb/multilevel_chunk_cache_test.go b/pkg/storage/tsdb/multilevel_chunk_cache_test.go index c72c1f3a55..4b50b8fd02 100644 --- a/pkg/storage/tsdb/multilevel_chunk_cache_test.go +++ b/pkg/storage/tsdb/multilevel_chunk_cache_test.go @@ -6,8 +6,10 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/cache" ) func Test_MultiLevelChunkCacheStore(t *testing.T) { @@ -72,6 +74,43 @@ func Test_MultiLevelChunkCacheStore(t *testing.T) { } } +func Test_MultiLevelChunkCacheFetchRace(t *testing.T) { + cfg := MultiLevelChunkCacheConfig{ + MaxAsyncConcurrency: 10, + MaxAsyncBufferSize: 100000, + MaxBackfillItems: 10000, + BackFillTTL: time.Hour * 24, + } + reg := prometheus.NewRegistry() + + m1 := newMockChunkCache("m1", map[string][]byte{ + "key1": []byte("value1"), + "key2": []byte("value2"), + }) + + inMemory, err := cache.NewInMemoryCacheWithConfig("test", log.NewNopLogger(), reg, cache.InMemoryCacheConfig{MaxSize: 10 * 1024, MaxItemSize: 1024}) + require.NoError(t, err) + + inMemory.Store(map[string][]byte{ + "key2": []byte("value2"), + "key3": []byte("value3"), + }, time.Minute) + + c := newMultiLevelChunkCache("chunk-cache", cfg, reg, inMemory, m1) + + hits := c.Fetch(context.Background(), []string{"key1", "key2", "key3", "key4"}) + + require.Equal(t, 3, len(hits)) + + // We should be able to change the returned values without any race problem + delete(hits, "key1") + + mlc := c.(*multiLevelChunkCache) + //Wait until async operation finishes. + mlc.backfillProcessor.Stop() + +} + func Test_MultiLevelChunkCacheFetch(t *testing.T) { cfg := MultiLevelChunkCacheConfig{ MaxAsyncConcurrency: 10, @@ -81,12 +120,14 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { } testCases := map[string]struct { - m1ExistingData map[string][]byte - m2ExistingData map[string][]byte - expectedM1Data map[string][]byte - expectedM2Data map[string][]byte - expectedFetchedData map[string][]byte - fetchKeys []string + m1ExistingData map[string][]byte + m2ExistingData map[string][]byte + expectedM1Data map[string][]byte + expectedM2Data map[string][]byte + expectedFetchedData map[string][]byte + expectedM1FetchedKeys []string + expectedM2FetchedKeys []string + fetchKeys []string }{ "fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": { m1ExistingData: map[string][]byte{ @@ -96,6 +137,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { "key2": []byte("value2"), "key3": []byte("value3"), }, + expectedM1FetchedKeys: []string{"key1", "key2", "key3"}, + expectedM2FetchedKeys: []string{"key2", "key3"}, expectedM1Data: map[string][]byte{ "key1": []byte("value1"), "key2": []byte("value2"), @@ -119,6 +162,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) { m2ExistingData: map[string][]byte{ "key2": []byte("value2"), }, + expectedM1FetchedKeys: []string{"key1", "key2", "key3"}, + expectedM2FetchedKeys: []string{"key2", "key3"}, expectedM1Data: map[string][]byte{ "key1": []byte("value1"), "key2": []byte("value2"), @@ -157,6 +202,8 @@ type mockChunkCache struct { mu sync.Mutex name string data map[string][]byte + + fetchedKeys []string } func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache { @@ -180,6 +227,7 @@ func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]by h := map[string][]byte{} for _, k := range keys { + m.fetchedKeys = append(m.fetchedKeys, k) if _, ok := m.data[k]; ok { h[k] = m.data[k] }