Skip to content

Commit 452e144

Browse files
authored
Make the number of workers processing federated query configurable (#6449)
* Add number tenants per query histogram Signed-off-by: SungJin1212 <[email protected]> * Make the number of worker processing federated query configurable Signed-off-by: SungJin1212 <[email protected]> --------- Signed-off-by: SungJin1212 <[email protected]>
1 parent ea848b6 commit 452e144

File tree

6 files changed

+153
-38
lines changed

6 files changed

+153
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1919
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
2020
* [FEATURE] Ruler: Add support for per-user external labels #6340
21+
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
2122
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
2223
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
2324
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ tenant_federation:
157157
# CLI flag: -tenant-federation.enabled
158158
[enabled: <boolean> | default = false]
159159

160+
# The number of workers used to process each federated query.
161+
# CLI flag: -tenant-federation.max-concurrent
162+
[max_concurrent: <int> | default = 16]
163+
160164
# The ruler_config configures the Cortex ruler.
161165
[ruler: <ruler_config>]
162166

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) {
273273
// single tenant. This allows for a less impactful enabling of tenant
274274
// federation.
275275
byPassForSingleQuerier := true
276-
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier))
276+
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer))
277277
}
278278
return nil, nil
279279
}

pkg/querier/tenantfederation/merge_queryable.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"strings"
88

99
"github.com/pkg/errors"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/promauto"
1012
"github.com/prometheus/prometheus/model/labels"
1113
"github.com/prometheus/prometheus/storage"
1214
"github.com/prometheus/prometheus/tsdb/chunkenc"
@@ -19,9 +21,9 @@ import (
1921
)
2022

2123
const (
22-
defaultTenantLabel = "__tenant_id__"
23-
retainExistingPrefix = "original_"
24-
maxConcurrency = 16
24+
defaultTenantLabel = "__tenant_id__"
25+
retainExistingPrefix = "original_"
26+
defaultMaxConcurrency = 16
2527
)
2628

2729
// NewQueryable returns a queryable that iterates through all the tenant IDs
@@ -36,8 +38,8 @@ const (
3638
// If the label "__tenant_id__" is already existing, its value is overwritten
3739
// by the tenant ID and the previous value is exposed through a new label
3840
// prefixed with "original_". This behaviour is not implemented recursively.
39-
func NewQueryable(upstream storage.Queryable, byPassWithSingleQuerier bool) storage.Queryable {
40-
return NewMergeQueryable(defaultTenantLabel, tenantQuerierCallback(upstream), byPassWithSingleQuerier)
41+
func NewQueryable(upstream storage.Queryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
42+
return NewMergeQueryable(defaultTenantLabel, maxConcurrent, tenantQuerierCallback(upstream), byPassWithSingleQuerier, reg)
4143
}
4244

4345
func tenantQuerierCallback(queryable storage.Queryable) MergeQuerierCallback {
@@ -79,29 +81,41 @@ type MergeQuerierCallback func(ctx context.Context, mint int64, maxt int64) (ids
7981
// If the label `idLabelName` is already existing, its value is overwritten and
8082
// the previous value is exposed through a new label prefixed with "original_".
8183
// This behaviour is not implemented recursively.
82-
func NewMergeQueryable(idLabelName string, callback MergeQuerierCallback, byPassWithSingleQuerier bool) storage.Queryable {
84+
func NewMergeQueryable(idLabelName string, maxConcurrent int, callback MergeQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.Queryable {
8385
return &mergeQueryable{
8486
idLabelName: idLabelName,
87+
maxConcurrent: maxConcurrent,
8588
callback: callback,
8689
byPassWithSingleQuerier: byPassWithSingleQuerier,
90+
91+
tenantsPerQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
92+
Namespace: "cortex",
93+
Name: "querier_federated_tenants_per_query",
94+
Help: "Number of tenants per query.",
95+
Buckets: []float64{1, 2, 4, 8, 16, 32, 64},
96+
}),
8797
}
8898
}
8999

90100
type mergeQueryable struct {
91101
idLabelName string
102+
maxConcurrent int
92103
byPassWithSingleQuerier bool
93104
callback MergeQuerierCallback
105+
tenantsPerQuery prometheus.Histogram
94106
}
95107

96108
// Querier returns a new mergeQuerier, which aggregates results from multiple
97109
// underlying queriers into a single result.
98110
func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error) {
99111
return &mergeQuerier{
100112
idLabelName: m.idLabelName,
113+
maxConcurrent: m.maxConcurrent,
101114
mint: mint,
102115
maxt: maxt,
103116
byPassWithSingleQuerier: m.byPassWithSingleQuerier,
104117
callback: m.callback,
118+
tenantsPerQuery: m.tenantsPerQuery,
105119
}, nil
106120
}
107121

@@ -112,11 +126,13 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error
112126
// the previous value is exposed through a new label prefixed with "original_".
113127
// This behaviour is not implemented recursively
114128
type mergeQuerier struct {
115-
idLabelName string
116-
mint, maxt int64
117-
callback MergeQuerierCallback
129+
idLabelName string
130+
mint, maxt int64
131+
callback MergeQuerierCallback
132+
maxConcurrent int
118133

119134
byPassWithSingleQuerier bool
135+
tenantsPerQuery prometheus.Histogram
120136
}
121137

122138
// LabelValues returns all potential values for a label name. It is not safe
@@ -130,6 +146,8 @@ func (m *mergeQuerier) LabelValues(ctx context.Context, name string, hints *stor
130146
return nil, nil, err
131147
}
132148

149+
m.tenantsPerQuery.Observe(float64(len(ids)))
150+
133151
// by pass when only single querier is returned
134152
if m.byPassWithSingleQuerier && len(queriers) == 1 {
135153
return queriers[0].LabelValues(ctx, name, hints, matchers...)
@@ -169,6 +187,8 @@ func (m *mergeQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints
169187
return nil, nil, err
170188
}
171189

190+
m.tenantsPerQuery.Observe(float64(len(ids)))
191+
172192
// by pass when only single querier is returned
173193
if m.byPassWithSingleQuerier && len(queriers) == 1 {
174194
return queriers[0].LabelNames(ctx, hints, matchers...)
@@ -257,7 +277,7 @@ func (m *mergeQuerier) mergeDistinctStringSliceWithTenants(ctx context.Context,
257277
return nil
258278
}
259279

260-
err := concurrency.ForEach(ctx, jobs, maxConcurrency, run)
280+
err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run)
261281
if err != nil {
262282
return nil, nil, err
263283
}
@@ -309,6 +329,8 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
309329
return storage.ErrSeriesSet(err)
310330
}
311331

332+
m.tenantsPerQuery.Observe(float64(len(ids)))
333+
312334
// by pass when only single querier is returned
313335
if m.byPassWithSingleQuerier && len(queriers) == 1 {
314336
return queriers[0].Select(ctx, sortSeries, hints, matchers...)
@@ -352,7 +374,7 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
352374
return nil
353375
}
354376

355-
if err := concurrency.ForEach(ctx, jobs, maxConcurrency, run); err != nil {
377+
if err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run); err != nil {
356378
return storage.ErrSeriesSet(err)
357379
}
358380

0 commit comments

Comments
 (0)