Skip to content

Commit 014235e

Browse files
committed
WIP
Signed-off-by: Emmanuel Lodovice <[email protected]>
1 parent 3624924 commit 014235e

File tree

5 files changed

+457
-57
lines changed

5 files changed

+457
-57
lines changed

pkg/ruler/manager.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ type DefaultMultiTenantManager struct {
4444
notifiers map[string]*rulerNotifier
4545
notifiersDiscoveryMetrics map[string]discovery.DiscovererMetrics
4646

47+
// rules backup
48+
rulesBackupManager *rulesBackupManager
49+
4750
managersTotal prometheus.Gauge
4851
lastReloadSuccessful *prometheus.GaugeVec
4952
lastReloadSuccessfulTimestamp *prometheus.GaugeVec
@@ -85,6 +88,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, eva
8588
mapper: newMapper(cfg.RulePath, logger),
8689
userManagers: map[string]RulesManager{},
8790
userManagerMetrics: userManagerMetrics,
91+
rulesBackupManager: newRulesBackupManager(cfg, logger),
8892
managersTotal: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
8993
Namespace: "cortex",
9094
Name: "ruler_managers_total",
@@ -142,8 +146,12 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
142146
r.managersTotal.Set(float64(len(r.userManagers)))
143147
}
144148

149+
func (r *DefaultMultiTenantManager) BackUpRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
150+
r.rulesBackupManager.backUpRuleGroups(ctx, ruleGroups)
151+
}
152+
145153
// syncRulesToManager maps the rule files to disk, detects any changes and will create/update the
146-
// the users Prometheus Rules Manager.
154+
// users Prometheus Rules Manager.
147155
func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user string, groups rulespb.RuleGroupList) {
148156
// Map the files to disk and return the file names to be passed to the users manager if they
149157
// have been updated
@@ -279,6 +287,10 @@ func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group {
279287
return groups
280288
}
281289

290+
func (r *DefaultMultiTenantManager) GetBackupRules(userID string) []*promRules.Group {
291+
return r.rulesBackupManager.getRuleGroups(userID)
292+
}
293+
282294
func (r *DefaultMultiTenantManager) Stop() {
283295
r.notifiersMtx.Lock()
284296
for _, n := range r.notifiers {

pkg/ruler/merger.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package ruler
2+
3+
import (
4+
"strings"
5+
"time"
6+
7+
"golang.org/x/exp/slices"
8+
9+
promRules "github.com/prometheus/prometheus/rules"
10+
)
11+
12+
func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
13+
states := make(map[string]*GroupStateDesc)
14+
rgTime := make(map[string]time.Time)
15+
for _, state := range in {
16+
latestTs := state.EvaluationTimestamp
17+
for _, r := range state.ActiveRules {
18+
if latestTs.Before(r.EvaluationTimestamp) {
19+
latestTs = r.EvaluationTimestamp
20+
}
21+
}
22+
key := promRules.GroupKey(state.Group.Namespace, state.Group.Name)
23+
ts, ok := rgTime[key]
24+
if !ok || ts.Before(latestTs) {
25+
states[key] = state
26+
rgTime[key] = latestTs
27+
}
28+
}
29+
groups := make([]*GroupStateDesc, 0, len(states))
30+
for _, state := range states {
31+
groups = append(groups, state)
32+
}
33+
slices.SortFunc(groups, func(a, b *GroupStateDesc) int {
34+
fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace)
35+
36+
// If the namespace is the same, check the group name
37+
if fileCompare != 0 {
38+
return fileCompare
39+
}
40+
return strings.Compare(a.Group.Name, b.Group.Name)
41+
})
42+
return groups
43+
}

pkg/ruler/rule_backup_manager.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"strings"
7+
"sync"
8+
9+
"github.com/go-kit/log"
10+
"github.com/go-kit/log/level"
11+
"github.com/prometheus/prometheus/model/rulefmt"
12+
"github.com/prometheus/prometheus/promql/parser"
13+
promRules "github.com/prometheus/prometheus/rules"
14+
15+
"github.com/cortexproject/cortex/pkg/ruler/rulespb"
16+
)
17+
18+
// Implements GroupLoader interface but instead of reading from a file when Load is called, it returns the
19+
// rulefmt.RuleGroup it has stored
20+
type loader struct {
21+
ruleGroups map[string][]rulefmt.RuleGroup
22+
}
23+
24+
func (r *loader) Load(identifier string) (*rulefmt.RuleGroups, []error) {
25+
return &rulefmt.RuleGroups{
26+
Groups: r.ruleGroups[identifier],
27+
}, nil
28+
}
29+
30+
func (r *loader) Parse(query string) (parser.Expr, error) {
31+
return parser.ParseExpr(query)
32+
}
33+
34+
type rulesBackupManager struct {
35+
backupRuleGroupsMtx sync.RWMutex
36+
backupRuleGroups map[string][]*promRules.Group
37+
cfg Config
38+
39+
logger log.Logger
40+
}
41+
42+
func newRulesBackupManager(cfg Config, logger log.Logger) *rulesBackupManager {
43+
return &rulesBackupManager{
44+
backupRuleGroups: make(map[string][]*promRules.Group),
45+
cfg: cfg,
46+
logger: logger,
47+
}
48+
}
49+
50+
func (r *rulesBackupManager) backUpRuleGroups(_ context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
51+
r.backupRuleGroupsMtx.Lock()
52+
defer r.backupRuleGroupsMtx.Unlock()
53+
backupRuleGroups := make(map[string][]*promRules.Group)
54+
for user, groups := range ruleGroups {
55+
g, err := r.ruleGroupListToPromGroups(user, groups)
56+
if err != nil {
57+
// TODO: Increment a metric
58+
level.Error(r.logger).Log("msg", "unable to back up rules", "user", user, "err", err)
59+
continue
60+
}
61+
backupRuleGroups[user] = g
62+
}
63+
r.backupRuleGroups = backupRuleGroups
64+
}
65+
66+
// ruleGroupListToPromGroups converts rulespb.RuleGroupList to []*promRules.Group by creating a single use
67+
// promRules.Manager and calling its LoadGroups method.
68+
func (r *rulesBackupManager) ruleGroupListToPromGroups(user string, ruleGroups rulespb.RuleGroupList) ([]*promRules.Group, error) {
69+
rgs := ruleGroups.Formatted()
70+
71+
loader := &loader{
72+
ruleGroups: rgs,
73+
}
74+
promManager := promRules.NewManager(&promRules.ManagerOptions{
75+
ExternalURL: r.cfg.ExternalURL.URL,
76+
GroupLoader: loader,
77+
})
78+
79+
namespaces := make([]string, 0, len(rgs))
80+
for k := range rgs {
81+
namespaces = append(namespaces, k)
82+
}
83+
level.Info(r.logger).Log("msg", "backup rules for user", "user", user, "namespaces", strings.Join(namespaces, ","))
84+
loadedGroups, errs := promManager.LoadGroups(r.cfg.EvaluationInterval, r.cfg.ExternalLabels, r.cfg.ExternalURL.String(), nil, namespaces...)
85+
if errs != nil {
86+
for _, e := range errs {
87+
level.Error(r.logger).Log("msg", "loading groups to backup failed", "user", user, "namespaces", namespaces, "err", e)
88+
}
89+
return nil, errors.New("error loading rules to backup")
90+
}
91+
92+
groups := make([]*promRules.Group, 0, len(loadedGroups))
93+
for _, g := range loadedGroups {
94+
groups = append(groups, g)
95+
}
96+
return groups, nil
97+
}
98+
99+
func (r *rulesBackupManager) getRuleGroups(userID string) []*promRules.Group {
100+
var result []*promRules.Group
101+
r.backupRuleGroupsMtx.RLock()
102+
defer r.backupRuleGroupsMtx.RUnlock()
103+
if groups, exists := r.backupRuleGroups[userID]; exists {
104+
result = groups
105+
}
106+
return result
107+
}

0 commit comments

Comments
 (0)