diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 0f70a3f3e59..31b96cf490b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4140,6 +4140,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -compactor.blocks-retention-period [compactor_blocks_retention_period: | default = 0s] +# The default tenant's shard size when the shuffle-sharding strategy is used by +# the compactor. When this setting is specified in the per-tenant overrides, a +# value of 0 disables shuffle sharding for the tenant. +# CLI flag: -compactor.tenant-shard-size +[compactor_tenant_shard_size: | default = 0] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/docs/guides/shuffle-sharding.md b/docs/guides/shuffle-sharding.md index 966b861854c..e8486f2f565 100644 --- a/docs/guides/shuffle-sharding.md +++ b/docs/guides/shuffle-sharding.md @@ -54,6 +54,7 @@ Cortex currently supports shuffle sharding in the following services: - [Query-frontend / Query-scheduler](#query-frontend-and-query-scheduler-shuffle-sharding) - [Store-gateway](#store-gateway-shuffle-sharding) - [Ruler](#ruler-shuffle-sharding) +- [Compactor](#compactor-shuffle-sharding) Shuffle sharding is **disabled by default** and needs to be explicitly enabled in the configuration. @@ -154,6 +155,20 @@ Cortex ruler can run in three modes: Note that when using sharding strategy, each rule group is evaluated by single ruler only, there is no replication. +### Compactor shuffle sharding + +Cortex compactor can run in three modes: + +1. **No sharding at all.** This is the most basic mode of the compactor. It is activated by using `-compactor.sharding-enabled=false` (default). In this mode every compactor will run every compaction. +2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. Each compactor will then select and evaluate only those users that it "owns". +3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, compactors use the ring to distribute workload, but compactions groups for each tenant can only be evaluated on limited number of compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides). + +The Cortex compactor by default shards by tenant ID when sharding is enabled. + +With shuffle sharding selected as the sharding strategy, a subset of the compactors will be used to handle a user based on the shard size. + +The idea behind using the shuffle sharding strategy for the compactor is to further enable horizontal scalability and build tolerance for compactions that may take longer than the compaction interval. + ## FAQ ### Does shuffle sharding add additional overhead to the KV store? diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 8dddac33eca..bf8a3190740 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/validation" ) const ( @@ -46,8 +47,10 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errShardingRequired = errors.New("sharding must be enabled to use shuffle-sharding sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ CompactorLimits, _ string) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -59,7 +62,7 @@ var ( metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits CompactorLimits, userID string) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -70,7 +73,11 @@ var ( garbageCollectedBlocks, remainingPlannedCompactions, metadata.NoneFunc, - cfg) + cfg, + ring, + ringLifecycle.Addr, + limits, + userID) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { @@ -104,6 +111,10 @@ type BlocksGrouperFactory func( blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, + ring *ring.Ring, + ringLifecycler *ring.Lifecycler, + limit CompactorLimits, + userID string, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -114,6 +125,11 @@ type BlocksCompactorFactory func( reg prometheus.Registerer, ) (compact.Compactor, compact.Planner, error) +// CompactorLimits defines limits used by the Compactor. +type CompactorLimits interface { + CompactorTenantShardSize(userID string) int +} + // Config holds the Compactor config. type Config struct { BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` @@ -181,7 +197,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") } -func (cfg *Config) Validate() error { +func (cfg *Config) Validate(limits validation.Limits) error { // Each block range period should be divisible by the previous one. for i := 1; i < len(cfg.BlockRanges); i++ { if cfg.BlockRanges[i]%cfg.BlockRanges[i-1] != 0 { @@ -194,6 +210,14 @@ func (cfg *Config) Validate() error { return errInvalidShardingStrategy } + if cfg.ShardingStrategy == util.ShardingStrategyShuffle { + if !cfg.ShardingEnabled { + return errShardingRequired + } else if limits.CompactorTenantShardSize <= 0 { + return errInvalidTenantShardSize + } + } + return nil } @@ -214,6 +238,7 @@ type Compactor struct { parentLogger log.Logger registerer prometheus.Registerer allowedTenants *util.AllowedTenants + limits CompactorLimits // Functions that creates bucket client, grouper, planner and compactor using the context. // Useful for injecting mock objects from tests. @@ -259,7 +284,7 @@ type Compactor struct { } // NewCompactor makes a new Compactor. -func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) { +func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits CompactorLimits) (*Compactor, error) { bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer) } @@ -282,7 +307,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi } } - cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory) + cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits) if err != nil { return nil, errors.Wrap(err, "failed to create Cortex blocks compactor") } @@ -299,6 +324,7 @@ func newCompactor( bucketClientFactory func(ctx context.Context) (objstore.Bucket, error), blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, + limits CompactorLimits, ) (*Compactor, error) { var remainingPlannedCompactions prometheus.Gauge if compactorCfg.ShardingStrategy == "shuffle-sharding" { @@ -366,6 +392,7 @@ func newCompactor( Help: "Total number of blocks marked for deletion by compactor.", }), remainingPlannedCompactions: remainingPlannedCompactions, + limits: limits, } if len(compactorCfg.EnabledTenants) > 0 { @@ -701,7 +728,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID), c.blocksPlanner, c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), @@ -758,8 +785,9 @@ func (c *Compactor) ownUser(userID string) (bool, error) { return false, nil } - // Always owned if sharding is disabled. - if !c.compactorCfg.ShardingEnabled { + // Always owned if sharding is disabled or if using shuffle-sharding as shard ownership + // is determined by the shuffle sharding grouper. + if !c.compactorCfg.ShardingEnabled || c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { return true, nil } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index da1a83cead3..56b6e77e461 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -36,6 +36,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -87,34 +88,66 @@ func TestConfig_ShouldSupportCliFlags(t *testing.T) { func TestConfig_Validate(t *testing.T) { tests := map[string]struct { - setup func(cfg *Config) - expected string + setup func(cfg *Config) + initLimits func(*validation.Limits) + expected string }{ "should pass with the default config": { - setup: func(cfg *Config) {}, - expected: "", + setup: func(cfg *Config) {}, + initLimits: func(_ *validation.Limits) {}, + expected: "", }, "should pass with only 1 block range period": { setup: func(cfg *Config) { cfg.BlockRanges = cortex_tsdb.DurationList{time.Hour} }, - expected: "", + initLimits: func(_ *validation.Limits) {}, + expected: "", }, "should fail with non divisible block range periods": { setup: func(cfg *Config) { cfg.BlockRanges = cortex_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour, 30 * time.Hour} }, - expected: errors.Errorf(errInvalidBlockRanges, 30*time.Hour, 24*time.Hour).Error(), + initLimits: func(_ *validation.Limits) {}, + expected: errors.Errorf(errInvalidBlockRanges, 30*time.Hour, 24*time.Hour).Error(), + }, + "should pass with valid shuffle sharding config": { + setup: func(cfg *Config) { + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingEnabled = true + }, + initLimits: func(limits *validation.Limits) { + limits.CompactorTenantShardSize = 1 + }, + expected: "", + }, + "should fail with shuffle sharding strategy selected without sharding enabled": { + setup: func(cfg *Config) { + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingEnabled = false + }, + initLimits: func(_ *validation.Limits) {}, + expected: errShardingRequired.Error(), + }, + "should fail with bad compactor tenant shard size": { + setup: func(cfg *Config) { + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingEnabled = true + }, + initLimits: func(_ *validation.Limits) {}, + expected: errInvalidTenantShardSize.Error(), }, } for testName, testData := range tests { t.Run(testName, func(t *testing.T) { cfg := &Config{} - flagext.DefaultValues(cfg) + limits := validation.Limits{} + flagext.DefaultValues(cfg, &limits) testData.setup(cfg) + testData.initLimits(&limits) - if actualErr := cfg.Validate(); testData.expected != "" { + if actualErr := cfg.Validate(limits); testData.expected != "" { assert.EqualError(t, actualErr, testData.expected) } else { assert.NoError(t, actualErr) @@ -130,7 +163,7 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", []string{}, nil) cfg := prepareConfig() - c, _, _, logs, registry := prepare(t, cfg, bucketClient) + c, _, _, logs, registry := prepare(t, cfg, bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) // Wait until a run has completed. @@ -275,7 +308,7 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket bucketClient := &bucket.ClientMock{} bucketClient.MockIter("", nil, errors.New("failed to iterate the bucket")) - c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient) + c, _, _, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) // Wait until all retry attempts have completed. @@ -428,7 +461,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) - c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient) + c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil) tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan")) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) @@ -479,7 +512,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) - c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient) + c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -606,7 +639,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) - c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -714,7 +747,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil) bucketClient.MockDelete("user-1/bucket-index.json.gz", nil) - c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -823,7 +856,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni cfg.ShardingRing.InstanceAddr = "1.2.3.4" cfg.ShardingRing.KVStore.Mock = consul.NewInMemoryClient(ring.GetCodec()) - c, _, tsdbPlanner, logs, _ := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, logs, _ := prepare(t, cfg, bucketClient, nil) // Mock the planner as if there's no compaction to do, // in order to simplify tests (all in all, we just want to @@ -909,7 +942,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second cfg.ShardingRing.KVStore.Mock = kvstore - c, _, tsdbPlanner, l, _ := prepare(t, cfg, bucketClient) + c, _, tsdbPlanner, l, _ := prepare(t, cfg, bucketClient, nil) defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck compactors = append(compactors, c) @@ -942,6 +975,165 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM } } +func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWithShuffleShardingAndMultipleInstancesRunning(t *testing.T) { + t.Parallel() + + numUsers := 3 + + // Setup user IDs + userIDs := make([]string, 0, numUsers) + for i := 1; i <= numUsers; i++ { + userIDs = append(userIDs, fmt.Sprintf("user-%d", i)) + } + + startTime := int64(1574776800000) + // Define blocks mapping block IDs to start and end times + blocks := map[string]map[string]int64{ + "01DTVP434PA9VFXSW2JKB3392D": { + "startTime": startTime, + "endTime": startTime + time.Hour.Milliseconds()*2, + }, + "01DTVP434PA9VFXSW2JKB3392E": { + "startTime": startTime, + "endTime": startTime + time.Hour.Milliseconds()*2, + }, + "01DTVP434PA9VFXSW2JKB3392F": { + "startTime": startTime + time.Hour.Milliseconds()*2, + "endTime": startTime + time.Hour.Milliseconds()*4, + }, + "01DTVP434PA9VFXSW2JKB3392G": { + "startTime": startTime + time.Hour.Milliseconds()*2, + "endTime": startTime + time.Hour.Milliseconds()*4, + }, + // Add another new block as the final block so that the previous groups will be planned for compaction + "01DTVP434PA9VFXSW2JKB3392H": { + "startTime": startTime + time.Hour.Milliseconds()*4, + "endTime": startTime + time.Hour.Milliseconds()*6, + }, + } + + // Mock the bucket to contain all users, each one with five blocks, 2 sets of overlapping blocks and 1 separate block. + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", userIDs, nil) + + // Keys with a value greater than 1 will be groups that should be compacted + groupHashes := make(map[uint32]int) + for _, userID := range userIDs { + blockDirectory := []string{} + + for blockID, blockTimes := range blocks { + bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil) + bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil) + blockDirectory = append(blockDirectory, userID+"/"+blockID) + + // Get all of the unique group hashes so that they can be used to ensure all groups were compacted + groupHash := hashGroup(userID, blockTimes["startTime"], blockTimes["endTime"]) + groupHashes[groupHash]++ + } + + bucketClient.MockIter(userID+"/", blockDirectory, nil) + bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) + bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) + } + + // Create a shared KV Store + kvstore := consul.NewInMemoryClient(ring.GetCodec()) + + // Create four compactors + var compactors []*Compactor + var logs []*concurrency.SyncBuffer + + for i := 1; i <= 4; i++ { + cfg := prepareConfig() + cfg.ShardingEnabled = true + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.ShardingRing.InstanceID = fmt.Sprintf("compactor-%d", i) + cfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + cfg.ShardingRing.WaitStabilityMinDuration = 3 * time.Second + cfg.ShardingRing.WaitStabilityMaxDuration = 10 * time.Second + cfg.ShardingRing.KVStore.Mock = kvstore + + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.CompactorTenantShardSize = 3 + + c, _, tsdbPlanner, l, _ := prepare(t, cfg, bucketClient, limits) + defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck + + compactors = append(compactors, c) + logs = append(logs, l) + + // Mock the planner as if there's no compaction to do, + // in order to simplify tests (all in all, we just want to + // test our logic and not TSDB compactor which we expect to + // be already tested). + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + } + + // Start all compactors + for _, c := range compactors { + require.NoError(t, c.StartAsync(context.Background())) + } + // Wait for all the compactors to get into the Running state without errors. + // Cannot use StartAndAwaitRunning as this would cause the compactions to start before + // all the compactors are initialized + for _, c := range compactors { + require.NoError(t, c.AwaitRunning(context.Background())) + } + + // Wait until a run has been completed on each compactor + for _, c := range compactors { + cortex_testutil.Poll(t, 60*time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + } + + // Ensure that each group was only compacted by exactly one compactor + for groupHash, blockCount := range groupHashes { + + l, found, err := checkLogsForCompaction(compactors, logs, groupHash) + require.NoError(t, err) + + // If the blockCount < 2 then the group shouldn't have been compacted, therefore not found in the logs + if blockCount < 2 { + assert.False(t, found) + } else { + assert.Contains(t, l.String(), fmt.Sprintf(`msg="found compactable group for user" group_hash=%d`, groupHash)) + + } + } +} + +// checkLogsForCompaction checks the logs to see if a compaction has happened on the groupHash, +// if there has been a compaction it will return the logs of the compactor that handled the group +// and will return true. Otherwise this function will return a nil value for the logs and false +// as the group was not compacted +func checkLogsForCompaction(compactors []*Compactor, logs []*concurrency.SyncBuffer, groupHash uint32) (*concurrency.SyncBuffer, bool, error) { + var log *concurrency.SyncBuffer + + // Make sure that the group_hash is only owned by a single compactor + for _, l := range logs { + owned := strings.Contains(l.String(), fmt.Sprintf(`msg="found compactable group for user" group_hash=%d`, groupHash)) + + // Ensure the group is not owned by multiple compactors + if owned && log != nil { + return nil, false, fmt.Errorf("group with group_hash=%d owned by multiple compactors", groupHash) + } + if owned { + log = l + } + } + + // Return an false if we've not been able to find it + if log == nil { + return nil, false, nil + } + + return log, true, nil +} + func createTSDBBlock(t *testing.T, bkt objstore.Bucket, userID string, minT, maxT int64, externalLabels map[string]string) ulid.ULID { // Create a temporary dir for TSDB. tempDir, err := ioutil.TempDir(os.TempDir(), "tsdb") @@ -1098,7 +1290,7 @@ func prepareConfig() Config { return compactorCfg } -func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { +func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) { storageCfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&storageCfg) @@ -1117,9 +1309,12 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* logger := log.NewLogfmtLogger(logs) registry := prometheus.NewRegistry() - var limits validation.Limits - flagext.DefaultValues(&limits) - overrides, err := validation.NewOverrides(limits, nil) + if limits == nil { + limits = &validation.Limits{} + flagext.DefaultValues(limits) + } + + overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) { @@ -1130,7 +1325,14 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket) (* return tsdbCompactor, tsdbPlanner, nil } - c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, DefaultBlocksGrouperFactory, blocksCompactorFactory) + var blocksGrouperFactory BlocksGrouperFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blocksGrouperFactory = ShuffleShardingGrouperFactory + } else { + blocksGrouperFactory = DefaultBlocksGrouperFactory + } + + c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, overrides) require.NoError(t, err) return c, tsdbCompactor, tsdbPlanner, logs, registry @@ -1184,6 +1386,32 @@ func mockBlockMetaJSON(id string) string { return string(content) } +func mockBlockMetaJSONWithTime(id string, orgID string, minTime int64, maxTime int64) string { + meta := metadata.Meta{ + Thanos: metadata.Thanos{ + Labels: map[string]string{"__org_id__": orgID}, + }, + } + + meta.BlockMeta = tsdb.BlockMeta{ + Version: 1, + ULID: ulid.MustParse(id), + MinTime: minTime, + MaxTime: maxTime, + Compaction: tsdb.BlockMetaCompaction{ + Level: 1, + Sources: []ulid.ULID{ulid.MustParse(id)}, + }, + } + + content, err := json.Marshal(meta) + if err != nil { + panic("failed to marshal mocked block meta") + } + + return string(content) +} + func mockDeletionMarkJSON(id string, deletionTime time.Time) string { meta := metadata.DeletionMark{ Version: metadata.DeletionMarkVersion1, @@ -1233,7 +1461,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) { cfg.ShardingRing.KVStore.Mock = kvstore // Each compactor will get its own temp dir for storing local files. - c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem) + c, _, tsdbPlanner, _, _ := prepare(t, cfg, inmem, nil) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) }) @@ -1299,7 +1527,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { // Set ObservePeriod to longer than the timeout period to mock a timeout while waiting on ring to become ACTIVE cfg.ShardingRing.ObservePeriod = time.Second * 10 - c, _, _, logs, _ := prepare(t, cfg, bucketClient) + c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil) // Try to start the compactor with a bad consul kv-store. The err := services.StartAndAwaitRunning(context.Background(), c) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 2e90e87abe1..c9935744fee 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/cortexproject/cortex/pkg/ring" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" @@ -35,6 +36,11 @@ type ShuffleShardingGrouper struct { compactionFailures *prometheus.CounterVec verticalCompactions *prometheus.CounterVec compactorCfg Config + limits CompactorLimits + userID string + + ring ring.ReadRing + ringLifecyclerAddr string } func NewShuffleShardingGrouper( @@ -48,6 +54,10 @@ func NewShuffleShardingGrouper( remainingPlannedCompactions prometheus.Gauge, hashFunc metadata.HashFunc, compactorCfg Config, + ring ring.ReadRing, + ringLifecyclerAddr string, + limits CompactorLimits, + userID string, ) *ShuffleShardingGrouper { if logger == nil { logger = log.NewNopLogger() @@ -83,7 +93,11 @@ func NewShuffleShardingGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - compactorCfg: compactorCfg, + compactorCfg: compactorCfg, + ring: ring, + ringLifecyclerAddr: ringLifecyclerAddr, + limits: limits, + userID: userID, } } @@ -101,7 +115,19 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re // which we can parallelly compact. var outGroups []*compact.Group - i := 0 + // Check if this compactor is on the subring. + // If the compactor is not on the subring when using the userID as a identifier + // no plans generated below will be owned by the compactor so we can just return an empty array + // as there will be no planned groups + onSubring, err := g.checkSubringForCompactor() + if err != nil { + return nil, errors.Wrap(err, "unable to check sub-ring for compactor ownership") + } + if !onSubring { + level.Info(g.logger).Log("msg", "compactor is not on the current sub-ring skipping user", "user", g.userID) + return outGroups, nil + } + // Metrics for the remaining planned compactions g.remainingPlannedCompactions.Set(0) @@ -112,14 +138,20 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re continue } - // TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group - groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd) + groupHash := hashGroup(g.userID, group.rangeStart, group.rangeEnd) - g.remainingPlannedCompactions.Add(1) - groupKey := fmt.Sprintf("%v%d", groupHash, i) - i++ + if owned, err := g.ownGroup(groupHash); err != nil { + level.Warn(g.logger).Log("msg", "unable to check if user is owned by this shard", "group hash", groupHash, "err", err, "group", group.String()) + continue + } else if !owned { + level.Info(g.logger).Log("msg", "skipping group because it is not owned by this shard", "group_hash", groupHash) + continue + } + + g.remainingPlannedCompactions.Inc() + groupKey := fmt.Sprintf("%v%s", groupHash, compact.DefaultGroupKey(group.blocks[0].Thanos)) - level.Debug(g.logger).Log("msg", "found compactable group for user", "user", group.blocks[0].Thanos.Labels["__org_id__"], "plan", group.String()) + level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "group", group.String()) // All the blocks within the same group have the same downsample // resolution and external labels. @@ -179,6 +211,34 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re return outGroups, nil } +// Check whether this compactor instance owns the group. +func (g *ShuffleShardingGrouper) ownGroup(groupHash uint32) (bool, error) { + subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + + rs, err := subRing.Get(groupHash, RingOp, nil, nil, nil) + if err != nil { + return false, err + } + + if len(rs.Instances) != 1 { + return false, fmt.Errorf("unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Instances)) + } + + return rs.Instances[0].Addr == g.ringLifecyclerAddr, nil +} + +// Check whether this compactor exists on the subring based on user ID +func (g *ShuffleShardingGrouper) checkSubringForCompactor() (bool, error) { + subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + + rs, err := subRing.GetAllHealthy(RingOp) + if err != nil { + return false, err + } + + return rs.Includes(g.ringLifecyclerAddr), nil +} + // Get the hash of a group based on the UserID, and the starting and ending time of the group's range. func hashGroup(userID string, rangeStart int64, rangeEnd int64) uint32 { groupString := fmt.Sprintf("%v%v%v", userID, rangeStart, rangeEnd) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index dd1a9eb3c5c..7fc0b1e7498 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -4,10 +4,14 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" ) @@ -33,63 +37,63 @@ func TestTimeShardingGrouper_Groups(t *testing.T) { map[ulid.ULID]*metadata.Meta{ block1ulid: { BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block2ulid: { BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block3ulid: { BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block4ulid: { BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block5ulid: { BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block6ulid: { BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block7ulid: { BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block8ulid: { BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block9ulid: { BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "3"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "3"}}, }, block10ulid: { BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block11ulid: { BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, }, block12ulid: { BlockMeta: tsdb.BlockMeta{ULID: block12ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block13ulid: { BlockMeta: tsdb.BlockMeta{ULID: block13ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 20 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block14ulid: { BlockMeta: tsdb.BlockMeta{ULID: block14ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, block15ulid: { BlockMeta: tsdb.BlockMeta{ULID: block15ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, - Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, }, } @@ -142,16 +146,43 @@ func TestTimeShardingGrouper_Groups(t *testing.T) { BlockRanges: testData.ranges, } + limits := &validation.Limits{} + overrides, err := validation.NewOverrides(*limits, nil) + require.NoError(t, err) + + registerer := prometheus.NewRegistry() + remainingPlannedCompactions := promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_remaining_planned_compactions", + Help: "Total number of plans that remain to be compacted.", + }) + + // Setup mocking of the ring so that the grouper will own all the shards + rs := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Addr: "test-addr"}, + }, + } + subring := &RingMock{} + subring.On("GetAllHealthy", mock.Anything).Return(rs, nil) + subring.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(rs, nil) + + ring := &RingMock{} + ring.On("ShuffleShard", mock.Anything, mock.Anything).Return(subring, nil) + g := NewShuffleShardingGrouper(nil, nil, false, // Do not accept malformed indexes true, // Enable vertical compaction - prometheus.NewRegistry(), - nil, + registerer, nil, nil, + remainingPlannedCompactions, metadata.NoneFunc, - *compactorCfg) + *compactorCfg, + ring, + "test-addr", + overrides, + "") actual, err := g.Groups(testData.blocks) require.NoError(t, err) require.Len(t, actual, len(testData.expected)) @@ -495,3 +526,55 @@ func TestBlocksGroup_overlaps(t *testing.T) { assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) } } + +type RingMock struct { + mock.Mock +} + +func (r *RingMock) Collect(ch chan<- prometheus.Metric) {} + +func (r *RingMock) Describe(ch chan<- *prometheus.Desc) {} + +func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts, bufZones []string) (ring.ReplicationSet, error) { + args := r.Called(key, op, bufDescs, bufHosts, bufZones) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (r *RingMock) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + args := r.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (r *RingMock) ReplicationFactor() int { + return 0 +} + +func (r *RingMock) InstancesCount() int { + return 0 +} + +func (r *RingMock) ShuffleShard(identifier string, size int) ring.ReadRing { + args := r.Called(identifier, size) + return args.Get(0).(ring.ReadRing) +} + +func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) { + args := r.Called(instanceID) + return args.Get(0).(ring.InstanceState), args.Error(1) +} + +func (r *RingMock) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ring.ReadRing { + args := r.Called(identifier, size, lookbackPeriod, now) + return args.Get(0).(ring.ReadRing) +} + +func (r *RingMock) HasInstance(instanceID string) bool { + return true +} + +func (r *RingMock) CleanupShuffleShardCache(identifier string) {} diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 76fb18e0f0a..2515e26838c 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -233,7 +233,7 @@ func (c *Config) Validate(log log.Logger) error { if err := c.StoreGateway.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid store-gateway config") } - if err := c.Compactor.Validate(); err != nil { + if err := c.Compactor.Validate(c.LimitsConfig); err != nil { return errors.Wrap(err, "invalid compactor config") } if err := c.AlertmanagerStorage.Validate(); err != nil { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 488e9aadddd..fe52da1b6aa 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -731,7 +731,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) { func (t *Cortex) initCompactor() (serv services.Service, err error) { t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort - t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) + t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides) if err != nil { return } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 506e6f6fffa..371dc823940 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -93,6 +93,7 @@ type Limits struct { // Compactor. CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"` + CompactorTenantShardSize int `yaml:"compactor_tenant_shard_size" json:"compactor_tenant_shard_size"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. @@ -168,6 +169,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.") f.Var(&l.CompactorBlocksRetentionPeriod, "compactor.blocks-retention-period", "Delete blocks containing samples older than the specified retention period. 0 to disable.") + f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") // Store-gateway. f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") @@ -496,6 +498,11 @@ func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration return time.Duration(o.getOverridesForUser(userID).CompactorBlocksRetentionPeriod) } +// CompactorTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. +func (o *Overrides) CompactorTenantShardSize(userID string) int { + return o.getOverridesForUser(userID).CompactorTenantShardSize +} + // MetricRelabelConfigs returns the metric relabel configs for a given user. func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config { return o.getOverridesForUser(userID).MetricRelabelConfigs