Skip to content

Commit ba3c705

Browse files
Refactoring filterRuleGroups, filterBackupRuleGroups and removed ruler address from LivenessCheckRequest gRPC call
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent 4e576fa commit ba3c705

File tree

3 files changed

+81
-140
lines changed

3 files changed

+81
-140
lines changed

pkg/ruler/ruler.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ func tokenForGroup(g *rulespb.RuleGroupDesc) uint32 {
512512
return ringHasher.Sum32()
513513
}
514514

515-
func (r *Ruler) instanceOwnsRuleGroup(rr ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, instanceAddr string, forBackup bool) (bool, error) {
515+
func (r *Ruler) instanceOwnsRuleGroup(rr ring.ReadRing, g *rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, forBackup bool) (bool, error) {
516516

517517
hash := tokenForGroup(g)
518518

@@ -521,6 +521,7 @@ func (r *Ruler) instanceOwnsRuleGroup(rr ring.ReadRing, g *rulespb.RuleGroupDesc
521521
return false, errors.Wrap(err, "error reading ring to verify rule group ownership")
522522
}
523523

524+
instanceAddr := r.lifecycler.GetInstanceAddr()
524525
if forBackup {
525526
// Only the second up to the last replica are used as backup
526527
for i := 1; i < len(rlrs.Instances); i++ {
@@ -536,7 +537,7 @@ func (r *Ruler) instanceOwnsRuleGroup(rr ring.ReadRing, g *rulespb.RuleGroupDesc
536537
level.Debug(r.Logger()).Log("msg", "primary taking ownership", "user", g.User, "group", g.Name, "namespace", g.Namespace, "ruler", instanceAddr)
537538
return ownsRuleGroupOrDisable(g, disabledRuleGroups)
538539
}
539-
if ruler.Addr == instanceAddr && r.nonPrimaryInstanceOwnsRuleGroup(g, rlrs.GetAddresses()[:i], instanceAddr) {
540+
if ruler.Addr == instanceAddr && r.nonPrimaryInstanceOwnsRuleGroup(g, rlrs.GetAddresses()[:i]) {
540541
level.Info(r.Logger()).Log("msg", "non-primary ruler taking ownership", "user", g.User, "group", g.Name, "namespace", g.Namespace, "ruler", instanceAddr)
541542
return ownsRuleGroupOrDisable(g, disabledRuleGroups)
542543
}
@@ -569,7 +570,7 @@ func (r *Ruler) LivenessCheck(_ context.Context, request *LivenessCheckRequest)
569570

570571
// This function performs a liveness check against the provided replicas. If any one of the replicas responds with a state = Running, then
571572
// this Ruler should not take ownership of the rule group. Otherwise, this Ruler must take ownership of the rule group to avoid missing evaluations
572-
func (r *Ruler) nonPrimaryInstanceOwnsRuleGroup(g *rulespb.RuleGroupDesc, replicas []string, selfAddress string) bool {
573+
func (r *Ruler) nonPrimaryInstanceOwnsRuleGroup(g *rulespb.RuleGroupDesc, replicas []string) bool {
573574
userID := g.User
574575

575576
jobs := concurrency.CreateJobsFromStrings(replicas)
@@ -586,14 +587,12 @@ func (r *Ruler) nonPrimaryInstanceOwnsRuleGroup(g *rulespb.RuleGroupDesc, replic
586587
rulerClient, err := r.GetClientFor(addr)
587588
if err != nil {
588589
errorChan <- err
589-
level.Debug(r.Logger()).Log("msg", "unable to get client for ruler", "ruler addr", addr)
590+
level.Error(r.Logger()).Log("msg", "unable to get client for ruler", "ruler addr", addr)
590591
return nil
591592
}
592-
level.Debug(r.Logger()).Log("msg", "performing liveness check against", "addr", addr, "for", g.Name, "instance addr", selfAddress)
593+
level.Debug(r.Logger()).Log("msg", "performing liveness check against", "addr", addr, "for", g.Name)
593594

594-
resp, err := rulerClient.LivenessCheck(ctx, &LivenessCheckRequest{
595-
RulerAddress: selfAddress,
596-
})
595+
resp, err := rulerClient.LivenessCheck(ctx, &LivenessCheckRequest{})
597596
if err != nil {
598597
errorChan <- err
599598
level.Debug(r.Logger()).Log("msg", "liveness check failed", "addr", addr, "for", g.Name, "err", err.Error())
@@ -804,12 +803,12 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
804803
backedUpConfigs := make(map[string]rulespb.RuleGroupList)
805804
for userID, groups := range configs {
806805
ruleGroupCounts[userID] = len(groups)
807-
owned := r.filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
806+
owned := r.filterRuleGroups(userID, groups, r.ring)
808807
if len(owned) > 0 {
809808
ownedConfigs[userID] = owned
810809
}
811810
if r.cfg.RulesBackupEnabled() {
812-
backup := r.filterBackupRuleGroups(userID, groups, owned, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
811+
backup := r.filterBackupRuleGroups(userID, groups, owned, r.ring)
813812
if len(backup) > 0 {
814813
backedUpConfigs[userID] = backup
815814
}
@@ -876,10 +875,10 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
876875
ruleGroupCounts[userID] = len(groups)
877876
gLock.Unlock()
878877

879-
filterOwned := r.filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
878+
filterOwned := r.filterRuleGroups(userID, groups, userRings[userID])
880879
var filterBackup []*rulespb.RuleGroupDesc
881880
if r.cfg.RulesBackupEnabled() {
882-
filterBackup = r.filterBackupRuleGroups(userID, groups, filterOwned, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
881+
filterBackup = r.filterBackupRuleGroups(userID, groups, filterOwned, userRings[userID])
883882
}
884883
if len(filterOwned) == 0 && len(filterBackup) == 0 {
885884
continue
@@ -906,28 +905,29 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
906905
// This function only uses User, Namespace, and Name fields of individual RuleGroups.
907906
//
908907
// This method must not use r.ring, but only ring passed as parameter.
909-
func (r *Ruler) filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
908+
func (r *Ruler) filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring ring.ReadRing) []*rulespb.RuleGroupDesc {
910909
// Prune the rule group to only contain rules that this ruler is responsible for, based on ring.
911910
var result []*rulespb.RuleGroupDesc
911+
912912
for _, g := range ruleGroups {
913-
owned, err := r.instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr, false)
913+
owned, err := r.instanceOwnsRuleGroup(ring, g, r.limits.DisabledRuleGroups(userID), false)
914914
if err != nil {
915915
switch e := err.(type) {
916916
case *DisabledRuleGroupErr:
917-
level.Info(log).Log("msg", e.Message)
917+
level.Info(r.logger).Log("msg", e.Message)
918918
continue
919919
default:
920-
ringCheckErrors.Inc()
921-
level.Error(log).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
920+
r.ringCheckErrors.Inc()
921+
level.Error(r.logger).Log("msg", "failed to check if the ruler replica owns the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
922922
continue
923923
}
924924
}
925925

926926
if owned {
927-
level.Debug(log).Log("msg", "rule group owned", "user", g.User, "namespace", g.Namespace, "name", g.Name)
927+
level.Debug(r.logger).Log("msg", "rule group owned", "user", g.User, "namespace", g.Namespace, "name", g.Name)
928928
result = append(result, g)
929929
} else {
930-
level.Debug(log).Log("msg", "rule group not owned, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name)
930+
level.Debug(r.logger).Log("msg", "rule group not owned, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name)
931931
}
932932
}
933933

@@ -938,7 +938,7 @@ func (r *Ruler) filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupD
938938
// This function only uses User, Namespace, and Name fields of individual RuleGroups.
939939
//
940940
// This method must not use r.ring, but only ring passed as parameter
941-
func (r *Ruler) filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, owned []*rulespb.RuleGroupDesc, disabledRuleGroups validation.DisabledRuleGroups, ring ring.ReadRing, instanceAddr string, log log.Logger, ringCheckErrors prometheus.Counter) []*rulespb.RuleGroupDesc {
941+
func (r *Ruler) filterBackupRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, owned []*rulespb.RuleGroupDesc, ring ring.ReadRing) []*rulespb.RuleGroupDesc {
942942
var result []*rulespb.RuleGroupDesc
943943
ownedMap := map[uint32]struct{}{}
944944
for _, g := range owned {
@@ -951,24 +951,24 @@ func (r *Ruler) filterBackupRuleGroups(userID string, ruleGroups []*rulespb.Rule
951951
if _, OK := ownedMap[hash]; OK {
952952
continue
953953
}
954-
backup, err := r.instanceOwnsRuleGroup(ring, g, disabledRuleGroups, instanceAddr, true)
954+
backup, err := r.instanceOwnsRuleGroup(ring, g, r.limits.DisabledRuleGroups(userID), true)
955955
if err != nil {
956956
switch e := err.(type) {
957957
case *DisabledRuleGroupErr:
958-
level.Info(log).Log("msg", e.Message)
958+
level.Info(r.logger).Log("msg", e.Message)
959959
continue
960960
default:
961-
ringCheckErrors.Inc()
962-
level.Error(log).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
961+
r.ringCheckErrors.Inc()
962+
level.Error(r.logger).Log("msg", "failed to check if the ruler replica backs up the rule group", "user", userID, "namespace", g.Namespace, "group", g.Name, "err", err)
963963
continue
964964
}
965965
}
966966

967967
if backup {
968-
level.Debug(log).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name)
968+
level.Debug(r.logger).Log("msg", "rule group backed up", "user", g.User, "namespace", g.Namespace, "name", g.Name)
969969
result = append(result, g)
970970
} else {
971-
level.Debug(log).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name)
971+
level.Debug(r.logger).Log("msg", "rule group not backed up, ignoring", "user", g.User, "namespace", g.Namespace, "name", g.Name)
972972
}
973973
}
974974

0 commit comments

Comments
 (0)