Skip to content

Make multilevel cache backfilling async #5661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Azure Storage: Upgraded objstore dependency and support Azure Workload Identity Authentication. Added `connection_string` to support authenticating via SAS token. Marked `msi_resource` config as deprecating. #5645
* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638

Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
[enabled_items: <list of string> | default = []]

multilevel:
# The maximum number of concurrent asynchronous operations can occur
# when backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 50]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
11 changes: 11 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,17 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
[enabled_items: <list of string> | default = []]

multilevel:
# The maximum number of concurrent asynchronous operations can occur
# when backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 50]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,17 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.index-cache.redis.enabled-items
[enabled_items: <list of string> | default = []]

multilevel:
# The maximum number of concurrent asynchronous operations can occur when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency
[max_async_concurrency: <int> | default = 50]

# The maximum number of enqueued asynchronous operations allowed when
# backfilling cache items.
# CLI flag: -blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size
[max_async_buffer_size: <int> | default = 10000]

chunks_cache:
# Backend for chunks cache, if not empty. Supported values: memcached.
# CLI flag: -blocks-storage.bucket-store.chunks-cache.backend
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/weaveworks/common v0.0.0-20221201103051-7c2720a9024d
go.etcd.io/etcd/api/v3 v3.5.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1519,8 +1519,8 @@ github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98 h1:gx2MTto1UQRu
github.com/thanos-io/objstore v0.0.0-20231112185854-37752ee64d98/go.mod h1:JauBAcJ61tRSv9widgISVmA6akQXDeUMXBrVmWW4xog=
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ=
github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q=
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c h1:hMpXd1ybZB/vnR3+zex93va42rQ++2E0qi2wVSf3AwY=
github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c/go.mod h1:q+0MQPBugkBKZBFSOec4WV4EcuKJU6tgMI0i4M2znpY=
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e h1:ej5fKlojY+r8qty//Q4b7nyNA4QEkJ5uWms77Itf75E=
github.com/thanos-io/thanos v0.32.5-0.20231120163350-0a4f5ae3310e/go.mod h1:qeDC74QOf5hWzTlvIrLT8WlNGg67nORFON0T2VF4qgg=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU=
github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
9 changes: 6 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ type userTSDB struct {
lastUpdate atomic.Int64

// Thanos shipper used to ship blocks to the storage.
shipper Shipper
shipper Shipper
shipperMetadataFilePath string

// When deletion marker is found for the tenant (checked before shipping),
// shipping stops and TSDB is closed before reaching idle timeout time (if enabled).
Expand Down Expand Up @@ -435,7 +436,7 @@ func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {

// updateCachedShipperBlocks reads the shipper meta file and updates the cached shipped blocks.
func (u *userTSDB) updateCachedShippedBlocks() error {
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
shipperMeta, err := shipper.ReadMetaFile(u.shipperMetadataFilePath)
if os.IsNotExist(err) || os.IsNotExist(errors.Cause(err)) {
// If the meta file doesn't exist it means the shipper hasn't run yet.
shipperMeta = &shipper.Meta{}
Expand Down Expand Up @@ -606,7 +607,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
}
}

// NewV2 returns a new Ingester that uses Cortex block storage instead of chunks storage.
// New returns a new Ingester that uses Cortex block storage instead of chunks storage.
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
defaultInstanceLimits = &cfg.DefaultLimits
if cfg.ingesterClientFactory == nil {
Expand Down Expand Up @@ -2050,7 +2051,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
},
true, // Allow out of order uploads. It's fine in Cortex's context.
metadata.NoneFunc,
"",
)
userDB.shipperMetadataFilePath = filepath.Join(userDB.db.Dir(), filepath.Clean(shipper.DefaultMetaFilename))

// Initialise the shipper blocks cache.
if err := userDB.updateCachedShippedBlocks(); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ func TestIngester_sholdUpdateCacheShippedBlocks(t *testing.T) {
require.Equal(t, len(db.getCachedShippedBlocks()), 0)
shippedBlock, _ := ulid.Parse("01D78XZ44G0000000000000000")

require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{shippedBlock},
}))
Expand Down Expand Up @@ -2858,7 +2858,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP

// Mock the shipper meta (no blocks).
db := i.getTSDB(userID)
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.db.Dir(), &shipper.Meta{
require.NoError(t, shipper.WriteMetaFile(log.NewNopLogger(), db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
}))

