Skip to content

Commit 0a69b1c

Browse files
committed
Let blocks_cleaner delete blocks concurrently
Signed-off-by: wangguoliang <[email protected]>
1 parent 8cf2583 commit 0a69b1c

File tree

2 files changed

+40
-11
lines changed

2 files changed

+40
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
66
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
77
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
8+
* [ENHANCEMENT] Let blocks_cleaner delete blocks concurrently(default 16 goroutines). #5028
89
* [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029
910
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
1011
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972

pkg/compactor/blocks_cleaner.go

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package compactor
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78

89
"github.com/go-kit/log"
@@ -24,6 +25,10 @@ import (
2425
"github.com/cortexproject/cortex/pkg/util/services"
2526
)
2627

28+
const (
29+
defaultDeleteBlocksConcurrency = 16
30+
)
31+
2732
type BlocksCleanerConfig struct {
2833
DeletionDelay time.Duration
2934
CleanupInterval time.Duration
@@ -344,23 +349,34 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
344349

345350
// Delete blocks marked for deletion. We iterate over a copy of deletion marks because
346351
// we'll need to manipulate the index (removing blocks which get deleted).
352+
blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks))
353+
var mux sync.Mutex
347354
for _, mark := range idx.BlockDeletionMarks.Clone() {
348355
if time.Since(mark.GetDeletionTime()).Seconds() <= c.cfg.DeletionDelay.Seconds() {
349356
continue
350357
}
358+
blocksToDelete = append(blocksToDelete, mark.ID)
359+
}
351360

352-
if err := block.Delete(ctx, userLogger, userBucket, mark.ID); err != nil {
361+
// Concurrently deletes blocks marked for deletion, and removes blocks from index.
362+
_ = concurrency.ForEach(ctx, blocksToDelete, defaultDeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
363+
blockID := job.(ulid.ULID)
364+
365+
if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
353366
c.blocksFailedTotal.Inc()
354-
level.Warn(userLogger).Log("msg", "failed to delete block marked for deletion", "block", mark.ID, "err", err)
355-
continue
367+
level.Warn(userLogger).Log("msg", "failed to delete block marked for deletion", "block", blockID, "err", err)
368+
return nil
356369
}
357370

358371
// Remove the block from the bucket index too.
359-
idx.RemoveBlock(mark.ID)
372+
mux.Lock()
373+
idx.RemoveBlock(blockID)
374+
mux.Unlock()
360375

361376
c.blocksCleanedTotal.Inc()
362-
level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", mark.ID)
363-
}
377+
level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", blockID)
378+
return nil
379+
})
364380

365381
// Partial blocks with a deletion mark can be cleaned up. This is a best effort, so we don't return
366382
// error if the cleanup of partial blocks fail.
@@ -383,39 +399,51 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
383399
}
384400

385401
// cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map
386-
// is updated accordingly.
402+
// and index are updated accordingly.
387403
func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) {
404+
// Collect all blocks with missing meta.json into buffered channel.
405+
blocks := make([]interface{}, 0, len(partials))
406+
388407
for blockID, blockErr := range partials {
389408
// We can safely delete only blocks which are partial because the meta.json is missing.
390409
if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) {
391410
continue
392411
}
412+
blocks = append(blocks, blockID)
413+
}
414+
415+
var mux sync.Mutex
393416

417+
_ = concurrency.ForEach(ctx, blocks, defaultDeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error {
418+
blockID := job.(ulid.ULID)
394419
// We can safely delete only partial blocks with a deletion mark.
395420
err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{})
396421
if errors.Is(err, metadata.ErrorMarkerNotFound) {
397-
continue
422+
return nil
398423
}
399424
if err != nil {
400425
level.Warn(userLogger).Log("msg", "error reading partial block deletion mark", "block", blockID, "err", err)
401-
continue
426+
return nil
402427
}
403428

404429
// Hard-delete partial blocks having a deletion mark, even if the deletion threshold has not
405430
// been reached yet.
406431
if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil {
407432
c.blocksFailedTotal.Inc()
408433
level.Warn(userLogger).Log("msg", "error deleting partial block marked for deletion", "block", blockID, "err", err)
409-
continue
434+
return nil
410435
}
411436

412437
// Remove the block from the bucket index too.
438+
mux.Lock()
413439
idx.RemoveBlock(blockID)
414440
delete(partials, blockID)
441+
mux.Unlock()
415442

416443
c.blocksCleanedTotal.Inc()
417444
level.Info(userLogger).Log("msg", "deleted partial block marked for deletion", "block", blockID)
418-
}
445+
return nil
446+
})
419447
}
420448

421449
// applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.

0 commit comments

Comments
 (0)