Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Ruler: Add support for group labels. #6665
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695
Expand Down
5 changes: 3 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3708,9 +3708,10 @@ query_rejection:

# The default tenant's shard size when the shuffle-sharding strategy is used by
# the compactor. When this setting is specified in the per-tenant overrides, a
# value of 0 disables shuffle sharding for the tenant.
# value of 0 disables shuffle sharding for the tenant. If the value is < 1 and >
# 0 the shard size will be a percentage of the total compactors
# CLI flag: -compactor.tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]
[compactor_tenant_shard_size: <float> | default = 0]

# Index size limit in bytes for each compaction partition. 0 means no limit
# CLI flag: -compactor.partition-index-size-bytes
Expand Down
9 changes: 7 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ type BlockDeletableCheckerFactory func(

// Limits defines limits used by the Compactor.
type Limits interface {
CompactorTenantShardSize(userID string) int
CompactorTenantShardSize(userID string) float64
CompactorPartitionIndexSizeBytes(userID string) int64
CompactorPartitionSeriesCount(userID string) int64
}
Expand Down Expand Up @@ -1122,6 +1122,10 @@ func (c *Compactor) ownUserForCleanUp(userID string) (bool, error) {
return c.ownUser(userID, true)
}

func (c *Compactor) getShardSizeForUser(userID string) int {
return util.DynamicShardSize(c.limits.CompactorTenantShardSize(userID), c.ring.InstancesCount())
}

func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) {
if !c.allowedTenants.IsAllowed(userID) {
return false, nil
Expand All @@ -1135,7 +1139,8 @@ func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) {
// If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by a subring
// Cleanup should only be owned by a single compactor, as there could be race conditions during block deletion
if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID))
shardSize := c.getShardSizeForUser(userID)
subRing := c.ring.ShuffleShard(userID, shardSize)

rs, err := subRing.GetAllHealthy(RingOp)
if err != nil {
Expand Down
182 changes: 182 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -2206,3 +2207,184 @@ func TestCompactor_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T
return len(healthy) == 1 && len(unhealthy) == 0
})
}

func TestCompactor_GetShardSizeForUser(t *testing.T) {

// User to shardsize
users := []struct {
userID string
tenantShardSize float64
expectedShardSize int
expectedShardSizeAfterScaleup int
}{
{
userID: "user-1",
tenantShardSize: 6,
expectedShardSize: 6,
expectedShardSizeAfterScaleup: 6,
},
{
userID: "user-2",
tenantShardSize: 1,
expectedShardSize: 1,
expectedShardSizeAfterScaleup: 1,
},
{
userID: "user-3",
tenantShardSize: 0.4,
expectedShardSize: 2,
expectedShardSizeAfterScaleup: 4,
},
{
userID: "user-4",
tenantShardSize: 0.01,
expectedShardSize: 1,
expectedShardSizeAfterScaleup: 1,
},
}

inmem := objstore.WithNoopInstr(objstore.NewInMemBucket())
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{})

for _, user := range users {
id, err := ulid.New(ulid.Now(), rand.Reader)
require.NoError(t, err)
require.NoError(t, inmem.Upload(context.Background(), user.userID+"/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String()))))
limits := validation.Limits{}
flagext.DefaultValues(&limits)
limits.CompactorTenantShardSize = user.tenantShardSize
tenantLimits.setLimits(user.userID, &limits)
}

// Create a shared KV Store
kvstore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

// Create compactors
var compactors []*Compactor
for i := 0; i < 5; i++ {
// Setup config
cfg := prepareConfig()

cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i)
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
cfg.ShardingRing.WaitStabilityMinDuration = time.Second
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

// Compactor will get its own temp dir for storing local files.
overrides, _ := validation.NewOverrides(validation.Limits{}, tenantLimits)
compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
compactor.limits = overrides
//compactor.limits.tenantLimits = tenantLimits
compactor.logger = log.NewNopLogger()
defer services.StopAndAwaitTerminated(context.Background(), compactor) //nolint:errcheck

compactors = append(compactors, compactor)

// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

// Start all compactors
for _, c := range compactors {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
}

// Wait until a run has been completed on each compactor
for _, c := range compactors {
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
return prom_testutil.ToFloat64(c.CompactionRunsCompleted) >= 1
})
}

assert.Equal(t, 5, compactors[0].ring.InstancesCount())

for _, user := range users {
assert.Equal(t, user.expectedShardSize, compactors[0].getShardSizeForUser(user.userID))
}

// Scaleup compactors
// Create compactors
var compactors2 []*Compactor
for i := 5; i < 10; i++ {
// Setup config
cfg := prepareConfig()

cfg.ShardingEnabled = true
cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i)
cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i)
cfg.ShardingRing.WaitStabilityMinDuration = time.Second
cfg.ShardingRing.WaitStabilityMaxDuration = 5 * time.Second
cfg.ShardingRing.KVStore.Mock = kvstore

