Skip to content

Commit 4539330

Browse files
authored
Introduce cleaner visit marker (#6113)
1 parent e5f47e1 commit 4539330

10 files changed

+623
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111
3939
* [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040
4040
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
41+
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
4142
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
4243
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
4344
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018

docs/blocks-storage/compactor.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,17 @@ compactor:
295295
# CLI flag: -compactor.block-visit-marker-file-update-interval
296296
[block_visit_marker_file_update_interval: <duration> | default = 1m]
297297

298+
# How long cleaner visit marker file should be considered as expired and able
299+
# to be picked up by cleaner again. The value should be smaller than
300+
# -compactor.cleanup-interval
301+
# CLI flag: -compactor.cleaner-visit-marker-timeout
302+
[cleaner_visit_marker_timeout: <duration> | default = 10m]
303+
304+
# How frequently cleaner visit marker file should be updated when cleaning
305+
# user.
306+
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
307+
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]
308+
298309
# When enabled, index verification will ignore out of order label names.
299310
# CLI flag: -compactor.accept-malformed-index
300311
[accept_malformed_index: <boolean> | default = false]

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2232,6 +2232,16 @@ sharding_ring:
22322232
# CLI flag: -compactor.block-visit-marker-file-update-interval
22332233
[block_visit_marker_file_update_interval: <duration> | default = 1m]
22342234
2235+
# How long cleaner visit marker file should be considered as expired and able to
2236+
# be picked up by cleaner again. The value should be smaller than
2237+
# -compactor.cleanup-interval
2238+
# CLI flag: -compactor.cleaner-visit-marker-timeout
2239+
[cleaner_visit_marker_timeout: <duration> | default = 10m]
2240+
2241+
# How frequently cleaner visit marker file should be updated when cleaning user.
2242+
# CLI flag: -compactor.cleaner-visit-marker-file-update-interval
2243+
[cleaner_visit_marker_file_update_interval: <duration> | default = 5m]
2244+
22352245
# When enabled, index verification will ignore out of order label names.
22362246
# CLI flag: -compactor.accept-malformed-index
22372247
[accept_malformed_index: <boolean> | default = false]

pkg/compactor/blocks_cleaner.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ type BlocksCleaner struct {
5050
bucketClient objstore.InstrumentedBucket
5151
usersScanner *cortex_tsdb.UsersScanner
5252

53+
ringLifecyclerID string
54+
5355
// Keep track of the last owned users.
5456
lastOwnedUsers []string
5557

58+
cleanerVisitMarkerTimeout time.Duration
59+
cleanerVisitMarkerFileUpdateInterval time.Duration
60+
5661
// Metrics.
5762
runsStarted *prometheus.CounterVec
5863
runsCompleted *prometheus.CounterVec
@@ -76,15 +81,21 @@ func NewBlocksCleaner(
7681
usersScanner *cortex_tsdb.UsersScanner,
7782
cfgProvider ConfigProvider,
7883
logger log.Logger,
84+
ringLifecyclerID string,
7985
reg prometheus.Registerer,
86+
cleanerVisitMarkerTimeout time.Duration,
87+
cleanerVisitMarkerFileUpdateInterval time.Duration,
8088
blocksMarkedForDeletion *prometheus.CounterVec,
8189
) *BlocksCleaner {
8290
c := &BlocksCleaner{
83-
cfg: cfg,
84-
bucketClient: bucketClient,
85-
usersScanner: usersScanner,
86-
cfgProvider: cfgProvider,
87-
logger: log.With(logger, "component", "cleaner"),
91+
cfg: cfg,
92+
bucketClient: bucketClient,
93+
usersScanner: usersScanner,
94+
cfgProvider: cfgProvider,
95+
logger: log.With(logger, "component", "cleaner"),
96+
ringLifecyclerID: ringLifecyclerID,
97+
cleanerVisitMarkerTimeout: cleanerVisitMarkerTimeout,
98+
cleanerVisitMarkerFileUpdateInterval: cleanerVisitMarkerFileUpdateInterval,
8899
runsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
89100
Name: "cortex_compactor_block_cleanup_started_total",
90101
Help: "Total number of blocks cleanup runs started.",
@@ -246,7 +257,15 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
246257
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
247258
userLogger := util_log.WithUserID(userID, c.logger)
248259
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
260+
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
261+
if err != nil {
262+
return err
263+
}
264+
if isVisited {
265+
return nil
266+
}
249267
errChan := make(chan error, 1)
268+
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
250269
defer func() {
251270
errChan <- nil
252271
}()
@@ -273,7 +292,15 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
273292
return concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
274293
userLogger := util_log.WithUserID(userID, c.logger)
275294
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
295+
visitMarkerManager, isVisited, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
296+
if err != nil {
297+
return err
298+
}
299+
if isVisited {
300+
return nil
301+
}
276302
errChan := make(chan error, 1)
303+
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
277304
defer func() {
278305
errChan <- nil
279306
}()
@@ -307,6 +334,19 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
307334
return users, deleted, nil
308335
}
309336

337+
func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (visitMarkerManager *VisitMarkerManager, isVisited bool, err error) {
338+
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
339+
visitMarkerManager = NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker)
340+
341+
existingCleanerVisitMarker := &CleanerVisitMarker{}
342+
err = visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
343+
if err != nil && !errors.Is(err, errorVisitMarkerNotFound) {
344+
return nil, false, errors.Wrapf(err, "failed to read cleaner visit marker")
345+
}
346+
isVisited = !errors.Is(err, errorVisitMarkerNotFound) && existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout)
347+
return visitMarkerManager, isVisited, nil
348+
}
349+
310350
// Remove blocks and remaining data for tenant marked for deletion.
311351
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {
312352

pkg/compactor/blocks_cleaner_test.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
8787
Help: blocksMarkedForDeletionHelp,
8888
}, append(commonLabels, reasonLabelName))
8989

