Skip to content

Commit b87ec68

Browse files
committed
Address comments
Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent b977bcb commit b87ec68

File tree

4 files changed

+18
-17
lines changed

4 files changed

+18
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
4444
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
4545
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
46-
* [ENHANCEMENT] Compactor: Split cleaner logic. #6827
46+
* [ENHANCEMENT] Compactor: Emit partition metrics separate from cleaner job. #6827
4747
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4848
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4949
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/compactor/blocks_cleaner.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
249249
metricsChan = make(chan *cleanerJob)
250250
defer close(metricsChan)
251251
go func() {
252-
c.runEmitMetricsWorker(ctx, metricsChan)
252+
c.runEmitPartitionMetricsWorker(ctx, metricsChan)
253253
}()
254254
}
255255

@@ -315,12 +315,12 @@ func (c *BlocksCleaner) checkRunError(runType string, err error) {
315315
}
316316
}
317317

318-
func (c *BlocksCleaner) runEmitMetricsWorker(ctx context.Context, jobChan <-chan *cleanerJob) {
318+
func (c *BlocksCleaner) runEmitPartitionMetricsWorker(ctx context.Context, jobChan <-chan *cleanerJob) {
319319
for job := range jobChan {
320320
err := concurrency.ForEachUser(ctx, job.users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
321321
userLogger := util_log.WithUserID(userID, c.logger)
322322
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
323-
c.emitUserMetrics(ctx, userLogger, userBucket, userID)
323+
c.emitUserParititionMetrics(ctx, userLogger, userBucket, userID)
324324
return nil
325325
})
326326

@@ -819,10 +819,11 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
819819
}
820820
}
821821

822-
func (c *BlocksCleaner) emitUserMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
822+
func (c *BlocksCleaner) emitUserParititionMetrics(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) {
823823
existentPartitionedGroupInfo, err := c.iterPartitionGroups(ctx, userBucket, userLogger)
824824
if err != nil {
825-
level.Warn(userLogger).Log("msg", "error return when going through partitioned group directory", "err", err)
825+
level.Warn(userLogger).Log("msg", "error listing partitioned group directory to emit metrics", "err", err)
826+
return
826827
}
827828

828829
remainingCompactions := 0
@@ -831,13 +832,11 @@ func (c *BlocksCleaner) emitUserMetrics(ctx context.Context, userLogger log.Logg
831832
defer func() {
832833
c.remainingPlannedCompactions.WithLabelValues(userID).Set(float64(remainingCompactions))
833834
c.inProgressCompactions.WithLabelValues(userID).Set(float64(inProgressCompactions))
834-
if c.oldestPartitionGroupOffset != nil {
835-
if oldestPartitionGroup != nil {
836-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
837-
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
838-
} else {
839-
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
840-
}
835+
if oldestPartitionGroup != nil {
836+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(float64(time.Now().Unix() - oldestPartitionGroup.CreationTime))
837+
level.Debug(userLogger).Log("msg", "partition group info with oldest creation time", "partitioned_group_id", oldestPartitionGroup.PartitionedGroupID, "creation_time", oldestPartitionGroup.CreationTime)
838+
} else {
839+
c.oldestPartitionGroupOffset.WithLabelValues(userID).Set(0)
841840
}
842841
}()
843842
for partitionedGroupInfo, extraInfo := range existentPartitionedGroupInfo {

pkg/compactor/blocks_cleaner_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1226,7 +1226,7 @@ func TestBlocksCleaner_EmitUserMetrics(t *testing.T) {
12261226
err = v4Manager.updateVisitMarker(ctx)
12271227
require.NoError(t, err)
12281228

1229-
cleaner.emitUserMetrics(ctx, logger, userBucket, userID)
1229+
cleaner.emitUserParititionMetrics(ctx, logger, userBucket, userID)
12301230

12311231
metricNames := []string{
12321232
"cortex_compactor_remaining_planned_compactions",

pkg/storage/tsdb/caching_bucket.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (cfg *MetadataCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix
198198
f.DurationVar(&cfg.BlockIndexAttributesTTL, prefix+"block-index-attributes-ttl", 168*time.Hour, "How long to cache attributes of the block index.")
199199
f.DurationVar(&cfg.BucketIndexContentTTL, prefix+"bucket-index-content-ttl", 5*time.Minute, "How long to cache content of the bucket index.")
200200
f.IntVar(&cfg.BucketIndexMaxSize, prefix+"bucket-index-max-size-bytes", 1*1024*1024, "Maximum size of bucket index content to cache in bytes. Caching will be skipped if the content exceeds this size. This is useful to avoid network round trip for large content if the configured caching backend has an hard limit on cached items size (in this case, you should set this limit to the same limit in the caching backend).")
201-
f.DurationVar(&cfg.PartitionedGroupsListTTL, prefix+"partitioned-groups-list-ttl", 5*time.Minute, "How long to cache list of partitioned groups for an user.")
201+
f.DurationVar(&cfg.PartitionedGroupsListTTL, prefix+"partitioned-groups-list-ttl", 0, "How long to cache list of partitioned groups for an user. 0 disables caching")
202202
}
203203

204204
func (cfg *MetadataCacheConfig) Validate() error {
@@ -279,8 +279,10 @@ func CreateCachingBucketForCompactor(metadataConfig MetadataCacheConfig, cleaner
279279
// Cache only GET for metadata and don't cache exists and not exists.
280280
cfg.CacheGet("metafile", metadataCache, matchers.GetMetafileMatcher(), metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, 0, 0)
281281

282-
//Avoid double iter when running cleanActiveUser and emitUserMetrics
283-
cfg.CacheIter("partitioned-groups-iter", metadataCache, matchers.GetPartitionedGroupsIterMatcher(), metadataConfig.PartitionedGroupsListTTL, codec, "")
282+
if metadataConfig.PartitionedGroupsListTTL > 0 {
283+
//Avoid double iter when running cleanActiveUser and emitUserMetrics
284+
cfg.CacheIter("partitioned-groups-iter", metadataCache, matchers.GetPartitionedGroupsIterMatcher(), metadataConfig.PartitionedGroupsListTTL, codec, "")
285+
}
284286
}
285287
}
286288

0 commit comments

Comments
 (0)