Skip to content

Commit db57caa

Browse files
committed
Let blocks_cleaner delete blocks concurrently
Signed-off-by: wangguoliang <[email protected]>
1 parent 061b348 commit db57caa

File tree

6 files changed

+71
-23
lines changed

6 files changed

+71
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
66
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
77
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
8+
* [ENHANCEMENT] Let blocks_cleaner delete blocks concurrently. And added `-compactor.blocks-delete-concurrency` allowing to configure number of go routines for deleting blocks during compaction. #5028
89
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
910
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
1011
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

docs/blocks-storage/compactor.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ compactor:
162162
# CLI flag: -compactor.blocks-fetch-concurrency
163163
[blocks_fetch_concurrency: <int> | default = 3]
164164

165+
# Number of goroutines to use when deleting blocks from object storage when
166+
# compacting.
167+
# CLI flag: -compactor.blocks-delete-concurrency
168+
[blocks_delete_concurrency: <int> | default = 16]
169+
165170
# When enabled, at compactor startup the bucket will be scanned and all found
166171
# deletion marks inside the block location will be copied to the markers
167172
# global location too. This option can (and should) be safely disabled as soon

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3811,6 +3811,11 @@ The `compactor_config` configures the compactor for the blocks storage.
38113811
# CLI flag: -compactor.blocks-fetch-concurrency
38123812
[blocks_fetch_concurrency: <int> | default = 3]
38133813
3814+
# Number of goroutines to use when deleting blocks from object storage when
3815+
# compacting.
3816+
# CLI flag: -compactor.blocks-delete-concurrency
3817+
[blocks_delete_concurrency: <int> | default = 16]
3818+
38143819
# When enabled, at compactor startup the bucket will be scanned and all found
38153820
# deletion marks inside the block location will be copied to the markers global
38163821
# location too. This option can (and should) be safely disabled as soon as the

pkg/compactor/blocks_cleaner.go

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package compactor
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78

89
"github.com/go-kit/log"
@@ -24,12 +25,17 @@ import (
2425
"github.com/cortexproject/cortex/pkg/util/services"
2526
)
2627

28+
const (
29+
defaultDeleteBlocksConcurrency = 16
30+
)
31+
2732
type BlocksCleanerConfig struct {
2833
DeletionDelay time.Duration
2934
CleanupInterval time.Duration
3035
CleanupConcurrency int
3136
BlockDeletionMarksMigrationEnabled bool // TODO Discuss whether we should remove it in Cortex 1.8.0 and document that upgrading to 1.7.0 before 1.8.0 is required.
3237
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
38+
DeleteBlocksConcurrency int
3339
}
3440

3541
type BlocksCleaner struct {
@@ -344,23 +350,34 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
344350

345351
// Delete blocks marked for deletion. We iterate over a copy of deletion marks because
346352
// we'll need to manipulate the index (removing blocks which get deleted).
353+
blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks))
354+
var mux sync.Mutex
347355
for _, mark := range idx.BlockDeletionMarks.Clone() {
348356
if time.Since(mark.GetDeletionTime()).Seconds() <= c.cfg.DeletionDelay.Seconds() {
349357
continue
350358
}
359+
blocksToDelete = append(blocksToDelete, mark.ID)
360+
}
351361

352-
if err := block.Delete(ctx, userLogger, userBucket, mark.ID); err != nil {
362+
// Concurrently deletes blocks marked for deletion, and removes blocks from index.
363+
_ = concurrency.ForEach(ctx, blocksToDelete, c.cfg.DeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
364+
blockID := job.(ulid.ULID)
365+
366+
if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
353367
c.blocksFailedTotal.Inc()
354-
level.Warn(userLogger).Log("msg", "failed to delete block marked for deletion", "block", mark.ID, "err", err)
355-
continue
368+
level.Warn(userLogger).Log("msg", "failed to delete block marked for deletion", "block", blockID, "err", err)
369+
return nil
356370
}
357371

358372
// Remove the block from the bucket index too.
359-
idx.RemoveBlock(mark.ID)
373+
mux.Lock()
374+
idx.RemoveBlock(blockID)
375+
mux.Unlock()
360376

361377
c.blocksCleanedTotal.Inc()
362-
level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", mark.ID)
363-
}
378+
level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", blockID)
379+
return nil
380+
})
364381

365382
// Partial blocks with a deletion mark can be cleaned up. This is a best effort, so we don't return
366383
// error if the cleanup of partial blocks fail.
@@ -383,39 +400,51 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
383400
}
384401

385402
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
386-
// is updated accordingly.
403+
// and index are updated accordingly.
387404
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
405+
// Collect all blocks with missing meta.json into buffered channel.
406+
blocks := make([]interface{}, 0, len(partials))
407+
388408
for blockID, blockErr := range partials {
389409
// We can safely delete only blocks which are partial because the meta.json is missing.
390410
if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) {
391411
continue
392412
}
413+
blocks = append(blocks, blockID)
414+
}
415+
416+
var mux sync.Mutex
393417

