From e1c1d01bed7b513338e0e2d588bed9466f60d21b Mon Sep 17 00:00:00 2001 From: Emmanuel Lodovice Date: Tue, 13 Feb 2024 16:36:19 -0800 Subject: [PATCH] Remove user specific evaluation metrics when rule manager is removed Signed-off-by: Emmanuel Lodovice --- CHANGELOG.md | 1 + integration/ruler_test.go | 16 +++++----- pkg/cortex/modules.go | 9 +++--- pkg/ruler/compat.go | 40 +++++-------------------- pkg/ruler/manager.go | 13 +++++--- pkg/ruler/manager_metrics.go | 49 +++++++++++++++++++++++++++++++ pkg/ruler/manager_metrics_test.go | 39 ++++++++++++++++++++++++ pkg/ruler/manager_test.go | 44 ++++++++++++++++++++++++++- pkg/ruler/ruler_test.go | 10 ++++--- 9 files changed, 167 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6233ee8de98..7fee8384c85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735 * [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714 +* [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 diff --git a/integration/ruler_test.go b/integration/ruler_test.go index f8de7223d0a..cd2b37a29a2 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -741,7 +741,6 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { } matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) - var totalQueries = []float64{0} // Verify that user-failures don't increase cortex_ruler_queries_failed_total for groupName, expression := range map[string]string{ @@ -769,19 +768,20 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { require.NoError(t, err) require.Equal(t, float64(0), sum[0]) + // Check that cortex_ruler_queries_total went up + totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher)) + require.NoError(t, err) + require.Greater(t, totalQueries[0], float64(0)) + // Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test. require.NoError(t, c.DeleteRuleGroup(namespace, groupName)) // Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears). require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics)) - // Check that cortex_ruler_queries_total went up since last test. - newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher)) - require.NoError(t, err) - require.Greater(t, newTotalQueries[0], totalQueries[0]) - - // Remember totalQueries for next test. - totalQueries = newTotalQueries + // Deleting the rule group should clean up the cortex_ruler_queries_total metrics + _, err = ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher)) + require.EqualError(t, err, "metric=cortex_ruler_queries_total service=ruler: metric not found") }) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 08f7dba55fb..291791ac51e 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -549,6 +549,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { } t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort + metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer) if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) @@ -577,15 +578,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = promql.NewEngine(opts) } - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, prometheus.DefaultRegisterer) - manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer) + manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } else { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer) - manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer) + manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger) } if err != nil { diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 8443c431155..98aa758647e 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -279,32 +278,7 @@ type RulesManager interface { // ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager. type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager -func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, reg prometheus.Registerer) ManagerFactory { - totalWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ruler_write_requests_total", - Help: "Number of write requests to ingesters.", - }, []string{"user"}) - failedWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ruler_write_requests_failed_total", - Help: "Number of failed write requests to ingesters.", - }, []string{"user"}) - - totalQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ruler_queries_total", - Help: "Number of queries executed by ruler.", - }, []string{"user"}) - failedQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ruler_queries_failed_total", - Help: "Number of failed queries by ruler.", - }, []string{"user"}) - var rulerQuerySeconds *prometheus.CounterVec - if cfg.EnableQueryStats { - rulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ruler_query_seconds_total", - Help: "Total amount of wall clock time spent processing queries by the ruler.", - }, []string{"user"}) - } - +func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory { // Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors // and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors. // Errors from PromQL are always "user" errors. @@ -312,14 +286,14 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager { var queryTime prometheus.Counter - if rulerQuerySeconds != nil { - queryTime = rulerQuerySeconds.WithLabelValues(userID) + if evalMetrics.RulerQuerySeconds != nil { + queryTime = evalMetrics.RulerQuerySeconds.WithLabelValues(userID) } - failedQueries := failedQueriesVec.WithLabelValues(userID) - totalQueries := totalQueriesVec.WithLabelValues(userID) - totalWrites := totalWritesVec.WithLabelValues(userID) - failedWrites := failedWritesVec.WithLabelValues(userID) + failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID) + totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID) + totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID) + failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID) engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID) metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index ff742492b6b..50dc6aebe18 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -26,9 +26,10 @@ import ( ) type DefaultMultiTenantManager struct { - cfg Config - notifierCfg *config.Config - managerFactory ManagerFactory + cfg Config + notifierCfg *config.Config + managerFactory ManagerFactory + ruleEvalMetrics *RuleEvalMetrics mapper *mapper @@ -51,7 +52,7 @@ type DefaultMultiTenantManager struct { logger log.Logger } -func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) { +func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) { ncfg, err := buildNotifierConfig(&cfg) if err != nil { return nil, err @@ -78,6 +79,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg cfg: cfg, notifierCfg: ncfg, managerFactory: managerFactory, + ruleEvalMetrics: evalMetrics, notifiers: map[string]*rulerNotifier{}, notifiersDiscoveryMetrics: notifiersDiscoveryMetrics, mapper: newMapper(cfg.RulePath, logger), @@ -130,6 +132,9 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID) r.configUpdatesTotal.DeleteLabelValues(userID) r.userManagerMetrics.RemoveUserRegistry(userID) + if r.ruleEvalMetrics != nil { + r.ruleEvalMetrics.deletePerUserMetrics(userID) + } level.Info(r.logger).Log("msg", "deleted rule manager and local rule files", "user", userID) } } diff --git a/pkg/ruler/manager_metrics.go b/pkg/ruler/manager_metrics.go index 7bb3d43c90a..130387407c3 100644 --- a/pkg/ruler/manager_metrics.go +++ b/pkg/ruler/manager_metrics.go @@ -2,6 +2,7 @@ package ruler import ( "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/cortexproject/cortex/pkg/util" ) @@ -222,3 +223,51 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) { data.SendSumOfGaugesPerUser(out, m.NotificationQueueCapacity, "prometheus_notifications_queue_capacity") data.SendSumOfGaugesPerUser(out, m.AlertmanagersDiscovered, "prometheus_notifications_alertmanagers_discovered") } + +type RuleEvalMetrics struct { + TotalWritesVec *prometheus.CounterVec + FailedWritesVec *prometheus.CounterVec + TotalQueriesVec *prometheus.CounterVec + FailedQueriesVec *prometheus.CounterVec + RulerQuerySeconds *prometheus.CounterVec +} + +func NewRuleEvalMetrics(cfg Config, reg prometheus.Registerer) *RuleEvalMetrics { + m := &RuleEvalMetrics{ + TotalWritesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_write_requests_total", + Help: "Number of write requests to ingesters.", + }, []string{"user"}), + FailedWritesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_write_requests_failed_total", + Help: "Number of failed write requests to ingesters.", + }, []string{"user"}), + TotalQueriesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_queries_total", + Help: "Number of queries executed by ruler.", + }, []string{"user"}), + FailedQueriesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_queries_failed_total", + Help: "Number of failed queries by ruler.", + }, []string{"user"}), + } + if cfg.EnableQueryStats { + m.RulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ruler_query_seconds_total", + Help: "Total amount of wall clock time spent processing queries by the ruler.", + }, []string{"user"}) + } + + return m +} + +func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) { + m.TotalWritesVec.DeleteLabelValues(userID) + m.FailedWritesVec.DeleteLabelValues(userID) + m.TotalQueriesVec.DeleteLabelValues(userID) + m.FailedQueriesVec.DeleteLabelValues(userID) + + if m.RulerQuerySeconds != nil { + m.RulerQuerySeconds.DeleteLabelValues(userID) + } +} diff --git a/pkg/ruler/manager_metrics_test.go b/pkg/ruler/manager_metrics_test.go index d0ac38c81a9..4b851ef1920 100644 --- a/pkg/ruler/manager_metrics_test.go +++ b/pkg/ruler/manager_metrics_test.go @@ -10,6 +10,8 @@ import ( dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util" ) func TestManagerMetricsWithRuleGroupLabel(t *testing.T) { @@ -556,3 +558,40 @@ func TestMetricsArePerUser(t *testing.T) { assert.True(t, foundUserLabel, "user label not found for metric %s", desc.String()) } } + +func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewPedanticRegistry() + + m := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg) + m.TotalWritesVec.WithLabelValues("fake1").Add(10) + m.TotalWritesVec.WithLabelValues("fake2").Add(10) + m.FailedWritesVec.WithLabelValues("fake1").Add(10) + m.FailedWritesVec.WithLabelValues("fake2").Add(10) + m.TotalQueriesVec.WithLabelValues("fake1").Add(10) + m.TotalQueriesVec.WithLabelValues("fake2").Add(10) + m.FailedQueriesVec.WithLabelValues("fake1").Add(10) + m.FailedQueriesVec.WithLabelValues("fake2").Add(10) + m.RulerQuerySeconds.WithLabelValues("fake1").Add(10) + m.RulerQuerySeconds.WithLabelValues("fake2").Add(10) + + metricNames := []string{"cortex_ruler_write_requests_total", "cortex_ruler_write_requests_failed_total", "cortex_ruler_queries_total", "cortex_ruler_queries_failed_total", "cortex_ruler_query_seconds_total"} + gm, err := reg.Gather() + require.NoError(t, err) + mfm, err := util.NewMetricFamilyMap(gm) + require.NoError(t, err) + for _, name := range metricNames { + require.Contains(t, mfm[name].String(), "value:\"fake1\"") + require.Contains(t, mfm[name].String(), "value:\"fake2\"") + } + + m.deletePerUserMetrics("fake1") + gm, err = reg.Gather() + require.NoError(t, err) + mfm, err = util.NewMetricFamilyMap(gm) + require.NoError(t, err) + for _, name := range metricNames { + require.NotContains(t, mfm[name].String(), "value:\"fake1\"") + require.Contains(t, mfm[name].String(), "value:\"fake2\"") + } +} diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index e0650082969..c76888fc801 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -14,13 +14,14 @@ import ( "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ruler/rulespb" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/test" ) func TestSyncRuleGroups(t *testing.T) { dir := t.TempDir() - m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, log.NewNopLogger()) + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger()) require.NoError(t, err) const user = "testUser" @@ -96,6 +97,47 @@ func TestSyncRuleGroups(t *testing.T) { }) } +func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) { + dir := t.TempDir() + reg := prometheus.NewPedanticRegistry() + evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg) + m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger()) + require.NoError(t, err) + + const user = "testUser" + + evalMetrics.TotalWritesVec.WithLabelValues(user).Add(10) + + userRules := map[string]rulespb.RuleGroupList{ + user: { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "ns", + Interval: 1 * time.Minute, + User: user, + }, + }, + } + m.SyncRuleGroups(context.Background(), userRules) + gm, err := reg.Gather() + require.NoError(t, err) + mfm, err := util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.Contains(t, mfm["cortex_ruler_write_requests_total"].String(), "value:\""+user+"\"") + require.Contains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"") + + // Passing empty map / nil stops all managers. + m.SyncRuleGroups(context.Background(), nil) + require.Nil(t, getManager(m, user)) + + gm, err = reg.Gather() + require.NoError(t, err) + mfm, err = util.NewMetricFamilyMap(gm) + require.NoError(t, err) + require.NotContains(t, mfm["cortex_ruler_write_requests_total"].String(), "value:\""+user+"\"") + require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"") +} + func getManager(m *DefaultMultiTenantManager, user string) RulesManager { m.userManagerMtx.Lock() defer m.userManagerMtx.Unlock() diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 35e6fdb16df..b046bc7be99 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -175,7 +175,9 @@ func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Eng func newManager(t *testing.T, cfg Config) *DefaultMultiTenantManager { engine, queryable, pusher, logger, overrides, reg := testSetup(t, nil) - manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger) + metrics := NewRuleEvalMetrics(cfg, nil) + managerFactory := DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, metrics, nil) + manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, metrics, reg, logger) require.NoError(t, err) return manager @@ -221,9 +223,9 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) (*Ruler, *DefaultMultiTenantManager) { engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig) - - managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg) - manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, reg, log.NewNopLogger()) + metrics := NewRuleEvalMetrics(rulerConfig, reg) + managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, metrics, reg) + manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, metrics, reg, log.NewNopLogger()) require.NoError(t, err) ruler, err := newRuler(