Skip to content

Commit 0f33cea

Browse files
committed
Emit error when the rule synchronization fails
Signed-off-by: SungJin1212 <[email protected]>
1 parent 69ac3d1 commit 0f33cea

File tree

3 files changed

+34
-13
lines changed

3 files changed

+34
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1919
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2020
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
21+
* [ENHANCEMENT] Ruler: Emit an error message when the rule synchronization fails. #6902
2122
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2223
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
2324
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715

pkg/ruler/ruler.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -683,29 +683,40 @@ func (r *Ruler) run(ctx context.Context) error {
683683
ringTickerChan = ringTicker.C
684684
}
685685

686-
r.syncRules(ctx, rulerSyncReasonInitial)
686+
syncRuleErrMsg := func(syncRulesErr error) {
687+
level.Error(r.logger).Log("msg", "failed to sync rules", "err", syncRulesErr)
688+
}
689+
690+
initialSyncErr := r.syncRules(ctx, rulerSyncReasonInitial)
691+
if initialSyncErr != nil {
692+
syncRuleErrMsg(initialSyncErr)
693+
}
687694
for {
695+
var syncRulesErr error
688696
select {
689697
case <-ctx.Done():
690698
return nil
691699
case <-tick.C:
692-
r.syncRules(ctx, rulerSyncReasonPeriodic)
700+
syncRulesErr = r.syncRules(ctx, rulerSyncReasonPeriodic)
693701
case <-ringTickerChan:
694702
// We ignore the error because in case of error it will return an empty
695703
// replication set which we use to compare with the previous state.
696704
currRingState, _ := r.ring.GetAllHealthy(RingOp)
697705

698706
if ring.HasReplicationSetChanged(ringLastState, currRingState) {
699707
ringLastState = currRingState
700-
r.syncRules(ctx, rulerSyncReasonRingChange)
708+
syncRulesErr = r.syncRules(ctx, rulerSyncReasonRingChange)
701709
}
702710
case err := <-r.subservicesWatcher.Chan():
703711
return errors.Wrap(err, "ruler subservice failed")
704712
}
713+
if syncRulesErr != nil {
714+
syncRuleErrMsg(syncRulesErr)
715+
}
705716
}
706717
}
707718

708-
func (r *Ruler) syncRules(ctx context.Context, reason string) {
719+
func (r *Ruler) syncRules(ctx context.Context, reason string) error {
709720
level.Info(r.logger).Log("msg", "syncing rules", "reason", reason)
710721
r.rulerSync.WithLabelValues(reason).Inc()
711722
timer := prometheus.NewTimer(nil)
@@ -717,19 +728,21 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) {
717728

718729
loadedConfigs, backupConfigs, err := r.loadRuleGroups(ctx)
719730
if err != nil {
720-
return
731+
return err
721732
}
722733

723734
if ctx.Err() != nil {
724735
level.Info(r.logger).Log("msg", "context is canceled. not syncing rules")
725-
return
736+
return err
726737
}
727738
// This will also delete local group files for users that are no longer in 'configs' map.
728739
r.manager.SyncRuleGroups(ctx, loadedConfigs)
729740

730741
if r.cfg.RulesBackupEnabled() {
731742
r.manager.BackUpRuleGroups(ctx, backupConfigs)
732743
}
744+
745+
return nil
733746
}
734747

735748
func (r *Ruler) loadRuleGroups(ctx context.Context) (map[string]rulespb.RuleGroupList, map[string]rulespb.RuleGroupList, error) {

pkg/ruler/ruler_test.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1342,7 +1342,8 @@ func TestGetRules(t *testing.T) {
13421342

13431343
// Sync Rules
13441344
forEachRuler(func(_ string, r *Ruler) {
1345-
r.syncRules(context.Background(), rulerSyncReasonInitial)
1345+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
1346+
require.NoError(t, err)
13461347
})
13471348

13481349
if tc.sharding {
@@ -1572,7 +1573,8 @@ func TestGetRulesFromBackup(t *testing.T) {
15721573

15731574
// Sync Rules
15741575
forEachRuler(func(_ string, r *Ruler) {
1575-
r.syncRules(context.Background(), rulerSyncReasonInitial)
1576+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
1577+
require.NoError(t, err)
15761578
})
15771579

15781580
// update the State of the rulers in the ring based on tc.rulerStateMap
@@ -1788,7 +1790,8 @@ func getRulesHATest(replicationFactor int) func(t *testing.T) {
17881790

17891791
// Sync Rules
17901792
forEachRuler(func(_ string, r *Ruler) {
1791-
r.syncRules(context.Background(), rulerSyncReasonInitial)
1793+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
1794+
require.NoError(t, err)
17921795
})
17931796

17941797
// update the State of the rulers in the ring based on tc.rulerStateMap
@@ -1811,8 +1814,10 @@ func getRulesHATest(replicationFactor int) func(t *testing.T) {
18111814
t.Errorf("ruler %s was not terminated with error %s", "ruler1", err.Error())
18121815
}
18131816

1814-
rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1815-
rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1817+
err = rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1818+
require.NoError(t, err)
1819+
err = rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic)
1820+
require.NoError(t, err)
18161821

18171822
requireGroupStateEqual := func(a *GroupStateDesc, b *GroupStateDesc) {
18181823
require.Equal(t, a.Group.Interval, b.Group.Interval)
@@ -2800,7 +2805,8 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
28002805
evalFunc := func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {}
28012806

28022807
r, _ := buildRulerWithIterFunc(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil, evalFunc)
2803-
r.syncRules(context.Background(), rulerSyncReasonInitial)
2808+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
2809+
require.NoError(t, err)
28042810

28052811
// assert initial state of rule group
28062812
ruleGroup := r.manager.GetRules("user1")[0]
@@ -3265,7 +3271,8 @@ func TestGetShardSizeForUser(t *testing.T) {
32653271

32663272
// Sync Rules
32673273
forEachRuler(func(_ string, r *Ruler) {
3268-
r.syncRules(context.Background(), rulerSyncReasonInitial)
3274+
err := r.syncRules(context.Background(), rulerSyncReasonInitial)
3275+
require.NoError(t, err)
32693276
})
32703277

32713278
result := testRuler.getShardSizeForUser(tc.userID)

0 commit comments

Comments
 (0)