Skip to content

Commit ba730c9

Browse files
authored
Add compactor start duration in seconds metric (#5683)
* Add additional metrics to record compactor start and stop duration in seconds Signed-off-by: Alex Le <[email protected]> * update CHANGELOG Signed-off-by: Alex Le <[email protected]> * removed stop duration metric replaced it by log Signed-off-by: Alex Le <[email protected]> * fix test Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent 9013059 commit ba730c9

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
77
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
88
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
9+
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
910
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
1011

1112
## 1.16.0 2023-11-20

pkg/compactor/compactor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ type Compactor struct {
321321
ringSubservicesWatcher *services.FailureWatcher
322322

323323
// Metrics.
324+
CompactorStartDurationSeconds prometheus.Gauge
324325
compactionRunsStarted prometheus.Counter
325326
compactionRunsInterrupted prometheus.Counter
326327
compactionRunsCompleted prometheus.Counter
@@ -403,6 +404,10 @@ func newCompactor(
403404
blocksCompactorFactory: blocksCompactorFactory,
404405
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
405406

407+
CompactorStartDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
408+
Name: "cortex_compactor_start_duration_seconds",
409+
Help: "Time in seconds spent by compactor running start function",
410+
}),
406411
compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
407412
Name: "cortex_compactor_runs_started_total",
408413
Help: "Total number of compaction runs started.",
@@ -485,6 +490,12 @@ func newCompactor(
485490

486491
// Start the compactor.
487492
func (c *Compactor) starting(ctx context.Context) error {
493+
begin := time.Now()
494+
defer func() {
495+
c.CompactorStartDurationSeconds.Set(time.Since(begin).Seconds())
496+
level.Info(c.logger).Log("msg", "compactor started", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
497+
}()
498+
488499
var err error
489500

490501
// Create bucket client.
@@ -581,6 +592,11 @@ func (c *Compactor) starting(ctx context.Context) error {
581592
}
582593

583594
func (c *Compactor) stopping(_ error) error {
595+
begin := time.Now()
596+
defer func() {
597+
level.Info(c.logger).Log("msg", "compactor stopped", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
598+
}()
599+
584600
ctx := context.Background()
585601

586602
services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck

pkg/compactor/compactor_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,11 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
214214
assert.Equal(t, []string{
215215
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
216216
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
217+
`level=info component=compactor msg="compactor started"`,
217218
`level=info component=compactor msg="discovering users from bucket"`,
218219
`level=info component=compactor msg="discovered users from bucket" users=0`,
219-
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))
220+
`level=info component=compactor msg="compactor stopped"`,
221+
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
220222

221223
assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
222224
# TYPE cortex_compactor_runs_started_total counter
@@ -365,9 +367,11 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket
365367
assert.Equal(t, []string{
366368
`level=info component=cleaner msg="started blocks cleanup and maintenance"`,
367369
`level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`,
370+
`level=info component=compactor msg="compactor started"`,
368371
`level=info component=compactor msg="discovering users from bucket"`,
369372
`level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`,
370-
}, strings.Split(strings.TrimSpace(logs.String()), "\n"))
373+
`level=info component=compactor msg="compactor stopped"`,
374+
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
371375

372376
assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(`
373377
# TYPE cortex_compactor_runs_started_total counter
@@ -661,6 +665,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
661665
`level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`,
662666
`level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`,
663667
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
668+
`level=info component=compactor msg="compactor started"`,
664669
`level=info component=compactor msg="discovering users from bucket"`,
665670
`level=info component=compactor msg="discovered users from bucket" users=2`,
666671
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
@@ -675,6 +680,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
675680
`level=info component=compactor org_id=user-2 msg="start of compactions"`,
676681
`level=info component=compactor org_id=user-2 msg="compaction iterations done"`,
677682
`level=info component=compactor msg="successfully compacted user blocks" user=user-2`,
683+
`level=info component=compactor msg="compactor stopped"`,
678684
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
679685

680686
// Instead of testing for shipper metrics, we only check our metrics here.
@@ -794,6 +800,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
794800
`level=info component=cleaner org_id=user-1 msg="deleted block marked for deletion" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`,
795801
`level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`,
796802
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
803+
`level=info component=compactor msg="compactor started"`,
797804
`level=info component=compactor msg="discovering users from bucket"`,
798805
`level=info component=compactor msg="discovered users from bucket" users=1`,
799806
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
@@ -802,6 +809,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
802809
`level=info component=compactor org_id=user-1 msg="start of compactions"`,
803810
`level=info component=compactor org_id=user-1 msg="compaction iterations done"`,
804811
`level=info component=compactor msg="successfully compacted user blocks" user=user-1`,
812+
`level=info component=compactor msg="compactor stopped"`,
805813
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
806814

807815
// Instead of testing for shipper metrics, we only check our metrics here.
@@ -986,9 +994,11 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
986994
`level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`,
987995
`level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`,
988996
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
997+
`level=info component=compactor msg="compactor started"`,
989998
`level=info component=compactor msg="discovering users from bucket"`,
990999
`level=info component=compactor msg="discovered users from bucket" users=1`,
9911000
`level=debug component=compactor msg="skipping user because it is marked for deletion" user=user-1`,
1001+
`level=info component=compactor msg="compactor stopped"`,
9921002
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
9931003

9941004
// Instead of testing for shipper metrics, we only check our metrics here.
@@ -1178,6 +1188,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
11781188
`level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`,
11791189
`level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`,
11801190
`level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`,
1191+
`level=info component=compactor msg="compactor started"`,
11811192
`level=info component=compactor msg="discovering users from bucket"`,
11821193
`level=info component=compactor msg="discovered users from bucket" users=2`,
11831194
`level=info component=compactor msg="starting compaction of user blocks" user=user-1`,
@@ -1192,6 +1203,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
11921203
`level=info component=compactor org_id=user-2 msg="start of compactions"`,
11931204
`level=info component=compactor org_id=user-2 msg="compaction iterations done"`,
11941205
`level=info component=compactor msg="successfully compacted user blocks" user=user-2`,
1206+
`level=info component=compactor msg="compactor stopped"`,
11951207
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))
11961208
}
11971209

@@ -1599,6 +1611,7 @@ func removeIgnoredLogs(input []string) []string {
15991611

16001612
out := make([]string, 0, len(input))
16011613
durationRe := regexp.MustCompile(`\s?duration=\S+`)
1614+
durationMsRe := regexp.MustCompile(`\s?duration_ms=\S+`)
16021615

16031616
for i := 0; i < len(input); i++ {
16041617
log := input[i]
@@ -1612,6 +1625,7 @@ func removeIgnoredLogs(input []string) []string {
16121625

16131626
// Remove any duration from logs.
16141627
log = durationRe.ReplaceAllString(log, "")
1628+
log = durationMsRe.ReplaceAllString(log, "")
16151629

16161630
out = append(out, log)
16171631
}
@@ -1941,6 +1955,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
19411955
assert.Equal(t, context.DeadlineExceeded, err)
19421956

19431957
assert.ElementsMatch(t, []string{
1958+
`level=info component=compactor msg="compactor started"`,
19441959
`level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`,
19451960
`level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`,
19461961
}, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))

0 commit comments

Comments
 (0)