Skip to content

Fixed no compact block got grouped in shuffle sharding grouper #5055

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Build ARM docker images. #5041
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055

## 1.14.0 2022-12-02

Expand Down
10 changes: 6 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string, _ *compact.GatherNoCompactionMarkFilter) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -67,7 +67,7 @@ var (
cfg.BlocksFetchConcurrency)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Grouper {
return NewShuffleShardingGrouper(
ctx,
logger,
Expand All @@ -91,7 +91,8 @@ var (
cfg.CompactionConcurrency,
cfg.BlockVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed)
blockVisitMarkerWriteFailed,
noCompactionMarkFilter.NoCompactMarkedBlocks)
}

DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
Expand Down Expand Up @@ -138,6 +139,7 @@ type BlocksGrouperFactory func(
ringLifecycler *ring.Lifecycler,
limit Limits,
userID string,
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
) compact.Grouper

// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
Expand Down Expand Up @@ -814,7 +816,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID),
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter),
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed),
c.blocksCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
Expand Down
11 changes: 9 additions & 2 deletions pkg/compactor/shuffle_sharding_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type ShuffleShardingGrouper struct {
blockVisitMarkerTimeout time.Duration
blockVisitMarkerReadFailed prometheus.Counter
blockVisitMarkerWriteFailed prometheus.Counter

noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark
}

func NewShuffleShardingGrouper(
Expand All @@ -79,6 +81,7 @@ func NewShuffleShardingGrouper(
blockVisitMarkerTimeout time.Duration,
blockVisitMarkerReadFailed prometheus.Counter,
blockVisitMarkerWriteFailed prometheus.Counter,
noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark,
) *ShuffleShardingGrouper {
if logger == nil {
logger = log.NewNopLogger()
Expand Down Expand Up @@ -129,17 +132,21 @@ func NewShuffleShardingGrouper(
blockVisitMarkerTimeout: blockVisitMarkerTimeout,
blockVisitMarkerReadFailed: blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed,
noCompBlocksFunc: noCompBlocksFunc,
}
}

// Groups function modified from https://github.com/cortexproject/cortex/pull/2616
func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) {
noCompactMarked := g.noCompBlocksFunc()
// First of all we have to group blocks using the Thanos default
// grouping (based on downsample resolution + external labels).
mainGroups := map[string][]*metadata.Meta{}
for _, b := range blocks {
key := b.Thanos.GroupKey()
mainGroups[key] = append(mainGroups[key], b)
if _, excluded := noCompactMarked[b.ULID]; !excluded {
key := b.Thanos.GroupKey()
mainGroups[key] = append(mainGroups[key], b)
}
}

// For each group, we have to further split it into set of blocks
Expand Down
24 changes: 22 additions & 2 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
compactorID string
isExpired bool
}
expected [][]ulid.ULID
metrics string
expected [][]ulid.ULID
metrics string
noCompactBlocks map[ulid.ULID]*metadata.NoCompactMark
}{
"test basic grouping": {
concurrency: 3,
Expand Down Expand Up @@ -306,6 +307,20 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
cortex_compactor_remaining_planned_compactions 2
`,
},
"test should skip block with no compact marker": {
concurrency: 2,
ranges: []time.Duration{4 * time.Hour},
blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid]},
expected: [][]ulid.ULID{
{block1hto2hExt2Ulid, block0hto1hExt2Ulid},
{block1hto2hExt1Ulid, block0hto1hExt1Ulid},
},
metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted.
# TYPE cortex_compactor_remaining_planned_compactions gauge
cortex_compactor_remaining_planned_compactions 2
`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really improve such test cases, like using CollectAndCompare is better IMHO.
This is just a note, we don't need to do anything in this pr. Let me create an issue to track it.

noCompactBlocks: map[ulid.ULID]*metadata.NoCompactMark{block2hto3hExt1Ulid: {}},
},
}

for testName, testData := range tests {
Expand Down Expand Up @@ -364,6 +379,10 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
bkt.MockUpload(mock.Anything, nil)
bkt.MockGet(mock.Anything, "", nil)

noCompactFilter := func() map[ulid.ULID]*metadata.NoCompactMark {
return testData.noCompactBlocks
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g := NewShuffleShardingGrouper(
Expand All @@ -390,6 +409,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
blockVisitMarkerTimeout,
blockVisitMarkerReadFailed,
blockVisitMarkerWriteFailed,
noCompactFilter,
)
actual, err := g.Groups(testData.blocks)
require.NoError(t, err)
Expand Down