Skip to content

Commit 9305f6b

Browse files
Minimize missed rule group evaluations
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent ce9c4ba commit 9305f6b

File tree

9 files changed

+1064
-82
lines changed

9 files changed

+1064
-82
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
* [CHANGE] Server: Instrument `cortex_request_duration_seconds` metric with native histogram. If `native-histograms` feature is enabled in monitoring Prometheus then the metric name needs to be updated in your dashboards. #6056
77
* [CHANGE] Distributor/Ingester: Change `cortex_distributor_ingester_appends_total`, `cortex_distributor_ingester_append_failures_total`, `cortex_distributor_ingester_queries_total`, and `cortex_distributor_ingester_query_failures_total` metrics to use the ingester ID instead of its IP as the label value. #6078
88
* [CHANGE] OTLP: Set `AddMetricSuffixes` to true to always enable metric name normalization. #6136
9+
* [FEATURE] Ruler: Minimize rule group missed evaluations via `-ruler.enable-ha` flag. #6129
910
* [FEATURE] Ingester/Distributor: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 #6010 #6020
1011
* [FEATURE] Querier: Enable querying native histogram chunks. #5944 #6031
1112
* [FEATURE] Query Frontend: Support native histogram in query frontend response. #5996 #6043

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4347,6 +4347,10 @@ ring:
43474347
# CLI flag: -ruler.ring.final-sleep
43484348
[final_sleep: <duration> | default = 0s]
43494349
4350+
# Keep instance in the ring on shut down.
4351+
# CLI flag: -ruler.ring.keep-instance-in-the-ring-on-shutdown
4352+
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]
4353+
43504354
# Period with which to attempt to flush rule groups.
43514355
# CLI flag: -ruler.flush-period
43524356
[flush_period: <duration> | default = 1m]
@@ -4381,6 +4385,14 @@ ring:
43814385
# Disable the rule_group label on exported metrics
43824386
# CLI flag: -ruler.disable-rule-group-label
43834387
[disable_rule_group_label: <boolean> | default = false]
4388+
4389+
# Enable high availability
4390+
# CLI flag: -ruler.enable-ha-evaluation
4391+
[enable_ha_evaluation: <boolean> | default = false]
4392+
4393+
# Timeout for fanout calls to other rulers
4394+
# CLI flag: -ruler.list-rules-fanout-timeout
4395+
[list_rules_fanout_timeout: <duration> | default = 2m]
43844396
```
43854397

43864398
### `ruler_storage_config`

integration/ruler_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,147 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
10931093
})
10941094
}
10951095

1096+
func TestRulerHA(t *testing.T) {
1097+
const numRulesGroups = 20
1098+
1099+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
1100+
s, err := e2e.NewScenario(networkName)
1101+
require.NoError(t, err)
1102+
defer s.Close()
1103+
1104+
// Generate multiple rule groups, with 1 rule each.
1105+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
1106+
expectedNames := make([]string, numRulesGroups)
1107+
alertCount := 0
1108+
evalInterval, _ := model.ParseDuration("5s")
1109+
for i := 0; i < numRulesGroups; i++ {
1110+
num := random.Intn(10)
1111+
var ruleNode yaml.Node
1112+
var exprNode yaml.Node
1113+
1114+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
1115+
exprNode.SetString(strconv.Itoa(i))
1116+
ruleName := fmt.Sprintf("test_%d", i)
1117+
1118+
expectedNames[i] = ruleName
1119+
1120+
if num%2 == 0 {
1121+
alertCount++
1122+
ruleGroups[i] = rulefmt.RuleGroup{
1123+
Name: ruleName,
1124+
Interval: evalInterval,
1125+
Rules: []rulefmt.RuleNode{{
1126+
Alert: ruleNode,
1127+
Expr: exprNode,
1128+
}},
1129+
}
1130+
} else {
1131+
ruleGroups[i] = rulefmt.RuleGroup{
1132+
Name: ruleName,
1133+
Interval: evalInterval,
1134+
Rules: []rulefmt.RuleNode{{
1135+
Record: ruleNode,
1136+
Expr: exprNode,
1137+
}},
1138+
}
1139+
}
1140+
}
1141+
1142+
// Start dependencies.
1143+
consul := e2edb.NewConsul()
1144+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
1145+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1146+
1147+
// Configure the ruler.
1148+
overrides := map[string]string{
1149+
// Since we're not going to run any rule, we don't need the
1150+
// store-gateway to be configured to a valid address.
1151+
"-querier.store-gateway-addresses": "localhost:12345",
1152+
// Enable the bucket index so we can skip the initial bucket scan.
1153+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
1154+
"-ruler.ring.replication-factor": "2",
1155+
"-ruler.enable-ha-evaluation": "true",
1156+
"-ruler.poll-interval": "5s",
1157+
"-ruler.list-rules-fanout-timeout": "2s",
1158+
}
1159+
1160+
rulerFlags := mergeFlags(
1161+
BlocksStorageFlags(),
1162+
RulerFlags(),
1163+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
1164+
overrides,
1165+
)
1166+
1167+
// Start rulers.
1168+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1169+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1170+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1171+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
1172+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
1173+
1174+
// Upload rule groups to one of the rulers.
1175+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
1176+
require.NoError(t, err)
1177+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
1178+
namespaceNameCount := make([]int, 5)
1179+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
1180+
for _, ruleGroup := range ruleGroups {
1181+
index := nsRand.Intn(len(namespaceNames))
1182+
namespaceNameCount[index] = namespaceNameCount[index] + 1
1183+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
1184+
}
1185+
1186+
// Wait until rulers have loaded all rules.
1187+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1188+
1189+
ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1190+
require.NoError(t, err)
1191+
ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1192+
require.NoError(t, err)
1193+
1194+
err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have
1195+
require.NoError(t, err)
1196+
1197+
err = ruler2.Kill()
1198+
require.NoError(t, err)
1199+
1200+
// wait for another sync
1201+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1202+
require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1203+
1204+
rulers = e2ecortex.NewCompositeCortexService(ruler1, ruler3)
1205+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1206+
1207+
t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1208+
t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1209+
1210+
c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1")
1211+
require.NoError(t, err)
1212+
1213+
ruler1Rules, err := c.GetRuleGroups()
1214+
require.NoError(t, err)
1215+
1216+
ruler3Rules, err := c3.GetRuleGroups()
1217+
require.NoError(t, err)
1218+
1219+
ruleCount := 0
1220+
countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) {
1221+
for _, v := range ruleGroups {
1222+
ruleCount += len(v)
1223+
}
1224+
}
1225+
1226+
countFunc(ruler1Rules)
1227+
require.Equal(t, numRulesGroups, ruleCount)
1228+
ruleCount = 0
1229+
countFunc(ruler3Rules)
1230+
require.Equal(t, numRulesGroups, ruleCount)
1231+
1232+
results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
1233+
require.NoError(t, err)
1234+
require.Equal(t, numRulesGroups, len(results))
1235+
}
1236+
10961237
func TestRulerKeepFiring(t *testing.T) {
10971238
s, err := e2e.NewScenario(networkName)
10981239
require.NoError(t, err)

pkg/ruler/client_pool_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cortexproject/cortex/pkg/util/flagext"
1616
"github.com/cortexproject/cortex/pkg/util/grpcclient"
17+
"github.com/cortexproject/cortex/pkg/util/services"
1718
)
1819

1920
func Test_newRulerClientFactory(t *testing.T) {
@@ -63,6 +64,12 @@ func Test_newRulerClientFactory(t *testing.T) {
6364

6465
type mockRulerServer struct{}
6566

67+
func (m *mockRulerServer) LivenessCheck(ctx context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) {
68+
return &LivenessCheckResponse{
69+
State: int32(services.Running),
70+
}, nil
71+
}
72+
6673
func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
6774
return &RulesResponse{}, nil
6875
}

0 commit comments

Comments
 (0)