diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a9c050e98..c33fdf80b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ * [ENHANCEMENT] Distributor: Reduce memory usage when error volume is high. #6095 * [ENHANCEMENT] Compactor: Centralize metrics used by compactor and add user label to compactor metrics. #6096 * [ENHANCEMENT] Compactor: Add unique execution ID for each compaction cycle in log for easy debugging. #6097 +* [ENHANCEMENT] Compactor: Differentiate retry and halt error and retry failed compaction only on retriable error. #6111 * [ENHANCEMENT] Ruler: Add support for filtering by `state` and `health` field on Rules API. #6040 * [BUGFIX] Configsdb: Fix endline issue in db password. #5920 * [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952 diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index e33994df8b..d32dc40549 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -83,7 +83,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) @@ -184,7 +184,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) @@ -345,7 +345,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) @@ -409,7 +409,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { blocksMarkedForDeletion := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil, blocksMarkedForDeletion) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) @@ -467,7 +467,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) require.NoError(t, cleaner.cleanUsers(ctx, true)) @@ -602,7 +602,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg, blocksMarkedForDeletion) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 7eac246c4d..0d03348d6b 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -792,10 +792,20 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e if lastErr == nil { return nil } + if ctx.Err() != nil { + return ctx.Err() + } if c.isCausedByPermissionDenied(lastErr) { level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr) + c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, unauthorizedError).Inc() return nil } + if compact.IsHaltError(lastErr) { + level.Error(c.logger).Log("msg", "compactor returned critical error", "user", userID, "err", lastErr) + c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, haltError).Inc() + return lastErr + } + c.compactorMetrics.compactionErrorsCount.WithLabelValues(userID, retriableError).Inc() retries.Wait() } diff --git a/pkg/compactor/compactor_metrics.go b/pkg/compactor/compactor_metrics.go index 3567225919..bdd3fefef0 100644 --- a/pkg/compactor/compactor_metrics.go +++ b/pkg/compactor/compactor_metrics.go @@ -37,17 +37,23 @@ type compactorMetrics struct { compactionFailures *prometheus.CounterVec verticalCompactions *prometheus.CounterVec remainingPlannedCompactions *prometheus.GaugeVec + compactionErrorsCount *prometheus.CounterVec } const ( - UserLabelName = "user" - TimeRangeLabelName = "time_range_milliseconds" - ReasonLabelName = "reason" + userLabelName = "user" + timeRangeLabelName = "time_range_milliseconds" + reasonLabelName = "reason" + compactionErrorTypesLabelName = "type" + + retriableError = "retriable" + haltError = "halt" + unauthorizedError = "unauthorized" ) var ( - commonLabels = []string{UserLabelName} - compactionLabels = []string{TimeRangeLabelName} + commonLabels = []string{userLabelName} + compactionLabels = []string{timeRangeLabelName} ) func newDefaultCompactorMetrics(reg prometheus.Registerer) *compactorMetrics { @@ -129,7 +135,7 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str m.syncerBlocksMarkedForDeletion = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: blocksMarkedForDeletionName, Help: blocksMarkedForDeletionHelp, - }, append(commonLabels, ReasonLabelName)) + }, append(commonLabels, reasonLabelName)) m.compactions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_compactor_group_compactions_total", @@ -159,6 +165,10 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str Name: "cortex_compactor_remaining_planned_compactions", Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy", }, commonLabels) + m.compactionErrorsCount = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_compactor_compaction_error_total", + Help: "Total number of errors from compactions.", + }, append(commonLabels, compactionErrorTypesLabelName)) return &m } diff --git a/pkg/compactor/compactor_metrics_test.go b/pkg/compactor/compactor_metrics_test.go index 8667d892cf..da4bb82025 100644 --- a/pkg/compactor/compactor_metrics_test.go +++ b/pkg/compactor/compactor_metrics_test.go @@ -119,6 +119,17 @@ func TestSyncerMetrics(t *testing.T) { cortex_compactor_remaining_planned_compactions{user="aaa"} 377740 cortex_compactor_remaining_planned_compactions{user="bbb"} 388850 cortex_compactor_remaining_planned_compactions{user="ccc"} 399960 + # HELP cortex_compactor_compaction_error_total Total number of errors from compactions. + # TYPE cortex_compactor_compaction_error_total counter + cortex_compactor_compaction_error_total{type="halt",user="aaa"} 444400 + cortex_compactor_compaction_error_total{type="halt",user="bbb"} 455510 + cortex_compactor_compaction_error_total{type="halt",user="ccc"} 466620 + cortex_compactor_compaction_error_total{type="retriable",user="aaa"} 411070 + cortex_compactor_compaction_error_total{type="retriable",user="bbb"} 422180 + cortex_compactor_compaction_error_total{type="retriable",user="ccc"} 433290 + cortex_compactor_compaction_error_total{type="unauthorized",user="aaa"} 477730 + cortex_compactor_compaction_error_total{type="unauthorized",user="bbb"} 488840 + cortex_compactor_compaction_error_total{type="unauthorized",user="ccc"} 499950 `)) require.NoError(t, err) @@ -163,4 +174,13 @@ func generateTestData(cm *compactorMetrics, base float64) { cm.remainingPlannedCompactions.WithLabelValues("aaa").Add(34 * base) cm.remainingPlannedCompactions.WithLabelValues("bbb").Add(35 * base) cm.remainingPlannedCompactions.WithLabelValues("ccc").Add(36 * base) + cm.compactionErrorsCount.WithLabelValues("aaa", retriableError).Add(37 * base) + cm.compactionErrorsCount.WithLabelValues("bbb", retriableError).Add(38 * base) + cm.compactionErrorsCount.WithLabelValues("ccc", retriableError).Add(39 * base) + cm.compactionErrorsCount.WithLabelValues("aaa", haltError).Add(40 * base) + cm.compactionErrorsCount.WithLabelValues("bbb", haltError).Add(41 * base) + cm.compactionErrorsCount.WithLabelValues("ccc", haltError).Add(42 * base) + cm.compactionErrorsCount.WithLabelValues("aaa", unauthorizedError).Add(43 * base) + cm.compactionErrorsCount.WithLabelValues("bbb", unauthorizedError).Add(44 * base) + cm.compactionErrorsCount.WithLabelValues("ccc", unauthorizedError).Add(45 * base) } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 6015adff86..89facd5969 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1646,8 +1646,8 @@ func (m *tsdbPlannerMock) getNoCompactBlocks() []string { return result } -func mockBlockMetaJSON(id string) string { - meta := tsdb.BlockMeta{ +func mockBlockMeta(id string) tsdb.BlockMeta { + return tsdb.BlockMeta{ Version: 1, ULID: ulid.MustParse(id), MinTime: 1574776800000, @@ -1657,6 +1657,10 @@ func mockBlockMetaJSON(id string) string { Sources: []ulid.ULID{ulid.MustParse(id)}, }, } +} + +func mockBlockMetaJSON(id string) string { + meta := mockBlockMeta(id) content, err := json.Marshal(meta) if err != nil { @@ -2019,3 +2023,105 @@ func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) } + +func TestCompactor_FailedWithRetriableError(t *testing.T) { + t.Parallel() + + ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion} + content, err := json.Marshal(ss) + require.NoError(t, err) + + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("__markers__", []string{}, nil) + bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, errors.New("test retriable error")) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, errors.New("test retriable error")) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil) + bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + + cfg := prepareConfig() + cfg.CompactionRetries = 2 + + c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + cortex_testutil.Poll(t, 1*time.Second, 2.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactorMetrics.compactionErrorsCount.WithLabelValues("user-1", retriableError)) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_compactor_compaction_error_total Total number of errors from compactions. + # TYPE cortex_compactor_compaction_error_total counter + cortex_compactor_compaction_error_total{type="retriable", user="user-1"} 2 + `), + "cortex_compactor_compaction_retry_error_total", + "cortex_compactor_compaction_error_total", + )) +} + +func TestCompactor_FailedWithHaltError(t *testing.T) { + t.Parallel() + + ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion} + content, err := json.Marshal(ss) + require.NoError(t, err) + + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("__markers__", []string{}, nil) + bucketClient.MockIter("", []string{"user-1"}, nil) + bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil) + bucketClient.MockIter("user-1/markers/", nil, nil) + bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil) + bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", nil, compact.HaltError{}) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil, compact.HaltError{}) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil) + bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + + cfg := prepareConfig() + cfg.CompactionRetries = 2 + + c, _, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient, nil) + tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{{BlockMeta: mockBlockMeta("01DTVP434PA9VFXSW2JKB3392D")}, {BlockMeta: mockBlockMeta("01DTW0ZCPDDNV4BV83Q2SV4QAZ")}}, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + cortex_testutil.Poll(t, 1*time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactorMetrics.compactionErrorsCount.WithLabelValues("user-1", haltError)) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_compactor_compaction_error_total Total number of errors from compactions. + # TYPE cortex_compactor_compaction_error_total counter + cortex_compactor_compaction_error_total{type="halt", user="user-1"} 1 + `), + "cortex_compactor_compaction_retry_error_total", + "cortex_compactor_compaction_error_total", + )) +}