Skip to content

Commit 72ba1d5

Browse files
authored
Merge pull request #5661 from yeya24/async-backfilling
Make multilevel cache backfilling async
2 parents 050632f + 3c231a1 commit 72ba1d5

File tree

19 files changed

+208
-82
lines changed

19 files changed

+208
-82
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44
* [CHANGE] Azure Storage: Upgraded objstore dependency and support Azure Workload Identity Authentication. Added `connection_string` to support authenticating via SAS token. Marked `msi_resource` config as deprecating. #5645
55
* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619
6+
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
67
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
78
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
89

docs/blocks-storage/querier.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,17 @@ blocks_storage:
708708
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
709709
[enabled_items: <list of string> | default = []]
710710

711+
multilevel:
712+
# The maximum number of concurrent asynchronous operations can occur
713+
# when backfilling cache items.
714+
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
715+
[max_async_concurrency: <int> | default = 50]
716+
717+
# The maximum number of enqueued asynchronous operations allowed when
718+
# backfilling cache items.
719+
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
720+
[max_async_buffer_size: <int> | default = 10000]
721+
711722
chunks_cache:
712723
# Backend for chunks cache, if not empty. Supported values: memcached.
713724
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend

docs/blocks-storage/store-gateway.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,17 @@ blocks_storage:
823823
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
824824
[enabled_items: <list of string> | default = []]
825825

826+
multilevel:
827+
# The maximum number of concurrent asynchronous operations can occur
828+
# when backfilling cache items.
829+
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
830+
[max_async_concurrency: <int> | default = 50]
831+
832+
# The maximum number of enqueued asynchronous operations allowed when
833+
# backfilling cache items.
834+
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
835+
[max_async_buffer_size: <int> | default = 10000]
836+
826837
chunks_cache:
827838
# Backend for chunks cache, if not empty. Supported values: memcached.
828839
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend

docs/configuration/config-file-reference.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,6 +1257,17 @@ bucket_store:
12571257
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
12581258
[enabled_items: <list of string> | default = []]
12591259

1260+
multilevel:
1261+
# The maximum number of concurrent asynchronous operations can occur when
1262+
# backfilling cache items.
1263+
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
1264+
[max_async_concurrency: <int> | default = 50]
1265+
1266+
# The maximum number of enqueued asynchronous operations allowed when
1267+
# backfilling cache items.
1268+
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
1269+
[max_async_buffer_size: <int> | default = 10000]
1270+
12601271
chunks_cache:
12611272
# Backend for chunks cache, if not empty. Supported values: memcached.
12621273
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ require (
5353
github.com/stretchr/testify v1.8.4
5454
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98
5555
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591
56-
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c
56+
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e
5757
github.com/uber/jaeger-client-go v2.30.0+incompatible
5858
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
5959
go.etcd.io/etcd/api/v3 v3.5.10

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1519,8 +1519,8 @@ github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 h1:gx2MTto1UQRu
15191519
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98/go.mod h1:JauBAcJ61tRSv9widgISVmA6akQXDeUMXBrVmWW4xog=
15201520
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ=
15211521
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q=
1522-
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c h1:hMpXd1ybZB/vnR3+zex93va42rQ++2E0qi2wVSf3AwY=
1523-
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c/go.mod h1:q+0MQPBugkBKZBFSOec4WV4EcuKJU6tgMI0i4M2znpY=
1522+
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e h1:ej5fKlojY+r8qty//Q4b7nyNA4QEkJ5uWms77Itf75E=
1523+
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e/go.mod h1:qeDC74QOf5hWzTlvIrLT8WlNGg67nORFON0T2VF4qgg=
15241524
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
15251525
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
15261526
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=

pkg/ingester/ingester.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,8 @@ type userTSDB struct {
256256
lastUpdate atomic.Int64
257257

258258
// Thanos shipper used to ship blocks to the storage.
259-
shipper Shipper
259+
shipper Shipper
260+
shipperMetadataFilePath string
260261

261262
// When deletion marker is found for the tenant (checked before shipping),
262263
// shipping stops and TSDB is closed before reaching idle timeout time (if enabled).
@@ -435,7 +436,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
435436

436437
// updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks.
437438
func (u *userTSDB) updateCachedShippedBlocks() error {
438-
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
439+
shipperMeta, err := shipper.ReadMetaFile(u.shipperMetadataFilePath)
439440
if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) {
440441
// If the meta file doesn't exist it means the shipper hasn't run yet.
441442
shipperMeta = &shipper.Meta{}
@@ -606,7 +607,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
606607
}
607608
}
608609

609-
// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.
610+
// New returns a new Ingester that uses Cortex block storage instead of chunks storage.
610611
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
611612
defaultInstanceLimits = &cfg.DefaultLimits
612613
if cfg.ingesterClientFactory == nil {
@@ -2050,7 +2051,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
20502051
},
20512052
true, // Allow out of order uploads. It's fine in Cortex's context.
20522053
metadata.NoneFunc,
2054+
"",
20532055
)
2056+
userDB.shipperMetadataFilePath = filepath.Join(userDB.db.Dir(), filepath.Clean(shipper.DefaultMetaFilename))
20542057

