diff --git a/CHANGELOG.md b/CHANGELOG.md index fc857f5e71..60326acfad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129 * [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869 * [ENHANCEMENT] Ingester/Ring: New `READONLY` status on ring to be used by Ingester. New ingester API to change mode of ingester #6163 * [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c664fa6a96..7db8a27273 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4142,6 +4142,10 @@ ruler_client: # CLI flag: -ruler.client.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] + # Timeout for downstream rulers. + # CLI flag: -ruler.client.remote-timeout + [remote_timeout: | default = 2m] + # How frequently to evaluate rules # CLI flag: -ruler.evaluation-interval [evaluation_interval: | default = 1m] @@ -4340,6 +4344,10 @@ ring: # CLI flag: -ruler.ring.final-sleep [final_sleep: | default = 0s] + # Keep instance in the ring on shut down. + # CLI flag: -ruler.ring.keep-instance-in-the-ring-on-shutdown + [keep_instance_in_the_ring_on_shutdown: | default = false] + # Period with which to attempt to flush rule groups. # CLI flag: -ruler.flush-period [flush_period: | default = 1m] @@ -4374,6 +4382,10 @@ ring: # Disable the rule_group label on exported metrics # CLI flag: -ruler.disable-rule-group-label [disable_rule_group_label: | default = false] + +# Enable high availability +# CLI flag: -ruler.enable-ha-evaluation +[enable_ha_evaluation: | default = false] ``` ### `ruler_storage_config` diff --git a/integration/ruler_test.go b/integration/ruler_test.go index d56bb696aa..aec4478af8 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -981,6 +981,152 @@ func TestRulerDisablesRuleGroups(t *testing.T) { }) } +func TestRulerHAEvaluation(t *testing.T) { + const numRulesGroups = 20 + + random := rand.New(rand.NewSource(time.Now().UnixNano())) + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Generate multiple rule groups, with 1 rule each. + ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups) + expectedNames := make([]string, numRulesGroups) + evalInterval, _ := model.ParseDuration("2s") + for i := 0; i < numRulesGroups; i++ { + num := random.Intn(10) + var ruleNode yaml.Node + var exprNode yaml.Node + + ruleNode.SetString(fmt.Sprintf("rule_%d", i)) + exprNode.SetString(strconv.Itoa(i)) + ruleName := fmt.Sprintf("test_%d", i) + + expectedNames[i] = ruleName + + if num%2 == 0 { + ruleGroups[i] = rulefmt.RuleGroup{ + Name: ruleName, + Interval: evalInterval, + Rules: []rulefmt.RuleNode{{ + Alert: ruleNode, + Expr: exprNode, + }}, + } + } else { + ruleGroups[i] = rulefmt.RuleGroup{ + Name: ruleName, + Interval: evalInterval, + Rules: []rulefmt.RuleNode{{ + Record: ruleNode, + Expr: exprNode, + }}, + } + } + } + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Configure the ruler. + overrides := map[string]string{ + // Since we're not going to run any rule, we don't need the + // store-gateway to be configured to a valid address. + "-querier.store-gateway-addresses": "localhost:12345", + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-ruler.ring.replication-factor": "2", + "-ruler.enable-ha-evaluation": "true", + "-ruler.poll-interval": "5s", + "-ruler.client.remote-timeout": "10ms", + } + + rulerFlags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(), + RulerShardingFlags(consul.NetworkHTTPEndpoint()), + overrides, + ) + + // Start rulers. + ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "") + ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "") + ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "") + rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3) + require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3)) + + // Upload rule groups to one of the rulers. + c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1") + require.NoError(t, err) + namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"} + namespaceNameCount := make([]int, len(namespaceNames)) + nsRand := rand.New(rand.NewSource(time.Now().UnixNano())) + for _, ruleGroup := range ruleGroups { + index := nsRand.Intn(len(namespaceNames)) + namespaceNameCount[index] = namespaceNameCount[index] + 1 + require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index])) + } + + // Wait until rulers have loaded all rules. + require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics)) + + ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"}) + require.NoError(t, err) + ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"}) + require.NoError(t, err) + + err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have + require.NoError(t, err) + + err = ruler2.Kill() + require.NoError(t, err) + + // wait for another sync + require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total")) + require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total")) + + rulers = e2ecortex.NewCompositeCortexService(ruler1, ruler3) + require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics)) + + t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"})) + t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"})) + + c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1") + require.NoError(t, err) + + ruler1Rules, err := c.GetRuleGroups() + require.NoError(t, err) + + ruler3Rules, err := c3.GetRuleGroups() + require.NoError(t, err) + + ruleCount := 0 + countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) { + for _, v := range ruleGroups { + ruleCount += len(v) + } + } + + countFunc(ruler1Rules) + require.Equal(t, numRulesGroups, ruleCount) + ruleCount = 0 + countFunc(ruler3Rules) + require.Equal(t, numRulesGroups, ruleCount) + + // each rule group in this test is set to evaluate at a 2 second interval. If a Ruler is down and another Ruler + // assumes ownership, it might not immediately evaluate until it's time to evaluate. The following sleep is to ensure the + // rulers have evaluated the rule groups + time.Sleep(2100 * time.Millisecond) + results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{}) + require.NoError(t, err) + require.Equal(t, numRulesGroups, len(results)) + for _, v := range results { + require.False(t, v.LastEvaluation.IsZero()) + } +} + func TestRulerKeepFiring(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -1125,7 +1271,12 @@ type Alert struct { Value string `json:"value"` } -func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup { +func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher { + return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName)) +} + +func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup { + // Prepare rule group with invalid rule. var recordNode = yaml.Node{} var exprNode = yaml.Node{} @@ -1136,19 +1287,13 @@ func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression st Name: groupName, Interval: 10, Rules: []rulefmt.RuleNode{{ - Alert: recordNode, - Expr: exprNode, - KeepFiringFor: keepFiring, + Record: recordNode, + Expr: exprNode, }}, } } -func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher { - return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName)) -} - -func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup { - // Prepare rule group with invalid rule. +func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup { var recordNode = yaml.Node{} var exprNode = yaml.Node{} @@ -1159,8 +1304,9 @@ func ruleGroupWithRule(groupName string, ruleName string, expression string) rul Name: groupName, Interval: 10, Rules: []rulefmt.RuleNode{{ - Record: recordNode, - Expr: exprNode, + Alert: recordNode, + Expr: exprNode, + KeepFiringFor: keepFiring, }}, } } diff --git a/pkg/ruler/client_pool_test.go b/pkg/ruler/client_pool_test.go index 11c2ce4c2b..66fe273a68 100644 --- a/pkg/ruler/client_pool_test.go +++ b/pkg/ruler/client_pool_test.go @@ -14,6 +14,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/grpcclient" + "github.com/cortexproject/cortex/pkg/util/services" ) func Test_newRulerClientFactory(t *testing.T) { @@ -63,6 +64,12 @@ func Test_newRulerClientFactory(t *testing.T) { type mockRulerServer struct{} +func (m *mockRulerServer) LivenessCheck(ctx context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) { + return &LivenessCheckResponse{ + State: int32(services.Running), + }, nil +} + func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) { return &RulesResponse{}, nil } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a910896ad2..fa405babcb 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -37,7 +37,6 @@ import ( util_api "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/grpcclient" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -81,6 +80,8 @@ const ( unknownHealthFilter string = "unknown" okHealthFilter string = "ok" errHealthFilter string = "err" + + livenessCheckTimeout = 100 * time.Millisecond ) type DisabledRuleGroupErr struct { @@ -98,7 +99,7 @@ type Config struct { // Labels to add to all alerts ExternalLabels labels.Labels `yaml:"external_labels,omitempty" doc:"nocli|description=Labels to add to all alerts."` // GRPC Client configuration. - ClientTLSConfig grpcclient.Config `yaml:"ruler_client"` + ClientTLSConfig ClientConfig `yaml:"ruler_client"` // How frequently to evaluate rules by default. EvaluationInterval time.Duration `yaml:"evaluation_interval"` // How frequently to poll for updated rules. @@ -151,6 +152,8 @@ type Config struct { EnableQueryStats bool `yaml:"query_stats_enabled"` DisableRuleGroupLabel bool `yaml:"disable_rule_group_label"` + + EnableHAEvaluation bool `yaml:"enable_ha_evaluation"` } // Validate config and returns error on failure @@ -175,7 +178,7 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", "", f) + cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f) cfg.Ring.RegisterFlags(f) cfg.Notifier.RegisterFlags(f) @@ -220,6 +223,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableQueryStats, "ruler.query-stats-enabled", false, "Report query statistics for ruler queries to complete as a per user metric and as an info level log message.") f.BoolVar(&cfg.DisableRuleGroupLabel, "ruler.disable-rule-group-label", false, "Disable the rule_group label on exported metrics") + f.BoolVar(&cfg.EnableHAEvaluation, "ruler.enable-ha-evaluation", false, "Enable high availability") + cfg.RingCheckPeriod = 5 * time.Second } @@ -304,7 +309,7 @@ type Ruler struct { // NewRuler creates a new ruler from a distributor and chunk store. func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits) (*Ruler, error) { - return newRuler(cfg, manager, reg, logger, ruleStore, limits, newRulerClientPool(cfg.ClientTLSConfig, logger, reg)) + return newRuler(cfg, manager, reg, logger, ruleStore, limits, newRulerClientPool(cfg.ClientTLSConfig.Config, logger, reg)) } func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) { @@ -381,7 +386,9 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { // Define lifecycler delegates in reverse order (last to be called defined first because they're // chained via "next delegate"). delegate := ring.BasicLifecyclerDelegate(r) - delegate = ring.NewLeaveOnStoppingDelegate(delegate, r.logger) + if !r.Config().Ring.KeepInstanceInTheRingOnShutdown { + delegate = ring.NewLeaveOnStoppingDelegate(delegate, r.logger) + } delegate = ring.NewTokensPersistencyDelegate(r.cfg.Ring.TokensFilePath, ring.JOINING, delegate, r.logger) delegate = ring.NewAutoForgetDelegate(r.cfg.Ring.HeartbeatTimeout*ringAutoForgetUnhealthyPeriods, delegate, r.logger) @@ -399,6 +406,18 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { return nil } +func (r *Ruler) Logger() log.Logger { + return r.logger +} + +func (r *Ruler) GetClientFor(addr string) (RulerClient, error) { + return r.clientsPool.GetClientFor(addr) +} + +func (r *Ruler) Config() Config { + return r.cfg +} + func (r *Ruler) starting(ctx context.Context) error { // If sharding is enabled, start the used subservices. if r.cfg.EnableSharding { @@ -490,34 +509,110 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 { return ringHasher.Sum32() } -func instanceOwnsRuleGroup(r ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string, forBackup bool) (bool, error) { +func (r *Ruler) instanceOwnsRuleGroup(rr ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, forBackup bool) (bool, error) { hash := tokenForGroup(g) - rlrs, err := r.Get(hash, RingOp, nil, nil, nil) + rlrs, err := rr.Get(hash, RingOp, nil, nil, nil) if err != nil { return false, errors.Wrap(err, "error reading ring to verify rule group ownership") } - var ownsRuleGroup bool + instanceAddr := r.lifecycler.GetInstanceAddr() if forBackup { // Only the second up to the last replica are used as backup for i := 1; i < len(rlrs.Instances); i++ { if rlrs.Instances[i].Addr == instanceAddr { - ownsRuleGroup = true - break + return ownsRuleGroupOrDisable(g, disabledRuleGroups) } } - } else { - // Even if the replication factor is set to a number bigger than 1, only the first ruler evaluates the rule group - ownsRuleGroup = rlrs.Instances[0].Addr == instanceAddr + return false, nil + } + if r.Config().EnableHAEvaluation { + for i, ruler := range rlrs.Instances { + if ruler.Addr == instanceAddr && i == 0 { + level.Debug(r.Logger()).Log("msg", "primary taking ownership", "user", g.User, "group", g.Name, "namespace", g.Namespace, "ruler", instanceAddr) + return ownsRuleGroupOrDisable(g, disabledRuleGroups) + } + if ruler.Addr == instanceAddr && r.nonPrimaryInstanceOwnsRuleGroup(g, rlrs.GetAddresses()[:i]) { + level.Info(r.Logger()).Log("msg", "non-primary ruler taking ownership", "user", g.User, "group", g.Name, "namespace", g.Namespace, "ruler", instanceAddr) + return ownsRuleGroupOrDisable(g, disabledRuleGroups) + } + } + return false, nil } + // Even if the replication factor is set to a number bigger than 1, only the first ruler evaluates the rule group + if rlrs.Instances[0].Addr == instanceAddr { + return ownsRuleGroupOrDisable(g, disabledRuleGroups) + } + return false, nil +} - if ownsRuleGroup && ruleGroupDisabled(g, disabledRuleGroups) { +func ownsRuleGroupOrDisable(g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups) (bool, error) { + if ruleGroupDisabled(g, disabledRuleGroups) { return false, &DisabledRuleGroupErr{Message: fmt.Sprintf("rule group %s, namespace %s, user %s is disabled", g.Name, g.Namespace, g.User)} } + return true, nil +} - return ownsRuleGroup, nil +func (r *Ruler) LivenessCheck(_ context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) { + if r.lifecycler.ServiceContext().Err() != nil || r.subservices.IsStopped() { + return nil, errors.New("ruler's context is canceled and might be stopping soon") + } + if !r.subservices.IsHealthy() { + return nil, errors.New("not all subservices are in healthy state") + } + return &LivenessCheckResponse{State: int32(r.State())}, nil +} + +// This function performs a liveness check against the provided replicas. If any one of the replicas responds with a state = Running, then +// this Ruler should not take ownership of the rule group. Otherwise, this Ruler must take ownership of the rule group to avoid missing evaluations +func (r *Ruler) nonPrimaryInstanceOwnsRuleGroup(g *rulespb.RuleGroupDesc, replicas []string) bool { + userID := g.User + + jobs := concurrency.CreateJobsFromStrings(replicas) + + errorChan := make(chan error, len(jobs)) + responseChan := make(chan *LivenessCheckResponse, len(jobs)) + + ctx := user.InjectOrgID(context.Background(), userID) + ctx, cancel := context.WithTimeout(ctx, livenessCheckTimeout) + defer cancel() + + err := concurrency.ForEach(ctx, jobs, len(jobs), func(ctx context.Context, job interface{}) error { + addr := job.(string) + rulerClient, err := r.GetClientFor(addr) + if err != nil { + errorChan <- err + level.Error(r.Logger()).Log("msg", "unable to get client for ruler", "ruler addr", addr) + return nil + } + level.Debug(r.Logger()).Log("msg", "performing liveness check against", "addr", addr, "for", g.Name) + + resp, err := rulerClient.LivenessCheck(ctx, &LivenessCheckRequest{}) + if err != nil { + errorChan <- err + level.Debug(r.Logger()).Log("msg", "liveness check failed", "addr", addr, "for", g.Name, "err", err.Error()) + return nil + } + level.Debug(r.Logger()).Log("msg", "liveness check succeeded ", "addr", addr, "for", g.Name, "ruler state", services.State(resp.GetState())) + responseChan <- resp + return nil + }) + + close(errorChan) + close(responseChan) + + if len(errorChan) == len(jobs) || err != nil { + return true + } + + for resp := range responseChan { + if services.State(resp.GetState()) == services.Running { + return false + } + } + return true } func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -579,7 +674,7 @@ func (r *Ruler) run(ctx context.Context) error { } func (r *Ruler) syncRules(ctx context.Context, reason string) { - level.Debug(r.logger).Log("msg", "syncing rules", "reason", reason) + level.Info(r.logger).Log("msg", "syncing rules", "reason", reason) r.rulerSync.WithLabelValues(reason).Inc() timer := prometheus.NewTimer(nil) @@ -593,6 +688,10 @@ func (r *Ruler) syncRules(ctx context.Context, reason string) { return } + if ctx.Err() != nil { + level.Info(r.logger).Log("msg", "context is canceled. not syncing rules") + return + } // This will also delete local group files for users that are no longer in 'configs' map. r.manager.SyncRuleGroups(ctx, loadedConfigs) @@ -701,12 +800,12 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp backedUpConfigs := make(map[string]rulespb.RuleGroupList) for userID, groups := range configs { ruleGroupCounts[userID] = len(groups) - owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + owned := r.filterRuleGroups(userID, groups, r.ring) if len(owned) > 0 { ownedConfigs[userID] = owned } if r.cfg.RulesBackupEnabled() { - backup := filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + backup := r.filterBackupRuleGroups(userID, groups, owned, r.ring) if len(backup) > 0 { backedUpConfigs[userID] = backup } @@ -773,10 +872,10 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp ruleGroupCounts[userID] = len(groups) gLock.Unlock() - filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + filterOwned := r.filterRuleGroups(userID, groups, userRings[userID]) var filterBackup []*rulespb.RuleGroupDesc if r.cfg.RulesBackupEnabled() { - filterBackup = filterBackupRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors) + filterBackup = r.filterBackupRuleGroups(userID, groups, filterOwned, userRings[userID]) } if len(filterOwned) == 0 && len(filterBackup) == 0 { continue @@ -802,30 +901,30 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp // filterRuleGroups returns map of rule groups that given instance "owns" based on supplied ring. // This function only uses User, Namespace, and Name fields of individual RuleGroups. // -// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, -// but only ring passed as parameter. -func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { +// This method must not use r.ring, but only ring passed as parameter. +func (r *Ruler) filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing) []*rulespb.RuleGroupDesc { // Prune the rule group to only contain rules that this ruler is responsible for, based on ring. var result []*rulespb.RuleGroupDesc + for _, g := range ruleGroups { - owned, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr, false) + owned, err := r.instanceOwnsRuleGroup(ring, g, r.limits.DisabledRuleGroups(userID), false) if err != nil { switch e := err.(type) { case *DisabledRuleGroupErr: - level.Info(log).Log("msg", e.Message) + level.Info(r.logger).Log("msg", e.Message) continue default: - ringCheckErrors.Inc() - level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + r.ringCheckErrors.Inc() + level.Error(r.logger).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) continue } } if owned { - level.Debug(log).Log("msg", "rule group owned", "user", g.User, "namespace", g.Namespace, "name", g.Name) + level.Debug(r.logger).Log("msg", "rule group owned", "user", g.User, "namespace", g.Namespace, "name", g.Name) result = append(result, g) } else { - level.Debug(log).Log("msg", "rule group not owned, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) + level.Debug(r.logger).Log("msg", "rule group not owned, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) } } @@ -835,29 +934,38 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabl // filterBackupRuleGroups returns map of rule groups that given instance backs up based on supplied ring. // This function only uses User, Namespace, and Name fields of individual RuleGroups. // -// Reason why this function is not a method on Ruler is to make sure we don't accidentally use r.ring, -// but only ring passed as parameter. -func filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc { +// This method must not use r.ring, but only ring passed as parameter +func (r *Ruler) filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, owned []*rulespb.RuleGroupDesc, ring ring.ReadRing) []*rulespb.RuleGroupDesc { var result []*rulespb.RuleGroupDesc + ownedMap := map[uint32]struct{}{} + for _, g := range owned { + hash := tokenForGroup(g) + ownedMap[hash] = struct{}{} + } for _, g := range ruleGroups { - backup, err := instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr, true) + hash := tokenForGroup(g) + // if already owned for eval, don't take backup ownership + if _, OK := ownedMap[hash]; OK { + continue + } + backup, err := r.instanceOwnsRuleGroup(ring, g, r.limits.DisabledRuleGroups(userID), true) if err != nil { switch e := err.(type) { case *DisabledRuleGroupErr: - level.Info(log).Log("msg", e.Message) + level.Info(r.logger).Log("msg", e.Message) continue default: - ringCheckErrors.Inc() - level.Error(log).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) + r.ringCheckErrors.Inc() + level.Error(r.logger).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err) continue } } if backup { - level.Debug(log).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name) + level.Debug(r.logger).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name) result = append(result, g) } else { - level.Debug(log).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) + level.Debug(r.logger).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name) } } @@ -1189,6 +1297,8 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest return errors.Wrapf(err, "unable to get client for ruler %s", addr) } + ctx, cancel := context.WithTimeout(ctx, r.cfg.ClientTLSConfig.RemoteTimeout) + defer cancel() newGrps, err := rulerClient.Rules(ctx, &RulesRequest{ RuleNames: rulesRequest.GetRuleNames(), RuleGroupNames: rulesRequest.GetRuleGroupNames(), diff --git a/pkg/ruler/ruler.pb.go b/pkg/ruler/ruler.pb.go index 5eb36c7a6a..b0078f4fbf 100644 --- a/pkg/ruler/ruler.pb.go +++ b/pkg/ruler/ruler.pb.go @@ -137,6 +137,84 @@ func (m *RulesRequest) GetExcludeAlerts() bool { return false } +type LivenessCheckRequest struct { +} + +func (m *LivenessCheckRequest) Reset() { *m = LivenessCheckRequest{} } +func (*LivenessCheckRequest) ProtoMessage() {} +func (*LivenessCheckRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_9ecbec0a4cfddea6, []int{1} +} +func (m *LivenessCheckRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LivenessCheckRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LivenessCheckRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LivenessCheckRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_LivenessCheckRequest.Merge(m, src) +} +func (m *LivenessCheckRequest) XXX_Size() int { + return m.Size() +} +func (m *LivenessCheckRequest) XXX_DiscardUnknown() { + xxx_messageInfo_LivenessCheckRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_LivenessCheckRequest proto.InternalMessageInfo + +type LivenessCheckResponse struct { + State int32 `protobuf:"varint,1,opt,name=state,proto3" json:"state,omitempty"` +} + +func (m *LivenessCheckResponse) Reset() { *m = LivenessCheckResponse{} } +func (*LivenessCheckResponse) ProtoMessage() {} +func (*LivenessCheckResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_9ecbec0a4cfddea6, []int{2} +} +func (m *LivenessCheckResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *LivenessCheckResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_LivenessCheckResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *LivenessCheckResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_LivenessCheckResponse.Merge(m, src) +} +func (m *LivenessCheckResponse) XXX_Size() int { + return m.Size() +} +func (m *LivenessCheckResponse) XXX_DiscardUnknown() { + xxx_messageInfo_LivenessCheckResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_LivenessCheckResponse proto.InternalMessageInfo + +func (m *LivenessCheckResponse) GetState() int32 { + if m != nil { + return m.State + } + return 0 +} + type RulesResponse struct { Groups []*GroupStateDesc `protobuf:"bytes,1,rep,name=groups,proto3" json:"groups,omitempty"` } @@ -144,7 +222,7 @@ type RulesResponse struct { func (m *RulesResponse) Reset() { *m = RulesResponse{} } func (*RulesResponse) ProtoMessage() {} func (*RulesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_9ecbec0a4cfddea6, []int{1} + return fileDescriptor_9ecbec0a4cfddea6, []int{3} } func (m *RulesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -191,7 +269,7 @@ type GroupStateDesc struct { func (m *GroupStateDesc) Reset() { *m = GroupStateDesc{} } func (*GroupStateDesc) ProtoMessage() {} func (*GroupStateDesc) Descriptor() ([]byte, []int) { - return fileDescriptor_9ecbec0a4cfddea6, []int{2} + return fileDescriptor_9ecbec0a4cfddea6, []int{4} } func (m *GroupStateDesc) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -262,7 +340,7 @@ type RuleStateDesc struct { func (m *RuleStateDesc) Reset() { *m = RuleStateDesc{} } func (*RuleStateDesc) ProtoMessage() {} func (*RuleStateDesc) Descriptor() ([]byte, []int) { - return fileDescriptor_9ecbec0a4cfddea6, []int{3} + return fileDescriptor_9ecbec0a4cfddea6, []int{5} } func (m *RuleStateDesc) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -356,7 +434,7 @@ type AlertStateDesc struct { func (m *AlertStateDesc) Reset() { *m = AlertStateDesc{} } func (*AlertStateDesc) ProtoMessage() {} func (*AlertStateDesc) Descriptor() ([]byte, []int) { - return fileDescriptor_9ecbec0a4cfddea6, []int{4} + return fileDescriptor_9ecbec0a4cfddea6, []int{6} } func (m *AlertStateDesc) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -443,6 +521,8 @@ func (m *AlertStateDesc) GetKeepFiringSince() time.Time { func init() { proto.RegisterType((*RulesRequest)(nil), "ruler.RulesRequest") + proto.RegisterType((*LivenessCheckRequest)(nil), "ruler.LivenessCheckRequest") + proto.RegisterType((*LivenessCheckResponse)(nil), "ruler.LivenessCheckResponse") proto.RegisterType((*RulesResponse)(nil), "ruler.RulesResponse") proto.RegisterType((*GroupStateDesc)(nil), "ruler.GroupStateDesc") proto.RegisterType((*RuleStateDesc)(nil), "ruler.RuleStateDesc") @@ -452,57 +532,60 @@ func init() { func init() { proto.RegisterFile("ruler.proto", fileDescriptor_9ecbec0a4cfddea6) } var fileDescriptor_9ecbec0a4cfddea6 = []byte{ - // 793 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0x4d, 0x6b, 0x13, 0x41, - 0x18, 0xde, 0x49, 0x9a, 0xaf, 0x49, 0x3f, 0x70, 0x1a, 0x65, 0x0d, 0x65, 0x13, 0xa2, 0x48, 0x10, - 0xdc, 0x40, 0x2c, 0x88, 0x87, 0x2a, 0x29, 0x6d, 0xbd, 0x88, 0x94, 0x8d, 0x7a, 0x0d, 0x93, 0xcd, - 0x64, 0xb3, 0x76, 0xb3, 0xbb, 0xce, 0xcc, 0x86, 0x7a, 0xf3, 0x27, 0xf4, 0xe8, 0xd9, 0x93, 0x3f, - 0xa5, 0xc7, 0xe2, 0xa9, 0x88, 0x54, 0x9b, 0x5e, 0x3c, 0x49, 0x7f, 0x82, 0xcc, 0xcc, 0x6e, 0x93, - 0xd4, 0x0a, 0x0d, 0xd2, 0x4b, 0x3b, 0xef, 0xc7, 0xf3, 0xce, 0xfb, 0x3e, 0xf3, 0xec, 0x1b, 0x58, - 0xa4, 0x91, 0x47, 0xa8, 0x19, 0xd2, 0x80, 0x07, 0x28, 0x23, 0x8d, 0x72, 0xc9, 0x09, 0x9c, 0x40, - 0x7a, 0x1a, 0xe2, 0xa4, 0x82, 0x65, 0xc3, 0x09, 0x02, 0xc7, 0x23, 0x0d, 0x69, 0x75, 0xa3, 0x7e, - 0xa3, 0x17, 0x51, 0xcc, 0xdd, 0xc0, 0x8f, 0xe3, 0x95, 0xcb, 0x71, 0xee, 0x0e, 0x09, 0xe3, 0x78, - 0x18, 0xc6, 0x09, 0x4f, 0x1d, 0x97, 0x0f, 0xa2, 0xae, 0x69, 0x07, 0xc3, 0x86, 0x1d, 0x50, 0x4e, - 0xf6, 0x43, 0x1a, 0xbc, 0x23, 0x36, 0x8f, 0xad, 0x46, 0xb8, 0xe7, 0x24, 0x81, 0x6e, 0x7c, 0x88, - 0xa1, 0x1b, 0xd7, 0x81, 0xca, 0xe6, 0xe5, 0x5f, 0x16, 0x76, 0xd5, 0x7f, 0x05, 0xaf, 0xfd, 0x06, - 0x70, 0xd1, 0x12, 0xb6, 0x45, 0xde, 0x47, 0x84, 0x71, 0xb4, 0x06, 0x0b, 0x22, 0xfe, 0x0a, 0x0f, - 0x09, 0xd3, 0x41, 0x35, 0x5d, 0x2f, 0x58, 0x13, 0x07, 0x7a, 0x00, 0x97, 0x85, 0xf1, 0x82, 0x06, - 0x51, 0xa8, 0x52, 0x52, 0x32, 0xe5, 0x92, 0x17, 0x95, 0x60, 0xa6, 0xef, 0x7a, 0x84, 0xe9, 0x69, - 0x19, 0x56, 0x06, 0x42, 0x70, 0x81, 0x7f, 0x08, 0x89, 0xbe, 0x50, 0x05, 0xf5, 0x82, 0x25, 0xcf, - 0x22, 0x93, 0x71, 0xcc, 0x89, 0x9e, 0x91, 0x4e, 0x65, 0xa0, 0x3b, 0x30, 0x3b, 0x20, 0xd8, 0xe3, - 0x03, 0x3d, 0x2b, 0xdd, 0xb1, 0x85, 0xca, 0x30, 0x3f, 0xc4, 0xdc, 0x1e, 0x10, 0xca, 0xf4, 0x9c, - 0x2c, 0x7d, 0x61, 0xa3, 0xfb, 0x70, 0x89, 0xec, 0xdb, 0x5e, 0xd4, 0x23, 0x2d, 0x8f, 0x50, 0xce, - 0xf4, 0x7c, 0x15, 0xd4, 0xf3, 0xd6, 0xac, 0xb3, 0xf6, 0x0c, 0x2e, 0xc5, 0xf3, 0xb2, 0x30, 0xf0, - 0x19, 0x41, 0x8f, 0x60, 0xd6, 0x11, 0x8d, 0xab, 0x69, 0x8b, 0xcd, 0xdb, 0xa6, 0x7a, 0x77, 0x39, - 0x4d, 0x5b, 0x74, 0xb3, 0x45, 0x98, 0x6d, 0xc5, 0x49, 0xb5, 0xcf, 0x29, 0xb8, 0x3c, 0x1b, 0x42, - 0x0f, 0x61, 0x46, 0x06, 0x75, 0x50, 0x05, 0xf5, 0x62, 0xb3, 0x64, 0x2a, 0x82, 0xad, 0x84, 0x12, - 0x89, 0x57, 0x29, 0xe8, 0x09, 0x5c, 0xc4, 0x36, 0x77, 0x47, 0xa4, 0x23, 0x93, 0x24, 0x7d, 0x09, - 0x84, 0x4a, 0xc8, 0xe4, 0xca, 0xa2, 0xca, 0x94, 0xed, 0xa2, 0xb7, 0x70, 0x95, 0x8c, 0xb0, 0x17, - 0x49, 0x5d, 0xbd, 0x4e, 0xf4, 0xa3, 0xa7, 0xe5, 0x95, 0x65, 0x53, 0x29, 0xcc, 0x4c, 0x14, 0x66, - 0x5e, 0x64, 0x6c, 0xe6, 0x0f, 0x4f, 0x2a, 0xda, 0xc1, 0x8f, 0x0a, 0xb0, 0xae, 0x2a, 0x80, 0xda, - 0x10, 0x4d, 0xdc, 0x5b, 0xb1, 0x6e, 0xe5, 0x0b, 0x15, 0x9b, 0x77, 0xff, 0x2a, 0x9b, 0x24, 0xa8, - 0xaa, 0x9f, 0x44, 0xd5, 0x2b, 0xe0, 0xb5, 0xef, 0x29, 0xc5, 0xf2, 0x84, 0xa3, 0x7b, 0x70, 0x41, - 0x8c, 0x18, 0x53, 0xb4, 0x32, 0x45, 0x91, 0x1c, 0x55, 0x06, 0x27, 0x5a, 0x48, 0x5d, 0xad, 0x85, - 0xf4, 0x8c, 0x16, 0xd6, 0x60, 0xc1, 0xc3, 0x8c, 0x6f, 0x53, 0x1a, 0xd0, 0x58, 0x52, 0x13, 0x87, - 0x78, 0x56, 0xac, 0x64, 0x90, 0x99, 0x79, 0x56, 0x29, 0x83, 0xa9, 0x67, 0x55, 0x49, 0xff, 0xa2, - 0x37, 0x7b, 0x33, 0xf4, 0xe6, 0xfe, 0x8f, 0xde, 0xaf, 0x19, 0xb8, 0x3c, 0x3b, 0xc7, 0x84, 0x3a, - 0x30, 0x4d, 0x9d, 0x0f, 0xb3, 0x1e, 0xee, 0x12, 0x2f, 0xd1, 0xd9, 0xaa, 0x99, 0x2c, 0x11, 0xf3, - 0xa5, 0xf0, 0xef, 0x62, 0x97, 0x6e, 0xb6, 0xc4, 0x5d, 0xdf, 0x4e, 0x2a, 0x73, 0x2d, 0x21, 0x85, - 0x6f, 0xf5, 0x70, 0xc8, 0x09, 0xb5, 0xe2, 0x5b, 0xd0, 0x3e, 0x2c, 0x62, 0xdf, 0x0f, 0xb8, 0x6c, - 0x53, 0x7d, 0xfc, 0x37, 0x77, 0xe9, 0xf4, 0x55, 0x62, 0x7e, 0xc1, 0x93, 0xda, 0x2d, 0xc0, 0x52, - 0x06, 0x6a, 0xc1, 0x42, 0xfc, 0xb5, 0x61, 0x2e, 0x17, 0xcc, 0x75, 0xdf, 0x32, 0xaf, 0x60, 0x2d, - 0x8e, 0x9e, 0xc3, 0x7c, 0xdf, 0xa5, 0xa4, 0x27, 0x2a, 0xcc, 0xa3, 0x86, 0x9c, 0x44, 0xb5, 0x38, - 0xda, 0x86, 0x45, 0x4a, 0x58, 0xe0, 0x8d, 0x54, 0x8d, 0xdc, 0x1c, 0x35, 0x60, 0x02, 0x6c, 0x71, - 0xb4, 0x03, 0x17, 0x85, 0xb8, 0x3b, 0x8c, 0xf8, 0x5c, 0xd4, 0xc9, 0xcf, 0x53, 0x47, 0x20, 0xdb, - 0xc4, 0xe7, 0xaa, 0x9d, 0x11, 0xf6, 0xdc, 0x5e, 0x27, 0xf2, 0xb9, 0xeb, 0xe9, 0x85, 0x79, 0xca, - 0x48, 0xe0, 0x1b, 0x81, 0x43, 0xbb, 0xf0, 0xd6, 0x1e, 0x21, 0x61, 0xa7, 0xef, 0x52, 0xd7, 0x77, - 0x3a, 0xcc, 0xf5, 0x6d, 0xa2, 0xc3, 0x39, 0x8a, 0xad, 0x08, 0xf8, 0x8e, 0x44, 0xb7, 0x05, 0xb8, - 0xb9, 0x01, 0x33, 0x62, 0x1d, 0x50, 0xb4, 0xae, 0x0e, 0x0c, 0xad, 0x4e, 0x6d, 0xc5, 0xe4, 0xf7, - 0xa9, 0x5c, 0x9a, 0x75, 0xaa, 0x25, 0x5e, 0xd3, 0x36, 0xd7, 0x8f, 0x4e, 0x0d, 0xed, 0xf8, 0xd4, - 0xd0, 0xce, 0x4f, 0x0d, 0xf0, 0x71, 0x6c, 0x80, 0x2f, 0x63, 0x03, 0x1c, 0x8e, 0x0d, 0x70, 0x34, - 0x36, 0xc0, 0xcf, 0xb1, 0x01, 0x7e, 0x8d, 0x0d, 0xed, 0x7c, 0x6c, 0x80, 0x83, 0x33, 0x43, 0x3b, - 0x3a, 0x33, 0xb4, 0xe3, 0x33, 0x43, 0xeb, 0x66, 0x65, 0x8f, 0x8f, 0xff, 0x04, 0x00, 0x00, 0xff, - 0xff, 0x3a, 0xeb, 0x6f, 0xc9, 0xec, 0x07, 0x00, 0x00, + // 845 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x55, 0xcf, 0x6f, 0xe3, 0x44, + 0x14, 0xf6, 0xb4, 0x75, 0x9a, 0xbc, 0xb4, 0x5d, 0x31, 0xcd, 0xae, 0x4c, 0xa8, 0x9c, 0x28, 0x20, + 0x14, 0x21, 0xad, 0x23, 0x85, 0x95, 0x10, 0x07, 0x40, 0x29, 0xbb, 0xcb, 0xa5, 0x42, 0x2b, 0x07, + 0xb8, 0x46, 0x13, 0x67, 0xe2, 0x98, 0x3a, 0xb6, 0x99, 0x19, 0x47, 0xe5, 0xc6, 0x9d, 0xcb, 0x1e, + 0x39, 0x73, 0xe2, 0x4f, 0xd9, 0x63, 0xc5, 0x69, 0x85, 0xd0, 0x42, 0xd3, 0x0b, 0x27, 0xb4, 0x7f, + 0x02, 0x9a, 0x1f, 0x6e, 0xe2, 0x12, 0xa4, 0x8d, 0x50, 0x2f, 0xed, 0xbc, 0x1f, 0xdf, 0x9b, 0x79, + 0xdf, 0xfb, 0xf2, 0x0c, 0x75, 0x96, 0xc7, 0x94, 0x79, 0x19, 0x4b, 0x45, 0x8a, 0x6d, 0x65, 0x34, + 0x1b, 0x61, 0x1a, 0xa6, 0xca, 0xd3, 0x93, 0x27, 0x1d, 0x6c, 0xba, 0x61, 0x9a, 0x86, 0x31, 0xed, + 0x29, 0x6b, 0x9c, 0x4f, 0x7b, 0x93, 0x9c, 0x11, 0x11, 0xa5, 0x89, 0x89, 0xb7, 0x6e, 0xc7, 0x45, + 0x34, 0xa7, 0x5c, 0x90, 0x79, 0x66, 0x12, 0x3e, 0x0e, 0x23, 0x31, 0xcb, 0xc7, 0x5e, 0x90, 0xce, + 0x7b, 0x41, 0xca, 0x04, 0xbd, 0xc8, 0x58, 0xfa, 0x2d, 0x0d, 0x84, 0xb1, 0x7a, 0xd9, 0x79, 0x58, + 0x04, 0xc6, 0xe6, 0x60, 0xa0, 0x9f, 0xbc, 0x09, 0x54, 0x3d, 0x5e, 0xfd, 0xe5, 0xd9, 0x58, 0xff, + 0xd7, 0xf0, 0xce, 0xdf, 0x08, 0x0e, 0x7c, 0x69, 0xfb, 0xf4, 0xbb, 0x9c, 0x72, 0x81, 0x4f, 0xa0, + 0x26, 0xe3, 0x5f, 0x92, 0x39, 0xe5, 0x0e, 0x6a, 0xef, 0x76, 0x6b, 0xfe, 0xca, 0x81, 0xdf, 0x87, + 0x23, 0x69, 0x7c, 0xc1, 0xd2, 0x3c, 0xd3, 0x29, 0x3b, 0x2a, 0xe5, 0x96, 0x17, 0x37, 0xc0, 0x9e, + 0x46, 0x31, 0xe5, 0xce, 0xae, 0x0a, 0x6b, 0x03, 0x63, 0xd8, 0x13, 0xdf, 0x67, 0xd4, 0xd9, 0x6b, + 0xa3, 0x6e, 0xcd, 0x57, 0x67, 0x99, 0xc9, 0x05, 0x11, 0xd4, 0xb1, 0x95, 0x53, 0x1b, 0xf8, 0x01, + 0x54, 0x66, 0x94, 0xc4, 0x62, 0xe6, 0x54, 0x94, 0xdb, 0x58, 0xb8, 0x09, 0xd5, 0x39, 0x11, 0xc1, + 0x8c, 0x32, 0xee, 0xec, 0xab, 0xd2, 0x37, 0x36, 0x7e, 0x0f, 0x0e, 0xe9, 0x45, 0x10, 0xe7, 0x13, + 0x3a, 0x88, 0x29, 0x13, 0xdc, 0xa9, 0xb6, 0x51, 0xb7, 0xea, 0x97, 0x9d, 0x9d, 0x07, 0xd0, 0x38, + 0x8b, 0x16, 0x34, 0xa1, 0x9c, 0x7f, 0x3e, 0xa3, 0xc1, 0xb9, 0xe9, 0xbb, 0xf3, 0x10, 0xee, 0xdf, + 0xf2, 0xf3, 0x2c, 0x4d, 0xf8, 0xda, 0x03, 0x51, 0x1b, 0x75, 0x6d, 0xf3, 0xc0, 0xce, 0xa7, 0x70, + 0x68, 0x68, 0x33, 0x69, 0x0f, 0xa1, 0x12, 0xca, 0xfe, 0x35, 0x69, 0xf5, 0xfe, 0x7d, 0x4f, 0xcb, + 0x47, 0x91, 0x32, 0x94, 0x98, 0xc7, 0x94, 0x07, 0xbe, 0x49, 0xea, 0xfc, 0xbc, 0x03, 0x47, 0xe5, + 0x10, 0xfe, 0x00, 0x6c, 0x15, 0x54, 0x17, 0xd5, 0xfb, 0x0d, 0x4f, 0xcf, 0xc9, 0x2f, 0x98, 0x55, + 0x78, 0x9d, 0x82, 0x3f, 0x82, 0x03, 0x12, 0x88, 0x68, 0x41, 0x47, 0x2a, 0x49, 0x4d, 0xa1, 0x80, + 0x30, 0x05, 0x59, 0x5d, 0x59, 0xd7, 0x99, 0xea, 0xb9, 0xf8, 0x1b, 0x38, 0xa6, 0x0b, 0x12, 0xe7, + 0x4a, 0x9e, 0x5f, 0x15, 0x32, 0x74, 0x76, 0xd5, 0x95, 0x4d, 0x4f, 0x0b, 0xd5, 0x2b, 0x84, 0xea, + 0xdd, 0x64, 0x9c, 0x56, 0x5f, 0xbc, 0x6a, 0x59, 0xcf, 0xff, 0x68, 0x21, 0x7f, 0x53, 0x01, 0x3c, + 0x04, 0xbc, 0x72, 0x3f, 0x36, 0xf2, 0x57, 0x83, 0xae, 0xf7, 0xdf, 0xfe, 0x57, 0xd9, 0x22, 0x41, + 0x57, 0xfd, 0x49, 0x56, 0xdd, 0x00, 0xef, 0xfc, 0xbe, 0xa3, 0x59, 0x5e, 0x71, 0xf4, 0x2e, 0xec, + 0xc9, 0x16, 0x0d, 0x45, 0xf7, 0xd6, 0x28, 0x52, 0xad, 0xaa, 0xe0, 0x6a, 0x62, 0x3b, 0x9b, 0x25, + 0xb5, 0x5b, 0x92, 0xd4, 0x09, 0xd4, 0x62, 0xc2, 0xc5, 0x13, 0xc6, 0x52, 0x66, 0x94, 0xb9, 0x72, + 0xc8, 0xb1, 0x12, 0xad, 0x26, 0xbb, 0x34, 0x56, 0xa5, 0xa6, 0xb5, 0xb1, 0xea, 0xa4, 0xff, 0xa2, + 0xb7, 0x72, 0x37, 0xf4, 0xee, 0xff, 0x3f, 0x7a, 0x7f, 0xb5, 0xe1, 0xa8, 0xdc, 0x47, 0x59, 0xec, + 0x37, 0xd4, 0x25, 0x50, 0x89, 0xc9, 0x98, 0xc6, 0x85, 0xce, 0x8e, 0xbd, 0x62, 0x17, 0x79, 0x67, + 0xd2, 0xff, 0x8c, 0x44, 0xec, 0x74, 0x20, 0xef, 0xfa, 0xed, 0x55, 0x6b, 0xab, 0x5d, 0xa6, 0xf1, + 0x83, 0x09, 0xc9, 0x04, 0x65, 0xbe, 0xb9, 0x05, 0x5f, 0x40, 0x9d, 0x24, 0x49, 0x2a, 0xd4, 0x33, + 0xf5, 0x0e, 0xb9, 0xbb, 0x4b, 0xd7, 0xaf, 0x92, 0xfd, 0x4b, 0x9e, 0xf4, 0x8a, 0x42, 0xbe, 0x36, + 0xf0, 0x00, 0x6a, 0xe6, 0xd7, 0x46, 0x84, 0xda, 0x53, 0x6f, 0x3a, 0xcb, 0xaa, 0x86, 0x0d, 0x04, + 0xfe, 0x0c, 0xaa, 0xd3, 0x88, 0xd1, 0x89, 0xac, 0xb0, 0x8d, 0x1a, 0xf6, 0x15, 0x6a, 0x20, 0xf0, + 0x13, 0xa8, 0x33, 0xca, 0xd3, 0x78, 0xa1, 0x6b, 0xec, 0x6f, 0x51, 0x03, 0x0a, 0xe0, 0x40, 0xe0, + 0xa7, 0x70, 0x20, 0xc5, 0x3d, 0xe2, 0x34, 0x11, 0xb2, 0x4e, 0x75, 0x9b, 0x3a, 0x12, 0x39, 0xa4, + 0x89, 0xd0, 0xcf, 0x59, 0x90, 0x38, 0x9a, 0x8c, 0xf2, 0x44, 0x44, 0xb1, 0x53, 0xdb, 0xa6, 0x8c, + 0x02, 0x7e, 0x2d, 0x71, 0xf8, 0x19, 0xbc, 0x75, 0x4e, 0x69, 0x36, 0x9a, 0x46, 0x2c, 0x4a, 0xc2, + 0x11, 0x8f, 0x92, 0x80, 0x3a, 0xb0, 0x45, 0xb1, 0x7b, 0x12, 0xfe, 0x54, 0xa1, 0x87, 0x12, 0xdc, + 0xff, 0x11, 0x81, 0x2d, 0xf7, 0x01, 0xc3, 0x8f, 0xf4, 0x81, 0xe3, 0xe3, 0xb5, 0xb5, 0x58, 0x7c, + 0xe7, 0x9a, 0x8d, 0xb2, 0x53, 0x6f, 0xf1, 0x8e, 0x85, 0xcf, 0xe0, 0xb0, 0xf4, 0x1d, 0xc0, 0xef, + 0x98, 0xc4, 0x4d, 0x5f, 0x8d, 0xe6, 0xc9, 0xe6, 0x60, 0x51, 0xed, 0xf4, 0xd1, 0xe5, 0x95, 0x6b, + 0xbd, 0xbc, 0x72, 0xad, 0xd7, 0x57, 0x2e, 0xfa, 0x61, 0xe9, 0xa2, 0x5f, 0x96, 0x2e, 0x7a, 0xb1, + 0x74, 0xd1, 0xe5, 0xd2, 0x45, 0x7f, 0x2e, 0x5d, 0xf4, 0xd7, 0xd2, 0xb5, 0x5e, 0x2f, 0x5d, 0xf4, + 0xfc, 0xda, 0xb5, 0x2e, 0xaf, 0x5d, 0xeb, 0xe5, 0xb5, 0x6b, 0x8d, 0x2b, 0xaa, 0xe5, 0x0f, 0xff, + 0x09, 0x00, 0x00, 0xff, 0xff, 0xc4, 0x1f, 0x82, 0x20, 0x82, 0x08, 0x00, 0x00, } func (this *RulesRequest) Equal(that interface{}) bool { @@ -570,6 +653,51 @@ func (this *RulesRequest) Equal(that interface{}) bool { } return true } +func (this *LivenessCheckRequest) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LivenessCheckRequest) + if !ok { + that2, ok := that.(LivenessCheckRequest) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + return true +} +func (this *LivenessCheckResponse) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*LivenessCheckResponse) + if !ok { + that2, ok := that.(LivenessCheckResponse) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.State != that1.State { + return false + } + return true +} func (this *RulesResponse) Equal(that interface{}) bool { if that == nil { return this == nil @@ -762,6 +890,25 @@ func (this *RulesRequest) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *LivenessCheckRequest) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 4) + s = append(s, "&ruler.LivenessCheckRequest{") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *LivenessCheckResponse) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&ruler.LivenessCheckResponse{") + s = append(s, "State: "+fmt.Sprintf("%#v", this.State)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *RulesResponse) GoString() string { if this == nil { return "nil" @@ -852,6 +999,7 @@ const _ = grpc.SupportPackageIsVersion4 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type RulerClient interface { Rules(ctx context.Context, in *RulesRequest, opts ...grpc.CallOption) (*RulesResponse, error) + LivenessCheck(ctx context.Context, in *LivenessCheckRequest, opts ...grpc.CallOption) (*LivenessCheckResponse, error) } type rulerClient struct { @@ -871,9 +1019,19 @@ func (c *rulerClient) Rules(ctx context.Context, in *RulesRequest, opts ...grpc. return out, nil } +func (c *rulerClient) LivenessCheck(ctx context.Context, in *LivenessCheckRequest, opts ...grpc.CallOption) (*LivenessCheckResponse, error) { + out := new(LivenessCheckResponse) + err := c.cc.Invoke(ctx, "/ruler.Ruler/LivenessCheck", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // RulerServer is the server API for Ruler service. type RulerServer interface { Rules(context.Context, *RulesRequest) (*RulesResponse, error) + LivenessCheck(context.Context, *LivenessCheckRequest) (*LivenessCheckResponse, error) } // UnimplementedRulerServer can be embedded to have forward compatible implementations. @@ -883,6 +1041,9 @@ type UnimplementedRulerServer struct { func (*UnimplementedRulerServer) Rules(ctx context.Context, req *RulesRequest) (*RulesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Rules not implemented") } +func (*UnimplementedRulerServer) LivenessCheck(ctx context.Context, req *LivenessCheckRequest) (*LivenessCheckResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method LivenessCheck not implemented") +} func RegisterRulerServer(s *grpc.Server, srv RulerServer) { s.RegisterService(&_Ruler_serviceDesc, srv) @@ -906,6 +1067,24 @@ func _Ruler_Rules_Handler(srv interface{}, ctx context.Context, dec func(interfa return interceptor(ctx, in, info, handler) } +func _Ruler_LivenessCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(LivenessCheckRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RulerServer).LivenessCheck(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ruler.Ruler/LivenessCheck", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RulerServer).LivenessCheck(ctx, req.(*LivenessCheckRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _Ruler_serviceDesc = grpc.ServiceDesc{ ServiceName: "ruler.Ruler", HandlerType: (*RulerServer)(nil), @@ -914,6 +1093,10 @@ var _Ruler_serviceDesc = grpc.ServiceDesc{ MethodName: "Rules", Handler: _Ruler_Rules_Handler, }, + { + MethodName: "LivenessCheck", + Handler: _Ruler_LivenessCheck_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "ruler.proto", @@ -1009,6 +1192,57 @@ func (m *RulesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *LivenessCheckRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LivenessCheckRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *LivenessCheckRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *LivenessCheckResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *LivenessCheckResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *LivenessCheckResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.State != 0 { + i = encodeVarintRuler(dAtA, i, uint64(m.State)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *RulesResponse) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1368,6 +1602,27 @@ func (m *RulesRequest) Size() (n int) { return n } +func (m *LivenessCheckRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *LivenessCheckResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.State != 0 { + n += 1 + sovRuler(uint64(m.State)) + } + return n +} + func (m *RulesResponse) Size() (n int) { if m == nil { return 0 @@ -1504,6 +1759,25 @@ func (this *RulesRequest) String() string { }, "") return s } +func (this *LivenessCheckRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LivenessCheckRequest{`, + `}`, + }, "") + return s +} +func (this *LivenessCheckResponse) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&LivenessCheckResponse{`, + `State:` + fmt.Sprintf("%v", this.State) + `,`, + `}`, + }, "") + return s +} func (this *RulesResponse) String() string { if this == nil { return "nil" @@ -1882,6 +2156,131 @@ func (m *RulesRequest) Unmarshal(dAtA []byte) error { } return nil } +func (m *LivenessCheckRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LivenessCheckRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LivenessCheckRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRuler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRuler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRuler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *LivenessCheckResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: LivenessCheckResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: LivenessCheckResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field State", wireType) + } + m.State = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRuler + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.State |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRuler(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRuler + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRuler + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *RulesResponse) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/ruler/ruler.proto b/pkg/ruler/ruler.proto index 42828f7352..22745ead97 100644 --- a/pkg/ruler/ruler.proto +++ b/pkg/ruler/ruler.proto @@ -17,6 +17,7 @@ option (gogoproto.unmarshaler_all) = true; service Ruler { rpc Rules(RulesRequest) returns (RulesResponse) {}; + rpc LivenessCheck(LivenessCheckRequest) returns (LivenessCheckResponse){}; } message RulesRequest { @@ -30,6 +31,12 @@ message RulesRequest { bool excludeAlerts = 8; } +message LivenessCheckRequest{} + +message LivenessCheckResponse{ + int32 state = 1; +} + message RulesResponse { repeated GroupStateDesc groups = 1; } diff --git a/pkg/ruler/ruler_client_config.go b/pkg/ruler/ruler_client_config.go new file mode 100644 index 0000000000..c4d019b660 --- /dev/null +++ b/pkg/ruler/ruler_client_config.go @@ -0,0 +1,18 @@ +package ruler + +import ( + "flag" + "time" + + "github.com/cortexproject/cortex/pkg/util/grpcclient" +) + +type ClientConfig struct { + grpcclient.Config `yaml:",inline"` + RemoteTimeout time.Duration `yaml:"remote_timeout"` +} + +func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.Config.RegisterFlagsWithPrefix(prefix, "", f) + f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 2*time.Minute, "Timeout for downstream rulers.") +} diff --git a/pkg/ruler/ruler_ring.go b/pkg/ruler/ruler_ring.go index 534dbb67da..215a711f02 100644 --- a/pkg/ruler/ruler_ring.go +++ b/pkg/ruler/ruler_ring.go @@ -53,8 +53,8 @@ type RingConfig struct { InstanceZone string `yaml:"instance_availability_zone" doc:"hidden"` NumTokens int `yaml:"num_tokens"` - FinalSleep time.Duration `yaml:"final_sleep"` - + FinalSleep time.Duration `yaml:"final_sleep"` + KeepInstanceInTheRingOnShutdown bool `yaml:"keep_instance_in_the_ring_on_shutdown"` // Injected internally ListenPort int `yaml:"-"` @@ -86,6 +86,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.InstanceID, "ruler.ring.instance-id", hostname, "Instance ID to register in the ring.") f.StringVar(&cfg.InstanceZone, "ruler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") f.IntVar(&cfg.NumTokens, "ruler.ring.num-tokens", 128, "Number of tokens for each ruler.") + f.BoolVar(&cfg.KeepInstanceInTheRingOnShutdown, "ruler.ring.keep-instance-in-the-ring-on-shutdown", false, "Keep instance in the ring on shut down.") } // ToLifecyclerConfig returns a LifecyclerConfig based on the ruler @@ -99,13 +100,14 @@ func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecycl instancePort := ring.GetInstancePort(cfg.InstancePort, cfg.ListenPort) return ring.BasicLifecyclerConfig{ - ID: cfg.InstanceID, - Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), - Zone: cfg.InstanceZone, - HeartbeatPeriod: cfg.HeartbeatPeriod, - TokensObservePeriod: 0, - NumTokens: cfg.NumTokens, - FinalSleep: cfg.FinalSleep, + ID: cfg.InstanceID, + Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, + HeartbeatPeriod: cfg.HeartbeatPeriod, + TokensObservePeriod: 0, + NumTokens: cfg.NumTokens, + FinalSleep: cfg.FinalSleep, + KeepInstanceInTheRingOnShutdown: cfg.KeepInstanceInTheRingOnShutdown, }, nil } diff --git a/pkg/ruler/ruler_ring_test.go b/pkg/ruler/ruler_ring_test.go index c40ada8964..4b740eea69 100644 --- a/pkg/ruler/ruler_ring_test.go +++ b/pkg/ruler/ruler_ring_test.go @@ -71,6 +71,38 @@ func TestGetReplicationSetForListRule(t *testing.T) { expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3"}, expectedMaxError: 1, }, + "max errors must be 0 when RF=3 and healthy instances=1": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.1"}, + expectedMaxError: 0, + }, + "max errors must be 1 when RF=3 and healthy instances=2": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 3, + expectedSet: []string{"127.0.0.1", "127.0.0.2"}, + expectedMaxError: 1, + }, + "max errors must be 1 when RF=2 and healthy instances=5": { + ringInstances: map[string]ring.InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, + "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "", 128, true)}, + "instance-3": {Addr: "127.0.0.3", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-3", "", 128, true)}, + "instance-4": {Addr: "127.0.0.4", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-4", "", 128, true)}, + "instance-5": {Addr: "127.0.0.5", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-5", "", 128, true)}, + }, + ringHeartbeatTimeout: time.Minute, + ringReplicationFactor: 2, + expectedSet: []string{"127.0.0.1", "127.0.0.2", "127.0.0.3", "127.0.0.4", "127.0.0.5"}, + expectedMaxError: 1, + }, "should succeed on 2 unhealthy instances and RF=3": { ringInstances: map[string]ring.InstanceDesc{ "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "", 128, true)}, @@ -144,7 +176,7 @@ func TestGetReplicationSetForListRule(t *testing.T) { }, expectedMaxUnavailableZones: 2, }, - "should fail on 3 unhealthy instances in 3 zonez and RF=3 zone replication enabled": { + "should fail on 3 unhealthy instances in 3 zones and RF=3 zone replication enabled": { ringInstances: map[string]ring.InstanceDesc{ "instance-1": {Addr: "127.0.0.1", State: ring.ACTIVE, Timestamp: now.Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-1", "z1", 128, true), Zone: "z1"}, "instance-2": {Addr: "127.0.0.2", State: ring.ACTIVE, Timestamp: now.Add(-10 * time.Second).Unix(), Tokens: g.GenerateTokens(ring.NewDesc(), "instance-2", "z2", 128, true), Zone: "z2"}, diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index af338443ca..94f990f375 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -2,6 +2,7 @@ package ruler import ( "context" + "errors" "fmt" "io" "math/rand" @@ -203,6 +204,16 @@ func (c *mockRulerClient) Rules(ctx context.Context, in *RulesRequest, _ ...grpc return c.ruler.Rules(ctx, in) } +func (c *mockRulerClient) LivenessCheck(ctx context.Context, in *LivenessCheckRequest, opts ...grpc.CallOption) (*LivenessCheckResponse, error) { + + if c.ruler.State() == services.Terminated { + return nil, errors.New("ruler is terminated") + } + return &LivenessCheckResponse{ + State: int32(services.Running), + }, nil +} + func (p *mockRulerClientsPool) GetClientFor(addr string) (RulerClient, error) { for _, r := range p.rulerAddrMap { if r.lifecycler.GetInstanceAddr() == addr { @@ -218,7 +229,7 @@ func (p *mockRulerClientsPool) GetClientFor(addr string) (RulerClient, error) { func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer, rulerAddrMap map[string]*Ruler) *mockRulerClientsPool { return &mockRulerClientsPool{ - ClientsPool: newRulerClientPool(cfg.ClientTLSConfig, logger, reg), + ClientsPool: newRulerClientPool(cfg.ClientTLSConfig.Config, logger, reg), cfg: cfg, rulerAddrMap: rulerAddrMap, } @@ -1330,6 +1341,233 @@ func TestGetRulesFromBackup(t *testing.T) { require.Equal(t, "rtest_user1_1", ruleStateDescriptions[0].ActiveRules[0].Rule.Record) } +func TestGetRules_HA(t *testing.T) { + t.Run("Test RF = 2", getRulesHATest(2)) + t.Run("Test RF = 3", getRulesHATest(3)) +} + +func getRulesHATest(replicationFactor int) func(t *testing.T) { + return func(t *testing.T) { + // ruler ID -> (user ID -> list of groups). + type expectedRulesMap map[string]map[string]rulespb.RuleGroupList + + rule := []*rulespb.RuleDesc{ + { + Record: "rtest_user1_1", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + }, + { + Alert: "atest_user1_1", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + }, + { + Record: "rtest_user1_2", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + Labels: []cortexpb.LabelAdapter{ + {Name: "key", Value: "val"}, + }, + }, + { + Alert: "atest_user1_2", + Expr: "sum(rate(node_cpu_seconds_total[3h:10m]))", + Labels: []cortexpb.LabelAdapter{ + {Name: "key", Value: "val"}, + }, + Annotations: []cortexpb.LabelAdapter{ + {Name: "aKey", Value: "aVal"}, + }, + For: 10 * time.Second, + KeepFiringFor: 20 * time.Second, + }, + } + + tenantId := "user1" + + rulerStateMapOnePending := map[string]ring.InstanceState{ + "ruler1": ring.PENDING, + "ruler2": ring.ACTIVE, + "ruler3": ring.ACTIVE, + } + + rulerAZEvenSpread := map[string]string{ + "ruler1": "a", + "ruler2": "b", + "ruler3": "c", + } + + expectedRules := expectedRulesMap{ + "ruler1": map[string]rulespb.RuleGroupList{ + tenantId: { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "l1", Interval: 10 * time.Minute, Limit: 10, Rules: rule}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "l2", Interval: 0, Rules: rule}, + }, + }, + "ruler2": map[string]rulespb.RuleGroupList{ + tenantId: { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "b1", Interval: 10 * time.Minute, Limit: 10, Rules: rule}, + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace", Name: "b2", Interval: 0, Rules: rule}, + }, + }, + "ruler3": map[string]rulespb.RuleGroupList{ + tenantId: { + &rulespb.RuleGroupDesc{User: "user1", Namespace: "namespace2", Name: "b3", Interval: 0, Rules: rule}, + }, + }, + } + + kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) + allRulesByUser := map[string]rulespb.RuleGroupList{} + allTokensByRuler := map[string][]uint32{} + rulerAddrMap := map[string]*Ruler{} + + createRuler := func(id string) *Ruler { + store := newMockRuleStore(allRulesByUser, nil) + cfg := defaultRulerConfig(t) + + cfg.ShardingStrategy = util.ShardingStrategyShuffle + cfg.EnableSharding = true + cfg.EnableHAEvaluation = true + cfg.EvaluationInterval = 5 * time.Minute + + cfg.Ring = RingConfig{ + InstanceID: id, + InstanceAddr: id, + KVStore: kv.Config{ + Mock: kvStore, + }, + ReplicationFactor: replicationFactor, + ZoneAwarenessEnabled: true, + InstanceZone: rulerAZEvenSpread[id], + } + + r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap) + r.limits = ruleLimits{tenantShard: 3} + rulerAddrMap[id] = r + if r.ring != nil { + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r.ring)) + t.Cleanup(r.ring.StopAsync) + } + return r + } + + for rID, r := range expectedRules { + createRuler(rID) + for u, rules := range r { + allRulesByUser[u] = append(allRulesByUser[u], rules...) + allTokensByRuler[rID] = generateTokenForGroups(rules, 1) + } + } + + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, ring.ACTIVE, time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + + forEachRuler := func(f func(rID string, r *Ruler)) { + for rID, r := range rulerAddrMap { + f(rID, r) + } + } + + // Sync Rules + forEachRuler(func(_ string, r *Ruler) { + r.syncRules(context.Background(), rulerSyncReasonInitial) + }) + + // update the State of the rulers in the ring based on tc.rulerStateMap + err = kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { + d, _ := in.(*ring.Desc) + if d == nil { + d = ring.NewDesc() + } + for rID, tokens := range allTokensByRuler { + d.AddIngester(rID, rulerAddrMap[rID].lifecycler.GetInstanceAddr(), rulerAddrMap[rID].lifecycler.GetInstanceZone(), tokens, rulerStateMapOnePending[rID], time.Now()) + } + return d, true, nil + }) + require.NoError(t, err) + // Wait a bit to make sure ruler's ring is updated. + time.Sleep(100 * time.Millisecond) + + rulerAddrMap["ruler1"].Service.StopAsync() + if err := rulerAddrMap["ruler1"].Service.AwaitTerminated(context.Background()); err != nil { + t.Errorf("ruler %s was not terminated with error %s", "ruler1", err.Error()) + } + + rulerAddrMap["ruler2"].syncRules(context.Background(), rulerSyncReasonPeriodic) + rulerAddrMap["ruler3"].syncRules(context.Background(), rulerSyncReasonPeriodic) + + requireGroupStateEqual := func(a *GroupStateDesc, b *GroupStateDesc) { + require.Equal(t, a.Group.Interval, b.Group.Interval) + require.Equal(t, a.Group.User, b.Group.User) + require.Equal(t, a.Group.Limit, b.Group.Limit) + require.Equal(t, a.EvaluationTimestamp, b.EvaluationTimestamp) + require.Equal(t, a.EvaluationDuration, b.EvaluationDuration) + require.Equal(t, len(a.ActiveRules), len(b.ActiveRules)) + for i, aRule := range a.ActiveRules { + bRule := b.ActiveRules[i] + require.Equal(t, aRule.EvaluationTimestamp, bRule.EvaluationTimestamp) + require.Equal(t, aRule.EvaluationDuration, bRule.EvaluationDuration) + require.Equal(t, aRule.Health, bRule.Health) + require.Equal(t, aRule.LastError, bRule.LastError) + require.Equal(t, aRule.Rule.Expr, bRule.Rule.Expr) + require.Equal(t, len(aRule.Rule.Labels), len(bRule.Rule.Labels)) + require.Equal(t, fmt.Sprintf("%+v", aRule.Rule.Labels), fmt.Sprintf("%+v", aRule.Rule.Labels)) + if aRule.Rule.Alert != "" { + require.Equal(t, fmt.Sprintf("%+v", aRule.Rule.Annotations), fmt.Sprintf("%+v", bRule.Rule.Annotations)) + require.Equal(t, aRule.Rule.Alert, bRule.Rule.Alert) + require.Equal(t, aRule.Rule.For, bRule.Rule.For) + require.Equal(t, aRule.Rule.KeepFiringFor, bRule.Rule.KeepFiringFor) + require.Equal(t, aRule.State, bRule.State) + require.Equal(t, aRule.Alerts, bRule.Alerts) + } else { + require.Equal(t, aRule.Rule.Record, bRule.Rule.Record) + } + } + } + + getRules := func(ruler string) { + ctx := user.InjectOrgID(context.Background(), tenantId) + ruleStateDescriptions, err := rulerAddrMap[ruler].GetRules(ctx, RulesRequest{}) + require.NoError(t, err) + require.Equal(t, 5, len(ruleStateDescriptions)) + stateByKey := map[string]*GroupStateDesc{} + for _, state := range ruleStateDescriptions { + stateByKey[state.Group.Namespace+";"+state.Group.Name] = state + } + // Rule Group Name that starts will b are from the backup and those that start with l are evaluating, the details of + // the group other than the Name should be equal to the group that starts with l as the config is the same. This test + // confirms that the way we convert rulepb.RuleGroupList to GroupStateDesc is consistent to how we convert + // promRules.Group to GroupStateDesc + requireGroupStateEqual(stateByKey["namespace;l1"], stateByKey["namespace;b1"]) + requireGroupStateEqual(stateByKey["namespace;l2"], stateByKey["namespace;b2"]) + } + + getRules("ruler3") + getRules("ruler2") + + ctx := user.InjectOrgID(context.Background(), tenantId) + + ruleResponse, err := rulerAddrMap["ruler2"].Rules(ctx, &RulesRequest{}) + require.NoError(t, err) + require.Equal(t, 5, len(ruleResponse.Groups)) + + ruleResponse, err = rulerAddrMap["ruler3"].Rules(ctx, &RulesRequest{}) + require.NoError(t, err) + require.Equal(t, 5, len(ruleResponse.Groups)) + } +} + func TestSharding(t *testing.T) { const ( user1 = "user1"