Skip to content

Federated ruler draft #4520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ type UserConfig struct {
func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), am.logger)

userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

cfg, err := am.store.GetAlertConfig(r.Context(), userID)
if err != nil {
Expand Down Expand Up @@ -95,12 +96,13 @@ func (am *MultitenantAlertmanager) GetUserConfig(w http.ResponseWriter, r *http.

func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), am.logger)
userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

var input io.Reader
maxConfigSize := am.limits.AlertmanagerMaxConfigSize(userID)
Expand Down Expand Up @@ -155,12 +157,13 @@ func (am *MultitenantAlertmanager) SetUserConfig(w http.ResponseWriter, r *http.
// Note that if no config exists for a user, StatusOK is returned.
func (am *MultitenantAlertmanager) DeleteUserConfig(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), am.logger)
userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
level.Error(logger).Log("msg", errNoOrgID, "err", err.Error())
http.Error(w, fmt.Sprintf("%s: %s", errNoOrgID, err.Error()), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

err = am.store.DeleteAlertConfig(r.Context(), userID)
if err != nil {
Expand Down
20 changes: 19 additions & 1 deletion pkg/alertmanager/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,24 @@ route:
- '...'
continue: false
receivers:
- name: route1
webhook_configs:
- send_resolved: true
http_config: {}
url: http://alertmanager/api/notifications?orgId=1&rrid=7
max_alerts: 0
`,
},
"user1|user2|user3": {
AlertmanagerConfig: `
global:
resolve_timeout: 5m
route:
receiver: route1
group_by:
- '...'
continue: false
receivers:
- name: route1
webhook_configs:
- send_resolved: true
Expand Down Expand Up @@ -693,7 +711,7 @@ receivers:

err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic)
require.NoError(t, err)
require.Len(t, am.alertmanagers, 2)
require.Len(t, am.alertmanagers, 3)

router := mux.NewRouter()
router.Path("/multitenant_alertmanager/configs").Methods(http.MethodGet).HandlerFunc(am.ListAllConfigs)
Expand Down
3 changes: 2 additions & 1 deletion pkg/alertmanager/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request)
d.requestsInFlight.Add(1)
defer d.requestsInFlight.Done()

userID, err := tenant.TenantID(r.Context())
tenantIDs, err := tenant.TenantIDs(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)

logger := util_log.WithContext(r.Context(), d.logger)

Expand Down
9 changes: 6 additions & 3 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,11 +1004,12 @@ func (am *MultitenantAlertmanager) HandleRequest(ctx context.Context, in *httpgr

// serveRequest serves the Alertmanager's web UI and API.
func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http.Request) {
userID, err := tenant.TenantID(req.Context())
tenantIDs, err := tenant.TenantIDs(req.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
userID := tenant.JoinTenantIDs(tenantIDs)
am.alertmanagersMtx.Lock()
userAM, ok := am.alertmanagers[userID]
am.alertmanagersMtx.Unlock()
Expand Down Expand Up @@ -1149,10 +1150,11 @@ func (am *MultitenantAlertmanager) ReadFullStateForUser(ctx context.Context, use

// UpdateState implements the Alertmanager service.
func (am *MultitenantAlertmanager) UpdateState(ctx context.Context, part *clusterpb.Part) (*alertmanagerpb.UpdateStateResponse, error) {
userID, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)

am.alertmanagersMtx.Lock()
userAM, ok := am.alertmanagers[userID]
Expand Down Expand Up @@ -1255,10 +1257,11 @@ func (am *MultitenantAlertmanager) getPerUserDirectories() map[string]string {

// UpdateState implements the Alertmanager service.
func (am *MultitenantAlertmanager) ReadState(ctx context.Context, req *alertmanagerpb.ReadStateRequest) (*alertmanagerpb.ReadStateResponse, error) {
userID, err := tenant.TenantID(ctx)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
userID := tenant.JoinTenantIDs(tenantIDs)

am.alertmanagersMtx.Lock()
userAM, ok := am.alertmanagers[userID]
Expand Down
2 changes: 1 addition & 1 deletion pkg/configs/userconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"

legacy_promql "github.com/cortexproject/cortex/pkg/configs/legacy_promql"
"github.com/cortexproject/cortex/pkg/ruler/rulefmt"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/configs/userconfig/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/rulefmt"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/rules"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

"github.com/cortexproject/cortex/pkg/ruler/rulefmt"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down
7 changes: 4 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/discovery/dns"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
Expand All @@ -45,6 +44,7 @@ import (
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/ruler/rules"
"github.com/cortexproject/cortex/pkg/scheduler"
"github.com/cortexproject/cortex/pkg/storegateway"
util_log "github.com/cortexproject/cortex/pkg/util/log"
Expand Down Expand Up @@ -238,7 +238,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
// single tenant. This allows for a less impactful enabling of tenant
// federation.
byPassForSingleQuerier := true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier))
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, t.Cfg.TenantFederation))
}
return nil, nil
}
Expand Down Expand Up @@ -662,8 +662,9 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)
mq := tenantfederation.NewQueryable(queryable, false, t.Cfg.TenantFederation)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, mq, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return nil, err
Expand Down
26 changes: 17 additions & 9 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,23 +964,31 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through

