Skip to content

Commit 1c9c53b

Browse files
Minimize missed rule group evaluations (#6129)
1 parent 907e2d2 commit 1c9c53b

File tree

11 files changed

+1088
-116
lines changed

11 files changed

+1088
-116
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
56
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
67
* [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163
78
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4142,6 +4142,10 @@ ruler_client:
41424142
# CLI flag: -ruler.client.tls-insecure-skip-verify
41434143
[tls_insecure_skip_verify: <boolean> | default = false]
41444144
4145+
# Timeout for downstream rulers.
4146+
# CLI flag: -ruler.client.remote-timeout
4147+
[remote_timeout: <duration> | default = 2m]
4148+
41454149
# How frequently to evaluate rules
41464150
# CLI flag: -ruler.evaluation-interval
41474151
[evaluation_interval: <duration> | default = 1m]
@@ -4340,6 +4344,10 @@ ring:
43404344
# CLI flag: -ruler.ring.final-sleep
43414345
[final_sleep: <duration> | default = 0s]
43424346
4347+
# Keep instance in the ring on shut down.
4348+
# CLI flag: -ruler.ring.keep-instance-in-the-ring-on-shutdown
4349+
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]
4350+
43434351
# Period with which to attempt to flush rule groups.
43444352
# CLI flag: -ruler.flush-period
43454353
[flush_period: <duration> | default = 1m]
@@ -4374,6 +4382,10 @@ ring:
43744382
# Disable the rule_group label on exported metrics
43754383
# CLI flag: -ruler.disable-rule-group-label
43764384
[disable_rule_group_label: <boolean> | default = false]
4385+
4386+
# Enable high availability
4387+
# CLI flag: -ruler.enable-ha-evaluation
4388+
[enable_ha_evaluation: <boolean> | default = false]
43774389
```
43784390

43794391
### `ruler_storage_config`

integration/ruler_test.go

Lines changed: 158 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -981,6 +981,152 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
981981
})
982982
}
983983

984+
func TestRulerHAEvaluation(t *testing.T) {
985+
const numRulesGroups = 20
986+
987+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
988+
s, err := e2e.NewScenario(networkName)
989+
require.NoError(t, err)
990+
defer s.Close()
991+
992+
// Generate multiple rule groups, with 1 rule each.
993+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
994+
expectedNames := make([]string, numRulesGroups)
995+
evalInterval, _ := model.ParseDuration("2s")
996+
for i := 0; i < numRulesGroups; i++ {
997+
num := random.Intn(10)
998+
var ruleNode yaml.Node
999+
var exprNode yaml.Node
1000+
1001+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
1002+
exprNode.SetString(strconv.Itoa(i))
1003+
ruleName := fmt.Sprintf("test_%d", i)
1004+
1005+
expectedNames[i] = ruleName
1006+
1007+
if num%2 == 0 {
1008+
ruleGroups[i] = rulefmt.RuleGroup{
1009+
Name: ruleName,
1010+
Interval: evalInterval,
1011+
Rules: []rulefmt.RuleNode{{
1012+
Alert: ruleNode,
1013+
Expr: exprNode,
1014+
}},
1015+
}
1016+
} else {
1017+
ruleGroups[i] = rulefmt.RuleGroup{
1018+
Name: ruleName,
1019+
Interval: evalInterval,
1020+
Rules: []rulefmt.RuleNode{{
1021+
Record: ruleNode,
1022+
Expr: exprNode,
1023+
}},
1024+
}
1025+
}
1026+
}
1027+
1028+
// Start dependencies.
1029+
consul := e2edb.NewConsul()
1030+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
1031+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1032+
1033+
// Configure the ruler.
1034+
overrides := map[string]string{
1035+
// Since we're not going to run any rule, we don't need the
1036+
// store-gateway to be configured to a valid address.
1037+
"-querier.store-gateway-addresses": "localhost:12345",
1038+
// Enable the bucket index so we can skip the initial bucket scan.
1039+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
1040+
"-ruler.ring.replication-factor": "2",
1041+
"-ruler.enable-ha-evaluation": "true",
1042+
"-ruler.poll-interval": "5s",
1043+
"-ruler.client.remote-timeout": "10ms",
1044+
}
1045+
1046+
rulerFlags := mergeFlags(
1047+
BlocksStorageFlags(),
1048+
RulerFlags(),
1049+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
1050+
overrides,
1051+
)
1052+
1053+
// Start rulers.
1054+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1055+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1056+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1057+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
1058+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
1059+
1060+
// Upload rule groups to one of the rulers.
1061+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
1062+
require.NoError(t, err)
1063+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
1064+
namespaceNameCount := make([]int, len(namespaceNames))
1065+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
1066+
for _, ruleGroup := range ruleGroups {
1067+
index := nsRand.Intn(len(namespaceNames))
1068+
namespaceNameCount[index] = namespaceNameCount[index] + 1
1069+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
1070+
}
1071+
1072+
// Wait until rulers have loaded all rules.
1073+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1074+
1075+
ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1076+
require.NoError(t, err)
1077+
ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1078+
require.NoError(t, err)
1079+
1080+
err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have
1081+
require.NoError(t, err)
1082+
1083+
err = ruler2.Kill()
1084+
require.NoError(t, err)
1085+
1086+
// wait for another sync
1087+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1088+
require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1089+
1090+
rulers = e2ecortex.NewCompositeCortexService(ruler1, ruler3)
1091+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1092+
1093+
t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1094+
t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1095+
1096+
c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1")
1097+
require.NoError(t, err)
1098+
1099+
ruler1Rules, err := c.GetRuleGroups()
1100+
require.NoError(t, err)
1101+
1102+
ruler3Rules, err := c3.GetRuleGroups()
1103+
require.NoError(t, err)
1104+
1105+
ruleCount := 0
1106+
countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) {
1107+
for _, v := range ruleGroups {
1108+
ruleCount += len(v)
1109+
}
1110+
}
1111+
1112+
countFunc(ruler1Rules)
1113+
require.Equal(t, numRulesGroups, ruleCount)
1114+
ruleCount = 0
1115+
countFunc(ruler3Rules)
1116+
require.Equal(t, numRulesGroups, ruleCount)
1117+
1118+
// each rule group in this test is set to evaluate at a 2 second interval. If a Ruler is down and another Ruler
1119+
// assumes ownership, it might not immediately evaluate until it's time to evaluate. The following sleep is to ensure the
1120+
// rulers have evaluated the rule groups
1121+
time.Sleep(2100 * time.Millisecond)
1122+
results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
1123+
require.NoError(t, err)
1124+
require.Equal(t, numRulesGroups, len(results))
1125+
for _, v := range results {
1126+
require.False(t, v.LastEvaluation.IsZero())
1127+
}
1128+
}
1129+
9841130
func TestRulerKeepFiring(t *testing.T) {
9851131
s, err := e2e.NewScenario(networkName)
9861132
require.NoError(t, err)
@@ -1125,7 +1271,12 @@ type Alert struct {
11251271
Value string `json:"value"`
11261272
}
11271273

1128-
func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
1274+
func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
1275+
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
1276+
}
1277+
1278+
func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
1279+
// Prepare rule group with invalid rule.
11291280
var recordNode = yaml.Node{}
11301281
var exprNode = yaml.Node{}
11311282

@@ -1136,19 +1287,13 @@ func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression st
11361287
Name: groupName,
11371288
Interval: 10,
11381289
Rules: []rulefmt.RuleNode{{
1139-
Alert: recordNode,
1140-
Expr: exprNode,
1141-
KeepFiringFor: keepFiring,
1290+
Record: recordNode,
1291+
Expr: exprNode,
11421292
}},
11431293
}
11441294
}
11451295

1146-
func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
1147-
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
1148-
}
1149-
1150-
func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
1151-
// Prepare rule group with invalid rule.
1296+
func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
11521297
var recordNode = yaml.Node{}
11531298
var exprNode = yaml.Node{}
11541299

@@ -1159,8 +1304,9 @@ func ruleGroupWithRule(groupName string, ruleName string, expression string) rul
11591304
Name: groupName,
11601305
Interval: 10,
11611306
Rules: []rulefmt.RuleNode{{
1162-
Record: recordNode,
1163-
Expr: exprNode,
1307+
Alert: recordNode,
1308+
Expr: exprNode,
1309+
KeepFiringFor: keepFiring,
11641310
}},
11651311
}
11661312
}

pkg/ruler/client_pool_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cortexproject/cortex/pkg/util/flagext"
1616
"github.com/cortexproject/cortex/pkg/util/grpcclient"
17+
"github.com/cortexproject/cortex/pkg/util/services"
1718
)
1819

1920
func Test_newRulerClientFactory(t *testing.T) {
@@ -63,6 +64,12 @@ func Test_newRulerClientFactory(t *testing.T) {
6364

6465
type mockRulerServer struct{}
6566

67+
func (m *mockRulerServer) LivenessCheck(ctx context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) {
68+
return &LivenessCheckResponse{
69+
State: int32(services.Running),
70+
}, nil
71+
}
72+
6673
func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
6774
return &RulesResponse{}, nil
6875
}

0 commit comments

Comments
 (0)