Skip to content

Commit fb55863

Browse files
committed
Add max tenant config to tenant federation
Signed-off-by: SungJin1212 <[email protected]>
1 parent 2e1700e commit fb55863

File tree

8 files changed

+138
-8
lines changed

8 files changed

+138
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461
2323
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
2424
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
25+
* [ENHANCEMENT] Query Frontend: Add a flag `-tenant-federation.max-tenant` to limit the number of tenants for federated query. #6493
2526
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
2627
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2728
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ tenant_federation:
161161
# CLI flag: -tenant-federation.max-concurrent
162162
[max_concurrent: <int> | default = 16]
163163

164+
# A maximum number of tenants to query at once. 0 means no limit.
165+
# CLI flag: -tenant-federation.max-tenant
166+
[max_tenant: <int> | default = 0]
167+
164168
# The ruler_config configures the Cortex ruler.
165169
[ruler: <ruler_config>]
166170

pkg/api/api.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/cortexproject/cortex/pkg/ingester/client"
3333
"github.com/cortexproject/cortex/pkg/purger"
3434
"github.com/cortexproject/cortex/pkg/querier"
35+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
3536
"github.com/cortexproject/cortex/pkg/ring"
3637
"github.com/cortexproject/cortex/pkg/ruler"
3738
"github.com/cortexproject/cortex/pkg/scheduler"
@@ -136,7 +137,7 @@ type API struct {
136137
corsOrigin *regexp.Regexp
137138
}
138139

139-
func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
140+
func New(cfg Config, tenantFederationCfg tenantfederation.Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
140141
// Ensure the encoded path is used. Required for the rules API
141142
s.HTTP.UseEncodedPath()
142143

@@ -169,6 +170,11 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
169170
if cfg.HTTPAuthMiddleware == nil {
170171
api.AuthMiddleware = middleware.AuthenticateUser
171172
}
173+
174+
if tenantFederationCfg.Enabled {
175+
api.AuthMiddleware = middleware.Merge(api.AuthMiddleware, tenantFederationMiddleWare(tenantFederationCfg.MaxTenant))
176+
}
177+
172178
if len(cfg.HTTPRequestHeadersToLog) > 0 {
173179
api.HTTPHeaderMiddleware = &HTTPHeaderMiddleware{TargetHeaders: cfg.HTTPRequestHeadersToLog}
174180
}

pkg/api/api_test.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/prometheus/prometheus/model/labels"
1313
"github.com/stretchr/testify/require"
1414
"github.com/weaveworks/common/server"
15+
16+
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
1517
)
1618

1719
const (
@@ -34,7 +36,9 @@ func TestNewApiWithoutSourceIPExtractor(t *testing.T) {
3436
server, err := server.New(serverCfg)
3537
require.NoError(t, err)
3638

37-
api, err := New(cfg, serverCfg, server, &FakeLogger{})
39+
tenantFederationCfg := tenantfederation.Config{}
40+
41+
api, err := New(cfg, tenantFederationCfg, serverCfg, server, &FakeLogger{})
3842
require.NoError(t, err)
3943
require.Nil(t, api.sourceIPs)
4044
}
@@ -49,7 +53,9 @@ func TestNewApiWithSourceIPExtractor(t *testing.T) {
4953
server, err := server.New(serverCfg)
5054
require.NoError(t, err)
5155

52-
api, err := New(cfg, serverCfg, server, &FakeLogger{})
56+
tenantFederationCfg := tenantfederation.Config{}
57+
58+
api, err := New(cfg, tenantFederationCfg, serverCfg, server, &FakeLogger{})
5359
require.NoError(t, err)
5460
require.NotNil(t, api.sourceIPs)
5561
}
@@ -67,7 +73,9 @@ func TestNewApiWithInvalidSourceIPExtractor(t *testing.T) {
6773
MetricsNamespace: "with_invalid_source_ip_extractor",
6874
}
6975

70-
api, err := New(cfg, serverCfg, &s, &FakeLogger{})
76+
tenantFederationCfg := tenantfederation.Config{}
77+
78+
api, err := New(cfg, tenantFederationCfg, serverCfg, &s, &FakeLogger{})
7179
require.Error(t, err)
7280
require.Nil(t, api)
7381
}
@@ -83,7 +91,9 @@ func TestNewApiWithHeaderLogging(t *testing.T) {
8391
server, err := server.New(serverCfg)
8492
require.NoError(t, err)
8593

86-
api, err := New(cfg, serverCfg, server, &FakeLogger{})
94+
tenantFederationCfg := tenantfederation.Config{}
95+
96+
api, err := New(cfg, tenantFederationCfg, serverCfg, server, &FakeLogger{})
8797
require.NoError(t, err)
8898
require.NotNil(t, api.HTTPHeaderMiddleware)
8999

@@ -100,7 +110,9 @@ func TestNewApiWithoutHeaderLogging(t *testing.T) {
100110
server, err := server.New(serverCfg)
101111
require.NoError(t, err)
102112

103-
api, err := New(cfg, serverCfg, server, &FakeLogger{})
113+
tenantFederationCfg := tenantfederation.Config{}
114+
115+
api, err := New(cfg, tenantFederationCfg, serverCfg, server, &FakeLogger{})
104116
require.NoError(t, err)
105117
require.Nil(t, api.HTTPHeaderMiddleware)
106118

@@ -157,7 +169,10 @@ func Benchmark_Compression(b *testing.B) {
157169

158170
server, err := server.New(serverCfg)
159171
require.NoError(b, err)
160-
api, err := New(cfg, serverCfg, server, &FakeLogger{})
172+
173+
tenantFederationCfg := tenantfederation.Config{}
174+
175+
api, err := New(cfg, tenantFederationCfg, serverCfg, server, &FakeLogger{})
161176
require.NoError(b, err)
162177

163178
labels := labels.ScratchBuilder{}

pkg/api/middlewares.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,19 @@ package api
22

33
import (
44
"context"
5+
"fmt"
56
"net/http"
67

8+
"github.com/weaveworks/common/middleware"
9+
10+
"github.com/cortexproject/cortex/pkg/tenant"
711
util_log "github.com/cortexproject/cortex/pkg/util/log"
812
)
913

14+
const (
15+
errTooManyTenants = "too many tenants, max: %d, actual: %d"
16+
)
17+
1018
// HTTPHeaderMiddleware adds specified HTTPHeaders to the request context
1119
type HTTPHeaderMiddleware struct {
1220
TargetHeaders []string
@@ -38,3 +46,24 @@ func (h HTTPHeaderMiddleware) Wrap(next http.Handler) http.Handler {
3846
next.ServeHTTP(w, r.WithContext(ctx))
3947
})
4048
}
49+
50+
func tenantFederationMiddleWare(maxTenant int) middleware.Interface {
51+
return middleware.Func(func(next http.Handler) http.Handler {
52+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
53+
ctx := r.Context()
54+
55+
ids, err := tenant.TenantIDs(ctx)
56+
if err != nil {
57+
http.Error(w, err.Error(), http.StatusUnauthorized)
58+
return
59+
}
60+
61+
if maxTenant > 0 && len(ids) > maxTenant {
62+
http.Error(w, fmt.Errorf(errTooManyTenants, maxTenant, len(ids)).Error(), http.StatusBadRequest)
63+
return
64+
}
65+
66+
next.ServeHTTP(w, r.WithContext(ctx))
67+
})
68+
})
69+
}

