Skip to content

Commit 151c577

Browse files
authored
Shared in-memory index cache for queriers with blocks storage (#2189)
* Shift to a shared in-memory index cache for queriers with blocks storage Signed-off-by: Marco Pracucci <[email protected]>
1 parent 92ab6cb commit 151c577

File tree

10 files changed

+339
-130
lines changed

10 files changed

+339
-130
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088
2626
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
2727
* [CHANGE] Remove fluentd-based billing infrastructure and flags such as `-distributor.enable-billing`. #1491
28+
* [CHANGE] Experimental TSDB: the querier in-memory index cache used by the experimental blocks storage shifted from per-tenant to per-querier. The `-experimental.tsdb.bucket-store.index-cache-size-bytes` now configures the per-querier index cache max size instead of a per-tenant cache and its default has been increased to 1GB. #2189
2829
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
2930
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
3031
* `--experimental.distributor.user-subring-size`

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2139,10 +2139,10 @@ bucket_store:
21392139
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
21402140
[sync_interval: <duration> | default = 5m0s]
21412141
2142-
# Size - in bytes - of a per-tenant in-memory index cache used to speed up
2143-
# blocks index lookups.
2142+
# Size in bytes of in-memory index cache used to speed up blocks index lookups
2143+
# (shared between all tenants).
21442144
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
2145-
[index_cache_size_bytes: <int> | default = 262144000]
2145+
[index_cache_size_bytes: <int> | default = 1073741824]
21462146
21472147
# Max size - in bytes - of a per-tenant chunk pool, used to reduce memory
21482148
# allocations.

docs/operations/blocks-storage.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,10 @@ tsdb:
133133
# CLI flag: -experimental.tsdb.bucket-store.sync-interval
134134
[sync_interval: <duration> | default = 5m0s]
135135
136-
# Size - in bytes - of a per-tenant in-memory index cache used to speed up
137-
# blocks index lookups.
136+
# Size in bytes of in-memory index cache used to speed up blocks index
137+
# lookups (shared between all tenants).
138138
# CLI flag: -experimental.tsdb.bucket-store.index-cache-size-bytes
139-
[index_cache_size_bytes: <int> | default = 262144000]
139+
[index_cache_size_bytes: <int> | default = 1073741824]
140140
141141
# Max size - in bytes - of a per-tenant chunk pool, used to reduce memory
142142
# allocations.

integration/configs.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ var (
5050
"-experimental.tsdb.bucket-store.sync-interval": "5s",
5151
"-experimental.tsdb.retention-period": "5m",
5252
"-experimental.tsdb.ship-interval": "1m",
53+
"-experimental.tsdb.head-compaction-interval": "1s",
5354
"-experimental.tsdb.s3.access-key-id": e2edb.MinioAccessKey,
5455
"-experimental.tsdb.s3.secret-access-key": e2edb.MinioSecretKey,
5556
"-experimental.tsdb.s3.bucket-name": "cortex",

integration/querier_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// +build integration
2+
3+
package main
4+
5+
import (
6+
"testing"
7+
"time"
8+
9+
"github.com/prometheus/common/model"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/cortexproject/cortex/integration/e2e"
14+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
15+
"github.com/cortexproject/cortex/integration/e2ecortex"
16+
)
17+
18+
func TestQuerierWithBlocksStorage(t *testing.T) {
19+
tests := map[string]struct {
20+
flags map[string]string
21+
}{
22+
"querier running with ingester gRPC streaming disabled": {
23+
flags: mergeFlags(BlocksStorageFlags, map[string]string{
24+
"-querier.ingester-streaming": "false",
25+
}),
26+
},
27+
}
28+
29+
for testName, testCfg := range tests {
30+
t.Run(testName, func(t *testing.T) {
31+
const blockRangePeriod = 5 * time.Second
32+
33+
s, err := e2e.NewScenario(networkName)
34+
require.NoError(t, err)
35+
defer s.Close()
36+
37+
// Configure the blocks storage to frequently compact TSDB head
38+
// and ship blocks to the storage.
39+
flags := mergeFlags(testCfg.flags, map[string]string{
40+
"-experimental.tsdb.block-ranges-period": blockRangePeriod.String(),
41+
"-experimental.tsdb.ship-interval": "1s",
42+
"-experimental.tsdb.bucket-store.sync-interval": "1s",
43+
"-experimental.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
44+
})
45+
46+
// Start dependencies.
47+
consul := e2edb.NewConsul()
48+
minio := e2edb.NewMinio(9000, flags["-experimental.tsdb.s3.bucket-name"])
49+
require.NoError(t, s.StartAndWaitReady(consul, minio))
50+
51+
// Start Cortex components.
52+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
53+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
54+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
55+
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
56+
57+
// Wait until both the distributor and querier have updated the ring.
58+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
59+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
60+
61+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "user-1")
62+
require.NoError(t, err)
63+
64+
// Push some series to Cortex.
65+
series1Timestamp := time.Now()
66+
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
67+
series1, expectedVector1 := generateSeries("series_1", series1Timestamp)
68+
series2, expectedVector2 := generateSeries("series_2", series2Timestamp)
69+
70+
res, err := c.Push(series1)
71+
require.NoError(t, err)
72+
require.Equal(t, 200, res.StatusCode)
73+
74+
res, err = c.Push(series2)
75+
require.NoError(t, err)
76+
require.Equal(t, 200, res.StatusCode)
77+
78+
// Wait until the TSDB head is compacted and shipped to the storage.
79+
// The shipped block contains the 1st series, while the 2ns series in in the head.
80+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
81+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total"))
82+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
83+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
84+
85+
// Push another series to further compact another block and delete the first block
86+
// due to expired retention.
87+
series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2)
88+
series3, expectedVector3 := generateSeries("series_3", series3Timestamp)
89+
90+
res, err = c.Push(series3)
91+
require.NoError(t, err)
92+
require.Equal(t, 200, res.StatusCode)
93+
94+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_shipper_uploads_total"))
95+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total"))
96+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))
97+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
98+
99+
// Wait until the querier has synched the new uploaded blocks.
100+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_bucket_store_blocks_loaded"))
101+
102+
// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
103+
// TODO: apparently Thanos has a bug which cause a block to not be considered if the
104+
// query timetamp matches the block max timestamp
105+
series1Timestamp = series1Timestamp.Add(time.Duration(time.Millisecond))
106+
expectedVector1[0].Timestamp = model.Time(e2e.TimeToMilliseconds(series1Timestamp))
107+
108+
result, err := c.Query("series_1", series1Timestamp)
109+
require.NoError(t, err)
110+
require.Equal(t, model.ValVector, result.Type())
111+
assert.Equal(t, expectedVector1, result.(model.Vector))
112+
113+
result, err = c.Query("series_2", series2Timestamp)
114+
require.NoError(t, err)
115+
require.Equal(t, model.ValVector, result.Type())
116+
assert.Equal(t, expectedVector2, result.(model.Vector))
117+
118+
result, err = c.Query("series_3", series3Timestamp)
119+
require.NoError(t, err)
120+
require.Equal(t, model.ValVector, result.Type())
121+
assert.Equal(t, expectedVector3, result.(model.Vector))
122+
123+
// Check the in-memory index cache metrics (in the querier).
124+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // 2 series both for postings and series cache
125+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // 2 series both for postings and series cache
126+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(0), "cortex_querier_blocks_index_cache_hits_total")) // no cache hit cause the cache was empty
127+
128+
// Query back again the 1st series from storage. This time it should use the index cache.
129+
result, err = c.Query("series_1", series1Timestamp)
130+
require.NoError(t, err)
131+
require.Equal(t, model.ValVector, result.Type())
132+
assert.Equal(t, expectedVector1, result.(model.Vector))
133+
134+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items")) // as before
135+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2*2), "cortex_querier_blocks_index_cache_items_added_total")) // as before
136+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(2), "cortex_querier_blocks_index_cache_hits_total")) // this time has used the index cache
137+
})
138+
}
139+
}