20552058
// Initialise the shipper blocks cache.
20562059
if err := userDB.updateCachedShippedBlocks(); err != nil {

pkg/ingester/ingester_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2821,7 +2821,7 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) {
28212821
require.Equal(t, len(db.getCachedShippedBlocks()), 0)
28222822
shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000")
28232823

2824-
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
2824+
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
28252825
Version: shipper.MetaVersion1,
28262826
Uploaded: []ulid.ULID{shippedBlock},
28272827
}))
@@ -2858,7 +2858,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP
28582858

28592859
// Mock the shipper meta (no blocks).
28602860
db := i.getTSDB(userID)
2861-
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
2861+
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
28622862
Version: shipper.MetaVersion1,
28632863
}))
28642864

@@ -3788,7 +3788,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
37883788
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))
37893789

37903790
// Saying that we have shipped the second block, so only that should get deleted.
3791-
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
3791+
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
37923792
Version: shipper.MetaVersion1,
37933793
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
37943794
}))
@@ -3816,7 +3816,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
38163816
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))
38173817

38183818
// Shipping 2 more blocks, hence all the blocks from first round.
3819-
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
3819+
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
38203820
Version: shipper.MetaVersion1,
38213821
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
38223822
}))

pkg/storage/tsdb/index_cache.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,16 @@ var (
4242
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
4343
errDuplicatedIndexCacheBackend = errors.New("duplicated index cache backend")
4444
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
45+
errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0")
46+
errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0")
4547
)
4648

4749
type IndexCacheConfig struct {
48-
Backend string `yaml:"backend"`
49-
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
50-
Memcached MemcachedIndexCacheConfig `yaml:"memcached"`
51-
Redis RedisIndexCacheConfig `yaml:"redis"`
50+
Backend string `yaml:"backend"`
51+
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
52+
Memcached MemcachedIndexCacheConfig `yaml:"memcached"`
53+
Redis RedisIndexCacheConfig `yaml:"redis"`
54+
MultiLevel MultiLevelIndexCacheConfig `yaml:"multilevel"`
5255
}
5356

5457
func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
@@ -64,6 +67,7 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
6467
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
6568
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
6669
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
70+
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")
6771
}
6872

6973
// Validate the config.
@@ -72,6 +76,12 @@ func (cfg *IndexCacheConfig) Validate() error {
7276
splitBackends := strings.Split(cfg.Backend, ",")
7377
configuredBackends := map[string]struct{}{}
7478

79+
if len(splitBackends) > 1 {
80+
if err := cfg.MultiLevel.Validate(); err != nil {
81+
return err
82+
}
83+
}
84+
7585
for _, backend := range splitBackends {
7686
if !util.StringsContain(supportedIndexCacheBackends, backend) {
7787
return errUnsupportedIndexCacheBackend
@@ -101,6 +111,26 @@ func (cfg *IndexCacheConfig) Validate() error {
101111
return nil
102112
}
103113

114+
type MultiLevelIndexCacheConfig struct {
115+
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
116+
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
117+
}
118+
119+
func (cfg *MultiLevelIndexCacheConfig) Validate() error {
120+
if cfg.MaxAsyncBufferSize <= 0 {
121+
return errInvalidMaxAsyncBufferSize
122+
}
123+
if cfg.MaxAsyncConcurrency <= 0 {
124+
return errInvalidMaxAsyncConcurrency
125+
}
126+
return nil
127+
}
128+
129+
func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
130+
f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.")
131+
f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.")
132+
}
133+
104134
type InMemoryIndexCacheConfig struct {
105135
MaxSizeBytes uint64 `yaml:"max_size_bytes"`
106136
EnabledItems []string `yaml:"enabled_items"`
@@ -210,7 +240,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
210240
}
211241
}
212242

213-
return newMultiLevelCache(registerer, caches...), nil
243+
return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil
214244
}
215245

216246
func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {

pkg/storage/tsdb/multilevel_cache.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package tsdb
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67

78
"github.com/oklog/ulid"
89
"github.com/prometheus/client_golang/prometheus"
910
"github.com/prometheus/client_golang/prometheus/promauto"
1011
"github.com/prometheus/prometheus/model/labels"
1112
"github.com/prometheus/prometheus/storage"
13+
"github.com/thanos-io/thanos/pkg/cacheutil"
1214
storecache "github.com/thanos-io/thanos/pkg/store/cache"
1315
)
1416