pkg/api/middlewares_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@ package api
22

33
import (
44
"context"
5+
"io"
56
"net/http"
7+
"net/http/httptest"
68
"testing"
79

810
"github.com/stretchr/testify/require"
11+
"github.com/weaveworks/common/middleware"
912

13+
"github.com/cortexproject/cortex/pkg/tenant"
1014
util_log "github.com/cortexproject/cortex/pkg/util/log"
1115
)
1216

@@ -72,3 +76,71 @@ func TestExistingHeaderInContextIsNotOverridden(t *testing.T) {
7276
require.Equal(t, contentsMap, util_log.HeaderMapFromContext(ctx))
7377

7478
}
79+
80+
func TestTenantFederationMiddleWare(t *testing.T) {
81+
// set a multi tenant resolver
82+
tenant.WithDefaultResolver(tenant.NewMultiResolver())
83+
84+
tests := []struct {
85+
name string
86+
maxTenant int
87+
orgId string
88+
expectedStatusCode int
89+
expectedErrMsg string
90+
}{
91+
{
92+
name: "less than max tenant",
93+
maxTenant: 3,
94+
orgId: "org1|org2",
95+
expectedStatusCode: http.StatusOK,
96+
},
97+
{
98+
name: "equal to max tenant",
99+
maxTenant: 2,
100+
orgId: "org1|org2",
101+
expectedStatusCode: http.StatusOK,
102+
},
103+
{
104+
name: "exceeds max tenant",
105+
maxTenant: 2,
106+
orgId: "org1|org2|org3",
107+
expectedStatusCode: http.StatusBadRequest,
108+
expectedErrMsg: "too many tenants, max: 2, actual: 3",
109+
},
110+
{
111+
name: "no org Id",
112+
maxTenant: 0,
113+
orgId: "",
114+
expectedStatusCode: http.StatusUnauthorized,
115+
expectedErrMsg: "no org id",
116+
},
117+
{
118+
name: "no limit",
119+
maxTenant: 0,
120+
orgId: "org1|org2|org3",
121+
expectedStatusCode: http.StatusOK,
122+
},
123+
}
124+
125+
for _, test := range tests {
126+
t.Run(test.name, func(t *testing.T) {
127+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})
128+
129+
middlerwares := middleware.Merge(middleware.AuthenticateUser, tenantFederationMiddleWare(test.maxTenant)).Wrap(handler)
130+
131+
req := httptest.NewRequest(http.MethodGet, "/", nil)
132+
req.Header.Set("X-Scope-OrgId", test.orgId)
133+
resp := httptest.NewRecorder()
134+
135+
middlerwares.ServeHTTP(resp, req)
136+
137+
body, err := io.ReadAll(resp.Body)
138+
require.NoError(t, err)
139+
require.Equal(t, test.expectedStatusCode, resp.Code)
140+
141+
if test.expectedErrMsg != "" {
142+
require.Contains(t, string(body), test.expectedErrMsg)
143+
}
144+
})
145+
}
146+
}

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func (t *Cortex) initAPI() (services.Service, error) {
100100
t.Cfg.API.ServerPrefix = t.Cfg.Server.PathPrefix
101101
t.Cfg.API.LegacyHTTPPrefix = t.Cfg.HTTPPrefix
102102

103-
a, err := api.New(t.Cfg.API, t.Cfg.Server, t.Server, util_log.Logger)
103+
a, err := api.New(t.Cfg.API, t.Cfg.TenantFederation, t.Cfg.Server, t.Server, util_log.Logger)
104104
if err != nil {
105105
return nil, err
106106
}

pkg/querier/tenantfederation/tenant_federation.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ type Config struct {
99
Enabled bool `yaml:"enabled"`
1010
// MaxConcurrent The number of workers used for processing federated query.
1111
MaxConcurrent int `yaml:"max_concurrent"`
12+
// MaxTenant A maximum number of tenants to query at once.
13+
MaxTenant int `yaml:"max_tenant"`
1214
}
1315

1416
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
1517
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
1618
f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.")
19+
f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.")
1720
}

0 commit comments

Comments
 (0)