|
8 | 8 | "crypto/x509"
|
9 | 9 | "crypto/x509/pkix"
|
10 | 10 | "fmt"
|
| 11 | + "github.com/cortexproject/cortex/pkg/ruler" |
11 | 12 | "math"
|
| 13 | + "math/rand" |
12 | 14 | "net/http"
|
13 | 15 | "os"
|
14 | 16 | "path/filepath"
|
@@ -389,6 +391,163 @@ func TestRulerSharding(t *testing.T) {
|
389 | 391 | assert.ElementsMatch(t, expectedNames, actualNames)
|
390 | 392 | }
|
391 | 393 |
|
| 394 | +func TestRulerAPISharding(t *testing.T) { |
| 395 | + const numRulesGroups = 100 |
| 396 | + |
| 397 | + random := rand.New(rand.NewSource(time.Now().UnixNano())) |
| 398 | + s, err := e2e.NewScenario(networkName) |
| 399 | + require.NoError(t, err) |
| 400 | + defer s.Close() |
| 401 | + |
| 402 | + // Generate multiple rule groups, with 1 rule each. |
| 403 | + ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups) |
| 404 | + expectedNames := make([]string, numRulesGroups) |
| 405 | + alertCount := 0 |
| 406 | + for i := 0; i < numRulesGroups; i++ { |
| 407 | + num := random.Intn(100) |
| 408 | + var ruleNode yaml.Node |
| 409 | + var exprNode yaml.Node |
| 410 | + |
| 411 | + ruleNode.SetString(fmt.Sprintf("rule_%d", i)) |
| 412 | + exprNode.SetString(strconv.Itoa(i)) |
| 413 | + ruleName := fmt.Sprintf("test_%d", i) |
| 414 | + |
| 415 | + expectedNames[i] = ruleName |
| 416 | + if num%2 == 0 { |
| 417 | + alertCount++ |
| 418 | + ruleGroups[i] = rulefmt.RuleGroup{ |
| 419 | + Name: ruleName, |
| 420 | + Interval: 60, |
| 421 | + Rules: []rulefmt.RuleNode{{ |
| 422 | + Alert: ruleNode, |
| 423 | + Expr: exprNode, |
| 424 | + }}, |
| 425 | + } |
| 426 | + } else { |
| 427 | + ruleGroups[i] = rulefmt.RuleGroup{ |
| 428 | + Name: ruleName, |
| 429 | + Interval: 60, |
| 430 | + Rules: []rulefmt.RuleNode{{ |
| 431 | + Record: ruleNode, |
| 432 | + Expr: exprNode, |
| 433 | + }}, |
| 434 | + } |
| 435 | + } |
| 436 | + } |
| 437 | + |
| 438 | + // Start dependencies. |
| 439 | + consul := e2edb.NewConsul() |
| 440 | + minio := e2edb.NewMinio(9000, rulestoreBucketName) |
| 441 | + require.NoError(t, s.StartAndWaitReady(consul, minio)) |
| 442 | + |
| 443 | + // Configure the ruler. |
| 444 | + rulerFlags := mergeFlags( |
| 445 | + BlocksStorageFlags(), |
| 446 | + RulerFlags(), |
| 447 | + RulerShardingFlags(consul.NetworkHTTPEndpoint()), |
| 448 | + map[string]string{ |
| 449 | + // Since we're not going to run any rule, we don't need the |
| 450 | + // store-gateway to be configured to a valid address. |
| 451 | + "-querier.store-gateway-addresses": "localhost:12345", |
| 452 | + // Enable the bucket index so we can skip the initial bucket scan. |
| 453 | + "-blocks-storage.bucket-store.bucket-index.enabled": "true", |
| 454 | + }, |
| 455 | + ) |
| 456 | + |
| 457 | + // Start rulers. |
| 458 | + ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "") |
| 459 | + ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "") |
| 460 | + rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2) |
| 461 | + require.NoError(t, s.StartAndWaitReady(ruler1, ruler2)) |
| 462 | + |
| 463 | + // Upload rule groups to one of the rulers. |
| 464 | + c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1") |
| 465 | + require.NoError(t, err) |
| 466 | + |
| 467 | + namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"} |
| 468 | + namespaceNameCount := make([]int, 5) |
| 469 | + nsRand := rand.New(rand.NewSource(time.Now().UnixNano())) |
| 470 | + for _, ruleGroup := range ruleGroups { |
| 471 | + index := nsRand.Intn(len(namespaceNames)) |
| 472 | + namespaceNameCount[index] = namespaceNameCount[index] + 1 |
| 473 | + require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index])) |
| 474 | + } |
| 475 | + |
| 476 | + // Wait until rulers have loaded all rules. |
| 477 | + require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics)) |
| 478 | + |
| 479 | + // Since rulers have loaded all rules, we expect that rules have been sharded |
| 480 | + // between the two rulers. |
| 481 | + require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) |
| 482 | + require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules")) |
| 483 | + |
| 484 | + testCases := map[string]struct { |
| 485 | + filter e2ecortex.RuleFilter |
| 486 | + resultCheckFn func(assert.TestingT, []*ruler.RuleGroup) |
| 487 | + }{ |
| 488 | + "Filter for Alert Rules": { |
| 489 | + filter: e2ecortex.RuleFilter{ |
| 490 | + RuleType: "alert", |
| 491 | + }, |
| 492 | + resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) { |
| 493 | + assert.Len(t, ruleGroups, alertCount, "Expected %d rules but got %d", alertCount, len(ruleGroups)) |
| 494 | + }, |
| 495 | + }, |
| 496 | + "Filter for Recording Rules": { |
| 497 | + filter: e2ecortex.RuleFilter{ |
| 498 | + RuleType: "record", |
| 499 | + }, |
| 500 | + resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) { |
| 501 | + assert.Len(t, ruleGroups, numRulesGroups-alertCount, "Expected %d rules but got %d", numRulesGroups-alertCount, len(ruleGroups)) |
| 502 | + }, |
| 503 | + }, |
| 504 | + "Filter by Namespace Name": { |
| 505 | + filter: e2ecortex.RuleFilter{ |
| 506 | + Namespaces: []string{namespaceNames[2]}, |
| 507 | + }, |
| 508 | + resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) { |
| 509 | + assert.Len(t, ruleGroups, namespaceNameCount[2], "Expected %d rules but got %d", namespaceNameCount[2], len(ruleGroups)) |
| 510 | + }, |
| 511 | + }, |
| 512 | + "Filter by Namespace Name and Alert Rules": { |
| 513 | + filter: e2ecortex.RuleFilter{ |
| 514 | + RuleType: "alert", |
| 515 | + Namespaces: []string{namespaceNames[2]}, |
| 516 | + }, |
| 517 | + resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) { |
| 518 | + for _, ruleGroup := range ruleGroups { |
| 519 | + rule := ruleGroup.Rules[0].(map[string]interface{}) |
| 520 | + ruleType := rule["type"] |
| 521 | + assert.Equal(t, "alerting", ruleType, "Expected 'alerting' rule type but got %s", ruleType) |
| 522 | + } |
| 523 | + }, |
| 524 | + }, |
| 525 | + "Filter by Rule Names": { |
| 526 | + filter: e2ecortex.RuleFilter{ |
| 527 | + RuleNames: []string{"rule_3", "rule_64", "rule_99"}, |
| 528 | + }, |
| 529 | + resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) { |
| 530 | + ruleNames := []string{} |
| 531 | + for _, ruleGroup := range ruleGroups { |
| 532 | + rule := ruleGroup.Rules[0].(map[string]interface{}) |
| 533 | + ruleName := rule["name"] |
| 534 | + ruleNames = append(ruleNames, ruleName.(string)) |
| 535 | + |
| 536 | + } |
| 537 | + assert.Len(t, ruleNames, 3, "Expected %d rules but got %d", 3, len(ruleNames)) |
| 538 | + }, |
| 539 | + }, |
| 540 | + } |
| 541 | + // For each test case, fetch the rules with configured filters, and ensure the results match. |
| 542 | + for name, tc := range testCases { |
| 543 | + t.Run(name, func(t *testing.T) { |
| 544 | + actualGroups, err := c.GetPrometheusRulesWithFilter(tc.filter) |
| 545 | + require.NoError(t, err) |
| 546 | + tc.resultCheckFn(t, actualGroups) |
| 547 | + }) |
| 548 | + } |
| 549 | +} |
| 550 | + |
392 | 551 | func TestRulerAlertmanager(t *testing.T) {
|
393 | 552 | var namespaceOne = "test_/encoded_+namespace/?"
|
394 | 553 | ruleGroup := createTestRuleGroup(t)
|
|
0 commit comments