418+
_ = concurrency.ForEach(ctx, blocks, c.cfg.DeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
419+
blockID := job.(ulid.ULID)
394420
// We can safely delete only partial blocks with a deletion mark.
395421
err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{})
396422
if errors.Is(err, metadata.ErrorMarkerNotFound) {
397-
continue
423+
return nil
398424
}
399425
if err != nil {
400426
level.Warn(userLogger).Log("msg", "error reading partial block deletion mark", "block", blockID, "err", err)
401-
continue
427+
return nil
402428
}
403429

404430
// Hard-delete partial blocks having a deletion mark, even if the deletion threshold has not
405431
// been reached yet.
406432
if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
407433
c.blocksFailedTotal.Inc()
408434
level.Warn(userLogger).Log("msg", "error deleting partial block marked for deletion", "block", blockID, "err", err)
409-
continue
435+
return nil
410436
}
411437

412438
// Remove the block from the bucket index too.
439+
mux.Lock()
413440
idx.RemoveBlock(blockID)
414441
delete(partials, blockID)
442+
mux.Unlock()
415443

416444
c.blocksCleanedTotal.Inc()
417445
level.Info(userLogger).Log("msg", "deleted partial block marked for deletion", "block", blockID)
418-
}
446+
return nil
447+
})
419448
}
420449

421450
// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.

pkg/compactor/blocks_cleaner_test.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
115115
CleanupConcurrency: options.concurrency,
116116
BlockDeletionMarksMigrationEnabled: options.markersMigrationEnabled,
117117
TenantCleanupDelay: options.tenantDeletionDelay,
118+
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
118119
}
119120

120121
reg := prometheus.NewPedanticRegistry()
@@ -256,9 +257,10 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
256257
}
257258

258259
cfg := BlocksCleanerConfig{
259-
DeletionDelay: deletionDelay,
260-
CleanupInterval: time.Minute,
261-
CleanupConcurrency: 1,
260+
DeletionDelay: deletionDelay,
261+
CleanupInterval: time.Minute,
262+
CleanupConcurrency: 1,
263+
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
262264
}
263265

264266
logger := log.NewNopLogger()
@@ -316,9 +318,10 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
316318
require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}")))
317319

318320
cfg := BlocksCleanerConfig{
319-
DeletionDelay: deletionDelay,
320-
CleanupInterval: time.Minute,
321-
CleanupConcurrency: 1,
321+
DeletionDelay: deletionDelay,
322+
CleanupInterval: time.Minute,
323+
CleanupConcurrency: 1,
324+
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
322325
}
323326

324327
logger := log.NewNopLogger()
@@ -365,9 +368,10 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
365368
createTSDBBlock(t, bucketClient, "user-2", 30, 40, nil)
366369

367370
cfg := BlocksCleanerConfig{
368-
DeletionDelay: time.Hour,
369-
CleanupInterval: time.Minute,
370-
CleanupConcurrency: 1,
371+
DeletionDelay: time.Hour,
372+
CleanupInterval: time.Minute,
373+
CleanupConcurrency: 1,
374+
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
371375
}
372376

373377
ctx := context.Background()
@@ -496,9 +500,10 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
496500
block4 := createTSDBBlock(t, bucketClient, "user-2", ts(-8), ts(-6), nil)
497501

498502
cfg := BlocksCleanerConfig{
499-
DeletionDelay: time.Hour,
500-
CleanupInterval: time.Minute,
501-
CleanupConcurrency: 1,
503+
DeletionDelay: time.Hour,
504+
CleanupInterval: time.Minute,
505+
CleanupConcurrency: 1,
506+
DeleteBlocksConcurrency: defaultDeleteBlocksConcurrency,
502507
}
503508

504509
ctx := context.Background()

pkg/compactor/compactor.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ type Config struct {
181181
SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
182182
BlockFilesConcurrency int `yaml:"block_files_concurrency"`
183183
BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"`
184+
BlocksDeleteConcurrency int `yaml:"blocks_delete_concurrency"`
184185

185186
// Whether the migration of block deletion marks to the global markers location is enabled.
186187
BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`
@@ -236,6 +237,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
236237
f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.")
237238
f.IntVar(&cfg.BlockFilesConcurrency, "compactor.block-files-concurrency", 10, "Number of goroutines to use when fetching/uploading block files from object storage.")
238239
f.IntVar(&cfg.BlocksFetchConcurrency, "compactor.blocks-fetch-concurrency", 3, "Number of goroutines to use when fetching blocks from object storage when compacting.")
240+
f.IntVar(&cfg.BlocksDeleteConcurrency, "compactor.blocks-delete-concurrency", 16, "Number of goroutines to use when deleting blocks from object storage when compacting.")
239241

240242
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
241243
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
@@ -505,6 +507,7 @@ func (c *Compactor) starting(ctx context.Context) error {
505507
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
506508
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
507509
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
510+
DeleteBlocksConcurrency: c.compactorCfg.BlocksDeleteConcurrency,
508511
}, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer)
509512

510513
// Initialize the compactors ring if sharding is enabled.

0 commit comments

Comments
 (0)