Skip to content

Commit b73a2bb

Browse files
committed
Adding compactor.blocks-fetch-concurrency
Signed-off-by: alanprot <[email protected]>
1 parent 2a37238 commit b73a2bb

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

pkg/compactor/compactor.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ var (
6464
garbageCollectedBlocks,
6565
blocksMarkedForNoCompaction,
6666
metadata.NoneFunc,
67-
cfg.BlockFilesConcurrency)
67+
cfg.BlockFilesConcurrency,
68+
cfg.BlocksFetchConcurrency)
6869
}
6970

7071
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
@@ -84,7 +85,8 @@ var (
8485
ringLifecycle.Addr,
8586
limits,
8687
userID,
87-
cfg.BlockFilesConcurrency)
88+
cfg.BlockFilesConcurrency,
89+
cfg.BlocksFetchConcurrency)
8890
}
8991

9092
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
@@ -166,6 +168,7 @@ type Config struct {
166168
TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"`
167169
SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"`
168170
BlockFilesConcurrency int `yaml:"block_files_concurrency"`
171+
BlocksFetchConcurrency int `yaml:"blocks_fetch_concurrency"`
169172

170173
// Whether the migration of block deletion marks to the global markers location is enabled.
171174
BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"`
@@ -216,6 +219,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
216219
f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.")
217220
f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.")
218221
f.IntVar(&cfg.BlockFilesConcurrency, "compactor.block-files-concurrency", 10, "Number of goroutines to use when fetching/uploading block files from object storage.")
222+
f.IntVar(&cfg.BlocksFetchConcurrency, "compactor.blocks-fetch-concurrency", 10, "Number of goroutines to use when fetching blocks from object storage when compacting.")
219223

220224
f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.")
221225
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type ShuffleShardingGrouper struct {
4141
limits Limits
4242
userID string
4343
blockFilesConcurrency int
44+
blocksFetchConcurrency int
4445

4546
ring ring.ReadRing
4647
ringLifecyclerAddr string
@@ -63,6 +64,7 @@ func NewShuffleShardingGrouper(
6364
limits Limits,
6465
userID string,
6566
blockFilesConcurrency int,
67+
blocksFetchConcurrency int,
6668
) *ShuffleShardingGrouper {
6769
if logger == nil {
6870
logger = log.NewNopLogger()
@@ -100,12 +102,13 @@ func NewShuffleShardingGrouper(
100102
Name: "thanos_compact_group_vertical_compactions_total",
101103
Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.",
102104
}, []string{"group"}),
103-
compactorCfg: compactorCfg,
104-
ring: ring,
105-
ringLifecyclerAddr: ringLifecyclerAddr,
106-
limits: limits,
107-
userID: userID,
108-
blockFilesConcurrency: blockFilesConcurrency,
105+
compactorCfg: compactorCfg,
106+
ring: ring,
107+
ringLifecyclerAddr: ringLifecyclerAddr,
108+
limits: limits,
109+
userID: userID,
110+
blockFilesConcurrency: blockFilesConcurrency,
111+
blocksFetchConcurrency: blocksFetchConcurrency,
109112
}
110113
}
111114

@@ -184,6 +187,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
184187
g.blocksMarkedForNoCompact,
185188
g.hashFunc,
186189
g.blockFilesConcurrency,
190+
g.blocksFetchConcurrency,
187191
)
188192
if err != nil {
189193
return nil, errors.Wrap(err, "create compaction group")

0 commit comments

Comments
 (0)