From 0a69b1ca7a2abf73cbf2585b0f86945ba80a07b2 Mon Sep 17 00:00:00 2001 From: wangguoliang Date: Thu, 8 Dec 2022 21:04:31 +0800 Subject: [PATCH] Let blocks_cleaner delete blocks concurrently Signed-off-by: wangguoliang --- CHANGELOG.md | 1 + pkg/compactor/blocks_cleaner.go | 50 +++++++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a437992eb4..8565ccf40bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976 * [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005 * [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010 +* [ENHANCEMENT] Let blocks_cleaner delete blocks concurrently(default 16 goroutines). #5028 * [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index d9cf7ff2353..59e7fd1c404 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -3,6 +3,7 @@ package compactor import ( "context" "fmt" + "sync" "time" "github.com/go-kit/log" @@ -24,6 +25,10 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" ) +const ( + defaultDeleteBlocksConcurrency = 16 +) + type BlocksCleanerConfig struct { DeletionDelay time.Duration CleanupInterval time.Duration @@ -344,23 +349,34 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b // Delete blocks marked for deletion. We iterate over a copy of deletion marks because // we'll need to manipulate the index (removing blocks which get deleted). + blocksToDelete := make([]interface{}, 0, len(idx.BlockDeletionMarks)) + var mux sync.Mutex for _, mark := range idx.BlockDeletionMarks.Clone() { if time.Since(mark.GetDeletionTime()).Seconds() <= c.cfg.DeletionDelay.Seconds() { continue } + blocksToDelete = append(blocksToDelete, mark.ID) + } - if err := block.Delete(ctx, userLogger, userBucket, mark.ID); err != nil { + // Concurrently deletes blocks marked for deletion, and removes blocks from index. + _ = concurrency.ForEach(ctx, blocksToDelete, defaultDeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error { + blockID := job.(ulid.ULID) + + if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil { c.blocksFailedTotal.Inc() - level.Warn(userLogger).Log("msg", "failed to delete block marked for deletion", "block", mark.ID, "err", err) - continue + level.Warn(userLogger).Log("msg", "failed to delete block marked for deletion", "block", blockID, "err", err) + return nil } // Remove the block from the bucket index too. - idx.RemoveBlock(mark.ID) + mux.Lock() + idx.RemoveBlock(blockID) + mux.Unlock() c.blocksCleanedTotal.Inc() - level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", mark.ID) - } + level.Info(userLogger).Log("msg", "deleted block marked for deletion", "block", blockID) + return nil + }) // Partial blocks with a deletion mark can be cleaned up. This is a best effort, so we don't return // error if the cleanup of partial blocks fail. @@ -383,22 +399,31 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } // cleanUserPartialBlocks delete partial blocks which are safe to be deleted. The provided partials map -// is updated accordingly. +// and index are updated accordingly. func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map[ulid.ULID]error, idx *bucketindex.Index, userBucket objstore.InstrumentedBucket, userLogger log.Logger) { + // Collect all blocks with missing meta.json into buffered channel. + blocks := make([]interface{}, 0, len(partials)) + for blockID, blockErr := range partials { // We can safely delete only blocks which are partial because the meta.json is missing. if !errors.Is(blockErr, bucketindex.ErrBlockMetaNotFound) { continue } + blocks = append(blocks, blockID) + } + + var mux sync.Mutex + _ = concurrency.ForEach(ctx, blocks, defaultDeleteBlocksConcurrency, func(ctx context.Context, job interface{}) error { + blockID := job.(ulid.ULID) // We can safely delete only partial blocks with a deletion mark. err := metadata.ReadMarker(ctx, userLogger, userBucket, blockID.String(), &metadata.DeletionMark{}) if errors.Is(err, metadata.ErrorMarkerNotFound) { - continue + return nil } if err != nil { level.Warn(userLogger).Log("msg", "error reading partial block deletion mark", "block", blockID, "err", err) - continue + return nil } // Hard-delete partial blocks having a deletion mark, even if the deletion threshold has not @@ -406,16 +431,19 @@ func (c *BlocksCleaner) cleanUserPartialBlocks(ctx context.Context, partials map if err := block.Delete(ctx, userLogger, userBucket, blockID); err != nil { c.blocksFailedTotal.Inc() level.Warn(userLogger).Log("msg", "error deleting partial block marked for deletion", "block", blockID, "err", err) - continue + return nil } // Remove the block from the bucket index too. + mux.Lock() idx.RemoveBlock(blockID) delete(partials, blockID) + mux.Unlock() c.blocksCleanedTotal.Inc() level.Info(userLogger).Log("msg", "deleted partial block marked for deletion", "block", blockID) - } + return nil + }) } // applyUserRetentionPeriod marks blocks for deletion which have aged past the retention period.