diff --git a/CHANGELOG.md b/CHANGELOG.md index 470aefe5228..725e90c2346 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686 * [CHANGE] Enable Thanos series limiter in store-gateway. #4702 +* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction ## 1.12.0 in progress diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index d52f88bb4e3..3d3bb0159f5 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -147,6 +147,11 @@ compactor: # CLI flag: -compactor.tenant-cleanup-delay [tenant_cleanup_delay: | default = 6h] + # When enabled, mark blocks containing index with out-of-order chunks for no + # compact instead of halting the compaction. + # CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled + [skip_blocks_with_out_of_order_chunks_enabled: | default = false] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers # global location too. This option can (and should) be safely disabled as soon diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7407c7c9221..e6f3c5ea7a2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5294,6 +5294,11 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.tenant-cleanup-delay [tenant_cleanup_delay: | default = 6h] +# When enabled, mark blocks containing index with out-of-order chunks for no +# compact instead of halting the compaction. +# CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled +[skip_blocks_with_out_of_order_chunks_enabled: | default = false] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers global # location too. This option can (and should) be safely disabled as soon as the diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 2661123fa1c..7084c39e952 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -51,7 +51,7 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -60,11 +60,11 @@ var ( reg, blocksMarkedForDeletion, garbageCollectedBlocks, - prometheus.NewCounter(prometheus.CounterOpts{}), + blocksMarkedForNoCompact, metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -72,7 +72,7 @@ var ( true, // Enable vertical compaction reg, blocksMarkedForDeletion, - prometheus.NewCounter(prometheus.CounterOpts{}), + blocksMarkedForNoCompact, garbageCollectedBlocks, metadata.NoneFunc, cfg) @@ -108,6 +108,7 @@ type BlocksGrouperFactory func( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -120,18 +121,19 @@ type BlocksCompactorFactory func( // Config holds the Compactor config. type Config struct { - BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` - DataDir string `yaml:"data_dir"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - CompactionRetries int `yaml:"compaction_retries"` - CompactionConcurrency int `yaml:"compaction_concurrency"` - CleanupInterval time.Duration `yaml:"cleanup_interval"` - CleanupConcurrency int `yaml:"cleanup_concurrency"` - DeletionDelay time.Duration `yaml:"deletion_delay"` - TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` + BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + DataDir string `yaml:"data_dir"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + CompactionRetries int `yaml:"compaction_retries"` + CompactionConcurrency int `yaml:"compaction_concurrency"` + CleanupInterval time.Duration `yaml:"cleanup_interval"` + CleanupConcurrency int `yaml:"cleanup_concurrency"` + DeletionDelay time.Duration `yaml:"deletion_delay"` + TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` + SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` @@ -180,6 +182,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") + f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") @@ -256,6 +259,7 @@ type Compactor struct { compactionRunInterval prometheus.Gauge blocksMarkedForDeletion prometheus.Counter garbageCollectedBlocks prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter // TSDB syncer metrics syncerMetrics *syncerMetrics @@ -361,6 +365,10 @@ func newCompactor( Name: "cortex_compactor_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", }), + blocksMarkedForNoCompact: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_blocks_marked_for_no_compact_total", + Help: "Total number of blocks marked for no compact.", + }), } if len(compactorCfg.EnabledTenants) > 0 { @@ -696,13 +704,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.blocksMarkedForNoCompact), c.blocksPlanner, c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), bucket, c.compactorCfg.CompactionConcurrency, - false, + c.compactorCfg.SkipBlocksWithOutOfOrderChunksEnabled, ) if err != nil { return errors.Wrap(err, "failed to create bucket compactor") diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 2f8c6c79640..77eea11abd4 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -17,6 +17,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -30,16 +32,17 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/testutil" "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" - cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -800,6 +803,58 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) `), testedMetrics...)) } +func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { + bucketClient, tmpDir := cortex_storage_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"}) + b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"}) + + err := testutil.PutOutOfOrderIndex(path.Join(tmpDir, "user-1", b1.String()), 10, 20) + require.NoError(t, err) + + cfg := prepareConfig() + cfg.SkipBlocksWithOutOfOrderChunksEnabled = true + c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient) + + tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) + + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: b1, + MinTime: 10, + MaxTime: 20, + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: b2, + MinTime: 20, + MaxTime: 30, + }, + }, + }, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck + + // Wait until a run has completed. + cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} { + if _, err := os.Stat(path.Join(tmpDir, "user-1", b1.String(), "no-compact-mark.json")); err == nil { + return true + } + return false + }) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_compactor_blocks_marked_for_no_compact_total Total number of blocks marked for no compact. + # TYPE cortex_compactor_blocks_marked_for_no_compact_total counter + cortex_compactor_blocks_marked_for_no_compact_total 1 + `), "cortex_compactor_blocks_marked_for_no_compact_total")) +} + func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunning(t *testing.T) { t.Parallel()