90-
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
90+
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
9191

9292
// Clean User with no error
9393
cleaner.bucketClient = bkt
@@ -194,7 +194,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
194194
Help: blocksMarkedForDeletionHelp,
195195
}, append(commonLabels, reasonLabelName))
196196

197-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
197+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
198198
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
199199
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck
200200

@@ -355,7 +355,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
355355
Help: blocksMarkedForDeletionHelp,
356356
}, append(commonLabels, reasonLabelName))
357357

358-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
358+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
359359
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
360360
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck
361361

@@ -419,7 +419,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
419419
Help: blocksMarkedForDeletionHelp,
420420
}, append(commonLabels, reasonLabelName))
421421

422-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion)
422+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", nil, time.Minute, 30*time.Second, blocksMarkedForDeletion)
423423
require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner))
424424
defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck
425425

@@ -477,7 +477,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
477477
Help: blocksMarkedForDeletionHelp,
478478
}, append(commonLabels, reasonLabelName))
479479

480-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
480+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
481481
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
482482
require.NoError(t, err)
483483
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, true))
@@ -618,7 +618,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
618618
Help: blocksMarkedForDeletionHelp,
619619
}, append(commonLabels, reasonLabelName))
620620

621-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion)
621+
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, "test-cleaner", reg, time.Minute, 30*time.Second, blocksMarkedForDeletion)
622622

623623
assertBlockExists := func(user string, block ulid.ULID, expectExists bool) {
624624
exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename))
@@ -628,6 +628,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
628628