@@ -21,8 +23,10 @@ const (
2123
type multiLevelCache struct {
2224
caches []storecache.IndexCache
2325

24-
fetchLatency *prometheus.HistogramVec
25-
backFillLatency *prometheus.HistogramVec
26+
fetchLatency *prometheus.HistogramVec
27+
backFillLatency *prometheus.HistogramVec
28+
backfillProcessor *cacheutil.AsyncOperationProcessor
29+
backfillDroppedItems *prometheus.CounterVec
2630
}
2731

2832
func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
@@ -44,9 +48,11 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
4448

4549
misses = keys
4650
hits = map[labels.Label][]byte{}
47-
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
51+
backfillItems := make([][]map[labels.Label][]byte, len(m.caches)-1)
4852
for i, c := range m.caches {
49-
backfillMap[c] = []map[labels.Label][]byte{}
53+
if i < len(m.caches)-1 {
54+
backfillItems[i] = []map[labels.Label][]byte{}
55+
}
5056
if ctx.Err() != nil {
5157
return
5258
}
@@ -58,7 +64,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
5864
}
5965

6066
if i > 0 {
61-
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
67+
backfillItems[i-1] = append(backfillItems[i-1], h)
6268
}
6369

6470
if len(misses) == 0 {
@@ -69,13 +75,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
6975
defer func() {
7076
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
7177
defer backFillTimer.ObserveDuration()
72-
for cache, hit := range backfillMap {
78+
for i, hit := range backfillItems {
7379
for _, values := range hit {
74-
for l, b := range values {
75-
if ctx.Err() != nil {
76-
return
80+
for lbl, b := range values {
81+
if err := m.backfillProcessor.EnqueueAsync(func() {
82+
m.caches[i].StorePostings(blockID, lbl, b, tenant)
83+
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
84+
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
7785
}
78-
cache.StorePostings(blockID, l, b, tenant)
7986
}
8087
}
8188
}
@@ -108,7 +115,11 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli
108115
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
109116
if i > 0 {
110117
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
111-
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
118+
if err := m.backfillProcessor.EnqueueAsync(func() {
119+
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
120+
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
121+
m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc()
122+
}
112123
backFillTimer.ObserveDuration()
113124
}
114125
return d, h
@@ -137,10 +148,12 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
137148

138149
misses = ids
139150
hits = map[storage.SeriesRef][]byte{}
140-
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}
151+
backfillItems := make([][]map[storage.SeriesRef][]byte, len(m.caches)-1)
141152

142153
for i, c := range m.caches {
143-
backfillMap[c] = []map[storage.SeriesRef][]byte{}
154+
if i < len(m.caches)-1 {
155+
backfillItems[i] = []map[storage.SeriesRef][]byte{}
156+
}
144157
if ctx.Err() != nil {
145158
return
146159
}
@@ -152,7 +165,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
152165
}
153166

154167
if i > 0 && len(h) > 0 {
155-
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
168+
backfillItems[i-1] = append(backfillItems[i-1], h)
156169
}
157170

158171
if len(misses) == 0 {
@@ -163,13 +176,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
163176
defer func() {
164177
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
165178
defer backFillTimer.ObserveDuration()
166-
for cache, hit := range backfillMap {
179+
for i, hit := range backfillItems {
167180
for _, values := range hit {
168-
for m, b := range values {
169-
if ctx.Err() != nil {
170-
return
181+
for ref, b := range values {
182+
if err := m.backfillProcessor.EnqueueAsync(func() {
183+
m.caches[i].StoreSeries(blockID, ref, b, tenant)
184+
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
185+
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
171186
}
172-
cache.StoreSeries(blockID, m, b, tenant)
173187
}
174188
}
175189
}
@@ -178,12 +192,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
178192
return hits, misses
179193
}
180194

181-
func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache {
195+
func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache {
182196
if len(c) == 1 {
183197
return c[0]
184198
}
199+
185200
return &multiLevelCache{
186-
caches: c,
201+
caches: c,
202+
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
187203
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
188204
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
189205
Help: "Histogram to track latency to fetch items from multi level index cache",
@@ -194,5 +210,9 @@ func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) s
194210
Help: "Histogram to track latency to backfill items from multi level index cache",
195211
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
196212
}, []string{"item_type"}),
213+
backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
214+
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
215+
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
216+
}, []string{"item_type"}),
197217
}
198218
}

0 commit comments

Comments
 (0)