diff --git a/CHANGELOG.md b/CHANGELOG.md index ac47f0cea84..eabc0ef996a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ * [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 * [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172 * [CHANGE] Remove fluentd-based billing infrastructure and flags such as `-distributor.enable-billing`. #1491 +* [CHANGE] Experimental TSDB: the querier in-memory index cache used by the experimental blocks storage shifted from per-tenant to per-querier. The `-experimental.tsdb.bucket-store.index-cache-size-bytes` now configures the per-querier index cache max size instead of a per-tenant cache and its default has been increased to 1GB. #2189 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5a5d7cbb55c..5d5ea6b1484 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2139,10 +2139,10 @@ bucket_store: # CLI flag: -experimental.tsdb.bucket-store.sync-interval [sync_interval: | default = 5m0s] - # Size - in bytes - of a per-tenant in-memory index cache used to speed up - # blocks index lookups. + # Size in bytes of in-memory index cache used to speed up blocks index lookups + # (shared between all tenants). # CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes - [index_cache_size_bytes: | default = 262144000] + [index_cache_size_bytes: | default = 1073741824] # Max size - in bytes - of a per-tenant chunk pool, used to reduce memory # allocations. diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 5b36c001d0f..2ac5d6479b0 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -133,10 +133,10 @@ tsdb: # CLI flag: -experimental.tsdb.bucket-store.sync-interval [sync_interval: | default = 5m0s] - # Size - in bytes - of a per-tenant in-memory index cache used to speed up - # blocks index lookups. + # Size in bytes of in-memory index cache used to speed up blocks index + # lookups (shared between all tenants). # CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes - [index_cache_size_bytes: | default = 262144000] + [index_cache_size_bytes: | default = 1073741824] # Max size - in bytes - of a per-tenant chunk pool, used to reduce memory # allocations. diff --git a/integration/configs.go b/integration/configs.go index 5b8d17bf136..5265a9a6043 100644 --- a/integration/configs.go +++ b/integration/configs.go @@ -50,6 +50,7 @@ var ( "-experimental.tsdb.bucket-store.sync-interval": "5s", "-experimental.tsdb.retention-period": "5m", "-experimental.tsdb.ship-interval": "1m", + "-experimental.tsdb.head-compaction-interval": "1s", "-experimental.tsdb.s3.access-key-id": e2edb.MinioAccessKey, "-experimental.tsdb.s3.secret-access-key": e2edb.MinioSecretKey, "-experimental.tsdb.s3.bucket-name": "cortex", diff --git a/integration/querier_test.go b/integration/querier_test.go new file mode 100644 index 00000000000..11dd2eb0951 --- /dev/null +++ b/integration/querier_test.go @@ -0,0 +1,139 @@ +// +build integration + +package main + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func TestQuerierWithBlocksStorage(t *testing.T) { + tests := map[string]struct { + flags map[string]string + }{ + "querier running with ingester gRPC streaming disabled": { + flags: mergeFlags(BlocksStorageFlags, map[string]string{ + "-querier.ingester-streaming": "false", + }), + }, + } + + for testName, testCfg := range tests { + t.Run(testName, func(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(testCfg.flags, map[string]string{ + "-experimental.tsdb.block-ranges-period": blockRangePeriod.String(), + "-experimental.tsdb.ship-interval": "1s", + "-experimental.tsdb.bucket-store.sync-interval": "1s", + "-experimental.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-experimental.tsdb.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier)) + + // Wait until both the distributor and querier have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series1, expectedVector1 := generateSeries("series_1", series1Timestamp) + series2, expectedVector2 := generateSeries("series_2", series2Timestamp) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2ns series in in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + + // Push another series to further compact another block and delete the first block + // due to expired retention. + series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2) + series3, expectedVector3 := generateSeries("series_3", series3Timestamp) + + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + + // Wait until the querier has synched the new uploaded blocks. + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_bucket_store_blocks_loaded")) + + // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). + // TODO: apparently Thanos has a bug which cause a block to not be considered if the + // query timetamp matches the block max timestamp + series1Timestamp = series1Timestamp.Add(time.Duration(time.Millisecond)) + expectedVector1[0].Timestamp = model.Time(e2e.TimeToMilliseconds(series1Timestamp)) + + result, err := c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + result, err = c.Query("series_2", series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector2, result.(model.Vector)) + + result, err = c.Query("series_3", series3Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector3, result.(model.Vector)) + + // Check the in-memory index cache metrics (in the querier). + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty + + // Query back again the 1st series from storage. This time it should use the index cache. + result, err = c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache + }) + } +} diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index da299766d1d..1083bc26d43 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/model" @@ -31,16 +32,20 @@ import ( // UserStore is a multi-tenant version of Thanos BucketStore type UserStore struct { - logger log.Logger - cfg tsdb.Config - bucket objstore.Bucket - client storepb.StoreClient - logLevel logging.Level - tsdbMetrics *tsdbBucketStoreMetrics + logger log.Logger + cfg tsdb.Config + bucket objstore.Bucket + client storepb.StoreClient + logLevel logging.Level + bucketStoreMetrics *tsdbBucketStoreMetrics + indexCacheMetrics *tsdbIndexCacheMetrics syncMint model.TimeOrDurationValue syncMaxt model.TimeOrDurationValue + // Index cache shared across all tenants. + indexCache storecache.IndexCache + // Keeps a bucket store for each tenant. storesMu sync.RWMutex stores map[string]*store.BucketStore @@ -55,16 +60,20 @@ type UserStore struct { // NewUserStore returns a new UserStore func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) { + var err error + workersCtx, workersCancel := context.WithCancel(context.Background()) + indexCacheRegistry := prometheus.NewRegistry() u := &UserStore{ - logger: logger, - cfg: cfg, - bucket: bucketClient, - stores: map[string]*store.BucketStore{}, - logLevel: logLevel, - tsdbMetrics: newTSDBBucketStoreMetrics(), - workersCancel: workersCancel, + logger: logger, + cfg: cfg, + bucket: bucketClient, + stores: map[string]*store.BucketStore{}, + logLevel: logLevel, + bucketStoreMetrics: newTSDBBucketStoreMetrics(), + indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry), + workersCancel: workersCancel, syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_querier_blocks_sync_seconds", Help: "The total time it takes to perform a sync stores", @@ -73,15 +82,20 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin } // Configure the time range to sync all blocks. - if err := u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil { + if err = u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil { return nil, err } - if err := u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil { + if err = u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil { return nil, err } + // Init the index cache. + if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore, logger, indexCacheRegistry); err != nil { + return nil, errors.Wrap(err, "create index cache") + } + if registerer != nil { - registerer.MustRegister(u.syncTimes, u.tsdbMetrics) + registerer.MustRegister(u.syncTimes, u.bucketStoreMetrics, u.indexCacheMetrics) } serv := grpc.NewServer() @@ -357,16 +371,6 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) userBkt := tsdb.NewUserBucketClient(userID, u.bucket) reg := prometheus.NewRegistry() - indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes - maxItemSizeBytes := indexCacheSizeBytes / 2 - indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(userLogger, reg, storecache.InMemoryIndexCacheConfig{ - MaxSize: storecache.Bytes(indexCacheSizeBytes), - MaxItemSize: storecache.Bytes(maxItemSizeBytes), - }) - if err != nil { - return nil, err - } - fetcher, err := block.NewMetaFetcher( userLogger, u.cfg.BucketStore.MetaSyncConcurrency, @@ -385,7 +389,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) userBkt, fetcher, filepath.Join(u.cfg.BucketStore.SyncDir, userID), - indexCache, + u.indexCache, uint64(u.cfg.BucketStore.MaxChunkPoolBytes), u.cfg.BucketStore.MaxSampleCount, u.cfg.BucketStore.MaxConcurrent, @@ -402,7 +406,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error) } u.stores[userID] = bs - u.tsdbMetrics.addUserRegistry(userID, reg) + u.bucketStoreMetrics.addUserRegistry(userID, reg) return bs, nil } diff --git a/pkg/querier/block_store_metrics.go b/pkg/querier/block_store_metrics.go index ca0aaf9bdb7..91d38116440 100644 --- a/pkg/querier/block_store_metrics.go +++ b/pkg/querier/block_store_metrics.go @@ -35,20 +35,6 @@ type tsdbBucketStoreMetrics struct { // Ignored: // blocks_meta_synced - - // Metrics gathered from Thanos storecache.InMemoryIndexCache - cacheItemsEvicted *prometheus.Desc - cacheItemsAdded *prometheus.Desc - cacheRequests *prometheus.Desc - cacheItemsOverflow *prometheus.Desc - cacheHits *prometheus.Desc - cacheItemsCurrentCount *prometheus.Desc - cacheItemsCurrentSize *prometheus.Desc - cacheItemsTotalCurrentSize *prometheus.Desc - - // Ignored: - // thanos_store_index_cache_max_size_bytes - // thanos_store_index_cache_max_item_size_bytes } func newTSDBBucketStoreMetrics() *tsdbBucketStoreMetrics { @@ -120,40 +106,6 @@ func newTSDBBucketStoreMetrics() *tsdbBucketStoreMetrics { "cortex_querier_bucket_store_blocks_meta_sync_duration_seconds", "TSDB: Duration of the blocks metadata synchronization in seconds", nil, nil), - - // Cache - cacheItemsEvicted: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_evicted_total", - "TSDB: Total number of items that were evicted from the index cache.", - []string{"item_type"}, nil), - cacheItemsAdded: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_added_total", - "TSDB: Total number of items that were added to the index cache.", - []string{"item_type"}, nil), - cacheRequests: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_requests_total", - "TSDB: Total number of requests to the cache.", - []string{"item_type"}, nil), - cacheItemsOverflow: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_overflowed_total", - "TSDB: Total number of items that could not be added to the cache due to being too big.", - []string{"item_type"}, nil), - cacheHits: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_hits_total", - "TSDB: Total number of requests to the cache that were a hit.", - []string{"item_type"}, nil), - cacheItemsCurrentCount: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items", - "TSDB: Current number of items in the index cache.", - []string{"item_type"}, nil), - cacheItemsCurrentSize: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_items_size_bytes", - "TSDB: Current byte size of items in the index cache.", - []string{"item_type"}, nil), - cacheItemsTotalCurrentSize: prometheus.NewDesc( - "cortex_querier_blocks_index_cache_total_size_bytes", - "TSDB: Current byte size of items (both value and key) in the index cache.", - []string{"item_type"}, nil), } } @@ -193,15 +145,6 @@ func (m *tsdbBucketStoreMetrics) Describe(out chan<- *prometheus.Desc) { out <- m.metaSyncs out <- m.metaSyncFailures out <- m.metaSyncDuration - - out <- m.cacheItemsEvicted - out <- m.cacheItemsAdded - out <- m.cacheRequests - out <- m.cacheItemsOverflow - out <- m.cacheHits - out <- m.cacheItemsCurrentCount - out <- m.cacheItemsCurrentSize - out <- m.cacheItemsTotalCurrentSize } func (m *tsdbBucketStoreMetrics) Collect(out chan<- prometheus.Metric) { @@ -227,6 +170,83 @@ func (m *tsdbBucketStoreMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfCounters(out, m.metaSyncs, "blocks_meta_syncs_total") data.SendSumOfCounters(out, m.metaSyncFailures, "blocks_meta_sync_failures_total") data.SendSumOfHistograms(out, m.metaSyncDuration, "blocks_meta_sync_duration_seconds") +} + +// This struct aggregates metrics exported by Thanos Index Cache +// and re-exports as Cortex metrics. +type tsdbIndexCacheMetrics struct { + reg *prometheus.Registry + + // Metrics gathered from Thanos storecache.InMemoryIndexCache + cacheItemsEvicted *prometheus.Desc + cacheItemsAdded *prometheus.Desc + cacheRequests *prometheus.Desc + cacheItemsOverflow *prometheus.Desc + cacheHits *prometheus.Desc + cacheItemsCurrentCount *prometheus.Desc + cacheItemsCurrentSize *prometheus.Desc + cacheItemsTotalCurrentSize *prometheus.Desc + + // Ignored: + // thanos_store_index_cache_max_size_bytes + // thanos_store_index_cache_max_item_size_bytes +} + +func newTSDBIndexCacheMetrics(reg *prometheus.Registry) *tsdbIndexCacheMetrics { + return &tsdbIndexCacheMetrics{ + reg: reg, + + // Cache + cacheItemsEvicted: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_evicted_total", + "TSDB: Total number of items that were evicted from the index cache.", + []string{"item_type"}, nil), + cacheItemsAdded: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_added_total", + "TSDB: Total number of items that were added to the index cache.", + []string{"item_type"}, nil), + cacheRequests: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_requests_total", + "TSDB: Total number of requests to the cache.", + []string{"item_type"}, nil), + cacheItemsOverflow: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_overflowed_total", + "TSDB: Total number of items that could not be added to the cache due to being too big.", + []string{"item_type"}, nil), + cacheHits: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_hits_total", + "TSDB: Total number of requests to the cache that were a hit.", + []string{"item_type"}, nil), + cacheItemsCurrentCount: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items", + "TSDB: Current number of items in the index cache.", + []string{"item_type"}, nil), + cacheItemsCurrentSize: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_size_bytes", + "TSDB: Current byte size of items in the index cache.", + []string{"item_type"}, nil), + cacheItemsTotalCurrentSize: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_total_size_bytes", + "TSDB: Current byte size of items (both value and key) in the index cache.", + []string{"item_type"}, nil), + } +} + +func (m *tsdbIndexCacheMetrics) Describe(out chan<- *prometheus.Desc) { + out <- m.cacheItemsEvicted + out <- m.cacheItemsAdded + out <- m.cacheRequests + out <- m.cacheItemsOverflow + out <- m.cacheHits + out <- m.cacheItemsCurrentCount + out <- m.cacheItemsCurrentSize + out <- m.cacheItemsTotalCurrentSize +} + +func (m *tsdbIndexCacheMetrics) Collect(out chan<- prometheus.Metric) { + data := util.BuildMetricFamiliesPerUserFromUserRegistries(map[string]*prometheus.Registry{ + "": m.reg, + }) data.SendSumOfCountersWithLabels(out, m.cacheItemsEvicted, "thanos_store_index_cache_items_evicted_total", "item_type") data.SendSumOfCountersWithLabels(out, m.cacheItemsAdded, "thanos_store_index_cache_items_added_total", "item_type") diff --git a/pkg/querier/bucket_store_metrics_test.go b/pkg/querier/bucket_store_metrics_test.go index f3facd1507a..5bf530af274 100644 --- a/pkg/querier/bucket_store_metrics_test.go +++ b/pkg/querier/bucket_store_metrics_test.go @@ -10,15 +10,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestTsdbBucketStoreMetrics(t *testing.T) { +func TestTSDBBucketStoreMetrics(t *testing.T) { mainReg := prometheus.NewPedanticRegistry() tsdbMetrics := newTSDBBucketStoreMetrics() mainReg.MustRegister(tsdbMetrics) - tsdbMetrics.addUserRegistry("user1", populateTSDBBucketStore(5328)) - tsdbMetrics.addUserRegistry("user2", populateTSDBBucketStore(6908)) - tsdbMetrics.addUserRegistry("user3", populateTSDBBucketStore(10283)) + tsdbMetrics.addUserRegistry("user1", populateTSDBBucketStoreMetrics(5328)) + tsdbMetrics.addUserRegistry("user2", populateTSDBBucketStoreMetrics(6908)) + tsdbMetrics.addUserRegistry("user3", populateTSDBBucketStoreMetrics(10283)) //noinspection ALL err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` @@ -141,46 +141,56 @@ func TestTsdbBucketStoreMetrics(t *testing.T) { # TYPE cortex_querier_bucket_store_series_result_series summary cortex_querier_bucket_store_series_result_series_sum 1.238545e+06 cortex_querier_bucket_store_series_result_series_count 6 +`)) + require.NoError(t, err) +} +func TestTSDBIndexCacheMetrics(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + cacheMetrics := newTSDBIndexCacheMetrics(populateTSDBIndexCacheMetrics(5328)) + mainReg.MustRegister(cacheMetrics) + + //noinspection ALL + err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` # HELP cortex_querier_blocks_index_cache_items_evicted_total TSDB: Total number of items that were evicted from the index cache. # TYPE cortex_querier_blocks_index_cache_items_evicted_total counter - cortex_querier_blocks_index_cache_items_evicted_total{item_type="Postings"} 1125950 - cortex_querier_blocks_index_cache_items_evicted_total{item_type="Series"} 1148469 + cortex_querier_blocks_index_cache_items_evicted_total{item_type="Postings"} 5328 + cortex_querier_blocks_index_cache_items_evicted_total{item_type="Series"} 10656 # HELP cortex_querier_blocks_index_cache_requests_total TSDB: Total number of requests to the cache. # TYPE cortex_querier_blocks_index_cache_requests_total counter - cortex_querier_blocks_index_cache_requests_total{item_type="Postings"} 1170988 - cortex_querier_blocks_index_cache_requests_total{item_type="Series"} 1193507 + cortex_querier_blocks_index_cache_requests_total{item_type="Postings"} 15984 + cortex_querier_blocks_index_cache_requests_total{item_type="Series"} 21312 # HELP cortex_querier_blocks_index_cache_hits_total TSDB: Total number of requests to the cache that were a hit. # TYPE cortex_querier_blocks_index_cache_hits_total counter - cortex_querier_blocks_index_cache_hits_total{item_type="Postings"} 1216026 - cortex_querier_blocks_index_cache_hits_total{item_type="Series"} 1238545 + cortex_querier_blocks_index_cache_hits_total{item_type="Postings"} 26640 + cortex_querier_blocks_index_cache_hits_total{item_type="Series"} 31968 # HELP cortex_querier_blocks_index_cache_items_added_total TSDB: Total number of items that were added to the index cache. # TYPE cortex_querier_blocks_index_cache_items_added_total counter - cortex_querier_blocks_index_cache_items_added_total{item_type="Postings"} 1261064 - cortex_querier_blocks_index_cache_items_added_total{item_type="Series"} 1283583 + cortex_querier_blocks_index_cache_items_added_total{item_type="Postings"} 37296 + cortex_querier_blocks_index_cache_items_added_total{item_type="Series"} 42624 # HELP cortex_querier_blocks_index_cache_items TSDB: Current number of items in the index cache. # TYPE cortex_querier_blocks_index_cache_items gauge - cortex_querier_blocks_index_cache_items{item_type="Postings"} 1306102 - cortex_querier_blocks_index_cache_items{item_type="Series"} 1328621 + cortex_querier_blocks_index_cache_items{item_type="Postings"} 47952 + cortex_querier_blocks_index_cache_items{item_type="Series"} 53280 # HELP cortex_querier_blocks_index_cache_items_size_bytes TSDB: Current byte size of items in the index cache. # TYPE cortex_querier_blocks_index_cache_items_size_bytes gauge - cortex_querier_blocks_index_cache_items_size_bytes{item_type="Postings"} 1351140 - cortex_querier_blocks_index_cache_items_size_bytes{item_type="Series"} 1373659 + cortex_querier_blocks_index_cache_items_size_bytes{item_type="Postings"} 58608 + cortex_querier_blocks_index_cache_items_size_bytes{item_type="Series"} 63936 # HELP cortex_querier_blocks_index_cache_total_size_bytes TSDB: Current byte size of items (both value and key) in the index cache. # TYPE cortex_querier_blocks_index_cache_total_size_bytes gauge - cortex_querier_blocks_index_cache_total_size_bytes{item_type="Postings"} 1396178 - cortex_querier_blocks_index_cache_total_size_bytes{item_type="Series"} 1418697 + cortex_querier_blocks_index_cache_total_size_bytes{item_type="Postings"} 69264 + cortex_querier_blocks_index_cache_total_size_bytes{item_type="Series"} 74592 # HELP cortex_querier_blocks_index_cache_items_overflowed_total TSDB: Total number of items that could not be added to the cache due to being too big. # TYPE cortex_querier_blocks_index_cache_items_overflowed_total counter - cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Postings"} 1441216 - cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Series"} 1463735 + cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Postings"} 79920 + cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Series"} 85248 `)) require.NoError(t, err) @@ -210,7 +220,7 @@ func benchmarkMetricsCollection(b *testing.B, users int) { base := 123456.0 for i := 0; i < users; i++ { - tsdbMetrics.addUserRegistry(fmt.Sprintf("user-%d", i), populateTSDBBucketStore(base*float64(i))) + tsdbMetrics.addUserRegistry(fmt.Sprintf("user-%d", i), populateTSDBBucketStoreMetrics(base*float64(i))) } b.ResetTimer() @@ -219,7 +229,7 @@ func benchmarkMetricsCollection(b *testing.B, users int) { } } -func populateTSDBBucketStore(base float64) *prometheus.Registry { +func populateTSDBBucketStoreMetrics(base float64) *prometheus.Registry { reg := prometheus.NewRegistry() m := newBucketStoreMetrics(reg) @@ -265,24 +275,30 @@ func populateTSDBBucketStore(base float64) *prometheus.Registry { m.queriesDropped.Add(31 * base) m.queriesLimit.Add(32 * base) + return reg +} + +func populateTSDBIndexCacheMetrics(base float64) *prometheus.Registry { + reg := prometheus.NewRegistry() c := newIndexStoreCacheMetrics(reg) - c.evicted.WithLabelValues(cacheTypePostings).Add(base * 50) - c.evicted.WithLabelValues(cacheTypeSeries).Add(base * 51) - c.requests.WithLabelValues(cacheTypePostings).Add(base * 52) - c.requests.WithLabelValues(cacheTypeSeries).Add(base * 53) - c.hits.WithLabelValues(cacheTypePostings).Add(base * 54) - c.hits.WithLabelValues(cacheTypeSeries).Add(base * 55) - c.added.WithLabelValues(cacheTypePostings).Add(base * 56) - c.added.WithLabelValues(cacheTypeSeries).Add(base * 57) - c.current.WithLabelValues(cacheTypePostings).Set(base * 58) - c.current.WithLabelValues(cacheTypeSeries).Set(base * 59) - c.currentSize.WithLabelValues(cacheTypePostings).Set(base * 60) - c.currentSize.WithLabelValues(cacheTypeSeries).Set(base * 61) - c.totalCurrentSize.WithLabelValues(cacheTypePostings).Set(base * 62) - c.totalCurrentSize.WithLabelValues(cacheTypeSeries).Set(base * 63) - c.overflow.WithLabelValues(cacheTypePostings).Add(base * 64) - c.overflow.WithLabelValues(cacheTypeSeries).Add(base * 65) + c.evicted.WithLabelValues(cacheTypePostings).Add(base * 1) + c.evicted.WithLabelValues(cacheTypeSeries).Add(base * 2) + c.requests.WithLabelValues(cacheTypePostings).Add(base * 3) + c.requests.WithLabelValues(cacheTypeSeries).Add(base * 4) + c.hits.WithLabelValues(cacheTypePostings).Add(base * 5) + c.hits.WithLabelValues(cacheTypeSeries).Add(base * 6) + c.added.WithLabelValues(cacheTypePostings).Add(base * 7) + c.added.WithLabelValues(cacheTypeSeries).Add(base * 8) + c.current.WithLabelValues(cacheTypePostings).Set(base * 9) + c.current.WithLabelValues(cacheTypeSeries).Set(base * 10) + c.currentSize.WithLabelValues(cacheTypePostings).Set(base * 11) + c.currentSize.WithLabelValues(cacheTypeSeries).Set(base * 12) + c.totalCurrentSize.WithLabelValues(cacheTypePostings).Set(base * 13) + c.totalCurrentSize.WithLabelValues(cacheTypeSeries).Set(base * 14) + c.overflow.WithLabelValues(cacheTypePostings).Add(base * 15) + c.overflow.WithLabelValues(cacheTypeSeries).Add(base * 16) + return reg } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b40d097b7b0..8af9b5af459 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -154,7 +154,7 @@ type BucketStoreConfig struct { func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.SyncDir, "experimental.tsdb.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.") f.DurationVar(&cfg.SyncInterval, "experimental.tsdb.bucket-store.sync-interval", 5*time.Minute, "How frequently scan the bucket to look for changes (new blocks shipped by ingesters and blocks removed by retention or compaction). 0 disables it.") - f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(250*units.Mebibyte), "Size - in bytes - of a per-tenant in-memory index cache used to speed up blocks index lookups.") + f.Uint64Var(&cfg.IndexCacheSizeBytes, "experimental.tsdb.bucket-store.index-cache-size-bytes", uint64(1*units.Gibibyte), "Size in bytes of in-memory index cache used to speed up blocks index lookups (shared between all tenants).") f.Uint64Var(&cfg.MaxChunkPoolBytes, "experimental.tsdb.bucket-store.max-chunk-pool-bytes", uint64(2*units.Gibibyte), "Max size - in bytes - of a per-tenant chunk pool, used to reduce memory allocations.") f.Uint64Var(&cfg.MaxSampleCount, "experimental.tsdb.bucket-store.max-sample-count", 0, "Max number of samples per query when loading series from the long-term storage. 0 disables the limit.") f.IntVar(&cfg.MaxConcurrent, "experimental.tsdb.bucket-store.max-concurrent", 20, "Max number of concurrent queries to execute against the long-term storage on a per-tenant basis.") diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go new file mode 100644 index 00000000000..d6909706f31 --- /dev/null +++ b/pkg/storage/tsdb/index_cache.go @@ -0,0 +1,28 @@ +package tsdb + +import ( + "github.com/alecthomas/units" + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + storecache "github.com/thanos-io/thanos/pkg/store/cache" +) + +const ( + defaultMaxItemSize = storecache.Bytes(128 * units.MiB) +) + +// NewIndexCache creates a new index cache based on the input configuration. +func NewIndexCache(cfg BucketStoreConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) { + maxCacheSize := storecache.Bytes(cfg.IndexCacheSizeBytes) + + // Calculate the max item size. + maxItemSize := defaultMaxItemSize + if maxItemSize > maxCacheSize { + maxItemSize = maxCacheSize + } + + return storecache.NewInMemoryIndexCacheWithConfig(logger, registerer, storecache.InMemoryIndexCacheConfig{ + MaxSize: maxCacheSize, + MaxItemSize: maxItemSize, + }) +}