629629
// Existing behaviour - retention period disabled.
630630
{
631+
// clean up cleaner visit marker before running test
632+
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
633+
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
634+
631635
cfgProvider.userRetentionPeriods["user-1"] = 0
632636
cfgProvider.userRetentionPeriods["user-2"] = 0
633637

@@ -662,6 +666,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
662666

663667
// Retention enabled only for a single user, but does nothing.
664668
{
669+
// clean up cleaner visit marker before running test
670+
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
671+
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
672+
665673
cfgProvider.userRetentionPeriods["user-1"] = 9 * time.Hour
666674

667675
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
@@ -677,6 +685,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
677685
// Retention enabled only for a single user, marking a single block.
678686
// Note the block won't be deleted yet due to deletion delay.
679687
{
688+
// clean up cleaner visit marker before running test
689+
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
690+
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
691+
680692
cfgProvider.userRetentionPeriods["user-1"] = 7 * time.Hour
681693

682694
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
@@ -710,6 +722,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
710722

711723
// Marking the block again, before the deletion occurs, should not cause an error.
712724
{
725+
// clean up cleaner visit marker before running test
726+
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
727+
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
728+
713729
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
714730
require.NoError(t, err)
715731
require.NoError(t, cleaner.cleanUpActiveUsers(ctx, activeUsers, false))
@@ -722,6 +738,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
722738

723739
// Reduce the deletion delay. Now the block will be deleted.
724740
{
741+
// clean up cleaner visit marker before running test
742+
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
743+
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
744+
725745
cleaner.cfg.DeletionDelay = 0
726746

727747
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)
@@ -755,6 +775,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
755775

756776
// Retention enabled for other user; test deleting multiple blocks.
757777
{
778+
// clean up cleaner visit marker before running test
779+
bucketClient.Delete(ctx, path.Join("user-1", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
780+
bucketClient.Delete(ctx, path.Join("user-2", GetCleanerVisitMarkerFilePath())) //nolint:errcheck
781+
758782
cfgProvider.userRetentionPeriods["user-2"] = 5 * time.Hour
759783

760784
activeUsers, deleteUsers, err := cleaner.scanUsers(ctx)

pkg/compactor/cleaner_visit_marker.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package compactor
2+
3+
import (
4+
"fmt"
5+
"path"
6+
"time"
7+
8+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
9+
)
10+
11+
const (
12+
// CleanerVisitMarkerName is the name of cleaner visit marker file.
13+
CleanerVisitMarkerName = "cleaner-visit-marker.json"
14+
// CleanerVisitMarkerVersion1 is the current supported version of cleaner visit mark file.
15+
CleanerVisitMarkerVersion1 = 1
16+
)
17+
18+
type CleanerVisitMarker struct {
19+
CompactorID string `json:"compactorID"`
20+
Status VisitStatus `json:"status"`
21+
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
22+
VisitTime int64 `json:"visitTime"`
23+
// Version of the file.
24+
Version int `json:"version"`
25+
}
26+
27+
func NewCleanerVisitMarker(compactorID string) *CleanerVisitMarker {
28+
return &CleanerVisitMarker{
29+
CompactorID: compactorID,
30+
Version: CleanerVisitMarkerVersion1,
31+
}
32+
}
33+
34+
func (b *CleanerVisitMarker) IsExpired(cleanerVisitMarkerTimeout time.Duration) bool {
35+
return !time.Now().Before(time.Unix(b.VisitTime, 0).Add(cleanerVisitMarkerTimeout))
36+
}
37+
38+
func (b *CleanerVisitMarker) IsVisited(cleanerVisitMarkerTimeout time.Duration) bool {
39+
return !(b.GetStatus() == Completed) && !(b.GetStatus() == Failed) && !b.IsExpired(cleanerVisitMarkerTimeout)
40+
}
41+
42+
func (b *CleanerVisitMarker) GetStatus() VisitStatus {
43+
return b.Status
44+
}
45+
46+
func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string {
47+
return GetCleanerVisitMarkerFilePath()
48+
}
49+
50+
func (b *CleanerVisitMarker) UpdateStatus(ownerIdentifier string, status VisitStatus) {
51+
b.CompactorID = ownerIdentifier
52+
b.Status = status
53+
b.VisitTime = time.Now().Unix()
54+
}
55+
56+
func (b *CleanerVisitMarker) String() string {
57+
return fmt.Sprintf("compactor_id=%s status=%s visit_time=%s",
58+
b.CompactorID,
59+
b.Status,
60+
time.Unix(b.VisitTime, 0).String(),
61+
)
62+
}
63+
64+
func GetCleanerVisitMarkerFilePath() string {
65+
return path.Join(bucketindex.MarkersPathname, CleanerVisitMarkerName)
66+
}

pkg/compactor/compactor.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,10 @@ type Config struct {
216216
BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"`
217217
BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"`
218218

219+
// Cleaner visit marker file config
220+
CleanerVisitMarkerTimeout time.Duration `yaml:"cleaner_visit_marker_timeout"`
221+
CleanerVisitMarkerFileUpdateInterval time.Duration `yaml:"cleaner_visit_marker_file_update_interval"`
222+
219223
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
220224
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
221225
}
@@ -255,6 +259,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
255259
f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.")
256260
f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.")
257261

262+
f.DurationVar(&cfg.CleanerVisitMarkerTimeout, "compactor.cleaner-visit-marker-timeout", 10*time.Minute, "How long cleaner visit marker file should be considered as expired and able to be picked up by cleaner again. The value should be smaller than -compactor.cleanup-interval")
263+
f.DurationVar(&cfg.CleanerVisitMarkerFileUpdateInterval, "compactor.cleaner-visit-marker-file-update-interval", 5*time.Minute, "How frequently cleaner visit marker file should be updated when cleaning user.")
264+
258265
f.BoolVar(&cfg.AcceptMalformedIndex, "compactor.accept-malformed-index", false, "When enabled, index verification will ignore out of order label names.")
259266
f.BoolVar(&cfg.CachingBucketEnabled, "compactor.caching-bucket-enabled", false, "When enabled, caching bucket will be used for compactor, except cleaner service, which serves as the source of truth for block status")
260267
}
@@ -522,15 +529,7 @@ func (c *Compactor) starting(ctx context.Context) error {
522529
// Create the users scanner.
523530
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)
524531

525-
// Create the blocks cleaner (service).
526-
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
527-
DeletionDelay: c.compactorCfg.DeletionDelay,
528-
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
529-
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
530-
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
531-
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
532-
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer, c.compactorMetrics.syncerBlocksMarkedForDeletion)
533-
532+
var cleanerRingLifecyclerID = "default-cleaner"
534533
// Initialize the compactors ring if sharding is enabled.
535534
if c.compactorCfg.ShardingEnabled {
536535
lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig()
@@ -539,6 +538,8 @@ func (c *Compactor) starting(ctx context.Context) error {
539538
return errors.Wrap(err, "unable to initialize compactor ring lifecycler")
540539
}
541540

541+
cleanerRingLifecyclerID = c.ringLifecycler.ID
542+
542543
c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer))
543544
if err != nil {
544545
return errors.Wrap(err, "unable to initialize compactor ring")
@@ -588,6 +589,16 @@ func (c *Compactor) starting(ctx context.Context) error {
588589
}
589590
}
590591

592+
// Create the blocks cleaner (service).
593+
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
594+
DeletionDelay: c.compactorCfg.DeletionDelay,
595+
CleanupInterval: util.DurationWithJitter(c.compactorCfg.CleanupInterval, 0.1),
596+
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
597+
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
598+
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
599+
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
600+
c.compactorMetrics.syncerBlocksMarkedForDeletion)
601+
591602
// Ensure an initial cleanup occurred before starting the compactor.
592603
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
593604
c.ringSubservices.StopAsync()

0 commit comments

Comments
 (0)