Skip to content

Commit ccd25f2

Browse files
authored
small refactor to avoid setting the same limits 2 times on compactor (#5526)
Signed-off-by: Alan Protasio <[email protected]>
1 parent c56d882 commit ccd25f2

File tree

3 files changed

+8
-11
lines changed

3 files changed

+8
-11
lines changed

pkg/compactor/compactor.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,11 @@ type Compactor struct {
283283

284284
compactorCfg Config
285285
storageCfg cortex_tsdb.BlocksStorageConfig
286-
cfgProvider ConfigProvider
287286
logger log.Logger
288287
parentLogger log.Logger
289288
registerer prometheus.Registerer
290289
allowedTenants *util.AllowedTenants
291-
limits Limits
290+
limits *validation.Overrides
292291

293292
// Functions that creates bucket client, grouper, planner and compactor using the context.
294293
// Useful for injecting mock objects from tests.
@@ -339,7 +338,7 @@ type Compactor struct {
339338
}
340339

341340
// NewCompactor makes a new Compactor.
342-
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits Limits) (*Compactor, error) {
341+
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) {
343342
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
344343
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
345344
}
@@ -362,7 +361,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
362361
}
363362
}
364363

365-
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
364+
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
366365
if err != nil {
367366
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
368367
}
@@ -373,13 +372,12 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
373372
func newCompactor(
374373
compactorCfg Config,
375374
storageCfg cortex_tsdb.BlocksStorageConfig,
376-
cfgProvider ConfigProvider,
377375
logger log.Logger,
378376
registerer prometheus.Registerer,
379377
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
380378
blocksGrouperFactory BlocksGrouperFactory,
381379
blocksCompactorFactory BlocksCompactorFactory,
382-
limits Limits,
380+
limits *validation.Overrides,
383381
) (*Compactor, error) {
384382
var remainingPlannedCompactions prometheus.Gauge
385383
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
@@ -391,7 +389,6 @@ func newCompactor(
391389
c := &Compactor{
392390
compactorCfg: compactorCfg,
393391
storageCfg: storageCfg,
394-
cfgProvider: cfgProvider,
395392
parentLogger: logger,
396393
logger: log.With(logger, "component", "compactor"),
397394
registerer: registerer,
@@ -510,7 +507,7 @@ func (c *Compactor) starting(ctx context.Context) error {
510507
CleanupConcurrency: c.compactorCfg.CleanupConcurrency,
511508
BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled,
512509
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
513-
}, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer)
510+
}, c.bucketClient, c.usersScanner, c.limits, c.parentLogger, c.registerer)
514511

515512
// Initialize the compactors ring if sharding is enabled.
516513
if c.compactorCfg.ShardingEnabled {
@@ -765,7 +762,7 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
765762
}
766763

767764
func (c *Compactor) compactUser(ctx context.Context, userID string) error {
768-
bucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
765+
bucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.limits)
769766

770767
reg := prometheus.NewRegistry()
771768
defer c.syncerMetrics.gatherThanosSyncerMetrics(reg)

pkg/compactor/compactor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1663,7 +1663,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li
16631663
blocksGrouperFactory = DefaultBlocksGrouperFactory
16641664
}
16651665

1666-
c, err := newCompactor(compactorCfg, storageCfg, overrides, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, overrides)
1666+
c, err := newCompactor(compactorCfg, storageCfg, logger, registry, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, overrides)
16671667
require.NoError(t, err)
16681668

16691669
return c, tsdbCompactor, tsdbPlanner, logs, registry

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@ func (t *Cortex) initAlertManager() (serv services.Service, err error) {
656656
func (t *Cortex) initCompactor() (serv services.Service, err error) {
657657
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
658658

659-
t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)
659+
t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides)
660660
if err != nil {
661661
return
662662
}

0 commit comments

Comments
 (0)