Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type DefaultMultiTenantManager struct {
ruleCache map[string][]*promRules.Group
ruleCacheMtx sync.RWMutex
syncRuleMtx sync.Mutex

ruleGroupIterationFunc promRules.GroupEvalIterationFunc
}

func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
Expand Down Expand Up @@ -122,15 +124,25 @@ func NewDefaultMultiTenantManager(cfg Config, limits RulesLimits, managerFactory
Name: "ruler_config_updates_total",
Help: "Total number of config updates triggered by a user",
}, []string{"user"}),
registry: reg,
logger: logger,
registry: reg,
logger: logger,
ruleGroupIterationFunc: defaultRuleGroupIterationFunc,
}
if cfg.RulesBackupEnabled() {
m.rulesBackupManager = newRulesBackupManager(cfg, logger, reg)
}
return m, nil
}

func NewDefaultMultiTenantManagerWithIterationFunc(iterFunc promRules.GroupEvalIterationFunc, cfg Config, limits RulesLimits, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
manager, err := NewDefaultMultiTenantManager(cfg, limits, managerFactory, evalMetrics, reg, logger)
if err != nil {
return nil, err
}
manager.ruleGroupIterationFunc = iterFunc
return manager, nil
}

func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) {
// this is a safety lock to ensure this method is executed sequentially
r.syncRuleMtx.Lock()
Expand Down Expand Up @@ -214,7 +226,7 @@ func (r *DefaultMultiTenantManager) syncRulesToManager(ctx context.Context, user
if (rulesUpdated || externalLabelsUpdated) && existing {
r.updateRuleCache(user, manager.RuleGroups())
}
err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), ruleGroupIterationFunc)
err = manager.Update(r.cfg.EvaluationInterval, files, externalLabels, r.cfg.ExternalURL.String(), r.ruleGroupIterationFunc)
r.deleteRuleCache(user)
if err != nil {
r.lastReloadSuccessful.WithLabelValues(user).Set(0)
Expand Down Expand Up @@ -257,7 +269,7 @@ func (r *DefaultMultiTenantManager) createRulesManager(user string, ctx context.
return manager
}

func ruleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
func defaultRuleGroupIterationFunc(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {
logMessage := []interface{}{
"component", "ruler",
"rule_group", g.Name(),
Expand Down
26 changes: 24 additions & 2 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,26 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
return ruler, manager
}

func buildRulerWithIterFunc(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler, ruleGroupIterFunc promRules.GroupEvalIterationFunc) (*Ruler, *DefaultMultiTenantManager) {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig)
metrics := NewRuleEvalMetrics(rulerConfig, reg)
managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, metrics, reg)
manager, err := NewDefaultMultiTenantManagerWithIterationFunc(ruleGroupIterFunc, rulerConfig, &ruleLimits{}, managerFactory, metrics, reg, log.NewNopLogger())
require.NoError(t, err)

ruler, err := newRuler(
rulerConfig,
manager,
reg,
logger,
store,
overrides,
newMockClientsPool(rulerConfig, logger, reg, rulerAddrMap),
)
require.NoError(t, err)
return ruler, manager
}

func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler {
ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))
Expand Down Expand Up @@ -2776,8 +2796,10 @@ func TestRecoverAlertsPostOutage(t *testing.T) {
querier.UseAlwaysQueryable(newEmptyQueryable()),
}

// create a ruler but don't start it. instead, we'll evaluate the rule groups manually.
r, _ := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil)
// Define a no-op GroupEvalIterationFunc to avoid races between the scheduled Eval() execution and the evaluations invoked by this test.
evalFunc := func(ctx context.Context, g *promRules.Group, evalTimestamp time.Time) {}

r, _ := buildRulerWithIterFunc(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, store, nil, evalFunc)
r.syncRules(context.Background(), rulerSyncReasonInitial)

// assert initial state of rule group
Expand Down
Loading