// Compactor will get its own temp dir for storing local files.
overrides, _ := validation.NewOverrides(validation.Limits{}, tenantLimits)
compactor, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil)
compactor.limits = overrides
//compactor.limits.tenantLimits = tenantLimits
compactor.logger = log.NewNopLogger()
defer services.StopAndAwaitTerminated(context.Background(), compactor) //nolint:errcheck

compactors2 = append(compactors2, compactor)

// Mock the planner as if there's no compaction to do,
// in order to simplify tests (all in all, we just want to
// test our logic and not TSDB compactor which we expect to
// be already tested).
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
}

// Start all compactors
for _, c := range compactors2 {
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
}

// Wait until a run has been completed on each compactor
for _, c := range compactors2 {
cortex_testutil.Poll(t, 120*time.Second, true, func() interface{} {
return prom_testutil.ToFloat64(c.CompactionRunsCompleted) >= 1
})
}

assert.Equal(t, 10, compactors[0].ring.InstancesCount())

for _, user := range users {
assert.Equal(t, user.expectedShardSizeAfterScaleup, compactors[0].getShardSizeForUser(user.userID))
}
}

type mockTenantLimits struct {
limits map[string]*validation.Limits
m sync.Mutex
}

// newMockTenantLimits creates a new mockTenantLimits that returns per-tenant limits based on
// the given map
func newMockTenantLimits(limits map[string]*validation.Limits) *mockTenantLimits {
return &mockTenantLimits{
limits: limits,
}
}

func (l *mockTenantLimits) ByUserID(userID string) *validation.Limits {
l.m.Lock()
defer l.m.Unlock()
return l.limits[userID]
}

func (l *mockTenantLimits) AllByUserID() map[string]*validation.Limits {
l.m.Lock()
defer l.m.Unlock()
return l.limits
}

func (l *mockTenantLimits) setLimits(userID string, limits *validation.Limits) {
l.m.Lock()
defer l.m.Unlock()
l.limits[userID] = limits
}
4 changes: 3 additions & 1 deletion pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
)

var (
Expand Down Expand Up @@ -146,7 +147,8 @@ func (g *PartitionCompactionGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta)

// Check whether this compactor exists on the subring based on user ID
func (g *PartitionCompactionGrouper) checkSubringForCompactor() (bool, error) {
subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID))
shardSize := util.DynamicShardSize(g.limits.CompactorTenantShardSize(g.userID), g.ring.InstancesCount())
subRing := g.ring.ShuffleShard(g.userID, shardSize)

rs, err := subRing.GetAllHealthy(RingOp)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/compactor/shuffle_sharding_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thanos-io/thanos/pkg/compact"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
)

type ShuffleShardingGrouper struct {
Expand Down Expand Up @@ -279,7 +280,8 @@ func (g *ShuffleShardingGrouper) isGroupVisited(blocks []*metadata.Meta, compact

// Check whether this compactor exists on the subring based on user ID
func (g *ShuffleShardingGrouper) checkSubringForCompactor() (bool, error) {
subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID))
shardSize := util.DynamicShardSize(g.limits.CompactorTenantShardSize(g.userID), g.ring.InstancesCount())
subRing := g.ring.ShuffleShard(g.userID, shardSize)

rs, err := subRing.GetAllHealthy(RingOp)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ type Limits struct {

// Compactor.
CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"`
CompactorTenantShardSize int `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"`
CompactorTenantShardSize float64 `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"`
CompactorPartitionIndexSizeBytes int64 `yaml:"compactor_partition_index_size_bytes" json:"compactor_partition_index_size_bytes"`
CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"`

Expand Down Expand Up @@ -294,7 +294,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Var(&l.RulerQueryOffset, "ruler.query-offset", "Duration to offset all rule evaluation queries per-tenant.")

f.Var(&l.CompactorBlocksRetentionPeriod, "compactor.blocks-retention-period", "Delete blocks containing samples older than the specified retention period. 0 to disable.")
f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.Float64Var(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total compactors")
// Default to 64GB because this is the hard limit of index size in Cortex
f.Int64Var(&l.CompactorPartitionIndexSizeBytes, "compactor.partition-index-size-bytes", 68719476736, "Index size limit in bytes for each compaction partition. 0 means no limit")
f.Int64Var(&l.CompactorPartitionSeriesCount, "compactor.partition-series-count", 0, "Time series count limit for each compaction partition. 0 means no limit")
Expand Down Expand Up @@ -830,7 +830,7 @@ func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration
}

// CompactorTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy.
func (o *Overrides) CompactorTenantShardSize(userID string) int {
func (o *Overrides) CompactorTenantShardSize(userID string) float64 {
return o.GetOverridesForUser(userID).CompactorTenantShardSize
}

Expand Down
Loading