|
64 | 64 | garbageCollectedBlocks,
|
65 | 65 | blocksMarkedForNoCompaction,
|
66 | 66 | metadata.NoneFunc,
|
67 |
| - cfg.BlockFilesConcurrency) |
| 67 | + cfg.BlockFilesConcurrency, |
| 68 | + cfg.BlocksFetchConcurrency) |
68 | 69 | }
|
69 | 70 |
|
70 | 71 | ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
|
|
84 | 85 | ringLifecycle.Addr,
|
85 | 86 | limits,
|
86 | 87 | userID,
|
87 |
| - cfg.BlockFilesConcurrency) |
| 88 | + cfg.BlockFilesConcurrency, |
| 89 | + cfg.BlocksFetchConcurrency) |
88 | 90 | }
|
89 | 91 |
|
90 | 92 | DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
|
@@ -166,6 +168,7 @@ type Config struct {
|
166 | 168 | TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"`
|
167 | 169 | SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
|
168 | 170 | BlockFilesConcurrency int `yaml:"block_files_concurrency"`
|
| 171 | + BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"` |
169 | 172 |
|
170 | 173 | // Whether the migration of block deletion marks to the global markers location is enabled.
|
171 | 174 | BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`
|
@@ -216,6 +219,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
|
216 | 219 | f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.")
|
217 | 220 | f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.")
|
218 | 221 | f.IntVar(&cfg.BlockFilesConcurrency, "compactor.block-files-concurrency", 10, "Number of goroutines to use when fetching/uploading block files from object storage.")
|
| 222 | + f.IntVar(&cfg.BlocksFetchConcurrency, "compactor.blocks-fetch-concurrency", 3, "Number of goroutines to use when fetching blocks from object storage when compacting.") |
219 | 223 |
|
220 | 224 | f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
|
221 | 225 | f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
|
|
0 commit comments