Skip to content

Commit d2a0b02

Browse files
authored
Add new query inflight request on ingester (#6081)
* Add new query infligh request on ingester Signed-off-by: Daniel Deluiggi <[email protected]> * Fix test naming Signed-off-by: Daniel Deluiggi <[email protected]> * Add test for specif err interface Signed-off-by: Daniel Deluiggi <[email protected]> --------- Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 1ba4bca commit d2a0b02

File tree

6 files changed

+138
-26
lines changed

6 files changed

+138
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [FEATURE] Ingester: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986
99
* [FEATURE] Query Frontend: Added a query rejection mechanism to block resource-intensive queries. #6005
1010
* [FEATURE] OTLP: Support ingesting OTLP exponential metrics as native histograms. #6071
11+
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
1112
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
1213
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
1314
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2961,6 +2961,11 @@ instance_limits:
29612961
# CLI flag: -ingester.instance-limits.max-inflight-push-requests
29622962
[max_inflight_push_requests: <int> | default = 0]
29632963
2964+
# Max inflight query requests that this ingester can handle (across all
2965+
# tenants). Additional requests will be rejected. 0 = unlimited.
2966+
# CLI flag: -ingester.instance-limits.max-inflight-query-requests
2967+
[max_inflight_query_requests: <int> | default = 0]
2968+
29642969
# Comma-separated list of metric names, for which
29652970
# -ingester.max-series-per-metric and -ingester.max-global-series-per-metric
29662971
# limits will be ignored. Does not affect max-series-per-user or

pkg/ingester/ingester.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
155155
f.Int64Var(&cfg.DefaultLimits.MaxInMemoryTenants, "ingester.instance-limits.max-tenants", 0, "Max users that this ingester can hold. Requests from additional users will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
156156
f.Int64Var(&cfg.DefaultLimits.MaxInMemorySeries, "ingester.instance-limits.max-series", 0, "Max series that this ingester can hold (across all tenants). Requests to create additional series will be rejected. This limit only works when using blocks engine. 0 = unlimited.")
157157
f.Int64Var(&cfg.DefaultLimits.MaxInflightPushRequests, "ingester.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")
158+
f.Int64Var(&cfg.DefaultLimits.MaxInflightQueryRequests, "ingester.instance-limits.max-inflight-query-requests", 0, "Max inflight query requests that this ingester can handle (across all tenants). Additional requests will be rejected. 0 = unlimited.")
158159

159160
f.StringVar(&cfg.IgnoreSeriesLimitForMetricNames, "ingester.ignore-series-limit-for-metric-names", "", "Comma-separated list of metric names, for which -ingester.max-series-per-metric and -ingester.max-global-series-per-metric limits will be ignored. Does not affect max-series-per-user or max-global-series-per-metric limits.")
160161

@@ -1401,9 +1402,6 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
14011402
return nil, err
14021403
}
14031404

1404-
c := i.trackInflightQueryRequest()
1405-
defer c()
1406-
14071405
userID, err := tenant.TenantID(ctx)
14081406
if err != nil {
14091407
return nil, err
@@ -1426,8 +1424,15 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
14261424
return nil, err
14271425
}
14281426

1427+
// We will report *this* request in the error too.
1428+
c, err := i.trackInflightQueryRequest()
1429+
if err != nil {
1430+
return nil, err
1431+
}
1432+
14291433
// It's not required to sort series from a single ingester because series are sorted by the Exemplar Storage before returning from Select.
14301434
res, err := q.Select(from, through, matchers...)
1435+
c()
14311436
if err != nil {
14321437
return nil, err
14331438
}
@@ -1452,17 +1457,13 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
14521457

14531458
// LabelValues returns all label values that are associated with a given label name.
14541459
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
1455-
c := i.trackInflightQueryRequest()
1456-
defer c()
14571460
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
14581461
defer cleanup()
14591462
return resp, err
14601463
}
14611464

14621465
// LabelValuesStream returns all label values that are associated with a given label name.
14631466
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
1464-
c := i.trackInflightQueryRequest()
1465-
defer c()
14661467
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
14671468
defer cleanup()
14681469

@@ -1525,6 +1526,11 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
15251526
q.Close()
15261527
}
15271528