Expand Down Expand Up @@ -3788,7 +3788,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, oldBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Saying that we have shipped the second block, so only that should get deleted.
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
}))
Expand Down Expand Up @@ -3816,7 +3816,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
`, newBlocks[0].Meta().ULID.Time()/1000)), "cortex_ingester_oldest_unshipped_block_timestamp_seconds"))

// Shipping 2 more blocks, hence all the blocks from first round.
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
require.Nil(t, shipper.WriteMetaFile(nil, db.shipperMetadataFilePath, &shipper.Meta{
Version: shipper.MetaVersion1,
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
}))
Expand Down
40 changes: 35 additions & 5 deletions pkg/storage/tsdb/index_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ var (
errUnsupportedIndexCacheBackend = errors.New("unsupported index cache backend")
errDuplicatedIndexCacheBackend = errors.New("duplicated index cache backend")
errNoIndexCacheAddresses = errors.New("no index cache backend addresses")
errInvalidMaxAsyncConcurrency = errors.New("invalid max_async_concurrency, must greater than 0")
errInvalidMaxAsyncBufferSize = errors.New("invalid max_async_buffer_size, must greater than 0")
)

type IndexCacheConfig struct {
Backend string `yaml:"backend"`
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
Memcached MemcachedIndexCacheConfig `yaml:"memcached"`
Redis RedisIndexCacheConfig `yaml:"redis"`
Backend string `yaml:"backend"`
InMemory InMemoryIndexCacheConfig `yaml:"inmemory"`
Memcached MemcachedIndexCacheConfig `yaml:"memcached"`
Redis RedisIndexCacheConfig `yaml:"redis"`
MultiLevel MultiLevelIndexCacheConfig `yaml:"multilevel"`
}

func (cfg *IndexCacheConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -64,6 +67,7 @@ func (cfg *IndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix str
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.")
cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")
}

// Validate the config.
Expand All @@ -72,6 +76,12 @@ func (cfg *IndexCacheConfig) Validate() error {
splitBackends := strings.Split(cfg.Backend, ",")
configuredBackends := map[string]struct{}{}

if len(splitBackends) > 1 {
if err := cfg.MultiLevel.Validate(); err != nil {
return err
}
}

for _, backend := range splitBackends {
if !util.StringsContain(supportedIndexCacheBackends, backend) {
return errUnsupportedIndexCacheBackend
Expand Down Expand Up @@ -101,6 +111,26 @@ func (cfg *IndexCacheConfig) Validate() error {
return nil
}

type MultiLevelIndexCacheConfig struct {
MaxAsyncConcurrency int `yaml:"max_async_concurrency"`
MaxAsyncBufferSize int `yaml:"max_async_buffer_size"`
}

func (cfg *MultiLevelIndexCacheConfig) Validate() error {
if cfg.MaxAsyncBufferSize <= 0 {
return errInvalidMaxAsyncBufferSize
}
if cfg.MaxAsyncConcurrency <= 0 {
return errInvalidMaxAsyncConcurrency
}
return nil
}

func (cfg *MultiLevelIndexCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.IntVar(&cfg.MaxAsyncConcurrency, prefix+"max-async-concurrency", 50, "The maximum number of concurrent asynchronous operations can occur when backfilling cache items.")
f.IntVar(&cfg.MaxAsyncBufferSize, prefix+"max-async-buffer-size", 10000, "The maximum number of enqueued asynchronous operations allowed when backfilling cache items.")
}

type InMemoryIndexCacheConfig struct {
MaxSizeBytes uint64 `yaml:"max_size_bytes"`
EnabledItems []string `yaml:"enabled_items"`
Expand Down Expand Up @@ -210,7 +240,7 @@ func NewIndexCache(cfg IndexCacheConfig, logger log.Logger, registerer prometheu
}
}

return newMultiLevelCache(registerer, caches...), nil
return newMultiLevelCache(registerer, cfg.MultiLevel, caches...), nil
}

func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, registerer prometheus.Registerer) (storecache.IndexCache, error) {
Expand Down
62 changes: 41 additions & 21 deletions pkg/storage/tsdb/multilevel_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package tsdb

import (
"context"
"errors"
"sync"

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/cacheutil"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
)

Expand All @@ -21,8 +23,10 @@ const (
type multiLevelCache struct {
caches []storecache.IndexCache

fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
fetchLatency *prometheus.HistogramVec
backFillLatency *prometheus.HistogramVec
backfillProcessor *cacheutil.AsyncOperationProcessor
backfillDroppedItems *prometheus.CounterVec
}

func (m *multiLevelCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
Expand All @@ -44,9 +48,11 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U

misses = keys
hits = map[labels.Label][]byte{}
backfillMap := map[storecache.IndexCache][]map[labels.Label][]byte{}
backfillItems := make([][]map[labels.Label][]byte, len(m.caches)-1)
for i, c := range m.caches {
backfillMap[c] = []map[labels.Label][]byte{}
if i < len(m.caches)-1 {
backfillItems[i] = []map[labels.Label][]byte{}
}
if ctx.Err() != nil {
return
}
Expand All @@ -58,7 +64,7 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
}

if i > 0 {
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
backfillItems[i-1] = append(backfillItems[i-1], h)
}

if len(misses) == 0 {
Expand All @@ -69,13 +75,14 @@ func (m *multiLevelCache) FetchMultiPostings(ctx context.Context, blockID ulid.U
defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypePostings))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for i, hit := range backfillItems {
for _, values := range hit {
for l, b := range values {
if ctx.Err() != nil {
return
for lbl, b := range values {
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StorePostings(blockID, lbl, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypePostings).Inc()
}
cache.StorePostings(blockID, l, b, tenant)
}
}
}
Expand Down Expand Up @@ -108,7 +115,11 @@ func (m *multiLevelCache) FetchExpandedPostings(ctx context.Context, blockID uli
if d, h := c.FetchExpandedPostings(ctx, blockID, matchers, tenant); h {
if i > 0 {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeExpandedPostings))
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i-1].StoreExpandedPostings(blockID, matchers, d, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeExpandedPostings).Inc()
}
backFillTimer.ObserveDuration()
}
return d, h
Expand Down Expand Up @@ -137,10 +148,12 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI

misses = ids
hits = map[storage.SeriesRef][]byte{}
backfillMap := map[storecache.IndexCache][]map[storage.SeriesRef][]byte{}
backfillItems := make([][]map[storage.SeriesRef][]byte, len(m.caches)-1)

for i, c := range m.caches {
backfillMap[c] = []map[storage.SeriesRef][]byte{}
if i < len(m.caches)-1 {
backfillItems[i] = []map[storage.SeriesRef][]byte{}
}
if ctx.Err() != nil {
return
}
Expand All @@ -152,7 +165,7 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
}

if i > 0 && len(h) > 0 {
backfillMap[m.caches[i-1]] = append(backfillMap[m.caches[i-1]], h)
backfillItems[i-1] = append(backfillItems[i-1], h)
}

if len(misses) == 0 {
Expand All @@ -163,13 +176,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
defer func() {
backFillTimer := prometheus.NewTimer(m.backFillLatency.WithLabelValues(cacheTypeSeries))
defer backFillTimer.ObserveDuration()
for cache, hit := range backfillMap {
for i, hit := range backfillItems {
for _, values := range hit {
for m, b := range values {
if ctx.Err() != nil {
return
for ref, b := range values {
if err := m.backfillProcessor.EnqueueAsync(func() {
m.caches[i].StoreSeries(blockID, ref, b, tenant)
}); errors.Is(err, cacheutil.ErrAsyncBufferFull) {
m.backfillDroppedItems.WithLabelValues(cacheTypeSeries).Inc()
}
cache.StoreSeries(blockID, m, b, tenant)
}
}
}
Expand All @@ -178,12 +192,14 @@ func (m *multiLevelCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULI
return hits, misses
}

func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) storecache.IndexCache {
func newMultiLevelCache(reg prometheus.Registerer, cfg MultiLevelIndexCacheConfig, c ...storecache.IndexCache) storecache.IndexCache {
if len(c) == 1 {
return c[0]
}

return &multiLevelCache{
caches: c,
caches: c,
backfillProcessor: cacheutil.NewAsyncOperationProcessor(cfg.MaxAsyncBufferSize, cfg.MaxAsyncConcurrency),
fetchLatency: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_store_multilevel_index_cache_fetch_duration_seconds",
Help: "Histogram to track latency to fetch items from multi level index cache",
Expand All @@ -194,5 +210,9 @@ func newMultiLevelCache(reg prometheus.Registerer, c ...storecache.IndexCache) s
Help: "Histogram to track latency to backfill items from multi level index cache",
Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 10, 15, 20, 25, 30, 40, 50, 60, 90},
}, []string{"item_type"}),
backfillDroppedItems: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_store_multilevel_index_cache_backfill_dropped_items_total",
Help: "Total number of items dropped due to async buffer full when backfilling multilevel cache ",
}, []string{"item_type"}),
}
}
Loading