Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,4 +333,8 @@ compactor:
# service, which serves as the source of truth for block status
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]

# When enabled, caching bucket will be used for cleaner
# CLI flag: -compactor.cleaner-caching-bucket-enabled
[cleaner_caching_bucket_enabled: <boolean> | default = false]
```
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2413,6 +2413,10 @@ sharding_ring:
# service, which serves as the source of truth for block status
# CLI flag: -compactor.caching-bucket-enabled
[caching_bucket_enabled: <boolean> | default = false]

# When enabled, caching bucket will be used for cleaner
# CLI flag: -compactor.cleaner-caching-bucket-enabled
[cleaner_caching_bucket_enabled: <boolean> | default = false]
```

### `configs_config`
Expand Down
28 changes: 17 additions & 11 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,9 @@ type Config struct {
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`

AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"`
}

// RegisterFlags registers the Compactor flags.
Expand Down Expand Up @@ -345,6 +346,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
f.BoolVar(&cfg.CleanerCachingBucketEnabled, "compactor.cleaner-caching-bucket-enabled", false, "When enabled, caching bucket will be used for cleaner")

f.DurationVar(&cfg.ShardingPlannerDelay, "compactor.sharding-planner-delay", 10*time.Second, "How long shuffle sharding planner would wait before running planning code. This delay would prevent double compaction when two compactors claimed same partition in grouper at same time.")
}
Expand Down Expand Up @@ -650,8 +652,17 @@ func (c *Compactor) starting(ctx context.Context) error {
// Wrap the bucket client to write block deletion marks in the global location too.
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)

cleanerBucketClient := c.bucketClient

if c.compactorCfg.CleanerCachingBucketEnabled {
cleanerBucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, true, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "cleaner"}, c.registerer))
if err != nil {
return errors.Wrap(err, "create caching bucket for cleaner")
}
}

// Create the users scanner.
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)
c.usersScanner = cortex_tsdb.NewUsersScanner(cleanerBucketClient, c.ownUserForCleanUp, c.parentLogger)

var cleanerRingLifecyclerID = "default-cleaner"
// Initialize the compactors ring if sharding is enabled.
Expand Down Expand Up @@ -727,18 +738,13 @@ func (c *Compactor) starting(ctx context.Context) error {
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
ShardingStrategy: c.compactorCfg.ShardingStrategy,
CompactionStrategy: c.compactorCfg.CompactionStrategy,
}, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
}, cleanerBucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)

if c.compactorCfg.CachingBucketEnabled {
matchers := cortex_tsdb.NewMatchers()
// Do not cache tenant deletion marker and block deletion marker for compactor
matchers.SetMetaFileMatcher(func(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename)
})
c.bucketClient, err = cortex_tsdb.CreateCachingBucket(cortex_tsdb.ChunksCacheConfig{}, c.storageCfg.BucketStore.MetadataCache, matchers, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
c.bucketClient, err = cortex_tsdb.CreateCachingBucketForCompactor(c.storageCfg.BucketStore.MetadataCache, false, c.bucketClient, c.logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "compactor"}, c.registerer))
if err != nil {
return errors.Wrap(err, "create caching bucket")
return errors.Wrap(err, "create caching bucket for compactor")
}
}
return nil
Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,47 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
return storecache.NewCachingBucket(bkt, cfg, logger, reg)
}

func CreateCachingBucketForCompactor(metadataConfig MetadataCacheConfig, cleaner bool, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this and CreateCachingBucket have a inner shared logic? They seems very similar, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right they are similar. I don't see a good way to share the same function for the two. The main difference is thosecfg.CacheXXX calls can be different for read and compaction path

matchers := NewMatchers()
// Do not cache block deletion marker for compactor
matchers.SetMetaFileMatcher(func(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile)
})
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

metadataCache, err := createMetadataCache("metadata-cache", &metadataConfig.MetadataCacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "metadata-cache")
}
if metadataCache != nil {
cachingConfigured = true
metadataCache = cache.NewTracingCache(metadataCache)

codec := snappyIterCodec{storecache.JSONIterCodec{}}
cfg.CacheIter("tenants-iter", metadataCache, matchers.GetTenantsIterMatcher(), metadataConfig.TenantsListTTL, codec, "")
cfg.CacheAttributes("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileAttributesTTL)

// Don't cache bucket index get and tenant blocks iter if it is cleaner.
if !cleaner {
cfg.CacheExists("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL)
cfg.CacheGet("bucket-index", metadataCache, matchers.GetBucketIndexMatcher(), metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0)
cfg.CacheIter("tenant-blocks-iter", metadataCache, matchers.GetTenantBlocksIterMatcher(), metadataConfig.TenantBlocksListTTL, codec, "")
} else {
// Cache only GET for metadata and don't cache exists and not exists.
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, 0, 0)
}
}

if !cachingConfigured {
// No caching is configured.
return bkt, nil
}

return storecache.NewCachingBucket(bkt, cfg, logger, reg)
}

func createMetadataCache(cacheName string, cacheBackend *MetadataCacheBackend, logger log.Logger, reg prometheus.Registerer) (cache.Cache, error) {
switch cacheBackend.Backend {
case "":
Expand Down
Loading