1529+
c, err := i.trackInflightQueryRequest()
1530+
if err != nil {
1531+
return nil, cleanup, err
1532+
}
1533+
defer c()
15281534
vals, _, err := q.LabelValues(ctx, labelName, matchers...)
15291535
if err != nil {
15301536
return nil, cleanup, err
@@ -1537,17 +1543,13 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
15371543

15381544
// LabelNames return all the label names.
15391545
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
1540-
c := i.trackInflightQueryRequest()
1541-
defer c()
15421546
resp, cleanup, err := i.labelNamesCommon(ctx, req)
15431547
defer cleanup()
15441548
return resp, err
15451549
}
15461550

15471551
// LabelNamesStream return all the label names.
15481552
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
1549-
c := i.trackInflightQueryRequest()
1550-
defer c()
15511553
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
15521554
defer cleanup()
15531555

@@ -1605,6 +1607,11 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
16051607
q.Close()
16061608
}
16071609

1610+
c, err := i.trackInflightQueryRequest()
1611+
if err != nil {
1612+
return nil, cleanup, err
1613+
}
1614+
defer c()
16081615
names, _, err := q.LabelNames(ctx)
16091616
if err != nil {
16101617
return nil, cleanup, err
@@ -1831,9 +1838,6 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
18311838
return err
18321839
}
18331840

1834-
c := i.trackInflightQueryRequest()
1835-
defer c()
1836-
18371841
spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream")
18381842
defer spanlog.Finish()
18391843

@@ -1879,11 +1883,18 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
18791883
return nil
18801884
}
18811885

1882-
func (i *Ingester) trackInflightQueryRequest() func() {
1886+
func (i *Ingester) trackInflightQueryRequest() (func(), error) {
1887+
gl := i.getInstanceLimits()
1888+
if gl != nil && gl.MaxInflightQueryRequests > 0 {
1889+
if i.inflightQueryRequests.Load() >= gl.MaxInflightQueryRequests {
1890+
return nil, errTooManyInflightQueryRequests
1891+
}
1892+
}
1893+
18831894
i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc())
18841895
return func() {
18851896
i.inflightQueryRequests.Dec()
1886-
}
1897+
}, nil
18871898
}
18881899

18891900
// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
@@ -1894,8 +1905,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
18941905
}
18951906
defer q.Close()
18961907

1908+
c, err := i.trackInflightQueryRequest()
1909+
if err != nil {
1910+
return 0, 0, 0, err
1911+
}
18971912
// It's not required to return sorted series because series are sorted by the Cortex querier.
18981913
ss := q.Select(ctx, false, nil, matchers...)
1914+
c()
18991915
if ss.Err() != nil {
19001916
return 0, 0, 0, ss.Err()
19011917
}

pkg/ingester/ingester_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2279,6 +2279,34 @@ func Test_Ingester_LabelValues(t *testing.T) {
22792279
}
22802280
}
22812281

2282+
func Test_Ingester_LabelValue_MaxInflightQueryRequest(t *testing.T) {
2283+
cfg := defaultIngesterTestConfig(t)
2284+
cfg.DefaultLimits.MaxInflightQueryRequests = 1
2285+
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
2286+
require.NoError(t, err)
2287+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
2288+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
2289+
2290+
// Wait until it's ACTIVE
2291+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
2292+
return i.lifecycler.GetState()
2293+
})
2294+
2295+
i.inflightQueryRequests.Add(1)
2296+
2297+
// Mock request
2298+
ctx := user.InjectOrgID(context.Background(), "test")
2299+
2300+
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
2301+
_, err = i.Push(ctx, wreq)
2302+
require.NoError(t, err)
2303+
2304+
rreq := &client.LabelValuesRequest{}
2305+
_, err = i.LabelValues(ctx, rreq)
2306+
require.Error(t, err)
2307+
require.Equal(t, err, errTooManyInflightQueryRequests)
2308+
}
2309+
22822310
func Test_Ingester_Query(t *testing.T) {
22832311
series := []struct {
22842312
lbls labels.Labels
@@ -2409,6 +2437,36 @@ func Test_Ingester_Query(t *testing.T) {
24092437
})
24102438
}
24112439
}
2440+
2441+
func Test_Ingester_Query_MaxInflightQueryRequest(t *testing.T) {
2442+
cfg := defaultIngesterTestConfig(t)
2443+
cfg.DefaultLimits.MaxInflightQueryRequests = 1
2444+
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
2445+
require.NoError(t, err)
2446+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
2447+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
2448+
2449+
// Wait until it's ACTIVE
2450+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
2451+
return i.lifecycler.GetState()
2452+
})
2453+
2454+
i.inflightQueryRequests.Add(1)
2455+
2456+
// Mock request
2457+
ctx := user.InjectOrgID(context.Background(), "test")
2458+
2459+
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
2460+
_, err = i.Push(ctx, wreq)
2461+
require.NoError(t, err)
2462+
2463+
rreq := &client.QueryRequest{}
2464+
s := &mockQueryStreamServer{ctx: ctx}
2465+
err = i.QueryStream(rreq, s)
2466+
require.Error(t, err)
2467+
require.Equal(t, err, errTooManyInflightQueryRequests)
2468+
}
2469+
24122470
func TestIngester_Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
24132471
i, err := prepareIngesterWithBlocksStorage(t, defaultIngesterTestConfig(t), prometheus.NewRegistry())
24142472
require.NoError(t, err)
@@ -4949,6 +5007,34 @@ func TestIngester_MaxExemplarsFallBack(t *testing.T) {
49495007
require.Equal(t, maxExemplars, int64(5))
49505008
}
49515009