// MetricsMetadata returns all metric metadata of a user.
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
userIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

req := &ingester_client.MetricsMetadataRequest{}
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
return nil, err
var userResps []interface{}
for _, userID := range userIDs {
ctx = user.InjectOrgID(ctx, userID)
replicationSet, err := d.GetIngestersForMetadata(ctx)
if err != nil {
return nil, err
}
req := &ingester_client.MetricsMetadataRequest{}
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
return client.MetricsMetadata(ctx, req)
})
if err != nil {
return nil, err
}
userResps = append(userResps, resps...)
}

result := []scrape.MetricMetadata{}
dedupTracker := map[cortexpb.MetricMetadata]struct{}{}
for _, resp := range resps {
for _, resp := range userResps {
r := resp.(*ingester_client.MetricsMetadataResponse)
for _, m := range r.Metadata {
// Given we look across all ingesters - dedup the metadata.
Expand Down
31 changes: 17 additions & 14 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
const (
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
maxConcurrency = 16
)

// NewQueryable returns a queryable that iterates through all the tenant IDs
Expand All @@ -36,8 +35,8 @@ const (
// If the label "__tenant_id__" is already existing, its value is overwritten
// by the tenant ID and the previous value is exposed through a new label
// prefixed with "original_". This behaviour is not implemented recursively.
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier)
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool, cfg Config) storage.Queryable {
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier, cfg)
}

func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback {
Expand Down Expand Up @@ -80,18 +79,20 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids
// If the label `idLabelName` is already existing, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable {
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool, cfg Config) storage.Queryable {
return &mergeQueryable{
idLabelName: idLabelName,
callback: callback,
byPassWithSingleQuerier: byPassWithSingleQuerier,
cfg: cfg,
}
}

type mergeQueryable struct {
idLabelName string
byPassWithSingleQuerier bool
callback MergeQuerierCallback
cfg Config
}

// Querier returns a new mergeQuerier, which aggregates results from multiple
Expand All @@ -111,10 +112,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
}

return &mergeQuerier{
ctx: ctx,
idLabelName: m.idLabelName,
queriers: queriers,
ids: ids,
ctx: ctx,
idLabelName: m.idLabelName,
queriers: queriers,
ids: ids,
maxConcurrency: m.cfg.MaxConcurrency,
}, nil
}

Expand All @@ -125,10 +127,11 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively
type mergeQuerier struct {
ctx context.Context
queriers []storage.Querier
idLabelName string
ids []string
ctx context.Context
queriers []storage.Querier
idLabelName string
ids []string
maxConcurrency int
}

// LabelValues returns all potential values for a label name. It is not safe
Expand Down Expand Up @@ -248,7 +251,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(f stringSliceFunc, te
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(m.ctx, jobs, m.maxConcurrency, run)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -334,7 +337,7 @@ func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
return nil
}

err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
err := concurrency.ForEach(ctx, jobs, m.maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/querier/tenantfederation/merge_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ type mergeQueryableScenario struct {

func (s *mergeQueryableScenario) init() (storage.Querier, error) {
// initialize with default tenant label
q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier)
q := NewQueryable(&s.queryable, !s.doNotByPassSingleQuerier, Config{
MaxConcurrency: 16,
})

// inject tenants into context
ctx := context.Background()
Expand Down Expand Up @@ -334,7 +336,9 @@ type labelValuesScenario struct {
func TestMergeQueryable_Querier(t *testing.T) {
t.Run("querying without a tenant specified should error", func(t *testing.T) {
queryable := &mockTenantQueryableWithFilter{}
q := NewQueryable(queryable, false /* byPassWithSingleQuerier */)
q := NewQueryable(queryable, false /* bypasswithsinglequerier */, Config{
MaxConcurrency: 16,
})
// Create a context with no tenant specified.
ctx := context.Background()

Expand Down Expand Up @@ -873,7 +877,9 @@ func TestTracingMergeQueryable(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())
filter := mockTenantQueryableWithFilter{}
q := NewQueryable(&filter, false)
q := NewQueryable(&filter, false, Config{
MaxConcurrency: 16,
})
// retrieve querier if set
querier, err := q.Querier(ctx, mint, maxt)
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion pkg/querier/tenantfederation/tenant_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (

type Config struct {
// Enabled switches on support for multi tenant query federation
Enabled bool `yaml:"enabled"`
Enabled bool `yaml:"enabled"`
MaxConcurrency int `yaml:"max-concurrency"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
f.IntVar(&cfg.MaxConcurrency, "tenant-federation.max-concurrency", 16, "Maximum concurrent federated sub queries used when evaluating a federated query")
}
Loading