pkg/querier/block_store.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/go-kit/kit/log"
1414
"github.com/go-kit/kit/log/level"
15+
"github.com/pkg/errors"
1516
"github.com/prometheus/client_golang/prometheus"
1617
"github.com/thanos-io/thanos/pkg/block"
1718
"github.com/thanos-io/thanos/pkg/model"
@@ -31,16 +32,20 @@ import (
3132

3233
// UserStore is a multi-tenant version of Thanos BucketStore
3334
type UserStore struct {
34-
logger log.Logger
35-
cfg tsdb.Config
36-
bucket objstore.Bucket
37-
client storepb.StoreClient
38-
logLevel logging.Level
39-
tsdbMetrics *tsdbBucketStoreMetrics
35+
logger log.Logger
36+
cfg tsdb.Config
37+
bucket objstore.Bucket
38+
client storepb.StoreClient
39+
logLevel logging.Level
40+
bucketStoreMetrics *tsdbBucketStoreMetrics
41+
indexCacheMetrics *tsdbIndexCacheMetrics
4042

4143
syncMint model.TimeOrDurationValue
4244
syncMaxt model.TimeOrDurationValue
4345

46+
// Index cache shared across all tenants.
47+
indexCache storecache.IndexCache
48+
4449
// Keeps a bucket store for each tenant.
4550
storesMu sync.RWMutex
4651
stores map[string]*store.BucketStore
@@ -55,16 +60,20 @@ type UserStore struct {
5560

5661
// NewUserStore returns a new UserStore
5762
func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel logging.Level, logger log.Logger, registerer prometheus.Registerer) (*UserStore, error) {
63+
var err error
64+
5865
workersCtx, workersCancel := context.WithCancel(context.Background())
66+
indexCacheRegistry := prometheus.NewRegistry()
5967

6068
u := &UserStore{
61-
logger: logger,
62-
cfg: cfg,
63-
bucket: bucketClient,
64-
stores: map[string]*store.BucketStore{},
65-
logLevel: logLevel,
66-
tsdbMetrics: newTSDBBucketStoreMetrics(),
67-
workersCancel: workersCancel,
69+
logger: logger,
70+
cfg: cfg,
71+
bucket: bucketClient,
72+
stores: map[string]*store.BucketStore{},
73+
logLevel: logLevel,
74+
bucketStoreMetrics: newTSDBBucketStoreMetrics(),
75+
indexCacheMetrics: newTSDBIndexCacheMetrics(indexCacheRegistry),
76+
workersCancel: workersCancel,
6877
syncTimes: prometheus.NewHistogram(prometheus.HistogramOpts{
6978
Name: "cortex_querier_blocks_sync_seconds",
7079
Help: "The total time it takes to perform a sync stores",
@@ -73,15 +82,20 @@ func NewUserStore(cfg tsdb.Config, bucketClient objstore.Bucket, logLevel loggin
7382
}
7483

7584
// Configure the time range to sync all blocks.
76-
if err := u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil {
85+
if err = u.syncMint.Set("0000-01-01T00:00:00Z"); err != nil {
7786
return nil, err
7887
}
79-
if err := u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil {
88+
if err = u.syncMaxt.Set("9999-12-31T23:59:59Z"); err != nil {
8089
return nil, err
8190
}
8291

92+
// Init the index cache.
93+
if u.indexCache, err = tsdb.NewIndexCache(cfg.BucketStore, logger, indexCacheRegistry); err != nil {
94+
return nil, errors.Wrap(err, "create index cache")
95+
}
96+
8397
if registerer != nil {
84-
registerer.MustRegister(u.syncTimes, u.tsdbMetrics)
98+
registerer.MustRegister(u.syncTimes, u.bucketStoreMetrics, u.indexCacheMetrics)
8599
}
86100

87101
serv := grpc.NewServer()
@@ -357,16 +371,6 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
357371
userBkt := tsdb.NewUserBucketClient(userID, u.bucket)
358372

359373
reg := prometheus.NewRegistry()
360-
indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes
361-
maxItemSizeBytes := indexCacheSizeBytes / 2
362-
indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(userLogger, reg, storecache.InMemoryIndexCacheConfig{
363-
MaxSize: storecache.Bytes(indexCacheSizeBytes),
364-
MaxItemSize: storecache.Bytes(maxItemSizeBytes),
365-
})
366-
if err != nil {
367-
return nil, err
368-
}
369-
370374
fetcher, err := block.NewMetaFetcher(
371375
userLogger,
372376
u.cfg.BucketStore.MetaSyncConcurrency,
@@ -385,7 +389,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
385389
userBkt,
386390
fetcher,
387391
filepath.Join(u.cfg.BucketStore.SyncDir, userID),
388-
indexCache,
392+
u.indexCache,
389393
uint64(u.cfg.BucketStore.MaxChunkPoolBytes),
390394
u.cfg.BucketStore.MaxSampleCount,
391395
u.cfg.BucketStore.MaxConcurrent,
@@ -402,7 +406,7 @@ func (u *UserStore) getOrCreateStore(userID string) (*store.BucketStore, error)
402406
}
403407

404408
u.stores[userID] = bs
405-
u.tsdbMetrics.addUserRegistry(userID, reg)
409+
u.bucketStoreMetrics.addUserRegistry(userID, reg)
406410

407411
return bs, nil
408412
}

0 commit comments

Comments
 (0)