Skip to content

Add shuffle sharding for compactor #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: add-metrics
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4140,6 +4140,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -compactor.blocks-retention-period
[compactor_blocks_retention_period: <duration> | default = 0s]

# The default tenant's shard size when the shuffle-sharding strategy is used by
# the compactor. When this setting is specified in the per-tenant overrides, a
# value of 0 disables shuffle sharding for the tenant.
# CLI flag: -compactor.tenant-shard-size
[compactor_tenant_shard_size: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
15 changes: 15 additions & 0 deletions docs/guides/shuffle-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Cortex currently supports shuffle sharding in the following services:
- [Query-frontend / Query-scheduler](#query-frontend-and-query-scheduler-shuffle-sharding)
- [Store-gateway](#store-gateway-shuffle-sharding)
- [Ruler](#ruler-shuffle-sharding)
- [Compactor](#compactor-shuffle-sharding)

Shuffle sharding is **disabled by default** and needs to be explicitly enabled in the configuration.

Expand Down Expand Up @@ -154,6 +155,20 @@ Cortex ruler can run in three modes:

Note that when using sharding strategy, each rule group is evaluated by single ruler only, there is no replication.

### Compactor shuffle sharding

Cortex compactor can run in three modes:

1. **No sharding at all.** This is the most basic mode of the compactor. It is activated by using `-compactor.sharding-enabled=false` (default). In this mode every compactor will run every compaction.
2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. Each compactor will then select and evaluate only those users that it "owns".
3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, compactors use the ring to distribute workload, but compactions groups for each tenant can only be evaluated on limited number of compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides).

The Cortex compactor by default shards by tenant ID when sharding is enabled.

With shuffle sharding selected as the sharding strategy, a subset of the compactors will be used to handle a user based on the shard size.

The idea behind using the shuffle sharding strategy for the compactor is to further enable horizontal scalability and build tolerance for compactions that may take longer than the compaction interval.

## FAQ

### Does shuffle sharding add additional overhead to the KV store?
Expand Down
46 changes: 37 additions & 9 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
Expand All @@ -46,8 +47,10 @@ var (

supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errShardingRequired = errors.New("sharding must be enabled to use shuffle-sharding sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ CompactorLimits, _ string) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -59,7 +62,7 @@ var (
metadata.NoneFunc)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits CompactorLimits, userID string) compact.Grouper {
return NewShuffleShardingGrouper(
logger,
bkt,
Expand All @@ -70,7 +73,11 @@ var (
garbageCollectedBlocks,
remainingPlannedCompactions,
metadata.NoneFunc,
cfg)
cfg,
ring,
ringLifecycle.Addr,
limits,
userID)
}

DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) {
Expand Down Expand Up @@ -104,6 +111,10 @@ type BlocksGrouperFactory func(
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
remainingPlannedCompactions prometheus.Gauge,
ring *ring.Ring,
ringLifecycler *ring.Lifecycler,
limit CompactorLimits,
userID string,
) compact.Grouper

// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
Expand All @@ -114,6 +125,11 @@ type BlocksCompactorFactory func(
reg prometheus.Registerer,
) (compact.Compactor, compact.Planner, error)

// CompactorLimits defines limits used by the Compactor.
type CompactorLimits interface {
CompactorTenantShardSize(userID string) int
}

// Config holds the Compactor config.
type Config struct {
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
Expand Down Expand Up @@ -181,7 +197,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
}

func (cfg *Config) Validate() error {
func (cfg *Config) Validate(limits validation.Limits) error {
// Each block range period should be divisible by the previous one.
for i := 1; i < len(cfg.BlockRanges); i++ {
if cfg.BlockRanges[i]%cfg.BlockRanges[i-1] != 0 {
Expand All @@ -194,6 +210,14 @@ func (cfg *Config) Validate() error {
return errInvalidShardingStrategy
}

if cfg.ShardingStrategy == util.ShardingStrategyShuffle {
if !cfg.ShardingEnabled {
return errShardingRequired
} else if limits.CompactorTenantShardSize <= 0 {
return errInvalidTenantShardSize
}
}

return nil
}

Expand All @@ -214,6 +238,7 @@ type Compactor struct {
parentLogger log.Logger
registerer prometheus.Registerer
allowedTenants *util.AllowedTenants
limits CompactorLimits

// Functions that creates bucket client, grouper, planner and compactor using the context.
// Useful for injecting mock objects from tests.
Expand Down Expand Up @@ -259,7 +284,7 @@ type Compactor struct {
}

// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits CompactorLimits) (*Compactor, error) {
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
}
Expand All @@ -282,7 +307,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
}
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory)
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
if err != nil {
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
}
Expand All @@ -299,6 +324,7 @@ func newCompactor(
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
blocksGrouperFactory BlocksGrouperFactory,
blocksCompactorFactory BlocksCompactorFactory,
limits CompactorLimits,
) (*Compactor, error) {
var remainingPlannedCompactions prometheus.Gauge
if compactorCfg.ShardingStrategy == "shuffle-sharding" {
Expand Down Expand Up @@ -366,6 +392,7 @@ func newCompactor(
Help: "Total number of blocks marked for deletion by compactor.",
}),
remainingPlannedCompactions: remainingPlannedCompactions,
limits: limits,
}

if len(compactorCfg.EnabledTenants) > 0 {
Expand Down Expand Up @@ -701,7 +728,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions),
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID),
c.blocksPlanner,
c.blocksCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
Expand Down Expand Up @@ -758,8 +785,9 @@ func (c *Compactor) ownUser(userID string) (bool, error) {
return false, nil
}

// Always owned if sharding is disabled.
if !c.compactorCfg.ShardingEnabled {
// Always owned if sharding is disabled or if using shuffle-sharding as shard ownership
// is determined by the shuffle sharding grouper.
if !c.compactorCfg.ShardingEnabled || c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
return true, nil
}

Expand Down
Loading