5010+
func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) {
5011+
cfg := defaultIngesterTestConfig(t)
5012+
cfg.DefaultLimits.MaxInflightQueryRequests = 1
5013+
i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
5014+
require.NoError(t, err)
5015+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
5016+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
5017+
5018+
// Wait until it's ACTIVE
5019+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
5020+
return i.lifecycler.GetState()
5021+
})
5022+
5023+
i.inflightQueryRequests.Add(1)
5024+
5025+
// Mock request
5026+
ctx := user.InjectOrgID(context.Background(), "test")
5027+
5028+
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
5029+
_, err = i.Push(ctx, wreq)
5030+
require.NoError(t, err)
5031+
5032+
rreq := &client.ExemplarQueryRequest{}
5033+
_, err = i.QueryExemplars(ctx, rreq)
5034+
require.Error(t, err)
5035+
require.Equal(t, err, errTooManyInflightQueryRequests)
5036+
}
5037+
49525038
func generateSamplesForLabel(l labels.Labels, count int) *cortexpb.WriteRequest {
49535039
var lbls = make([]labels.Labels, 0, count)
49545040
var samples = make([]cortexpb.Sample, 0, count)

pkg/ingester/instance_limits.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,17 @@ var (
88
errMaxUsersLimitReached = errors.New("cannot create TSDB: ingesters's max tenants limit reached")
99
errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached")
1010
errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests in ingester")
11+
errTooManyInflightQueryRequests = errors.New("cannot push: too many inflight query requests in ingester")
1112
)
1213

1314
// InstanceLimits describes limits used by ingester. Reaching any of these will result in Push method to return
1415
// (internal) error.
1516
type InstanceLimits struct {
16-
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
17-
MaxInMemoryTenants int64 `yaml:"max_tenants"`
18-
MaxInMemorySeries int64 `yaml:"max_series"`
19-
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
17+
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
18+
MaxInMemoryTenants int64 `yaml:"max_tenants"`
19+
MaxInMemorySeries int64 `yaml:"max_series"`
20+
MaxInflightPushRequests int64 `yaml:"max_inflight_push_requests"`
21+
MaxInflightQueryRequests int64 `yaml:"max_inflight_query_requests"`
2022
}
2123

2224
// Sets default limit values for unmarshalling.

pkg/ingester/instance_limits_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ import (
99

1010
func TestInstanceLimitsUnmarshal(t *testing.T) {
1111
defaultInstanceLimits = &InstanceLimits{
12-
MaxIngestionRate: 10,
13-
MaxInMemoryTenants: 20,
14-
MaxInMemorySeries: 30,
15-
MaxInflightPushRequests: 40,
12+
MaxIngestionRate: 10,
13+
MaxInMemoryTenants: 20,
14+
MaxInMemorySeries: 30,
15+
MaxInflightPushRequests: 40,
16+
MaxInflightQueryRequests: 50,
1617
}
1718

1819
l := InstanceLimits{}
@@ -24,6 +25,7 @@ max_tenants: 50000
2425
require.NoError(t, yaml.UnmarshalStrict([]byte(input), &l))
2526
require.Equal(t, float64(125.678), l.MaxIngestionRate)
2627
require.Equal(t, int64(50000), l.MaxInMemoryTenants)
27-
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
28-
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
28+
require.Equal(t, int64(30), l.MaxInMemorySeries) // default value
29+
require.Equal(t, int64(40), l.MaxInflightPushRequests) // default value
30+
require.Equal(t, int64(50), l.MaxInflightQueryRequests) // default value
2931
}

0 commit comments

Comments
 (0)