diff --git a/CHANGELOG.md b/CHANGELOG.md index b79f7913477..f475fe7e479 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [FEATURE] Ruler: Add support for group labels. #6665 * [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716 * [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580 +* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738 * [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715 * [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744 * [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8d1ac8f1fae..29b5b979f86 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3708,9 +3708,10 @@ query_rejection: # The default tenant's shard size when the shuffle-sharding strategy is used by # the compactor. When this setting is specified in the per-tenant overrides, a -# value of 0 disables shuffle sharding for the tenant. +# value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > +# 0 the shard size will be a percentage of the total compactors # CLI flag: -compactor.tenant-shard-size -[compactor_tenant_shard_size: | default = 0] +[compactor_tenant_shard_size: | default = 0] # Index size limit in bytes for each compaction partition. 0 means no limit # CLI flag: -compactor.partition-index-size-bytes diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 608f3d3340b..c18a874b5df 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -244,7 +244,7 @@ type BlockDeletableCheckerFactory func( // Limits defines limits used by the Compactor. type Limits interface { - CompactorTenantShardSize(userID string) int + CompactorTenantShardSize(userID string) float64 CompactorPartitionIndexSizeBytes(userID string) int64 CompactorPartitionSeriesCount(userID string) int64 } @@ -1122,6 +1122,10 @@ func (c *Compactor) ownUserForCleanUp(userID string) (bool, error) { return c.ownUser(userID, true) } +func (c *Compactor) getShardSizeForUser(userID string) int { + return util.DynamicShardSize(c.limits.CompactorTenantShardSize(userID), c.ring.InstancesCount()) +} + func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) { if !c.allowedTenants.IsAllowed(userID) { return false, nil @@ -1135,7 +1139,8 @@ func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) { // If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by a subring // Cleanup should only be owned by a single compactor, as there could be race conditions during block deletion if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { - subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID)) + shardSize := c.getShardSizeForUser(userID) + subRing := c.ring.ShuffleShard(userID, shardSize) rs, err := subRing.GetAllHealthy(RingOp) if err != nil { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index be18521ba97..c482e6c7329 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -14,6 +14,7 @@ import ( "regexp" "strconv" "strings" + "sync" "testing" "time" @@ -2206,3 +2207,184 @@ func TestCompactor_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T return len(healthy) == 1 && len(unhealthy) == 0 }) } + +func TestCompactor_GetShardSizeForUser(t *testing.T) { + + // User to shardsize + users := []struct { + userID string + tenantShardSize float64 + expectedShardSize int + expectedShardSizeAfterScaleup int + }{ + { + userID: "user-1", + tenantShardSize: 6, + expectedShardSize: 6, + expectedShardSizeAfterScaleup: 6, + }, + { + userID: "user-2", + tenantShardSize: 1, + expectedShardSize: 1, + expectedShardSizeAfterScaleup: 1, + }, + { + userID: "user-3", + tenantShardSize: 0.4, + expectedShardSize: 2, + expectedShardSizeAfterScaleup: 4, + }, + { + userID: "user-4", + tenantShardSize: 0.01, + expectedShardSize: 1, + expectedShardSizeAfterScaleup: 1, + }, + } + + inmem := objstore.WithNoopInstr(objstore.NewInMemBucket()) + tenantLimits := newMockTenantLimits(map[string]*validation.Limits{}) + + for _, user := range users { + id, err := ulid.New(ulid.Now(), rand.Reader) + require.NoError(t, err) + require.NoError(t, inmem.Upload(context.Background(), user.userID+"/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String())))) + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.CompactorTenantShardSize = user.tenantShardSize + tenantLimits.setLimits(user.userID, &limits) + } + + // Create a shared KV Store + kvstore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Create compactors + var compactors []*Compactor + for i := 0; i < 5; i++ { + // Setup config + cfg := prepareConfig() + + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i) + cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + cfg.ShardingRing.WaitStabilityMinDuration = time.Second + cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second + cfg.ShardingRing.KVStore.Mock = kvstore + + // Compactor will get its own temp dir for storing local files. + overrides, _ := validation.NewOverrides(validation.Limits{}, tenantLimits) + compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil) + compactor.limits = overrides + //compactor.limits.tenantLimits = tenantLimits + compactor.logger = log.NewNopLogger() + defer services.StopAndAwaitTerminated(context.Background(), compactor) //nolint:errcheck + + compactors = append(compactors, compactor) + + // Mock the planner as if there's no compaction to do, + // in order to simplify tests (all in all, we just want to + // test our logic and not TSDB compactor which we expect to + // be already tested). + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + } + + // Start all compactors + for _, c := range compactors { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + } + + // Wait until a run has been completed on each compactor + for _, c := range compactors { + cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} { + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) >= 1 + }) + } + + assert.Equal(t, 5, compactors[0].ring.InstancesCount()) + + for _, user := range users { + assert.Equal(t, user.expectedShardSize, compactors[0].getShardSizeForUser(user.userID)) + } + + // Scaleup compactors + // Create compactors + var compactors2 []*Compactor + for i := 5; i < 10; i++ { + // Setup config + cfg := prepareConfig() + + cfg.ShardingEnabled = true + cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i) + cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + cfg.ShardingRing.WaitStabilityMinDuration = time.Second + cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second + cfg.ShardingRing.KVStore.Mock = kvstore + + // Compactor will get its own temp dir for storing local files. + overrides, _ := validation.NewOverrides(validation.Limits{}, tenantLimits) + compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil) + compactor.limits = overrides + //compactor.limits.tenantLimits = tenantLimits + compactor.logger = log.NewNopLogger() + defer services.StopAndAwaitTerminated(context.Background(), compactor) //nolint:errcheck + + compactors2 = append(compactors2, compactor) + + // Mock the planner as if there's no compaction to do, + // in order to simplify tests (all in all, we just want to + // test our logic and not TSDB compactor which we expect to + // be already tested). + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + } + + // Start all compactors + for _, c := range compactors2 { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + } + + // Wait until a run has been completed on each compactor + for _, c := range compactors2 { + cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} { + return prom_testutil.ToFloat64(c.CompactionRunsCompleted) >= 1 + }) + } + + assert.Equal(t, 10, compactors[0].ring.InstancesCount()) + + for _, user := range users { + assert.Equal(t, user.expectedShardSizeAfterScaleup, compactors[0].getShardSizeForUser(user.userID)) + } +} + +type mockTenantLimits struct { + limits map[string]*validation.Limits + m sync.Mutex +} + +// newMockTenantLimits creates a new mockTenantLimits that returns per-tenant limits based on +// the given map +func newMockTenantLimits(limits map[string]*validation.Limits) *mockTenantLimits { + return &mockTenantLimits{ + limits: limits, + } +} + +func (l *mockTenantLimits) ByUserID(userID string) *validation.Limits { + l.m.Lock() + defer l.m.Unlock() + return l.limits[userID] +} + +func (l *mockTenantLimits) AllByUserID() map[string]*validation.Limits { + l.m.Lock() + defer l.m.Unlock() + return l.limits +} + +func (l *mockTenantLimits) setLimits(userID string, limits *validation.Limits) { + l.m.Lock() + defer l.m.Unlock() + l.limits[userID] = limits +} diff --git a/pkg/compactor/partition_compaction_grouper.go b/pkg/compactor/partition_compaction_grouper.go index 53f7762df87..d39f0da226a 100644 --- a/pkg/compactor/partition_compaction_grouper.go +++ b/pkg/compactor/partition_compaction_grouper.go @@ -22,6 +22,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" ) var ( @@ -146,7 +147,8 @@ func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) // Check whether this compactor exists on the subring based on user ID func (g *PartitionCompactionGrouper) checkSubringForCompactor() (bool, error) { - subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + shardSize := util.DynamicShardSize(g.limits.CompactorTenantShardSize(g.userID), g.ring.InstancesCount()) + subRing := g.ring.ShuffleShard(g.userID, shardSize) rs, err := subRing.GetAllHealthy(RingOp) if err != nil { diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index f6328b8fb5b..f770756edfb 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -19,6 +19,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" ) type ShuffleShardingGrouper struct { @@ -279,7 +280,8 @@ func (g *ShuffleShardingGrouper) isGroupVisited(blocks []*metadata.Meta, compact // Check whether this compactor exists on the subring based on user ID func (g *ShuffleShardingGrouper) checkSubringForCompactor() (bool, error) { - subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + shardSize := util.DynamicShardSize(g.limits.CompactorTenantShardSize(g.userID), g.ring.InstancesCount()) + subRing := g.ring.ShuffleShard(g.userID, shardSize) rs, err := subRing.GetAllHealthy(RingOp) if err != nil { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a40d2f9d378..e598422f9f1 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -197,7 +197,7 @@ type Limits struct { // Compactor. CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"` - CompactorTenantShardSize int `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"` + CompactorTenantShardSize float64 `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"` CompactorPartitionIndexSizeBytes int64 `yaml:"compactor_partition_index_size_bytes" json:"compactor_partition_index_size_bytes"` CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` @@ -294,7 +294,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Var(&l.RulerQueryOffset, "ruler.query-offset", "Duration to offset all rule evaluation queries per-tenant.") f.Var(&l.CompactorBlocksRetentionPeriod, "compactor.blocks-retention-period", "Delete blocks containing samples older than the specified retention period. 0 to disable.") - f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") + f.Float64Var(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total compactors") // Default to 64GB because this is the hard limit of index size in Cortex f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit") f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit") @@ -830,7 +830,7 @@ func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration } // CompactorTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. -func (o *Overrides) CompactorTenantShardSize(userID string) int { +func (o *Overrides) CompactorTenantShardSize(userID string) float64 { return o.GetOverridesForUser(userID).CompactorTenantShardSize }