diff --git a/pkg/alertmanager/api.go b/pkg/alertmanager/api.go index 385d3e2fad8..e5cfe91c1be 100644 --- a/pkg/alertmanager/api.go +++ b/pkg/alertmanager/api.go @@ -58,12 +58,13 @@ type UserConfig struct { func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), am.logger) - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { level.Error(logger).Log("msg", errNoOrgID, "err", err.Error()) http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) cfg, err := am.store.GetAlertConfig(r.Context(), userID) if err != nil { @@ -95,12 +96,13 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http. func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), am.logger) - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { level.Error(logger).Log("msg", errNoOrgID, "err", err.Error()) http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) var input io.Reader maxConfigSize := am.limits.AlertmanagerMaxConfigSize(userID) @@ -155,12 +157,13 @@ func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http. // Note that if no config exists for a user, StatusOK is returned. func (am *MultitenantAlertmanager) DeleteUserConfig(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), am.logger) - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { level.Error(logger).Log("msg", errNoOrgID, "err", err.Error()) http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) err = am.store.DeleteAlertConfig(r.Context(), userID) if err != nil { diff --git a/pkg/alertmanager/api_test.go b/pkg/alertmanager/api_test.go index 494120dc3d7..daf31baf381 100644 --- a/pkg/alertmanager/api_test.go +++ b/pkg/alertmanager/api_test.go @@ -640,6 +640,24 @@ route: - '...' continue: false receivers: +- name: route1 + webhook_configs: + - send_resolved: true + http_config: {} + url: http://alertmanager/api/notifications?orgId=1&rrid=7 + max_alerts: 0 +`, + }, + "user1|user2|user3": { + AlertmanagerConfig: ` +global: + resolve_timeout: 5m +route: + receiver: route1 + group_by: + - '...' + continue: false +receivers: - name: route1 webhook_configs: - send_resolved: true @@ -693,7 +711,7 @@ receivers: err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) require.NoError(t, err) - require.Len(t, am.alertmanagers, 2) + require.Len(t, am.alertmanagers, 3) router := mux.NewRouter() router.Path("/multitenant_alertmanager/configs").Methods(http.MethodGet).HandlerFunc(am.ListAllConfigs) diff --git a/pkg/alertmanager/distributor.go b/pkg/alertmanager/distributor.go index 53112160a1d..607c32877bc 100644 --- a/pkg/alertmanager/distributor.go +++ b/pkg/alertmanager/distributor.go @@ -122,11 +122,12 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request) d.requestsInFlight.Add(1) defer d.requestsInFlight.Done() - userID, err := tenant.TenantID(r.Context()) + tenantIDs, err := tenant.TenantIDs(r.Context()) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) logger := util_log.WithContext(r.Context(), d.logger) diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index d42b5a8e551..ae1be5ba891 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -1004,11 +1004,12 @@ func (am *MultitenantAlertmanager) HandleRequest(ctx context.Context, in *httpgr // serveRequest serves the Alertmanager's web UI and API. func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http.Request) { - userID, err := tenant.TenantID(req.Context()) + tenantIDs, err := tenant.TenantIDs(req.Context()) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) am.alertmanagersMtx.Lock() userAM, ok := am.alertmanagers[userID] am.alertmanagersMtx.Unlock() @@ -1149,10 +1150,11 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use // UpdateState implements the Alertmanager service. func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *clusterpb.Part) (*alertmanagerpb.UpdateStateResponse, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } + userID := tenant.JoinTenantIDs(tenantIDs) am.alertmanagersMtx.Lock() userAM, ok := am.alertmanagers[userID] @@ -1255,10 +1257,11 @@ func (am *MultitenantAlertmanager) getPerUserDirectories() map[string]string { // UpdateState implements the Alertmanager service. func (am *MultitenantAlertmanager) ReadState(ctx context.Context, req *alertmanagerpb.ReadStateRequest) (*alertmanagerpb.ReadStateResponse, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } + userID := tenant.JoinTenantIDs(tenantIDs) am.alertmanagersMtx.Lock() userAM, ok := am.alertmanagers[userID] diff --git a/pkg/configs/userconfig/config.go b/pkg/configs/userconfig/config.go index eb8bff4fb3e..7db3d7bb0fd 100644 --- a/pkg/configs/userconfig/config.go +++ b/pkg/configs/userconfig/config.go @@ -12,11 +12,11 @@ import ( "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" legacy_promql "github.com/cortexproject/cortex/pkg/configs/legacy_promql" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/configs/userconfig/config_test.go b/pkg/configs/userconfig/config_test.go index f0eb76fb1fb..93438f88b8f 100644 --- a/pkg/configs/userconfig/config_test.go +++ b/pkg/configs/userconfig/config_test.go @@ -10,13 +10,13 @@ import ( "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" util_log "github.com/cortexproject/cortex/pkg/util/log" ) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a6aa9171ba1..cc34c83b863 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -19,7 +19,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/rules" prom_storage "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/discovery/dns" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" @@ -45,6 +44,7 @@ import ( querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ruler" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storegateway" util_log "github.com/cortexproject/cortex/pkg/util/log" @@ -238,7 +238,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // single tenant. This allows for a less impactful enabling of tenant // federation. byPassForSingleQuerier := true - t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier)) + t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, t.Cfg.TenantFederation)) } return nil, nil } @@ -662,8 +662,9 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer) // TODO: Consider wrapping logger to differentiate from querier module logger queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger) + mq := tenantfederation.NewQueryable(queryable, false, t.Cfg.TenantFederation) - managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer) + managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, mq, engine, t.Overrides, prometheus.DefaultRegisterer) manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { return nil, err diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index a3a62eb2093..532133c193e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -964,23 +964,31 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through // MetricsMetadata returns all metric metadata of a user. func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { - replicationSet, err := d.GetIngestersForMetadata(ctx) + userIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err } - req := &ingester_client.MetricsMetadataRequest{} - // TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled. - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { - return client.MetricsMetadata(ctx, req) - }) - if err != nil { - return nil, err + var userResps []interface{} + for _, userID := range userIDs { + ctx = user.InjectOrgID(ctx, userID) + replicationSet, err := d.GetIngestersForMetadata(ctx) + if err != nil { + return nil, err + } + req := &ingester_client.MetricsMetadataRequest{} + resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return client.MetricsMetadata(ctx, req) + }) + if err != nil { + return nil, err + } + userResps = append(userResps, resps...) } result := []scrape.MetricMetadata{} dedupTracker := map[cortexpb.MetricMetadata]struct{}{} - for _, resp := range resps { + for _, resp := range userResps { r := resp.(*ingester_client.MetricsMetadataResponse) for _, m := range r.Metadata { // Given we look across all ingesters - dedup the metadata. diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 9efba31d135..ef566e9ae54 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -21,7 +21,6 @@ import ( const ( defaultTenantLabel = "__tenant_id__" retainExistingPrefix = "original_" - maxConcurrency = 16 ) // NewQueryable returns a queryable that iterates through all the tenant IDs @@ -36,8 +35,8 @@ const ( // If the label "__tenant_id__" is already existing, its value is overwritten // by the tenant ID and the previous value is exposed through a new label // prefixed with "original_". This behaviour is not implemented recursively. -func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable { - return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier) +func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, cfg Config) storage.Queryable { + return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, cfg) } func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback { @@ -80,11 +79,12 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids // If the label `idLabelName` is already existing, its value is overwritten and // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively. -func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable { +func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, cfg Config) storage.Queryable { return &mergeQueryable{ idLabelName: idLabelName, callback: callback, byPassWithSingleQuerier: byPassWithSingleQuerier, + cfg: cfg, } } @@ -92,6 +92,7 @@ type mergeQueryable struct { idLabelName string byPassWithSingleQuerier bool callback MergeQuerierCallback + cfg Config } // Querier returns a new mergeQuerier, which aggregates results from multiple @@ -111,10 +112,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s } return &mergeQuerier{ - ctx: ctx, - idLabelName: m.idLabelName, - queriers: queriers, - ids: ids, + ctx: ctx, + idLabelName: m.idLabelName, + queriers: queriers, + ids: ids, + maxConcurrency: m.cfg.MaxConcurrency, }, nil } @@ -125,10 +127,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s // the previous value is exposed through a new label prefixed with "original_". // This behaviour is not implemented recursively type mergeQuerier struct { - ctx context.Context - queriers []storage.Querier - idLabelName string - ids []string + ctx context.Context + queriers []storage.Querier + idLabelName string + ids []string + maxConcurrency int } // LabelValues returns all potential values for a label name. It is not safe @@ -248,7 +251,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te return nil } - err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(m.ctx, jobs, m.maxConcurrency, run) if err != nil { return nil, nil, err } @@ -334,7 +337,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match return nil } - err := concurrency.ForEach(ctx, jobs, maxConcurrency, run) + err := concurrency.ForEach(ctx, jobs, m.maxConcurrency, run) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index fd1441daa07..d493736d454 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -257,7 +257,9 @@ type mergeQueryableScenario struct { func (s *mergeQueryableScenario) init() (storage.Querier, error) { // initialize with default tenant label - q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier) + q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, Config{ + MaxConcurrency: 16, + }) // inject tenants into context ctx := context.Background() @@ -334,7 +336,9 @@ type labelValuesScenario struct { func TestMergeQueryable_Querier(t *testing.T) { t.Run("querying without a tenant specified should error", func(t *testing.T) { queryable := &mockTenantQueryableWithFilter{} - q := NewQueryable(queryable, false /* byPassWithSingleQuerier */) + q := NewQueryable(queryable, false /* bypasswithsinglequerier */, Config{ + MaxConcurrency: 16, + }) // Create a context with no tenant specified. ctx := context.Background() @@ -873,7 +877,9 @@ func TestTracingMergeQueryable(t *testing.T) { // set a multi tenant resolver tenant.WithDefaultResolver(tenant.NewMultiResolver()) filter := mockTenantQueryableWithFilter{} - q := NewQueryable(&filter, false) + q := NewQueryable(&filter, false, Config{ + MaxConcurrency: 16, + }) // retrieve querier if set querier, err := q.Querier(ctx, mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/tenantfederation/tenant_federation.go b/pkg/querier/tenantfederation/tenant_federation.go index af5bd7b929e..f7e915002e9 100644 --- a/pkg/querier/tenantfederation/tenant_federation.go +++ b/pkg/querier/tenantfederation/tenant_federation.go @@ -6,9 +6,11 @@ import ( type Config struct { // Enabled switches on support for multi tenant query federation - Enabled bool `yaml:"enabled"` + Enabled bool `yaml:"enabled"` + MaxConcurrency int `yaml:"max-concurrency"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).") + f.IntVar(&cfg.MaxConcurrency, "tenant-federation.max-concurrency", 16, "Maximum concurrent federated sub queries used when evaluating a federated query") } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go index 00f3148c54f..8e7907fdf3c 100644 --- a/pkg/ruler/api.go +++ b/pkg/ruler/api.go @@ -16,11 +16,11 @@ import ( "github.com/pkg/errors" v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/weaveworks/common/user" "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/tenant" @@ -77,6 +77,8 @@ type alertingRule struct { State string `json:"state"` Name string `json:"name"` Query string `json:"query"` + SrcTenants string `json:"srcTenants,omitempty"` + DestTenant string `json:"destTenant,omitempty"` Duration float64 `json:"duration"` Labels labels.Labels `json:"labels"` Annotations labels.Labels `json:"annotations"` @@ -91,6 +93,8 @@ type alertingRule struct { type recordingRule struct { Name string `json:"name"` Query string `json:"query"` + SrcTenants string `json:"srcTenants,omitempty"` + DestTenant string `json:"destTenant,omitempty"` Labels labels.Labels `json:"labels"` Health string `json:"health"` LastError string `json:"lastError"` @@ -138,8 +142,8 @@ func NewAPI(r *Ruler, s rulestore.RuleStore, logger log.Logger) *API { func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), a.logger) - userID, err := tenant.TenantID(req.Context()) - if err != nil || userID == "" { + tenantIDs, err := tenant.TenantIDs(req.Context()) + if err != nil || len(tenantIDs) == 0 { level.Error(logger).Log("msg", "error extracting org id from context", "err", err) respondError(logger, w, "no valid org id found") return @@ -190,6 +194,8 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { LastEvaluation: rl.GetEvaluationTimestamp(), EvaluationTime: rl.GetEvaluationDuration().Seconds(), Type: v1.RuleTypeAlerting, + SrcTenants: rl.Rule.GetSrcTenants(), + DestTenant: rl.Rule.GetDestTenant(), } } else { grp.Rules[i] = recordingRule{ @@ -201,6 +207,8 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { LastEvaluation: rl.GetEvaluationTimestamp(), EvaluationTime: rl.GetEvaluationDuration().Seconds(), Type: v1.RuleTypeRecording, + SrcTenants: rl.Rule.GetSrcTenants(), + DestTenant: rl.Rule.GetDestTenant(), } } } @@ -230,8 +238,8 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) { func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), a.logger) - userID, err := tenant.TenantID(req.Context()) - if err != nil || userID == "" { + tenantIDs, err := tenant.TenantIDs(req.Context()) + if err != nil || len(tenantIDs) == 0 { level.Error(logger).Log("msg", "error extracting org id from context", "err", err) respondError(logger, w, "no valid org id found") return @@ -359,10 +367,11 @@ func parseGroupName(params map[string]string) (string, error) { // and returns them in that order. It also allows users to require a namespace or group name and return // an error if it they can not be parsed. func parseRequest(req *http.Request, requireNamespace, requireGroup bool) (string, string, string, error) { - userID, err := tenant.TenantID(req.Context()) + tenantIDs, err := tenant.TenantIDs(req.Context()) if err != nil { return "", "", "", user.ErrNoOrgID } + userID := tenant.JoinTenantIDs(tenantIDs) vars := mux.Vars(req) @@ -495,6 +504,12 @@ func (a *API) CreateRuleGroup(w http.ResponseWriter, req *http.Request) { return } + if err := a.ruler.AssertValidFederation(userID, rg.Rules); err != nil { + level.Error(logger).Log("msg", "rule federation failure", "err", err.Error(), "rules", rg.Rules) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + rgProto := rulespb.ToProto(userID, namespace, rg) level.Debug(logger).Log("msg", "attempting to store rulegroup", "userID", userID, "group", rgProto.String()) diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 2d631078380..d188ad48437 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -171,6 +171,59 @@ func TestRuler_alerts(t *testing.T) { require.Equal(t, string(expectedResponse), string(body)) } +func TestRuler_federated_rules(t *testing.T) { + cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockFederatedRules)) + // Not neccesary for listing, but matches expected behavior + cfg.EnableFederatedRules = true + defer cleanup() + + r, rcleanup := newTestRuler(t, cfg, nil) + defer rcleanup() + defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + a := NewAPI(r, r.store, log.NewNopLogger()) + + req := requestFor(t, "GET", "https://localhost:8080/api/prom/api/v1/rules", nil, "federated_user") + w := httptest.NewRecorder() + a.PrometheusRules(w, req) + + resp := w.Result() + body, _ := ioutil.ReadAll(resp.Body) + + // Check status code and status response + responseJSON := response{} + err := json.Unmarshal(body, &responseJSON) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, responseJSON.Status, "success") + + // Testing the running rules for user1 in the mock store + expectedResponse, _ := json.Marshal(response{ + Status: "success", + Data: &RuleDiscovery{ + RuleGroups: []*RuleGroup{ + { + Name: "group1", + File: "namespace1", + Rules: []rule{ + &recordingRule{ + Name: "UP_RULE", + Query: "up", + Health: "unknown", + Type: "recording", + SrcTenants: "src1|src2", + DestTenant: "dstTenant", + }, + }, + Interval: 60, + }, + }, + }, + }) + + require.Equal(t, string(expectedResponse), string(body)) +} + func TestRuler_Create(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(make(map[string]rulespb.RuleGroupList))) defer cleanup() @@ -214,6 +267,20 @@ interval: 15s status: 400, err: errors.New("invalid rules config: rule group 'rg_name' has no rules"), }, + { + name: "with a federated rule without federation enabled", + status: 400, + input: ` +name: test +interval: 15s +rules: +- record: up_rule + expr: up{} + src_tenants: foo + dest_tenant: bar +`, + err: errors.New("ruler federation not enabled and federated rule detected (src_tenant: foo dest_tenant: bar)"), + }, { name: "with a a valid rules file", status: 202, @@ -262,6 +329,85 @@ rules: } } +func TestRuler_CreateFederated(t *testing.T) { + cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(make(map[string]rulespb.RuleGroupList))) + defer cleanup() + cfg.EnableFederatedRules = true + cfg.AllowedFederatedTenants = []string{"user1"} + + r, rcleanup := newTestRuler(t, cfg, nil) + defer rcleanup() + defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + a := NewAPI(r, r.store, log.NewNopLogger()) + + tc := []struct { + name string + input string + user string + output string + err error + status int + }{ + { + name: "with a user not in the allowed list", + status: 400, + user: "notallowed", + input: ` +name: federated_test +interval: 15s +rules: +- record: up_rule + expr: up{} + src_tenants: foo + dest_tenant: bar +`, + err: errors.New("ruler federation not allowed for user (userID: notallowed)"), + }, + { + name: "with a valid federated rules file", + status: 202, + user: "user1", + input: ` +name: federated_test +interval: 15s +rules: +- record: up_rule + expr: up{} + src_tenants: foo + dest_tenant: bar +`, + output: "name: federated_test\ninterval: 15s\nrules:\n - record: up_rule\n expr: up{}\n src_tenants: foo\n dest_tenant: bar\n", + }, + } + + for _, tt := range tc { + t.Run(tt.name, func(t *testing.T) { + router := mux.NewRouter() + router.Path("/api/v1/rules/{namespace}").Methods("POST").HandlerFunc(a.CreateRuleGroup) + router.Path("/api/v1/rules/{namespace}/{groupName}").Methods("GET").HandlerFunc(a.GetRuleGroup) + // POST + req := requestFor(t, http.MethodPost, "https://localhost:8080/api/v1/rules/namespace", strings.NewReader(tt.input), tt.user) + w := httptest.NewRecorder() + + router.ServeHTTP(w, req) + require.Equal(t, tt.status, w.Code) + + if tt.err == nil { + // GET + req = requestFor(t, http.MethodGet, "https://localhost:8080/api/v1/rules/namespace/federated_test", nil, tt.user) + w = httptest.NewRecorder() + + router.ServeHTTP(w, req) + require.Equal(t, 200, w.Code) + require.Equal(t, tt.output, w.Body.String()) + } else { + require.Equal(t, tt.err.Error()+"\n", w.Body.String()) + } + }) + } +} + func TestRuler_DeleteNamespace(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRulesNamespaces)) defer cleanup() diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 34758920184..24c13309409 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -14,13 +14,13 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/ruler/rules" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -72,7 +72,9 @@ func (a *PusherAppender) Commit() error { // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. // We shouldn't call client.ReuseSlice here. - _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), cortexpb.ToWriteRequest(a.labels, a.samples, nil, cortexpb.RULE)) + // I think we can just rely on the provided ctx to the constructor here since that logic is now in the group eval + // I really don't understand why this was ever injecting on top of an already existing context but what the fuck do i know + _, err := a.pusher.Push(a.ctx, cortexpb.ToWriteRequest(a.labels, a.samples, nil, cortexpb.RULE)) if err != nil { // Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.) diff --git a/pkg/ruler/manager.go b/pkg/ruler/manager.go index 7af5f079c54..464fdd74558 100644 --- a/pkg/ruler/manager.go +++ b/pkg/ruler/manager.go @@ -14,11 +14,11 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" - "github.com/prometheus/prometheus/pkg/rulefmt" - promRules "github.com/prometheus/prometheus/rules" "github.com/weaveworks/common/user" "golang.org/x/net/context/ctxhttp" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" ) @@ -218,8 +218,8 @@ func (r *DefaultMultiTenantManager) getOrCreateNotifier(userID string) (*notifie return n.notifier, nil } -func (r *DefaultMultiTenantManager) GetRules(userID string) []*promRules.Group { - var groups []*promRules.Group +func (r *DefaultMultiTenantManager) GetRules(userID string) []*rules.Group { + var groups []*rules.Group r.userManagerMtx.Lock() if mngr, exists := r.userManagers[userID]; exists { groups = mngr.RuleGroups() diff --git a/pkg/ruler/manager_metrics_test.go b/pkg/ruler/manager_metrics_test.go index 518d74c0cba..0cac778b1f9 100644 --- a/pkg/ruler/manager_metrics_test.go +++ b/pkg/ruler/manager_metrics_test.go @@ -166,7 +166,8 @@ func populateManager(base float64) *prometheus.Registry { return r } -// Copied from github.com/prometheus/rules/manager.go +// TODO import the one from pkg/ruler/rules to use here +// Copied originally from github.com/prometheus/rules/manager.go type groupMetrics struct { evalDuration prometheus.Summary iterationDuration prometheus.Summary diff --git a/pkg/ruler/manager_test.go b/pkg/ruler/manager_test.go index afc30a0c55e..841f5b0b3e9 100644 --- a/pkg/ruler/manager_test.go +++ b/pkg/ruler/manager_test.go @@ -11,10 +11,10 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/labels" - promRules "github.com/prometheus/prometheus/rules" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -128,6 +128,6 @@ func (m *mockRulesManager) Update(_ time.Duration, _ []string, _ labels.Labels, return nil } -func (m *mockRulesManager) RuleGroups() []*promRules.Group { +func (m *mockRulesManager) RuleGroups() []*rules.Group { return nil } diff --git a/pkg/ruler/mapper.go b/pkg/ruler/mapper.go index 4b5ab60e26f..5becb2f3a63 100644 --- a/pkg/ruler/mapper.go +++ b/pkg/ruler/mapper.go @@ -8,9 +8,10 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/spf13/afero" "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" ) // mapper is designed to enusre the provided rule sets are identical diff --git a/pkg/ruler/mapper_test.go b/pkg/ruler/mapper_test.go index 4cee84e82ac..68157d42eb6 100644 --- a/pkg/ruler/mapper_test.go +++ b/pkg/ruler/mapper_test.go @@ -5,12 +5,13 @@ import ( "os" "testing" - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/spf13/afero" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" ) var ( diff --git a/pkg/ruler/rulefmt/rulefmt.go b/pkg/ruler/rulefmt/rulefmt.go new file mode 100644 index 00000000000..b80853fbe3a --- /dev/null +++ b/pkg/ruler/rulefmt/rulefmt.go @@ -0,0 +1,298 @@ +package rulefmt + +import ( + "bytes" + "context" + "io" + "io/ioutil" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + yaml "gopkg.in/yaml.v3" + + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/template" +) + +// Error represents semantic errors on parsing rule groups. +type Error struct { + Group string + Rule int + RuleName string + Err WrappedError +} + +// WrappedError wraps error with the yaml node which can be used to represent +// the line and column numbers of the error. +type WrappedError struct { + err error + node *yaml.Node + nodeAlt *yaml.Node +} + +func (err *Error) Error() string { + if err.Err.nodeAlt != nil { + return errors.Wrapf(err.Err.err, "%d:%d: %d:%d: group %q, rule %d, %q", err.Err.node.Line, err.Err.node.Column, err.Err.nodeAlt.Line, err.Err.nodeAlt.Column, err.Group, err.Rule, err.RuleName).Error() + } else if err.Err.node != nil { + return errors.Wrapf(err.Err.err, "%d:%d: group %q, rule %d, %q", err.Err.node.Line, err.Err.node.Column, err.Group, err.Rule, err.RuleName).Error() + } + return errors.Wrapf(err.Err.err, "group %q, rule %d, %q", err.Group, err.Rule, err.RuleName).Error() +} + +// RuleGroups is a set of rule groups that are typically exposed in a file. +type RuleGroups struct { + Groups []RuleGroup `yaml:"groups"` +} + +type ruleGroups struct { + Groups []yaml.Node `yaml:"groups"` +} + +// Validate validates all rules in the rule groups. +func (g *RuleGroups) Validate(node ruleGroups) (errs []error) { + set := map[string]struct{}{} + + for j, g := range g.Groups { + if g.Name == "" { + errs = append(errs, errors.Errorf("%d:%d: Groupname must not be empty", node.Groups[j].Line, node.Groups[j].Column)) + } + + if _, ok := set[g.Name]; ok { + errs = append( + errs, + errors.Errorf("%d:%d: groupname: \"%s\" is repeated in the same file", node.Groups[j].Line, node.Groups[j].Column, g.Name), + ) + } + + set[g.Name] = struct{}{} + + for i, r := range g.Rules { + for _, node := range r.Validate() { + var ruleName yaml.Node + if r.Alert.Value != "" { + ruleName = r.Alert + } else { + ruleName = r.Record + } + errs = append(errs, &Error{ + Group: g.Name, + Rule: i + 1, + RuleName: ruleName.Value, + Err: node, + }) + } + } + } + + return errs +} + +// RuleGroup is a list of sequentially evaluated recording and alerting rules. +type RuleGroup struct { + Name string `yaml:"name"` + Interval model.Duration `yaml:"interval,omitempty"` + Rules []RuleNode `yaml:"rules"` +} + +// Rule describes an alerting or recording rule. +type Rule struct { + Record string `yaml:"record,omitempty"` + Alert string `yaml:"alert,omitempty"` + Expr string `yaml:"expr"` + For model.Duration `yaml:"for,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` + Annotations map[string]string `yaml:"annotations,omitempty"` + SrcTenants string `yaml:"src_tenants,omitempty"` + DestTenant string `yaml:"dest_tenant,omitempty"` +} + +// RuleNode adds yaml.v3 layer to support line and column outputs for invalid rules. +type RuleNode struct { + Record yaml.Node `yaml:"record,omitempty"` + Alert yaml.Node `yaml:"alert,omitempty"` + Expr yaml.Node `yaml:"expr"` + For model.Duration `yaml:"for,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` + Annotations map[string]string `yaml:"annotations,omitempty"` + SrcTenants yaml.Node `yaml:"src_tenants,omitempty"` + DestTenant yaml.Node `yaml:"dest_tenant,omitempty"` +} + +func (r *RuleNode) IsFederated() bool { + return r.SrcTenants.Value != "" || r.DestTenant.Value != "" +} + +// Validate the rule and return a list of encountered errors. +func (r *RuleNode) Validate() (nodes []WrappedError) { + if r.Record.Value != "" && r.Alert.Value != "" { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("only one of 'record' and 'alert' must be set"), + node: &r.Record, + nodeAlt: &r.Alert, + }) + } + if r.Record.Value == "" && r.Alert.Value == "" { + if r.Record.Value == "0" { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("one of 'record' or 'alert' must be set"), + node: &r.Alert, + }) + } else { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("one of 'record' or 'alert' must be set"), + node: &r.Record, + }) + } + } + + if r.Expr.Value == "" { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("field 'expr' must be set in rule"), + node: &r.Expr, + }) + } else if _, err := parser.ParseExpr(r.Expr.Value); err != nil { + nodes = append(nodes, WrappedError{ + err: errors.Wrapf(err, "could not parse expression"), + node: &r.Expr, + }) + } + if r.Record.Value != "" { + if len(r.Annotations) > 0 { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid field 'annotations' in recording rule"), + node: &r.Record, + }) + } + if r.For != 0 { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid field 'for' in recording rule"), + node: &r.Record, + }) + } + if !model.IsValidMetricName(model.LabelValue(r.Record.Value)) { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid recording rule name: %s", r.Record.Value), + node: &r.Record, + }) + } + } + + for k, v := range r.Labels { + if !model.LabelName(k).IsValid() || k == model.MetricNameLabel { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid label name: %s", k), + }) + } + + if !model.LabelValue(v).IsValid() { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid label value: %s", v), + }) + } + } + + for k := range r.Annotations { + if !model.LabelName(k).IsValid() { + nodes = append(nodes, WrappedError{ + err: errors.Errorf("invalid annotation name: %s", k), + }) + } + } + + for _, err := range testTemplateParsing(r) { + nodes = append(nodes, WrappedError{err: err}) + } + + return +} + +// testTemplateParsing checks if the templates used in labels and annotations +// of the alerting rules are parsed correctly. +func testTemplateParsing(rl *RuleNode) (errs []error) { + if rl.Alert.Value == "" { + // Not an alerting rule. + return errs + } + + // Trying to parse templates. + tmplData := template.AlertTemplateData(map[string]string{}, map[string]string{}, "", 0) + defs := []string{ + "{{$labels := .Labels}}", + "{{$externalLabels := .ExternalLabels}}", + "{{$externalURL := .ExternalURL}}", + "{{$value := .Value}}", + } + parseTest := func(text string) error { + tmpl := template.NewTemplateExpander( + context.TODO(), + strings.Join(append(defs, text), ""), + "__alert_"+rl.Alert.Value, + tmplData, + model.Time(timestamp.FromTime(time.Now())), + nil, + nil, + ) + return tmpl.ParseTest() + } + + // Parsing Labels. + for k, val := range rl.Labels { + err := parseTest(val) + if err != nil { + errs = append(errs, errors.Wrapf(err, "label %q", k)) + } + } + + // Parsing Annotations. + for k, val := range rl.Annotations { + err := parseTest(val) + if err != nil { + errs = append(errs, errors.Wrapf(err, "annotation %q", k)) + } + } + + return errs +} + +// Parse parses and validates a set of rules. +func Parse(content []byte) (*RuleGroups, []error) { + var ( + groups RuleGroups + node ruleGroups + errs []error + ) + + decoder := yaml.NewDecoder(bytes.NewReader(content)) + decoder.KnownFields(true) + err := decoder.Decode(&groups) + // Ignore io.EOF which happens with empty input. + if err != nil && err != io.EOF { + errs = append(errs, err) + } + err = yaml.Unmarshal(content, &node) + if err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return nil, errs + } + + return &groups, groups.Validate(node) +} + +// ParseFile reads and parses rules from a file. +func ParseFile(file string) (*RuleGroups, []error) { + b, err := ioutil.ReadFile(file) + if err != nil { + return nil, []error{errors.Wrap(err, file)} + } + rgs, errs := Parse(b) + for i := range errs { + errs[i] = errors.Wrap(errs[i], file) + } + return rgs, errs +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index f92b376f75a..29403d7e38c 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -21,14 +21,14 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/notifier" - "github.com/prometheus/prometheus/pkg/rulefmt" - promRules "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/util/strutil" "github.com/weaveworks/common/user" "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/tenant" @@ -60,6 +60,10 @@ const ( errMaxRuleGroupsPerUserLimitExceeded = "per-user rule groups limit (limit: %d actual: %d) exceeded" errMaxRulesPerRuleGroupPerUserLimitExceeded = "per-user rules per rule group limit (limit: %d actual: %d) exceeded" + // Federation errors + errRulerFederationNotEnabled = "ruler federation not enabled and federated rule detected (src_tenant: %s dest_tenant: %s)" + errFederatedRulesNotAllowed = "ruler federation not allowed for user (userID: %s)" + // errors errListAllUser = "unable to list the ruler users" ) @@ -116,6 +120,10 @@ type Config struct { RingCheckPeriod time.Duration `yaml:"-"` EnableQueryStats bool `yaml:"query_stats_enabled"` + + EnableFederatedRules bool `yaml:"enable_federated_rules"` + AllowedFederatedTenants flagext.StringSliceCSV `yaml:"allowed_federated_tenants"` + DisallowedFederatedTenants flagext.StringSliceCSV `yaml:"disallowed_federated_tenants"` } // Validate config and returns error on failure @@ -180,6 +188,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableQueryStats, "ruler.query-stats-enabled", false, "Report the wall time for ruler queries to complete as a per user metric and as an info level log message.") + f.BoolVar(&cfg.EnableFederatedRules, "ruler.enable-federated-rules", false, "Allow federated rules for ruler.") + f.Var(&cfg.AllowedFederatedTenants, "ruler.allowed-federated-tenants", "Comma separated list of tenants that are allowed to have federated rules which query data from multiple tenants.") + f.Var(&cfg.DisallowedFederatedTenants, "ruler.disallowed-federated-tenants", "Comma separated list of tenants that are not allowed to have federated rules which query data from multiple tenants.") + cfg.RingCheckPeriod = 5 * time.Second } @@ -189,7 +201,7 @@ type MultiTenantManager interface { // If existing user is missing in the ruleGroups map, its ruler manager will be stopped. SyncRuleGroups(ctx context.Context, ruleGroups map[string]rulespb.RuleGroupList) // GetRules fetches rules for a particular tenant (userID). - GetRules(userID string) []*promRules.Group + GetRules(userID string) []*rules.Group // Stop stops all Manager components. Stop() // ValidateRuleGroup validates a rulegroup @@ -241,7 +253,8 @@ type Ruler struct { ringCheckErrors prometheus.Counter rulerSync *prometheus.CounterVec - allowedTenants *util.AllowedTenants + allowedTenants *util.AllowedTenants + federatedTenants *util.AllowedTenants registry prometheus.Registerer logger log.Logger @@ -254,14 +267,15 @@ func NewRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer, logger log.Logger, ruleStore rulestore.RuleStore, limits RulesLimits, clientPool ClientsPool) (*Ruler, error) { ruler := &Ruler{ - cfg: cfg, - store: ruleStore, - manager: manager, - registry: reg, - logger: logger, - limits: limits, - clientsPool: clientPool, - allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), + cfg: cfg, + store: ruleStore, + manager: manager, + registry: reg, + logger: logger, + limits: limits, + clientsPool: clientPool, + allowedTenants: util.NewAllowedTenants(cfg.EnabledTenants, cfg.DisabledTenants), + federatedTenants: util.NewAllowedTenants(cfg.AllowedFederatedTenants, cfg.DisallowedFederatedTenants), ringCheckErrors: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_ruler_ring_check_errors_total", @@ -371,8 +385,8 @@ type sender interface { // It filters any non-firing alerts from the input. // // Copied from Prometheus's main.go. -func SendAlerts(n sender, externalURL string) promRules.NotifyFunc { - return func(ctx context.Context, expr string, alerts ...*promRules.Alert) { +func SendAlerts(n sender, externalURL string) rules.NotifyFunc { + return func(ctx context.Context, expr string, alerts ...*rules.Alert) { var res []*notifier.Alert for _, alert := range alerts { @@ -645,10 +659,11 @@ func filterRuleGroups(userID string, ruleGroups []*rulespb.RuleGroupDesc, ring r // GetRules retrieves the running rules from this ruler and all running rulers in the ring if // sharding is enabled func (r *Ruler) GetRules(ctx context.Context) ([]*GroupStateDesc, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, fmt.Errorf("no user id found in context") } + userID := tenant.JoinTenantIDs(tenantIDs) if r.cfg.EnableSharding { return r.getShardedRules(ctx, userID) @@ -691,7 +706,7 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { var ruleDesc *RuleStateDesc switch rule := r.(type) { - case *promRules.AlertingRule: + case *rules.AlertingRule: rule.ActiveAlerts() alerts := []*AlertStateDesc{} for _, a := range rule.ActiveAlerts() { @@ -714,6 +729,8 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { For: rule.HoldDuration(), Labels: cortexpb.FromLabelsToLabelAdapters(rule.Labels()), Annotations: cortexpb.FromLabelsToLabelAdapters(rule.Annotations()), + SrcTenants: rule.GetSrcTenants(), + DestTenant: rule.GetDestTenant(), }, State: rule.State().String(), Health: string(rule.Health()), @@ -722,12 +739,14 @@ func (r *Ruler) getLocalRules(userID string) ([]*GroupStateDesc, error) { EvaluationTimestamp: rule.GetEvaluationTimestamp(), EvaluationDuration: rule.GetEvaluationDuration(), } - case *promRules.RecordingRule: + case *rules.RecordingRule: ruleDesc = &RuleStateDesc{ Rule: &rulespb.RuleDesc{ - Record: rule.Name(), - Expr: rule.Query().String(), - Labels: cortexpb.FromLabelsToLabelAdapters(rule.Labels()), + Record: rule.Name(), + Expr: rule.Query().String(), + Labels: cortexpb.FromLabelsToLabelAdapters(rule.Labels()), + SrcTenants: rule.GetSrcTenants(), + DestTenant: rule.GetDestTenant(), }, Health: string(rule.Health()), LastError: lastError, @@ -794,10 +813,11 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string) ([]*GroupSta // Rules implements the rules service func (r *Ruler) Rules(ctx context.Context, in *RulesRequest) (*RulesResponse, error) { - userID, err := tenant.TenantID(ctx) + tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, fmt.Errorf("no user id found in context") } + userID := tenant.JoinTenantIDs(tenantIDs) groupDescs, err := r.getLocalRules(userID) if err != nil { @@ -838,16 +858,33 @@ func (r *Ruler) AssertMaxRulesPerRuleGroup(userID string, rules int) error { return fmt.Errorf(errMaxRulesPerRuleGroupPerUserLimitExceeded, limit, rules) } +// AssertValidFederation check whether the rules are federated, and if so +// whether federation is enabled for the ruler and the given tenant +func (r *Ruler) AssertValidFederation(userID string, rules []rulefmt.RuleNode) error { + for _, rule := range rules { + if rule.IsFederated() { + if !r.cfg.EnableFederatedRules { + return fmt.Errorf(errRulerFederationNotEnabled, rule.SrcTenants.Value, rule.DestTenant.Value) + } + if !r.federatedTenants.IsAllowed(userID) { + return fmt.Errorf(errFederatedRulesNotAllowed, userID) + } + } + } + return nil +} + func (r *Ruler) DeleteTenantConfiguration(w http.ResponseWriter, req *http.Request) { logger := util_log.WithContext(req.Context(), r.logger) - userID, err := tenant.TenantID(req.Context()) + tenantIDs, err := tenant.TenantIDs(req.Context()) if err != nil { // When Cortex is running, it uses Auth Middleware for checking X-Scope-OrgID and injecting tenant into context. // Auth Middleware sends http.StatusUnauthorized if X-Scope-OrgID is missing, so we do too here, for consistency. http.Error(w, err.Error(), http.StatusUnauthorized) return } + userID := tenant.JoinTenantIDs(tenantIDs) err = r.store.DeleteNamespace(req.Context(), userID, "") // Empty namespace = delete all rule groups. if err != nil && !errors.Is(err, rulestore.ErrGroupNamespaceNotFound) { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 11c1b85feb4..1c04b8077c1 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -39,9 +39,7 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/prometheus/prometheus/promql" - promRules "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -51,6 +49,8 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/ruler/rulestore/objectclient" @@ -222,7 +222,7 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, rulerAddrMap map[string]*Ruler) (*Ruler, func()) { engine, queryable, pusher, logger, overrides, reg, cleanup := testSetup(t, querierTestConfig) - storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, promRules.FileLoader{}, log.NewNopLogger()) + storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, rules.FileLoader{}, log.NewNopLogger()) require.NoError(t, err) managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg) @@ -1153,11 +1153,11 @@ func (s senderFunc) Send(alerts ...*notifier.Alert) { func TestSendAlerts(t *testing.T) { testCases := []struct { - in []*promRules.Alert + in []*rules.Alert exp []*notifier.Alert }{ { - in: []*promRules.Alert{ + in: []*rules.Alert{ { Labels: []labels.Label{{Name: "l1", Value: "v1"}}, Annotations: []labels.Label{{Name: "a2", Value: "v2"}}, @@ -1177,7 +1177,7 @@ func TestSendAlerts(t *testing.T) { }, }, { - in: []*promRules.Alert{ + in: []*rules.Alert{ { Labels: []labels.Label{{Name: "l1", Value: "v1"}}, Annotations: []labels.Label{{Name: "a2", Value: "v2"}}, @@ -1197,7 +1197,7 @@ func TestSendAlerts(t *testing.T) { }, }, { - in: []*promRules.Alert{}, + in: []*rules.Alert{}, }, } @@ -1290,7 +1290,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) { alertRule := ruleGroup.Rules()[0] require.Equal(t, time.Time{}, alertRule.GetEvaluationTimestamp()) require.Equal(t, "UP_ALERT", alertRule.Name()) - require.Equal(t, promRules.HealthUnknown, alertRule.Health()) + require.Equal(t, rules.HealthUnknown, alertRule.Health()) // NEXT, evaluate the rule group the first time and assert ctx := user.InjectOrgID(context.Background(), "user1") @@ -1298,7 +1298,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) { // since the eval is done at the current timestamp, the activeAt timestamp of alert should equal current timestamp require.Equal(t, "UP_ALERT", alertRule.Name()) - require.Equal(t, promRules.HealthGood, alertRule.Health()) + require.Equal(t, rules.HealthGood, alertRule.Health()) activeMapRaw := reflect.ValueOf(alertRule).Elem().FieldByName("active") activeMapKeys := activeMapRaw.MapKeys() @@ -1307,15 +1307,15 @@ func TestRecoverAlertsPostOutage(t *testing.T) { activeAlertRuleRaw := activeMapRaw.MapIndex(activeMapKeys[0]).Elem() activeAtTimeRaw := activeAlertRuleRaw.FieldByName("ActiveAt") - require.Equal(t, promRules.StatePending, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) + require.Equal(t, rules.StatePending, rules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) require.Equal(t, reflect.NewAt(activeAtTimeRaw.Type(), unsafe.Pointer(activeAtTimeRaw.UnsafeAddr())).Elem().Interface().(time.Time), currentTime) // NEXT, restore the FOR state and assert ruleGroup.RestoreForState(currentTime) require.Equal(t, "UP_ALERT", alertRule.Name()) - require.Equal(t, promRules.HealthGood, alertRule.Health()) - require.Equal(t, promRules.StatePending, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) + require.Equal(t, rules.HealthGood, alertRule.Health()) + require.Equal(t, rules.StatePending, rules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) require.Equal(t, reflect.NewAt(activeAtTimeRaw.Type(), unsafe.Pointer(activeAtTimeRaw.UnsafeAddr())).Elem().Interface().(time.Time), downAtActiveAtTime.Add(currentTime.Sub(downAtTime))) // NEXT, 20 minutes is expected to be left, eval timestamp at currentTimestamp +20m @@ -1327,5 +1327,5 @@ func TestRecoverAlertsPostOutage(t *testing.T) { firedAtTime := reflect.NewAt(firedAtRaw.Type(), unsafe.Pointer(firedAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) require.Equal(t, firedAtTime, currentTime) - require.Equal(t, promRules.StateFiring, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) + require.Equal(t, rules.StateFiring, rules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) } diff --git a/pkg/ruler/rules/alerting.go b/pkg/ruler/rules/alerting.go new file mode 100644 index 00000000000..cdd1b71e394 --- /dev/null +++ b/pkg/ruler/rules/alerting.go @@ -0,0 +1,516 @@ +package rules + +import ( + "context" + "fmt" + "net/url" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/template" + yaml "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" +) + +const ( + // AlertMetricName is the metric name for synthetic alert timeseries. + alertMetricName = "ALERTS" + // AlertForStateMetricName is the metric name for 'for' state of alert. + alertForStateMetricName = "ALERTS_FOR_STATE" + + // AlertStateLabel is the label name indicating the state of an alert. + alertStateLabel = "alertstate" +) + +// AlertState denotes the state of an active alert. +type AlertState int + +const ( + // StateInactive is the state of an alert that is neither firing nor pending. + StateInactive AlertState = iota + // StatePending is the state of an alert that has been active for less than + // the configured threshold duration. + StatePending + // StateFiring is the state of an alert that has been active for longer than + // the configured threshold duration. + StateFiring +) + +func (s AlertState) String() string { + switch s { + case StateInactive: + return "inactive" + case StatePending: + return "pending" + case StateFiring: + return "firing" + } + panic(errors.Errorf("unknown alert state: %d", s)) +} + +// Alert is the user-level representation of a single instance of an alerting rule. +type Alert struct { + State AlertState + + Labels labels.Labels + Annotations labels.Labels + + // The value at the last evaluation of the alerting expression. + Value float64 + // The interval during which the condition of this alert held true. + // ResolvedAt will be 0 to indicate a still active alert. + ActiveAt time.Time + FiredAt time.Time + ResolvedAt time.Time + LastSentAt time.Time + ValidUntil time.Time +} + +func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool { + if a.State == StatePending { + return false + } + + // if an alert has been resolved since the last send, resend it + if a.ResolvedAt.After(a.LastSentAt) { + return true + } + + return a.LastSentAt.Add(resendDelay).Before(ts) +} + +// An AlertingRule generates alerts from its vector expression. +type AlertingRule struct { + // The name of the alert. + name string + // The vector expression from which to generate alerts. + vector parser.Expr + // The duration for which a labelset needs to persist in the expression + // output vector before an alert transitions from Pending to Firing state. + holdDuration time.Duration + // Extra labels to attach to the resulting alert sample vectors. + labels labels.Labels + // Non-identifying key/value pairs. + annotations labels.Labels + // External labels from the global config. + externalLabels map[string]string + // The external URL from the --web.external-url flag. + externalURL string + // true if old state has been restored. We start persisting samples for ALERT_FOR_STATE + // only after the restoration. + restored bool + // Protects the below. + mtx sync.Mutex + // Time in seconds taken to evaluate rule. + evaluationDuration time.Duration + // Timestamp of last evaluation of rule. + evaluationTimestamp time.Time + // The health of the alerting rule. + health RuleHealth + // The last error seen by the alerting rule. + lastError error + // A map of alerts which are currently active (Pending or Firing), keyed by + // the fingerprint of the labelset they correspond to. + active map[uint64]*Alert + + srcTenants string + destTenant string + + logger log.Logger +} + +// NewAlertingRule constructs a new AlertingRule. +func NewAlertingRule( + name string, vec parser.Expr, hold time.Duration, + labels, annotations, externalLabels labels.Labels, externalURL string, + restored bool, logger log.Logger, srcTenants, destTenant string, +) *AlertingRule { + el := make(map[string]string, len(externalLabels)) + for _, lbl := range externalLabels { + el[lbl.Name] = lbl.Value + } + + return &AlertingRule{ + name: name, + vector: vec, + holdDuration: hold, + labels: labels, + annotations: annotations, + externalLabels: el, + externalURL: externalURL, + health: HealthUnknown, + active: map[uint64]*Alert{}, + logger: logger, + restored: restored, + srcTenants: srcTenants, + destTenant: destTenant, + } +} + +// Name returns the name of the alerting rule. +func (r *AlertingRule) Name() string { + return r.name +} + +// SetLastError sets the current error seen by the alerting rule. +func (r *AlertingRule) SetLastError(err error) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.lastError = err +} + +// LastError returns the last error seen by the alerting rule. +func (r *AlertingRule) LastError() error { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.lastError +} + +// SetHealth sets the current health of the alerting rule. +func (r *AlertingRule) SetHealth(health RuleHealth) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.health = health +} + +// Health returns the current health of the alerting rule. +func (r *AlertingRule) Health() RuleHealth { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.health +} + +// Query returns the query expression of the alerting rule. +func (r *AlertingRule) Query() parser.Expr { + return r.vector +} + +// HoldDuration returns the hold duration of the alerting rule. +func (r *AlertingRule) HoldDuration() time.Duration { + return r.holdDuration +} + +// Labels returns the labels of the alerting rule. +func (r *AlertingRule) Labels() labels.Labels { + return r.labels +} + +// Annotations returns the annotations of the alerting rule. +func (r *AlertingRule) Annotations() labels.Labels { + return r.annotations +} + +func (r *AlertingRule) GetDestTenant() string { + return r.destTenant +} + +func (r *AlertingRule) GetSrcTenants() string { + return r.srcTenants +} + +func (r *AlertingRule) sample(alert *Alert, ts time.Time) promql.Sample { + lb := labels.NewBuilder(r.labels) + + for _, l := range alert.Labels { + lb.Set(l.Name, l.Value) + } + + lb.Set(labels.MetricName, alertMetricName) + lb.Set(labels.AlertName, r.name) + lb.Set(alertStateLabel, alert.State.String()) + + s := promql.Sample{ + Metric: lb.Labels(), + Point: promql.Point{T: timestamp.FromTime(ts), V: 1}, + } + return s +} + +// forStateSample returns the sample for ALERTS_FOR_STATE. +func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) promql.Sample { + lb := labels.NewBuilder(r.labels) + + for _, l := range alert.Labels { + lb.Set(l.Name, l.Value) + } + + lb.Set(labels.MetricName, alertForStateMetricName) + lb.Set(labels.AlertName, r.name) + + s := promql.Sample{ + Metric: lb.Labels(), + Point: promql.Point{T: timestamp.FromTime(ts), V: v}, + } + return s +} + +// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation. +func (r *AlertingRule) SetEvaluationDuration(dur time.Duration) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.evaluationDuration = dur +} + +// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule. +func (r *AlertingRule) GetEvaluationDuration() time.Duration { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.evaluationDuration +} + +// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. +func (r *AlertingRule) SetEvaluationTimestamp(ts time.Time) { + r.mtx.Lock() + defer r.mtx.Unlock() + r.evaluationTimestamp = ts +} + +// GetEvaluationTimestamp returns the time the evaluation took place. +func (r *AlertingRule) GetEvaluationTimestamp() time.Time { + r.mtx.Lock() + defer r.mtx.Unlock() + return r.evaluationTimestamp +} + +// SetRestored updates the restoration state of the alerting rule. +func (r *AlertingRule) SetRestored(restored bool) { + r.restored = restored +} + +// resolvedRetention is the duration for which a resolved alert instance +// is kept in memory state and consequently repeatedly sent to the AlertManager. +const resolvedRetention = 15 * time.Minute + +// Eval evaluates the rule expression and then creates pending alerts and fires +// or removes previously pending alerts accordingly. +func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) { + res, err := query(ctx, r.vector.String(), ts) + if err != nil { + r.SetHealth(HealthBad) + r.SetLastError(err) + return nil, err + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + // Create pending alerts for any new vector elements in the alert expression + // or update the expression value for existing elements. + resultFPs := map[uint64]struct{}{} + + var vec promql.Vector + var alerts = make(map[uint64]*Alert, len(res)) + for _, smpl := range res { + // Provide the alert information to the template. + l := make(map[string]string, len(smpl.Metric)) + for _, lbl := range smpl.Metric { + l[lbl.Name] = lbl.Value + } + + tmplData := template.AlertTemplateData(l, r.externalLabels, r.externalURL, smpl.V) + // Inject some convenience variables that are easier to remember for users + // who are not used to Go's templating system. + defs := []string{ + "{{$labels := .Labels}}", + "{{$externalLabels := .ExternalLabels}}", + "{{$externalURL := .ExternalURL}}", + "{{$value := .Value}}", + } + + expand := func(text string) string { + tmpl := template.NewTemplateExpander( + ctx, + strings.Join(append(defs, text), ""), + "__alert_"+r.Name(), + tmplData, + model.Time(timestamp.FromTime(ts)), + template.QueryFunc(query), + externalURL, + ) + result, err := tmpl.Expand() + if err != nil { + result = fmt.Sprintf("", err) + level.Warn(r.logger).Log("msg", "Expanding alert template failed", "err", err, "data", tmplData) + } + return result + } + + lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName) + + for _, l := range r.labels { + lb.Set(l.Name, expand(l.Value)) + } + lb.Set(labels.AlertName, r.Name()) + + annotations := make(labels.Labels, 0, len(r.annotations)) + for _, a := range r.annotations { + annotations = append(annotations, labels.Label{Name: a.Name, Value: expand(a.Value)}) + } + + lbs := lb.Labels() + h := lbs.Hash() + resultFPs[h] = struct{}{} + + if _, ok := alerts[h]; ok { + err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels") + // We have already acquired the lock above hence using SetHealth and + // SetLastError will deadlock. + r.health = HealthBad + r.lastError = err + return nil, err + } + + alerts[h] = &Alert{ + Labels: lbs, + Annotations: annotations, + ActiveAt: ts, + State: StatePending, + Value: smpl.V, + } + } + + for h, a := range alerts { + // Check whether we already have alerting state for the identifying label set. + // Update the last value and annotations if so, create a new alert entry otherwise. + if alert, ok := r.active[h]; ok && alert.State != StateInactive { + alert.Value = a.Value + alert.Annotations = a.Annotations + continue + } + + r.active[h] = a + } + + // Check if any pending alerts should be removed or fire now. Write out alert timeseries. + for fp, a := range r.active { + if _, ok := resultFPs[fp]; !ok { + // If the alert was previously firing, keep it around for a given + // retention time so it is reported as resolved to the AlertManager. + if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) { + delete(r.active, fp) + } + if a.State != StateInactive { + a.State = StateInactive + a.ResolvedAt = ts + } + continue + } + + if a.State == StatePending && ts.Sub(a.ActiveAt) >= r.holdDuration { + a.State = StateFiring + a.FiredAt = ts + } + + if r.restored { + vec = append(vec, r.sample(a, ts)) + vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix()))) + } + } + + // We have already acquired the lock above hence using SetHealth and + // SetLastError will deadlock. + r.health = HealthGood + r.lastError = err + return vec, nil +} + +// State returns the maximum state of alert instances for this rule. +// StateFiring > StatePending > StateInactive +func (r *AlertingRule) State() AlertState { + r.mtx.Lock() + defer r.mtx.Unlock() + + maxState := StateInactive + for _, a := range r.active { + if a.State > maxState { + maxState = a.State + } + } + return maxState +} + +// ActiveAlerts returns a slice of active alerts. +func (r *AlertingRule) ActiveAlerts() []*Alert { + var res []*Alert + for _, a := range r.currentAlerts() { + if a.ResolvedAt.IsZero() { + res = append(res, a) + } + } + return res +} + +// currentAlerts returns all instances of alerts for this rule. This may include +// inactive alerts that were previously firing. +func (r *AlertingRule) currentAlerts() []*Alert { + r.mtx.Lock() + defer r.mtx.Unlock() + + alerts := make([]*Alert, 0, len(r.active)) + + for _, a := range r.active { + anew := *a + alerts = append(alerts, &anew) + } + return alerts +} + +// ForEachActiveAlert runs the given function on each alert. +// This should be used when you want to use the actual alerts from the AlertingRule +// and not on its copy. +// If you want to run on a copy of alerts then don't use this, get the alerts from 'ActiveAlerts()'. +func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) { + r.mtx.Lock() + defer r.mtx.Unlock() + + for _, a := range r.active { + f(a) + } +} + +func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) { + alerts := []*Alert{} + r.ForEachActiveAlert(func(alert *Alert) { + if alert.needsSending(ts, resendDelay) { + alert.LastSentAt = ts + // Allow for two Eval or Alertmanager send failures. + delta := resendDelay + if interval > resendDelay { + delta = interval + } + alert.ValidUntil = ts.Add(4 * delta) + anew := *alert + alerts = append(alerts, &anew) + } + }) + notifyFunc(ctx, r.vector.String(), alerts...) +} + +func (r *AlertingRule) String() string { + ar := rulefmt.Rule{ + Alert: r.name, + Expr: r.vector.String(), + For: model.Duration(r.holdDuration), + Labels: r.labels.Map(), + Annotations: r.annotations.Map(), + SrcTenants: r.srcTenants, + DestTenant: r.destTenant, + } + + byt, err := yaml.Marshal(ar) + if err != nil { + return fmt.Sprintf("error marshaling alerting rule: %s", err.Error()) + } + + return string(byt) +} diff --git a/pkg/ruler/rules/manager.go b/pkg/ruler/rules/manager.go new file mode 100644 index 00000000000..fe48cf7ac1b --- /dev/null +++ b/pkg/ruler/rules/manager.go @@ -0,0 +1,1164 @@ +package rules + +import ( + "context" + "math" + "net/url" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + opentracing "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/prometheus/prometheus/storage" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" + "github.com/cortexproject/cortex/pkg/tenant" +) + +// RuleHealth describes the health state of a rule. +type RuleHealth string + +// The possible health states of a rule based on the last execution. +const ( + HealthUnknown RuleHealth = "unknown" + HealthGood RuleHealth = "ok" + HealthBad RuleHealth = "err" +) + +// Constants for instrumentation. +const namespace = "prometheus" + +// Metrics for rule evaluation. +type Metrics struct { + EvalDuration prometheus.Summary + IterationDuration prometheus.Summary + IterationsMissed *prometheus.CounterVec + IterationsScheduled *prometheus.CounterVec + EvalTotal *prometheus.CounterVec + EvalFailures *prometheus.CounterVec + GroupInterval *prometheus.GaugeVec + GroupLastEvalTime *prometheus.GaugeVec + GroupLastDuration *prometheus.GaugeVec + GroupRules *prometheus.GaugeVec + GroupSamples *prometheus.GaugeVec +} + +// NewGroupMetrics creates a new instance of Metrics and registers it with the provided registerer, +// if not nil. +func NewGroupMetrics(reg prometheus.Registerer) *Metrics { + m := &Metrics{ + EvalDuration: prometheus.NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "rule_evaluation_duration_seconds", + Help: "The duration for a rule to execute.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + IterationDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Namespace: namespace, + Name: "rule_group_duration_seconds", + Help: "The duration of rule group evaluations.", + Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001}, + }), + IterationsMissed: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "rule_group_iterations_missed_total", + Help: "The total number of rule group evaluations missed due to slow rule group evaluation.", + }, + []string{"rule_group"}, + ), + IterationsScheduled: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "rule_group_iterations_total", + Help: "The total number of scheduled rule group evaluations, whether executed or missed.", + }, + []string{"rule_group"}, + ), + EvalTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "rule_evaluations_total", + Help: "The total number of rule evaluations.", + }, + []string{"rule_group"}, + ), + EvalFailures: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Name: "rule_evaluation_failures_total", + Help: "The total number of rule evaluation failures.", + }, + []string{"rule_group"}, + ), + GroupInterval: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "rule_group_interval_seconds", + Help: "The interval of a rule group.", + }, + []string{"rule_group"}, + ), + GroupLastEvalTime: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "rule_group_last_evaluation_timestamp_seconds", + Help: "The timestamp of the last rule group evaluation in seconds.", + }, + []string{"rule_group"}, + ), + GroupLastDuration: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "rule_group_last_duration_seconds", + Help: "The duration of the last rule group evaluation.", + }, + []string{"rule_group"}, + ), + GroupRules: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "rule_group_rules", + Help: "The number of rules.", + }, + []string{"rule_group"}, + ), + GroupSamples: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "rule_group_last_evaluation_samples", + Help: "The number of samples returned during the last rule group evaluation.", + }, + []string{"rule_group"}, + ), + } + + if reg != nil { + reg.MustRegister( + m.EvalDuration, + m.IterationDuration, + m.IterationsMissed, + m.IterationsScheduled, + m.EvalTotal, + m.EvalFailures, + m.GroupInterval, + m.GroupLastEvalTime, + m.GroupLastDuration, + m.GroupRules, + m.GroupSamples, + ) + } + + return m +} + +// QueryFunc processes PromQL queries. +type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, error) + +// EngineQueryFunc returns a new query function that executes instant queries against +// the given engine. +// It converts scalar into vector results. +func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { + return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + q, err := engine.NewInstantQuery(q, qs, t) + if err != nil { + return nil, err + } + res := q.Exec(ctx) + if res.Err != nil { + return nil, res.Err + } + switch v := res.Value.(type) { + case promql.Vector: + return v, nil + case promql.Scalar: + return promql.Vector{promql.Sample{ + Point: promql.Point(v), + Metric: labels.Labels{}, + }}, nil + default: + return nil, errors.New("rule result is not a vector or scalar") + } + } +} + +// A Rule encapsulates a vector expression which is evaluated at a specified +// interval and acted upon (currently either recorded or used for alerting). +type Rule interface { + Name() string + // Labels of the rule. + Labels() labels.Labels + // eval evaluates the rule, including any associated recording or alerting actions. + Eval(context.Context, time.Time, QueryFunc, *url.URL) (promql.Vector, error) + // String returns a human-readable string representation of the rule. + String() string + // Query returns the rule query expression. + Query() parser.Expr + // SetLastErr sets the current error experienced by the rule. + SetLastError(error) + // LastErr returns the last error experienced by the rule. + LastError() error + // SetHealth sets the current health of the rule. + SetHealth(RuleHealth) + // Health returns the current health of the rule. + Health() RuleHealth + SetEvaluationDuration(time.Duration) + // GetEvaluationDuration returns last evaluation duration. + // NOTE: Used dynamically by rules.html template. + GetEvaluationDuration() time.Duration + SetEvaluationTimestamp(time.Time) + // GetEvaluationTimestamp returns last evaluation timestamp. + // NOTE: Used dynamically by rules.html template. + GetEvaluationTimestamp() time.Time + + // GetSrcTenants gets the source tenants to be used for the rule. + GetSrcTenants() string + // GetDestTenant gets the destination teant to store series produced by the rule in. + GetDestTenant() string +} + +// Group is a set of rules that have a logical relation. +type Group struct { + name string + file string + interval time.Duration + rules []Rule + seriesInPreviousEval []map[string]labels.Labels // One per Rule. + staleSeries []labels.Labels + opts *ManagerOptions + mtx sync.Mutex + evaluationTime time.Duration + lastEvaluation time.Time + + shouldRestore bool + + markStale bool + done chan struct{} + terminated chan struct{} + managerDone chan struct{} + + logger log.Logger + + metrics *Metrics +} + +type GroupOptions struct { + Name, File string + Interval time.Duration + Rules []Rule + ShouldRestore bool + Opts *ManagerOptions + done chan struct{} +} + +// NewGroup makes a new Group with the given name, options, and rules. +func NewGroup(o GroupOptions) *Group { + metrics := o.Opts.Metrics + if metrics == nil { + metrics = NewGroupMetrics(o.Opts.Registerer) + } + + key := GroupKey(o.File, o.Name) + metrics.IterationsMissed.WithLabelValues(key) + metrics.IterationsScheduled.WithLabelValues(key) + metrics.EvalTotal.WithLabelValues(key) + metrics.EvalFailures.WithLabelValues(key) + metrics.GroupLastEvalTime.WithLabelValues(key) + metrics.GroupLastDuration.WithLabelValues(key) + metrics.GroupRules.WithLabelValues(key).Set(float64(len(o.Rules))) + metrics.GroupSamples.WithLabelValues(key) + metrics.GroupInterval.WithLabelValues(key).Set(o.Interval.Seconds()) + + return &Group{ + name: o.Name, + file: o.File, + interval: o.Interval, + rules: o.Rules, + shouldRestore: o.ShouldRestore, + opts: o.Opts, + seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)), + done: make(chan struct{}), + managerDone: o.done, + terminated: make(chan struct{}), + logger: log.With(o.Opts.Logger, "group", o.Name), + metrics: metrics, + } +} + +// Name returns the group name. +func (g *Group) Name() string { return g.name } + +// File returns the group's file. +func (g *Group) File() string { return g.file } + +// Rules returns the group's rules. +func (g *Group) Rules() []Rule { return g.rules } + +// Interval returns the group's interval. +func (g *Group) Interval() time.Duration { return g.interval } + +func (g *Group) run(ctx context.Context) { + defer close(g.terminated) + + // Wait an initial amount to have consistently slotted intervals. + evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval) + select { + case <-time.After(time.Until(evalTimestamp)): + case <-g.done: + return + } + + ctx = promql.NewOriginContext(ctx, map[string]interface{}{ + "ruleGroup": map[string]string{ + "file": g.File(), + "name": g.Name(), + }, + }) + + iter := func() { + g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc() + + start := time.Now() + g.Eval(ctx, evalTimestamp) + timeSinceStart := time.Since(start) + + g.metrics.IterationDuration.Observe(timeSinceStart.Seconds()) + g.setEvaluationTime(timeSinceStart) + g.setLastEvaluation(start) + } + + // The assumption here is that since the ticker was started after having + // waited for `evalTimestamp` to pass, the ticks will trigger soon + // after each `evalTimestamp + N * g.interval` occurrence. + tick := time.NewTicker(g.interval) + defer tick.Stop() + + defer func() { + if !g.markStale { + return + } + go func(now time.Time) { + for _, rule := range g.seriesInPreviousEval { + for _, r := range rule { + g.staleSeries = append(g.staleSeries, r) + } + } + // That can be garbage collected at this point. + g.seriesInPreviousEval = nil + // Wait for 2 intervals to give the opportunity to renamed rules + // to insert new series in the tsdb. At this point if there is a + // renamed rule, it should already be started. + select { + case <-g.managerDone: + case <-time.After(2 * g.interval): + g.cleanupStaleSeries(ctx, now) + } + }(time.Now()) + }() + + iter() + if g.shouldRestore { + // If we have to restore, we wait for another Eval to finish. + // The reason behind this is, during first eval (or before it) + // we might not have enough data scraped, and recording rules would not + // have updated the latest values, on which some alerts might depend. + select { + case <-g.done: + return + case <-tick.C: + missed := (time.Since(evalTimestamp) / g.interval) - 1 + if missed > 0 { + g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + } + evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) + iter() + } + + g.RestoreForState(time.Now()) + g.shouldRestore = false + } + + for { + select { + case <-g.done: + return + default: + select { + case <-g.done: + return + case <-tick.C: + missed := (time.Since(evalTimestamp) / g.interval) - 1 + if missed > 0 { + g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed)) + } + evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval) + iter() + } + } + } +} + +func (g *Group) stop() { + close(g.done) + <-g.terminated +} + +func (g *Group) hash() uint64 { + l := labels.New( + labels.Label{Name: "name", Value: g.name}, + labels.Label{Name: "file", Value: g.file}, + ) + return l.Hash() +} + +// AlertingRules returns the list of the group's alerting rules. +func (g *Group) AlertingRules() []*AlertingRule { + g.mtx.Lock() + defer g.mtx.Unlock() + + var alerts []*AlertingRule + for _, rule := range g.rules { + if alertingRule, ok := rule.(*AlertingRule); ok { + alerts = append(alerts, alertingRule) + } + } + sort.Slice(alerts, func(i, j int) bool { + return alerts[i].State() > alerts[j].State() || + (alerts[i].State() == alerts[j].State() && + alerts[i].Name() < alerts[j].Name()) + }) + return alerts +} + +// HasAlertingRules returns true if the group contains at least one AlertingRule. +func (g *Group) HasAlertingRules() bool { + g.mtx.Lock() + defer g.mtx.Unlock() + + for _, rule := range g.rules { + if _, ok := rule.(*AlertingRule); ok { + return true + } + } + return false +} + +// GetEvaluationTime returns the time in seconds it took to evaluate the rule group. +func (g *Group) GetEvaluationTime() time.Duration { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.evaluationTime +} + +// setEvaluationTime sets the time in seconds the last evaluation took. +func (g *Group) setEvaluationTime(dur time.Duration) { + g.metrics.GroupLastDuration.WithLabelValues(GroupKey(g.file, g.name)).Set(dur.Seconds()) + + g.mtx.Lock() + defer g.mtx.Unlock() + g.evaluationTime = dur +} + +// GetLastEvaluation returns the time the last evaluation of the rule group took place. +func (g *Group) GetLastEvaluation() time.Time { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.lastEvaluation +} + +// setLastEvaluation updates evaluationTimestamp to the timestamp of when the rule group was last evaluated. +func (g *Group) setLastEvaluation(ts time.Time) { + g.metrics.GroupLastEvalTime.WithLabelValues(GroupKey(g.file, g.name)).Set(float64(ts.UnixNano()) / 1e9) + + g.mtx.Lock() + defer g.mtx.Unlock() + g.lastEvaluation = ts +} + +// EvalTimestamp returns the immediately preceding consistently slotted evaluation time. +func (g *Group) EvalTimestamp(startTime int64) time.Time { + var ( + offset = int64(g.hash() % uint64(g.interval)) + adjNow = startTime - offset + base = adjNow - (adjNow % int64(g.interval)) + ) + + return time.Unix(0, base+offset).UTC() +} + +func nameAndLabels(rule Rule) string { + return rule.Name() + rule.Labels().String() +} + +// CopyState copies the alerting rule and staleness related state from the given group. +// +// Rules are matched based on their name and labels. If there are duplicates, the +// first is matched with the first, second with the second etc. +func (g *Group) CopyState(from *Group) { + g.evaluationTime = from.evaluationTime + g.lastEvaluation = from.lastEvaluation + + ruleMap := make(map[string][]int, len(from.rules)) + + for fi, fromRule := range from.rules { + nameAndLabels := nameAndLabels(fromRule) + l := ruleMap[nameAndLabels] + ruleMap[nameAndLabels] = append(l, fi) + } + + for i, rule := range g.rules { + nameAndLabels := nameAndLabels(rule) + indexes := ruleMap[nameAndLabels] + if len(indexes) == 0 { + continue + } + fi := indexes[0] + g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi] + ruleMap[nameAndLabels] = indexes[1:] + + ar, ok := rule.(*AlertingRule) + if !ok { + continue + } + far, ok := from.rules[fi].(*AlertingRule) + if !ok { + continue + } + + for fp, a := range far.active { + ar.active[fp] = a + } + } + + // Handle deleted and unmatched duplicate rules. + g.staleSeries = from.staleSeries + for fi, fromRule := range from.rules { + nameAndLabels := nameAndLabels(fromRule) + l := ruleMap[nameAndLabels] + if len(l) != 0 { + for _, series := range from.seriesInPreviousEval[fi] { + g.staleSeries = append(g.staleSeries, series) + } + } + } +} + +// Eval runs a single evaluation cycle in which all rules are evaluated sequentially. +func (g *Group) Eval(ctx context.Context, ts time.Time) { + var samplesTotal float64 + for i, rule := range g.rules { + select { + case <-g.done: + return + default: + } + + func(i int, rule Rule) { + sp, ctx := opentracing.StartSpanFromContext(ctx, "rule") + sp.SetTag("name", rule.Name()) + defer func(t time.Time) { + sp.Finish() + + since := time.Since(t) + g.metrics.EvalDuration.Observe(since.Seconds()) + rule.SetEvaluationDuration(since) + rule.SetEvaluationTimestamp(t) + }(time.Now()) + + g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Set the OrgID from src tenant for this ctx + queryCtx := ctx + srcTenants := rule.GetSrcTenants() + if srcTenants != "" { + queryCtx = user.InjectOrgID(ctx, srcTenants) + } + vector, err := rule.Eval(queryCtx, ts, g.opts.QueryFunc, g.opts.ExternalURL) + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Canceled queries are intentional termination of queries. This normally + // happens on shutdown and thus we skip logging of any errors here. + if _, ok := err.(promql.ErrQueryCanceled); !ok { + level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) + } + return + } + samplesTotal += float64(len(vector)) + + if ar, ok := rule.(*AlertingRule); ok { + ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + } + var ( + numOutOfOrder = 0 + numDuplicates = 0 + ) + + // determine the appender here based on the rule dest tenant, again by ctx injection + appCtx := ctx + dst := rule.GetDestTenant() + if dst != "" { + appCtx = user.InjectOrgID(ctx, dst) + } + // Handle when dest tenant (or owning tenant) is a composite tenant e.g. `a|b|...|z` + tenantIDs, err := tenant.TenantIDs(appCtx) + if err == nil && len(tenantIDs) > 1 { + // Mod the hash of the series so same series always goes to same subtenant + h, _ := rule.Labels().Copy().HashForLabels(make([]byte, 0, 1024), labels.MetricName) + i := int(h) % len(tenantIDs) + if i < 0 { + i = i * -1 + } + tenantID := tenantIDs[i] + appCtx = user.InjectOrgID(ctx, tenantID) + } + app := g.opts.Appendable.Appender(appCtx) + seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err) + return + } + g.seriesInPreviousEval[i] = seriesReturned + }() + + for _, s := range vector { + if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + + switch errors.Cause(err) { + case storage.ErrOutOfOrderSample: + numOutOfOrder++ + level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case storage.ErrDuplicateSampleForTimestamp: + numDuplicates++ + level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + default: + level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + } + } else { + seriesReturned[s.Metric.String()] = s.Metric + } + } + if numOutOfOrder > 0 { + level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder) + } + if numDuplicates > 0 { + level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates) + } + + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + switch errors.Cause(err) { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err) + } + } + } + }(i, rule) + } + if g.metrics != nil { + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) + } + g.cleanupStaleSeries(ctx, ts) +} + +func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) { + if len(g.staleSeries) == 0 { + return + } + app := g.opts.Appendable.Appender(ctx) + for _, s := range g.staleSeries { + // Rule that produced series no longer configured, mark it stale. + _, err := app.Append(0, s, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN)) + switch errors.Cause(err) { + case nil: + case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(g.logger).Log("msg", "Adding stale sample for previous configuration failed", "sample", s, "err", err) + } + } + if err := app.Commit(); err != nil { + level.Warn(g.logger).Log("msg", "Stale sample appending for previous configuration failed", "err", err) + } else { + g.staleSeries = nil + } +} + +// RestoreForState restores the 'for' state of the alerts +// by looking up last ActiveAt from storage. +func (g *Group) RestoreForState(ts time.Time) { + maxtMS := int64(model.TimeFromUnixNano(ts.UnixNano())) + // We allow restoration only if alerts were active before after certain time. + mint := ts.Add(-g.opts.OutageTolerance) + mintMS := int64(model.TimeFromUnixNano(mint.UnixNano())) + q, err := g.opts.Queryable.Querier(g.opts.Context, mintMS, maxtMS) + if err != nil { + level.Error(g.logger).Log("msg", "Failed to get Querier", "err", err) + return + } + defer func() { + if err := q.Close(); err != nil { + level.Error(g.logger).Log("msg", "Failed to close Querier", "err", err) + } + }() + + for _, rule := range g.Rules() { + alertRule, ok := rule.(*AlertingRule) + if !ok { + continue + } + + alertHoldDuration := alertRule.HoldDuration() + if alertHoldDuration < g.opts.ForGracePeriod { + // If alertHoldDuration is already less than grace period, we would not + // like to make it wait for `g.opts.ForGracePeriod` time before firing. + // Hence we skip restoration, which will make it wait for alertHoldDuration. + alertRule.SetRestored(true) + continue + } + + alertRule.ForEachActiveAlert(func(a *Alert) { + smpl := alertRule.forStateSample(a, time.Now(), 0) + var matchers []*labels.Matcher + for _, l := range smpl.Metric { + mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value) + if err != nil { + panic(err) + } + matchers = append(matchers, mt) + } + + sset := q.Select(false, nil, matchers...) + + seriesFound := false + var s storage.Series + for sset.Next() { + // Query assures that smpl.Metric is included in sset.At().Labels(), + // hence just checking the length would act like equality. + // (This is faster than calling labels.Compare again as we already have some info). + if len(sset.At().Labels()) == len(smpl.Metric) { + s = sset.At() + seriesFound = true + break + } + } + + if err := sset.Err(); err != nil { + // Querier Warnings are ignored. We do not care unless we have an error. + level.Error(g.logger).Log( + "msg", "Failed to restore 'for' state", + labels.AlertName, alertRule.Name(), + "stage", "Select", + "err", err, + ) + return + } + + if !seriesFound { + return + } + + // Series found for the 'for' state. + var t int64 + var v float64 + it := s.Iterator() + for it.Next() { + t, v = it.At() + } + if it.Err() != nil { + level.Error(g.logger).Log("msg", "Failed to restore 'for' state", + labels.AlertName, alertRule.Name(), "stage", "Iterator", "err", it.Err()) + return + } + if value.IsStaleNaN(v) { // Alert was not active. + return + } + + downAt := time.Unix(t/1000, 0).UTC() + restoredActiveAt := time.Unix(int64(v), 0).UTC() + timeSpentPending := downAt.Sub(restoredActiveAt) + timeRemainingPending := alertHoldDuration - timeSpentPending + + if timeRemainingPending <= 0 { + // It means that alert was firing when prometheus went down. + // In the next Eval, the state of this alert will be set back to + // firing again if it's still firing in that Eval. + // Nothing to be done in this case. + } else if timeRemainingPending < g.opts.ForGracePeriod { + // (new) restoredActiveAt = (ts + m.opts.ForGracePeriod) - alertHoldDuration + // /* new firing time */ /* moving back by hold duration */ + // + // Proof of correctness: + // firingTime = restoredActiveAt.Add(alertHoldDuration) + // = ts + m.opts.ForGracePeriod - alertHoldDuration + alertHoldDuration + // = ts + m.opts.ForGracePeriod + // + // Time remaining to fire = firingTime.Sub(ts) + // = (ts + m.opts.ForGracePeriod) - ts + // = m.opts.ForGracePeriod + restoredActiveAt = ts.Add(g.opts.ForGracePeriod).Add(-alertHoldDuration) + } else { + // By shifting ActiveAt to the future (ActiveAt + some_duration), + // the total pending time from the original ActiveAt + // would be `alertHoldDuration + some_duration`. + // Here, some_duration = downDuration. + downDuration := ts.Sub(downAt) + restoredActiveAt = restoredActiveAt.Add(downDuration) + } + + a.ActiveAt = restoredActiveAt + level.Debug(g.logger).Log("msg", "'for' state restored", + labels.AlertName, alertRule.Name(), "restored_time", a.ActiveAt.Format(time.RFC850), + "labels", a.Labels.String()) + + }) + + alertRule.SetRestored(true) + } + +} + +// Equals return if two groups are the same. +func (g *Group) Equals(ng *Group) bool { + if g.name != ng.name { + return false + } + + if g.file != ng.file { + return false + } + + if g.interval != ng.interval { + return false + } + + if len(g.rules) != len(ng.rules) { + return false + } + + for i, gr := range g.rules { + if gr.String() != ng.rules[i].String() { + return false + } + } + + return true +} + +// The Manager manages recording and alerting rules. +type Manager struct { + opts *ManagerOptions + groups map[string]*Group + mtx sync.RWMutex + block chan struct{} + done chan struct{} + restored bool + + logger log.Logger +} + +// NotifyFunc sends notifications about a set of alerts generated by the given expression. +type NotifyFunc func(ctx context.Context, expr string, alerts ...*Alert) + +// ManagerOptions bundles options for the Manager. +type ManagerOptions struct { + ExternalURL *url.URL + QueryFunc QueryFunc + NotifyFunc NotifyFunc + Context context.Context + Appendable storage.Appendable + Queryable storage.Queryable + Logger log.Logger + Registerer prometheus.Registerer + OutageTolerance time.Duration + ForGracePeriod time.Duration + ResendDelay time.Duration + GroupLoader GroupLoader + + Metrics *Metrics +} + +// NewManager returns an implementation of Manager, ready to be started +// by calling the Run method. +func NewManager(o *ManagerOptions) *Manager { + if o.Metrics == nil { + o.Metrics = NewGroupMetrics(o.Registerer) + } + + if o.GroupLoader == nil { + o.GroupLoader = FileLoader{} + } + + m := &Manager{ + groups: map[string]*Group{}, + opts: o, + block: make(chan struct{}), + done: make(chan struct{}), + logger: o.Logger, + } + + return m +} + +// Run starts processing of the rule manager. It is blocking. +func (m *Manager) Run() { + m.start() + <-m.done +} + +func (m *Manager) start() { + close(m.block) +} + +// Stop the rule manager's rule evaluation cycles. +func (m *Manager) Stop() { + m.mtx.Lock() + defer m.mtx.Unlock() + + level.Info(m.logger).Log("msg", "Stopping rule manager...") + + for _, eg := range m.groups { + eg.stop() + } + + // Shut down the groups waiting multiple evaluation intervals to write + // staleness markers. + close(m.done) + + level.Info(m.logger).Log("msg", "Rule manager stopped") +} + +// Update the rule manager's state as the config requires. If +// loading the new rules failed the old rule set is restored. +func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error { + m.mtx.Lock() + defer m.mtx.Unlock() + + groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...) + if errs != nil { + for _, e := range errs { + level.Error(m.logger).Log("msg", "loading groups failed", "err", e) + } + return errors.New("error loading rules, previous rule set restored") + } + m.restored = true + + var wg sync.WaitGroup + for _, newg := range groups { + // If there is an old group with the same identifier, + // check if new group equals with the old group, if yes then skip it. + // If not equals, stop it and wait for it to finish the current iteration. + // Then copy it into the new group. + gn := GroupKey(newg.file, newg.name) + oldg, ok := m.groups[gn] + delete(m.groups, gn) + + if ok && oldg.Equals(newg) { + groups[gn] = oldg + continue + } + + wg.Add(1) + go func(newg *Group) { + if ok { + oldg.stop() + newg.CopyState(oldg) + } + wg.Done() + // Wait with starting evaluation until the rule manager + // is told to run. This is necessary to avoid running + // queries against a bootstrapping storage. + <-m.block + newg.run(m.opts.Context) + }(newg) + } + + // Stop remaining old groups. + wg.Add(len(m.groups)) + for n, oldg := range m.groups { + go func(n string, g *Group) { + g.markStale = true + g.stop() + if m := g.metrics; m != nil { + m.IterationsMissed.DeleteLabelValues(n) + m.IterationsScheduled.DeleteLabelValues(n) + m.EvalTotal.DeleteLabelValues(n) + m.EvalFailures.DeleteLabelValues(n) + m.GroupInterval.DeleteLabelValues(n) + m.GroupLastEvalTime.DeleteLabelValues(n) + m.GroupLastDuration.DeleteLabelValues(n) + m.GroupRules.DeleteLabelValues(n) + m.GroupSamples.DeleteLabelValues((n)) + } + wg.Done() + }(n, oldg) + } + + wg.Wait() + m.groups = groups + + return nil +} + +// GroupLoader is responsible for loading rule groups from arbitrary sources and parsing them. +type GroupLoader interface { + Load(identifier string) (*rulefmt.RuleGroups, []error) + Parse(query string) (parser.Expr, error) +} + +// FileLoader is the default GroupLoader implementation. It defers to rulefmt.ParseFile +// and parser.ParseExpr +type FileLoader struct{} + +func (FileLoader) Load(identifier string) (*rulefmt.RuleGroups, []error) { + return rulefmt.ParseFile(identifier) +} + +func (FileLoader) Parse(query string) (parser.Expr, error) { return parser.ParseExpr(query) } + +// LoadGroups reads groups from a list of files. +func (m *Manager) LoadGroups( + interval time.Duration, externalLabels labels.Labels, externalURL string, filenames ...string, +) (map[string]*Group, []error) { + groups := make(map[string]*Group) + + shouldRestore := !m.restored + + for _, fn := range filenames { + rgs, errs := m.opts.GroupLoader.Load(fn) + if errs != nil { + return nil, errs + } + + for _, rg := range rgs.Groups { + itv := interval + if rg.Interval != 0 { + itv = time.Duration(rg.Interval) + } + + rules := make([]Rule, 0, len(rg.Rules)) + for _, r := range rg.Rules { + expr, err := m.opts.GroupLoader.Parse(r.Expr.Value) + if err != nil { + return nil, []error{errors.Wrap(err, fn)} + } + + if r.Alert.Value != "" { + rules = append(rules, NewAlertingRule( + r.Alert.Value, + expr, + time.Duration(r.For), + labels.FromMap(r.Labels), + labels.FromMap(r.Annotations), + externalLabels, + externalURL, + m.restored, + log.With(m.logger, "alert", r.Alert), + r.SrcTenants.Value, + r.DestTenant.Value, + )) + continue + } + rules = append(rules, NewRecordingRule( + r.Record.Value, + expr, + labels.FromMap(r.Labels), + r.SrcTenants.Value, + r.DestTenant.Value, + )) + } + + groups[GroupKey(fn, rg.Name)] = NewGroup(GroupOptions{ + Name: rg.Name, + File: fn, + Interval: itv, + Rules: rules, + ShouldRestore: shouldRestore, + Opts: m.opts, + done: m.done, + }) + } + } + + return groups, nil +} + +// GroupKey group names need not be unique across filenames. +func GroupKey(file, name string) string { + return file + ";" + name +} + +// RuleGroups returns the list of manager's rule groups. +func (m *Manager) RuleGroups() []*Group { + m.mtx.RLock() + defer m.mtx.RUnlock() + + rgs := make([]*Group, 0, len(m.groups)) + for _, g := range m.groups { + rgs = append(rgs, g) + } + + sort.Slice(rgs, func(i, j int) bool { + if rgs[i].file != rgs[j].file { + return rgs[i].file < rgs[j].file + } + return rgs[i].name < rgs[j].name + }) + + return rgs +} + +// Rules returns the list of the manager's rules. +func (m *Manager) Rules() []Rule { + m.mtx.RLock() + defer m.mtx.RUnlock() + + var rules []Rule + for _, g := range m.groups { + rules = append(rules, g.rules...) + } + + return rules +} + +// AlertingRules returns the list of the manager's alerting rules. +func (m *Manager) AlertingRules() []*AlertingRule { + alerts := []*AlertingRule{} + for _, rule := range m.Rules() { + if alertingRule, ok := rule.(*AlertingRule); ok { + alerts = append(alerts, alertingRule) + } + } + + return alerts +} diff --git a/pkg/ruler/rules/recording.go b/pkg/ruler/rules/recording.go new file mode 100644 index 00000000000..2b23e49cef9 --- /dev/null +++ b/pkg/ruler/rules/recording.go @@ -0,0 +1,180 @@ +package rules + +import ( + "context" + "fmt" + "net/url" + "sync" + "time" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + yaml "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" +) + +// A RecordingRule records its vector expression into new timeseries. +type RecordingRule struct { + name string + vector parser.Expr + labels labels.Labels + // Protects the below. + mtx sync.Mutex + // The health of the recording rule. + health RuleHealth + // Timestamp of last evaluation of the recording rule. + evaluationTimestamp time.Time + // The last error seen by the recording rule. + lastError error + // Duration of how long it took to evaluate the recording rule. + evaluationDuration time.Duration + // Used by cortex federated ruler to determine src and dest for rules + srcTenants string + destTenant string +} + +// NewRecordingRule returns a new recording rule. +func NewRecordingRule(name string, vector parser.Expr, lset labels.Labels, srcTenants, destTenant string) *RecordingRule { + return &RecordingRule{ + name: name, + vector: vector, + health: HealthUnknown, + labels: lset, + srcTenants: srcTenants, + destTenant: destTenant, + } +} + +// Name returns the rule name. +func (rule *RecordingRule) Name() string { + return rule.name +} + +// Query returns the rule query expression. +func (rule *RecordingRule) Query() parser.Expr { + return rule.vector +} + +// Labels returns the rule labels. +func (rule *RecordingRule) Labels() labels.Labels { + return rule.labels +} + +// Eval evaluates the rule and then overrides the metric names and labels accordingly. +func (rule *RecordingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, _ *url.URL) (promql.Vector, error) { + // Set the context based on the src tenants for the query func + vector, err := query(ctx, rule.vector.String(), ts) + if err != nil { + return nil, err + } + // Override the metric name and labels. + for i := range vector { + sample := &vector[i] + + lb := labels.NewBuilder(sample.Metric) + + lb.Set(labels.MetricName, rule.name) + + for _, l := range rule.labels { + lb.Set(l.Name, l.Value) + } + + sample.Metric = lb.Labels() + } + + // Check that the rule does not produce identical metrics after applying + // labels. + if vector.ContainsSameLabelset() { + err = fmt.Errorf("vector contains metrics with the same labelset after applying rule labels") + rule.SetHealth(HealthBad) + rule.SetLastError(err) + return nil, err + } + + rule.SetHealth(HealthGood) + rule.SetLastError(err) + return vector, nil +} + +func (rule *RecordingRule) String() string { + r := rulefmt.Rule{ + Record: rule.name, + Expr: rule.vector.String(), + Labels: rule.labels.Map(), + SrcTenants: rule.srcTenants, + DestTenant: rule.destTenant, + } + + byt, err := yaml.Marshal(r) + if err != nil { + return fmt.Sprintf("error marshaling recording rule: %q", err.Error()) + } + + return string(byt) +} + +// SetEvaluationDuration updates evaluationDuration to the time in seconds it took to evaluate the rule on its last evaluation. +func (rule *RecordingRule) SetEvaluationDuration(dur time.Duration) { + rule.mtx.Lock() + defer rule.mtx.Unlock() + rule.evaluationDuration = dur +} + +// SetLastError sets the current error seen by the recording rule. +func (rule *RecordingRule) SetLastError(err error) { + rule.mtx.Lock() + defer rule.mtx.Unlock() + rule.lastError = err +} + +// LastError returns the last error seen by the recording rule. +func (rule *RecordingRule) LastError() error { + rule.mtx.Lock() + defer rule.mtx.Unlock() + return rule.lastError +} + +// SetHealth sets the current health of the recording rule. +func (rule *RecordingRule) SetHealth(health RuleHealth) { + rule.mtx.Lock() + defer rule.mtx.Unlock() + rule.health = health +} + +// Health returns the current health of the recording rule. +func (rule *RecordingRule) Health() RuleHealth { + rule.mtx.Lock() + defer rule.mtx.Unlock() + return rule.health +} + +// GetEvaluationDuration returns the time in seconds it took to evaluate the recording rule. +func (rule *RecordingRule) GetEvaluationDuration() time.Duration { + rule.mtx.Lock() + defer rule.mtx.Unlock() + return rule.evaluationDuration +} + +// SetEvaluationTimestamp updates evaluationTimestamp to the timestamp of when the rule was last evaluated. +func (rule *RecordingRule) SetEvaluationTimestamp(ts time.Time) { + rule.mtx.Lock() + defer rule.mtx.Unlock() + rule.evaluationTimestamp = ts +} + +// GetEvaluationTimestamp returns the time the evaluation took place. +func (rule *RecordingRule) GetEvaluationTimestamp() time.Time { + rule.mtx.Lock() + defer rule.mtx.Unlock() + return rule.evaluationTimestamp +} + +func (rule *RecordingRule) GetDestTenant() string { + return rule.destTenant +} + +func (rule *RecordingRule) GetSrcTenants() string { + return rule.srcTenants +} diff --git a/pkg/ruler/rulespb/compat.go b/pkg/ruler/rulespb/compat.go index f181c01843b..997f05829a9 100644 --- a/pkg/ruler/rulespb/compat.go +++ b/pkg/ruler/rulespb/compat.go @@ -5,10 +5,10 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/rulefmt" "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/cortexpb" //lint:ignore faillint allowed to import other protobuf + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" ) // ToProto transforms a formatted prometheus rulegroup to a rule group protobuf @@ -33,6 +33,8 @@ func formattedRuleToProto(rls []rulefmt.RuleNode) []*RuleDesc { For: time.Duration(rls[i].For), Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(rls[i].Labels)), Annotations: cortexpb.FromLabelsToLabelAdapters(labels.FromMap(rls[i].Annotations)), + SrcTenants: rls[i].SrcTenants.Value, + DestTenant: rls[i].DestTenant.Value, } } @@ -67,6 +69,16 @@ func FromProto(rg *RuleGroupDesc) rulefmt.RuleGroup { alertNode.SetString(rl.GetAlert()) newRule.Alert = alertNode } + if rl.GetSrcTenants() != "" { + srcNode := yaml.Node{} + srcNode.SetString(rl.GetSrcTenants()) + newRule.SrcTenants = srcNode + } + if rl.GetDestTenant() != "" { + destNode := yaml.Node{} + destNode.SetString(rl.GetDestTenant()) + newRule.DestTenant = destNode + } formattedRuleGroup.Rules[i] = newRule } diff --git a/pkg/ruler/rulespb/custom.go b/pkg/ruler/rulespb/custom.go index b0043092829..c55f9eaa222 100644 --- a/pkg/ruler/rulespb/custom.go +++ b/pkg/ruler/rulespb/custom.go @@ -1,6 +1,6 @@ package rulespb -import "github.com/prometheus/prometheus/pkg/rulefmt" +import "github.com/cortexproject/cortex/pkg/ruler/rulefmt" // RuleGroupList contains a set of rule groups type RuleGroupList []*RuleGroupDesc diff --git a/pkg/ruler/rulespb/rules.pb.go b/pkg/ruler/rulespb/rules.pb.go index 8f76eaa471b..ed38c5908db 100644 --- a/pkg/ruler/rulespb/rules.pb.go +++ b/pkg/ruler/rulespb/rules.pb.go @@ -125,6 +125,8 @@ type RuleDesc struct { Expr string `protobuf:"bytes,1,opt,name=expr,proto3" json:"expr,omitempty"` Record string `protobuf:"bytes,2,opt,name=record,proto3" json:"record,omitempty"` Alert string `protobuf:"bytes,3,opt,name=alert,proto3" json:"alert,omitempty"` + SrcTenants string `protobuf:"bytes,13,opt,name=src_tenants,json=srcTenants,proto3" json:"src_tenants,omitempty"` + DestTenant string `protobuf:"bytes,14,opt,name=dest_tenant,json=destTenant,proto3" json:"dest_tenant,omitempty"` For time.Duration `protobuf:"bytes,4,opt,name=for,proto3,stdduration" json:"for"` Labels []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter `protobuf:"bytes,5,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter" json:"labels"` Annotations []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter `protobuf:"bytes,6,rep,name=annotations,proto3,customtype=github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter" json:"annotations"` @@ -183,6 +185,20 @@ func (m *RuleDesc) GetAlert() string { return "" } +func (m *RuleDesc) GetSrcTenants() string { + if m != nil { + return m.SrcTenants + } + return "" +} + +func (m *RuleDesc) GetDestTenant() string { + if m != nil { + return m.DestTenant + } + return "" +} + func (m *RuleDesc) GetFor() time.Duration { if m != nil { return m.For @@ -198,37 +214,39 @@ func init() { func init() { proto.RegisterFile("rules.proto", fileDescriptor_8e722d3e922f0937) } var fileDescriptor_8e722d3e922f0937 = []byte{ - // 476 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x52, 0x3f, 0x6f, 0xd3, 0x40, - 0x1c, 0xf5, 0x35, 0x8e, 0x63, 0x5f, 0x54, 0x11, 0x1d, 0x15, 0x72, 0x2b, 0x74, 0x89, 0x2a, 0x21, - 0x65, 0xe1, 0x22, 0x15, 0x31, 0x30, 0x20, 0x94, 0xa8, 0x12, 0x52, 0xc4, 0x80, 0x3c, 0xb2, 0x9d, - 0x9d, 0xab, 0x31, 0xb8, 0xbe, 0xd3, 0xf9, 0x8c, 0xda, 0x8d, 0x8f, 0xc0, 0xc8, 0x47, 0xe0, 0xa3, - 0x74, 0xcc, 0x58, 0x31, 0x14, 0xe2, 0x2c, 0x8c, 0x95, 0xf8, 0x00, 0xa0, 0xfb, 0x63, 0x5a, 0xc1, - 0x02, 0x03, 0x53, 0x7e, 0xef, 0xde, 0xbd, 0xbc, 0xf7, 0x7b, 0x67, 0x38, 0x94, 0x4d, 0xc9, 0x6a, - 0x22, 0x24, 0x57, 0x1c, 0xf5, 0x0d, 0x38, 0x78, 0x98, 0x17, 0xea, 0x75, 0x93, 0x92, 0x8c, 0x9f, - 0xce, 0x72, 0x9e, 0xf3, 0x99, 0x61, 0xd3, 0xe6, 0xc4, 0x20, 0x03, 0xcc, 0x64, 0x55, 0x07, 0x38, - 0xe7, 0x3c, 0x2f, 0xd9, 0xcd, 0xad, 0x55, 0x23, 0xa9, 0x2a, 0x78, 0xe5, 0xf8, 0xfd, 0xdf, 0x79, - 0x5a, 0x9d, 0x3b, 0xea, 0xc9, 0x2d, 0xa7, 0x8c, 0x4b, 0xc5, 0xce, 0x84, 0xe4, 0x6f, 0x58, 0xa6, - 0x1c, 0x9a, 0x89, 0xb7, 0x79, 0x47, 0xa4, 0x6e, 0xb0, 0xd2, 0xc3, 0x1f, 0x00, 0xee, 0x26, 0x4d, - 0xc9, 0x9e, 0x4b, 0xde, 0x88, 0x63, 0x56, 0x67, 0x08, 0x41, 0xbf, 0xa2, 0xa7, 0x2c, 0x06, 0x13, - 0x30, 0x8d, 0x12, 0x33, 0xa3, 0xfb, 0x30, 0xd2, 0xbf, 0xb5, 0xa0, 0x19, 0x8b, 0x77, 0x0c, 0x71, - 0x73, 0x80, 0x9e, 0xc1, 0xb0, 0xa8, 0x14, 0x93, 0xef, 0x68, 0x19, 0xf7, 0x26, 0x60, 0x3a, 0x3c, - 0xda, 0x27, 0x36, 0x2c, 0xe9, 0xc2, 0x92, 0x63, 0xb7, 0xcc, 0x22, 0xbc, 0xb8, 0x1a, 0x7b, 0x1f, - 0xbf, 0x8c, 0x41, 0xf2, 0x4b, 0x84, 0x1e, 0x40, 0x5b, 0x59, 0xec, 0x4f, 0x7a, 0xd3, 0xe1, 0xd1, - 0x1d, 0x62, 0xdb, 0xd4, 0xb9, 0x74, 0xa4, 0xc4, 0xb2, 0x3a, 0x59, 0x53, 0x33, 0x19, 0x07, 0x36, - 0x99, 0x9e, 0x11, 0x81, 0x03, 0x2e, 0xf4, 0x1f, 0xd7, 0x71, 0x64, 0xc4, 0x7b, 0x7f, 0x58, 0xcf, - 0xab, 0xf3, 0xa4, 0xbb, 0xb4, 0xf4, 0xc3, 0xfe, 0x28, 0x58, 0xfa, 0xe1, 0x60, 0x14, 0x2e, 0xfd, - 0x30, 0x1c, 0x45, 0x87, 0xdf, 0x77, 0x60, 0xd8, 0x39, 0x69, 0x0b, 0x5d, 0x5e, 0xb7, 0xbc, 0x9e, - 0xd1, 0x3d, 0x18, 0x48, 0x96, 0x71, 0xb9, 0x72, 0x9b, 0x3b, 0x84, 0xf6, 0x60, 0x9f, 0x96, 0x4c, - 0x2a, 0xb3, 0x73, 0x94, 0x58, 0x80, 0x1e, 0xc3, 0xde, 0x09, 0x97, 0xb1, 0xff, 0xf7, 0x3d, 0xe8, - 0xfb, 0xa8, 0x82, 0x41, 0x49, 0x53, 0x56, 0xd6, 0x71, 0xdf, 0xac, 0x71, 0x97, 0x74, 0xef, 0x45, - 0x5e, 0xe8, 0xf3, 0x97, 0xb4, 0x90, 0x8b, 0xb9, 0xd6, 0x7c, 0xbe, 0x1a, 0xff, 0xd3, 0x7b, 0x5b, - 0xfd, 0x7c, 0x45, 0x85, 0x62, 0x32, 0x71, 0x2e, 0xe8, 0x0c, 0x0e, 0x69, 0x55, 0x71, 0x45, 0x6d, - 0x77, 0xc1, 0x7f, 0x35, 0xbd, 0x6d, 0x65, 0xba, 0xdf, 0x5d, 0x3c, 0x5d, 0x6f, 0xb0, 0x77, 0xb9, - 0xc1, 0xde, 0xf5, 0x06, 0x83, 0xf7, 0x2d, 0x06, 0x9f, 0x5a, 0x0c, 0x2e, 0x5a, 0x0c, 0xd6, 0x2d, - 0x06, 0x5f, 0x5b, 0x0c, 0xbe, 0xb5, 0xd8, 0xbb, 0x6e, 0x31, 0xf8, 0xb0, 0xc5, 0xde, 0x7a, 0x8b, - 0xbd, 0xcb, 0x2d, 0xf6, 0x5e, 0x0d, 0xcc, 0x87, 0x20, 0xd2, 0x34, 0x30, 0x85, 0x3e, 0xfa, 0x19, - 0x00, 0x00, 0xff, 0xff, 0xa0, 0xd3, 0x9a, 0x1a, 0x78, 0x03, 0x00, 0x00, + // 511 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x52, 0xb1, 0x6e, 0xd3, 0x40, + 0x18, 0xf6, 0x11, 0xc7, 0x71, 0xce, 0x0a, 0x44, 0x47, 0x85, 0xae, 0x15, 0xba, 0x44, 0x95, 0x90, + 0xb2, 0xe0, 0x48, 0x45, 0x0c, 0x0c, 0x08, 0x25, 0xaa, 0x84, 0x14, 0x31, 0x20, 0x8b, 0x89, 0x05, + 0x9d, 0x9d, 0xab, 0x09, 0xb8, 0x77, 0xd6, 0xdd, 0x19, 0xb5, 0x1b, 0x8f, 0xc0, 0xc8, 0x23, 0xf0, + 0x02, 0xbc, 0x43, 0xc7, 0x8c, 0x15, 0x43, 0x21, 0xce, 0xc2, 0xd8, 0x37, 0x00, 0xdd, 0x9d, 0x4d, + 0x2b, 0x58, 0x60, 0x60, 0xca, 0xff, 0xff, 0xdf, 0xf7, 0xe7, 0xfb, 0xfe, 0xcf, 0x07, 0x23, 0x59, + 0x15, 0x4c, 0xc5, 0xa5, 0x14, 0x5a, 0xa0, 0xae, 0x6d, 0xf6, 0xee, 0xe7, 0x2b, 0xfd, 0xba, 0x4a, + 0xe3, 0x4c, 0x1c, 0x4f, 0x73, 0x91, 0x8b, 0xa9, 0x45, 0xd3, 0xea, 0xc8, 0x76, 0xb6, 0xb1, 0x95, + 0xdb, 0xda, 0x23, 0xb9, 0x10, 0x79, 0xc1, 0xae, 0x58, 0xcb, 0x4a, 0x52, 0xbd, 0x12, 0xbc, 0xc1, + 0x77, 0x7f, 0xc7, 0x29, 0x3f, 0x6d, 0xa0, 0x47, 0xd7, 0x94, 0x32, 0x21, 0x35, 0x3b, 0x29, 0xa5, + 0x78, 0xc3, 0x32, 0xdd, 0x74, 0xd3, 0xf2, 0x6d, 0xde, 0x02, 0x69, 0x53, 0xb8, 0xd5, 0xfd, 0x1f, + 0x00, 0x0e, 0x92, 0xaa, 0x60, 0x4f, 0xa5, 0xa8, 0xca, 0x43, 0xa6, 0x32, 0x84, 0xa0, 0xcf, 0xe9, + 0x31, 0xc3, 0x60, 0x0c, 0x26, 0xfd, 0xc4, 0xd6, 0xe8, 0x2e, 0xec, 0x9b, 0x5f, 0x55, 0xd2, 0x8c, + 0xe1, 0x1b, 0x16, 0xb8, 0x1a, 0xa0, 0x27, 0x30, 0x5c, 0x71, 0xcd, 0xe4, 0x3b, 0x5a, 0xe0, 0xce, + 0x18, 0x4c, 0xa2, 0x83, 0xdd, 0xd8, 0x99, 0x8d, 0x5b, 0xb3, 0xf1, 0x61, 0x73, 0xcc, 0x3c, 0x3c, + 0xbb, 0x18, 0x79, 0x1f, 0xbf, 0x8e, 0x40, 0xf2, 0x6b, 0x09, 0xdd, 0x83, 0x2e, 0x32, 0xec, 0x8f, + 0x3b, 0x93, 0xe8, 0xe0, 0x56, 0xec, 0xd2, 0x34, 0xbe, 0x8c, 0xa5, 0xc4, 0xa1, 0xc6, 0x59, 0xa5, + 0x98, 0xc4, 0x81, 0x73, 0x66, 0x6a, 0x14, 0xc3, 0x9e, 0x28, 0xcd, 0x1f, 0x2b, 0xdc, 0xb7, 0xcb, + 0x3b, 0x7f, 0x48, 0xcf, 0xf8, 0x69, 0xd2, 0x92, 0x16, 0x7e, 0xd8, 0x1d, 0x06, 0x0b, 0x3f, 0xec, + 0x0d, 0xc3, 0x85, 0x1f, 0x86, 0xc3, 0xfe, 0xfe, 0xe7, 0x0e, 0x0c, 0x5b, 0x25, 0x23, 0x61, 0xc2, + 0x6b, 0x8f, 0x37, 0x35, 0xba, 0x03, 0x03, 0xc9, 0x32, 0x21, 0x97, 0xcd, 0xe5, 0x4d, 0x87, 0x76, + 0x60, 0x97, 0x16, 0x4c, 0x6a, 0x7b, 0x73, 0x3f, 0x71, 0x0d, 0x1a, 0xc1, 0x48, 0xc9, 0xec, 0x95, + 0x66, 0x9c, 0x72, 0xad, 0xf0, 0xc0, 0x62, 0x50, 0xc9, 0xec, 0x85, 0x9b, 0x18, 0xc2, 0x92, 0x29, + 0xdd, 0x30, 0xf0, 0x4d, 0x47, 0x30, 0x23, 0xc7, 0x40, 0x0f, 0x61, 0xe7, 0x48, 0x48, 0xec, 0xff, + 0x7d, 0x92, 0x86, 0x8f, 0x38, 0x0c, 0x0a, 0x9a, 0xb2, 0x42, 0xe1, 0xae, 0x0d, 0xe2, 0x76, 0xdc, + 0x7e, 0xf1, 0xf8, 0x99, 0x99, 0x3f, 0xa7, 0x2b, 0x39, 0x9f, 0x99, 0x9d, 0x2f, 0x17, 0xa3, 0x7f, + 0x7a, 0x31, 0x6e, 0x7f, 0xb6, 0xa4, 0xa5, 0x66, 0x32, 0x69, 0x54, 0xd0, 0x09, 0x8c, 0x28, 0xe7, + 0x42, 0x53, 0x97, 0x7e, 0xf0, 0x5f, 0x45, 0xaf, 0x4b, 0xd9, 0xaf, 0x37, 0x98, 0x3f, 0x5e, 0x6f, + 0x88, 0x77, 0xbe, 0x21, 0xde, 0xe5, 0x86, 0x80, 0xf7, 0x35, 0x01, 0x9f, 0x6a, 0x02, 0xce, 0x6a, + 0x02, 0xd6, 0x35, 0x01, 0xdf, 0x6a, 0x02, 0xbe, 0xd7, 0xc4, 0xbb, 0xac, 0x09, 0xf8, 0xb0, 0x25, + 0xde, 0x7a, 0x4b, 0xbc, 0xf3, 0x2d, 0xf1, 0x5e, 0xf6, 0xec, 0x53, 0x2a, 0xd3, 0x34, 0xb0, 0x81, + 0x3e, 0xf8, 0x19, 0x00, 0x00, 0xff, 0xff, 0x30, 0x4e, 0xd9, 0xef, 0xba, 0x03, 0x00, 0x00, } func (this *RuleGroupDesc) Equal(that interface{}) bool { @@ -308,6 +326,12 @@ func (this *RuleDesc) Equal(that interface{}) bool { if this.Alert != that1.Alert { return false } + if this.SrcTenants != that1.SrcTenants { + return false + } + if this.DestTenant != that1.DestTenant { + return false + } if this.For != that1.For { return false } @@ -352,11 +376,13 @@ func (this *RuleDesc) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 10) + s := make([]string, 0, 12) s = append(s, "&rulespb.RuleDesc{") s = append(s, "Expr: "+fmt.Sprintf("%#v", this.Expr)+",\n") s = append(s, "Record: "+fmt.Sprintf("%#v", this.Record)+",\n") s = append(s, "Alert: "+fmt.Sprintf("%#v", this.Alert)+",\n") + s = append(s, "SrcTenants: "+fmt.Sprintf("%#v", this.SrcTenants)+",\n") + s = append(s, "DestTenant: "+fmt.Sprintf("%#v", this.DestTenant)+",\n") s = append(s, "For: "+fmt.Sprintf("%#v", this.For)+",\n") s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") s = append(s, "Annotations: "+fmt.Sprintf("%#v", this.Annotations)+",\n") @@ -471,6 +497,20 @@ func (m *RuleDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.DestTenant) > 0 { + i -= len(m.DestTenant) + copy(dAtA[i:], m.DestTenant) + i = encodeVarintRules(dAtA, i, uint64(len(m.DestTenant))) + i-- + dAtA[i] = 0x72 + } + if len(m.SrcTenants) > 0 { + i -= len(m.SrcTenants) + copy(dAtA[i:], m.SrcTenants) + i = encodeVarintRules(dAtA, i, uint64(len(m.SrcTenants))) + i-- + dAtA[i] = 0x6a + } if len(m.Annotations) > 0 { for iNdEx := len(m.Annotations) - 1; iNdEx >= 0; iNdEx-- { { @@ -609,6 +649,14 @@ func (m *RuleDesc) Size() (n int) { n += 1 + l + sovRules(uint64(l)) } } + l = len(m.SrcTenants) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + l = len(m.DestTenant) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } return n } @@ -654,6 +702,8 @@ func (this *RuleDesc) String() string { `For:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.For), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, `Annotations:` + fmt.Sprintf("%v", this.Annotations) + `,`, + `SrcTenants:` + fmt.Sprintf("%v", this.SrcTenants) + `,`, + `DestTenant:` + fmt.Sprintf("%v", this.DestTenant) + `,`, `}`, }, "") return s @@ -1142,6 +1192,70 @@ func (m *RuleDesc) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 13: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SrcTenants", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.SrcTenants = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 14: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DestTenant", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DestTenant = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRules(dAtA[iNdEx:]) diff --git a/pkg/ruler/rulespb/rules.proto b/pkg/ruler/rulespb/rules.proto index 16274cec013..77d6bcd5ee1 100644 --- a/pkg/ruler/rulespb/rules.proto +++ b/pkg/ruler/rulespb/rules.proto @@ -35,6 +35,8 @@ message RuleDesc { string expr = 1; string record = 2; string alert = 3; + string src_tenants = 13; + string dest_tenant = 14; google.protobuf.Duration for = 4 [(gogoproto.nullable) = false,(gogoproto.stdduration) = true]; repeated cortexpb.LabelPair labels = 5 [ (gogoproto.nullable) = false, @@ -44,4 +46,4 @@ message RuleDesc { (gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter" ]; -} \ No newline at end of file +} diff --git a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go index 4395ad9a404..8fe9bc6ec87 100644 --- a/pkg/ruler/rulestore/bucketclient/bucket_client_test.go +++ b/pkg/ruler/rulestore/bucketclient/bucket_client_test.go @@ -11,13 +11,13 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/ruler/rulestore/objectclient" diff --git a/pkg/ruler/rulestore/local/local.go b/pkg/ruler/rulestore/local/local.go index 080e2941476..c42cc3e9bd9 100644 --- a/pkg/ruler/rulestore/local/local.go +++ b/pkg/ruler/rulestore/local/local.go @@ -8,8 +8,8 @@ import ( "path/filepath" "github.com/pkg/errors" - promRules "github.com/prometheus/prometheus/rules" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" ) @@ -30,10 +30,10 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // cfg.Directory / userID / namespace type Client struct { cfg Config - loader promRules.GroupLoader + loader rules.GroupLoader } -func NewLocalRulesClient(cfg Config, loader promRules.GroupLoader) (*Client, error) { +func NewLocalRulesClient(cfg Config, loader rules.GroupLoader) (*Client, error) { if cfg.Directory == "" { return nil, errors.New("directory required for local rules config") } diff --git a/pkg/ruler/rulestore/local/local_test.go b/pkg/ruler/rulestore/local/local_test.go index ce4899c3883..6a38003693f 100644 --- a/pkg/ruler/rulestore/local/local_test.go +++ b/pkg/ruler/rulestore/local/local_test.go @@ -9,11 +9,11 @@ import ( "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/pkg/rulefmt" - promRules "github.com/prometheus/prometheus/rules" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" + "github.com/cortexproject/cortex/pkg/ruler/rulefmt" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulespb" ) @@ -69,7 +69,7 @@ func TestClient_LoadAllRuleGroups(t *testing.T) { client, err := NewLocalRulesClient(Config{ Directory: dir, - }, promRules.FileLoader{}) + }, rules.FileLoader{}) require.NoError(t, err) ctx := context.Background() diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go index f66d689866a..9355a61aa37 100644 --- a/pkg/ruler/storage.go +++ b/pkg/ruler/storage.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - promRules "github.com/prometheus/prometheus/rules" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/aws" @@ -17,6 +16,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/openstack" "github.com/cortexproject/cortex/pkg/configs/client" configClient "github.com/cortexproject/cortex/pkg/configs/client" + "github.com/cortexproject/cortex/pkg/ruler/rules" "github.com/cortexproject/cortex/pkg/ruler/rulestore" "github.com/cortexproject/cortex/pkg/ruler/rulestore/bucketclient" "github.com/cortexproject/cortex/pkg/ruler/rulestore/configdb" @@ -75,13 +75,13 @@ func (cfg *RuleStoreConfig) IsDefaults() bool { // NewLegacyRuleStore returns a rule store backend client based on the provided cfg. // The client used by the function is based a legacy object store clients that shouldn't // be used anymore. -func NewLegacyRuleStore(cfg RuleStoreConfig, loader promRules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) { +func NewLegacyRuleStore(cfg RuleStoreConfig, loader rules.GroupLoader, logger log.Logger) (rulestore.RuleStore, error) { if cfg.mock != nil { return cfg.mock, nil } if loader == nil { - loader = promRules.FileLoader{} + loader = rules.FileLoader{} } var err error @@ -116,7 +116,7 @@ func NewLegacyRuleStore(cfg RuleStoreConfig, loader promRules.GroupLoader, logge } // NewRuleStore returns a rule store backend client based on the provided cfg. -func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader promRules.GroupLoader, logger log.Logger, reg prometheus.Registerer) (rulestore.RuleStore, error) { +func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.TenantConfigProvider, loader rules.GroupLoader, logger log.Logger, reg prometheus.Registerer) (rulestore.RuleStore, error) { if cfg.Backend == configdb.Name { c, err := client.New(cfg.ConfigDB) diff --git a/pkg/ruler/store_mock_test.go b/pkg/ruler/store_mock_test.go index 965e072fab0..9cd7095afe0 100644 --- a/pkg/ruler/store_mock_test.go +++ b/pkg/ruler/store_mock_test.go @@ -110,6 +110,25 @@ var ( }, }, } + + mockFederatedRules = map[string]rulespb.RuleGroupList{ + "federated_user": { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "namespace1", + User: "federated_user", + Rules: []*rulespb.RuleDesc{ + { + Record: "UP_RULE", + Expr: "up", + SrcTenants: "src1|src2", + DestTenant: "dstTenant", + }, + }, + Interval: interval, + }, + }, + } ) func newMockRuleStore(rules map[string]rulespb.RuleGroupList) *mockRuleStore {