From 9a4de98cc6b5757aaab57d55a5c2efb89defe931 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 1 Jun 2025 19:37:33 -0700 Subject: [PATCH 1/3] support caching bucket for cleaner Signed-off-by: yeya24 --- pkg/compactor/compactor.go | 20 +++++++++------ pkg/storage/tsdb/caching_bucket.go | 41 ++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 602fd0bddbe..fae9eec8b69 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -650,8 +650,17 @@ func (c *Compactor) starting(ctx context.Context) error { // Wrap the bucket client to write block deletion marks in the global location too. c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient) + cleanerBucketClient := c.bucketClient + + if c.compactorCfg.CachingBucketEnabled { + cleanerBucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, true, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "cleaner"}, c.registerer)) + if err != nil { + return errors.Wrap(err, "create caching bucket") + } + } + // Create the users scanner. - c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger) + c.usersScanner = cortex_tsdb.NewUsersScanner(cleanerBucketClient, c.ownUserForCleanUp, c.parentLogger) var cleanerRingLifecyclerID = "default-cleaner" // Initialize the compactors ring if sharding is enabled. @@ -727,16 +736,11 @@ func (c *Compactor) starting(ctx context.Context) error { TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, ShardingStrategy: c.compactorCfg.ShardingStrategy, CompactionStrategy: c.compactorCfg.CompactionStrategy, - }, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, + }, cleanerBucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval, c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions) if c.compactorCfg.CachingBucketEnabled { - matchers := cortex_tsdb.NewMatchers() - // Do not cache tenant deletion marker and block deletion marker for compactor - matchers.SetMetaFileMatcher(func(name string) bool { - return strings.HasSuffix(name, "/"+metadata.MetaFilename) - }) - c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) + c.bucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, false, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) if err != nil { return errors.Wrap(err, "create caching bucket") } diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index 79a4f96463b..c77d94cf88d 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -245,6 +245,47 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata return storecache.NewCachingBucket(bkt, cfg, logger, reg) } +func CreateCachingBucketForCompactor(metadataConfig MetadataCacheConfig, cleaner bool, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) { + matchers := NewMatchers() + // Do not cache block deletion marker for compactor + matchers.SetMetaFileMatcher(func(name string) bool { + return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile) + }) + cfg := cache.NewCachingBucketConfig() + cachingConfigured := false + + metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg) + if err != nil { + return nil, errors.Wrapf(err, "metadata-cache") + } + if metadataCache != nil { + cachingConfigured = true + metadataCache = cache.NewTracingCache(metadataCache) + + codec := snappyIterCodec{storecache.JSONIterCodec{}} + cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec, "") + cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL) + + // Don't cache bucket index get and tenant blocks iter if it is cleaner. + if !cleaner { + cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) + cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) + cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec, "") + } else { + // Cache only GET for metadata and don't cache exists and not exists. + cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, 0, 0) + } + } + + if !cachingConfigured { + // No caching is configured. + return bkt, nil + } + + return storecache.NewCachingBucket(bkt, cfg, logger, reg) +} + func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) { switch cacheBackend.Backend { case "": From 898582ed21739b2c1c3d427d74a5790292afbda2 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Sun, 1 Jun 2025 22:02:13 -0700 Subject: [PATCH 2/3] add changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + pkg/compactor/compactor.go | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 49aaedda58b..1b0314012b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ * [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714 * [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766 * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 +* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. #6778 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index fae9eec8b69..a2be271953e 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -300,8 +300,9 @@ type Config struct { CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"` CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"` - AcceptMalformedIndex bool `yaml:"accept_malformed_index"` - CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` + AcceptMalformedIndex bool `yaml:"accept_malformed_index"` + CachingBucketEnabled bool `yaml:"caching_bucket_enabled"` + CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"` } // RegisterFlags registers the Compactor flags. @@ -345,6 +346,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.") f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status") + f.BoolVar(&cfg.CleanerCachingBucketEnabled, "compactor.cleaner-caching-bucket-enabled", false, "When enabled, caching bucket will be used for cleaner") f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code. This delay would prevent double compaction when two compactors claimed same partition in grouper at same time.") } @@ -652,10 +654,10 @@ func (c *Compactor) starting(ctx context.Context) error { cleanerBucketClient := c.bucketClient - if c.compactorCfg.CachingBucketEnabled { + if c.compactorCfg.CleanerCachingBucketEnabled { cleanerBucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, true, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "cleaner"}, c.registerer)) if err != nil { - return errors.Wrap(err, "create caching bucket") + return errors.Wrap(err, "create caching bucket for cleaner") } } @@ -742,7 +744,7 @@ func (c *Compactor) starting(ctx context.Context) error { if c.compactorCfg.CachingBucketEnabled { c.bucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, false, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer)) if err != nil { - return errors.Wrap(err, "create caching bucket") + return errors.Wrap(err, "create caching bucket for compactor") } } return nil From 01ecacbfe5cb64cfc3dc2d87848deca54ce52cdb Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 3 Jun 2025 22:25:15 +0000 Subject: [PATCH 3/3] add separate flag Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- docs/blocks-storage/compactor.md | 4 ++++ docs/configuration/config-file-reference.md | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b0314012b0..3514d1c13ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,7 +36,7 @@ * [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714 * [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766 * [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769 -* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. #6778 +* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 0d0b2fcf1c9..dc7daeb8a91 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -333,4 +333,8 @@ compactor: # service, which serves as the source of truth for block status # CLI flag: -compactor.caching-bucket-enabled [caching_bucket_enabled: | default = false] + + # When enabled, caching bucket will be used for cleaner + # CLI flag: -compactor.cleaner-caching-bucket-enabled + [cleaner_caching_bucket_enabled: | default = false] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 29b5b979f86..6882ca45c45 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2413,6 +2413,10 @@ sharding_ring: # service, which serves as the source of truth for block status # CLI flag: -compactor.caching-bucket-enabled [caching_bucket_enabled: | default = false] + +# When enabled, caching bucket will be used for cleaner +# CLI flag: -compactor.cleaner-caching-bucket-enabled +[cleaner_caching_bucket_enabled: | default = false] ``` ### `configs_config`