Skip to content

Commit 14cb7ef

Browse files
committed
Adding cortex_ingester_inflight_query_requests metric to track the number of query being executed on ingesters
Signed-off-by: Alan Protasio <[email protected]>
1 parent eafc37a commit 14cb7ef

File tree

4 files changed

+193
-3
lines changed

4 files changed

+193
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* [ENHANCEMENT] Query Frontend/Scheduler: Time check in query priority now considers overall data select time window (including range selectors, modifiers and lookback delta). #5758
2424
* [ENHANCEMENT] Querier: Added `querier.store-gateway-query-stats-enabled` to enable or disable store gateway query stats log. #5749
2525
* [ENHANCEMENT] Upgrade to go 1.21.6. #5765
26+
* [ENHANCEMENT] Ingester: Add new ingester metric `cortex_ingester_inflight_query_requests`. #5798
2627
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
2728
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
2829
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734

pkg/ingester/ingester.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@ type Ingester struct {
200200
// Rate of pushed samples. Only used by V2-ingester to limit global samples push rate.
201201
ingestionRate *util_math.EwmaRate
202202
inflightPushRequests atomic.Int64
203+
204+
inflightQueryRequests atomic.Int64
205+
maxInflightQueryRequests atomic.Int64
203206
}
204207

205208
// Shipper interface is used to have an easy way to mock it in tests.
@@ -627,7 +630,13 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
627630
logger: logger,
628631
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
629632
}
630-
i.metrics = newIngesterMetrics(registerer, false, cfg.ActiveSeriesMetricsEnabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests)
633+
i.metrics = newIngesterMetrics(registerer,
634+
false,
635+
cfg.ActiveSeriesMetricsEnabled,
636+
i.getInstanceLimits,
637+
i.ingestionRate,
638+
&i.inflightPushRequests,
639+
&i.maxInflightQueryRequests)
631640

632641
// Replace specific metrics which we can't directly track but we need to read
633642
// them from the underlying system (ie. TSDB).
@@ -692,7 +701,14 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
692701
TSDBState: newTSDBState(bucketClient, registerer),
693702
logger: logger,
694703
}
695-
i.metrics = newIngesterMetrics(registerer, false, false, i.getInstanceLimits, nil, &i.inflightPushRequests)
704+
i.metrics = newIngesterMetrics(registerer,
705+
false,
706+
false,
707+
i.getInstanceLimits,
708+
nil,
709+
&i.inflightPushRequests,
710+
&i.maxInflightQueryRequests,
711+
)
696712

697713
i.TSDBState.shipperIngesterID = "flusher"
698714

@@ -1250,6 +1266,9 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
12501266
return nil, err
12511267
}
12521268

1269+
c := i.trackInflightQueryRequest()
1270+
defer c()
1271+
12531272
userID, err := tenant.TenantID(ctx)
12541273
if err != nil {
12551274
return nil, err
@@ -1322,6 +1341,9 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
13221341
return nil, err
13231342
}
13241343

1344+
c := i.trackInflightQueryRequest()
1345+
defer c()
1346+
13251347
userID, err := tenant.TenantID(ctx)
13261348
if err != nil {
13271349
return nil, err
@@ -1370,13 +1392,17 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
13701392

13711393
// LabelValues returns all label values that are associated with a given label name.
13721394
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
1395+
c := i.trackInflightQueryRequest()
1396+
defer c()
13731397
resp, cleanup, err := i.labelsValuesCommon(ctx, req)
13741398
defer cleanup()
13751399
return resp, err
13761400
}
13771401

13781402
// LabelValuesStream returns all label values that are associated with a given label name.
13791403
func (i *Ingester) LabelValuesStream(req *client.LabelValuesRequest, stream client.Ingester_LabelValuesStreamServer) error {
1404+
c := i.trackInflightQueryRequest()
1405+
defer c()
13801406
resp, cleanup, err := i.labelsValuesCommon(stream.Context(), req)
13811407
defer cleanup()
13821408

@@ -1451,13 +1477,17 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
14511477

14521478
// LabelNames return all the label names.
14531479
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
1480+
c := i.trackInflightQueryRequest()
1481+
defer c()
14541482
resp, cleanup, err := i.labelNamesCommon(ctx, req)
14551483
defer cleanup()
14561484
return resp, err
14571485
}
14581486

14591487
// LabelNamesStream return all the label names.
14601488
func (i *Ingester) LabelNamesStream(req *client.LabelNamesRequest, stream client.Ingester_LabelNamesStreamServer) error {
1489+
c := i.trackInflightQueryRequest()
1490+
defer c()
14611491
resp, cleanup, err := i.labelNamesCommon(stream.Context(), req)
14621492
defer cleanup()
14631493

@@ -1741,6 +1771,9 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
17411771
return err
17421772
}
17431773

1774+
c := i.trackInflightQueryRequest()
1775+
defer c()
1776+
17441777
spanlog, ctx := spanlogger.New(stream.Context(), "QueryStream")
17451778
defer spanlog.Finish()
17461779

@@ -1786,6 +1819,16 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
17861819
return nil
17871820
}
17881821

1822+
func (i *Ingester) trackInflightQueryRequest() func() {
1823+
max := i.inflightQueryRequests.Inc()
1824+
if m := i.maxInflightQueryRequests.Load(); max > m {
1825+
i.maxInflightQueryRequests.CompareAndSwap(m, max)
1826+
}
1827+
return func() {
1828+
i.inflightQueryRequests.Dec()
1829+
}
1830+
}
1831+
17891832
// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
17901833
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) {
17911834
q, err := db.ChunkQuerier(from, through)

pkg/ingester/metrics.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,17 @@ type ingesterMetrics struct {
4646
ingestionRate prometheus.GaugeFunc
4747
maxInflightPushRequests prometheus.GaugeFunc
4848
inflightRequests prometheus.GaugeFunc
49+
inflightQueryRequests prometheus.GaugeFunc
4950
}
5051

51-
func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSDB bool, activeSeriesEnabled bool, instanceLimitsFn func() *InstanceLimits, ingestionRate *util_math.EwmaRate, inflightRequests *atomic.Int64) *ingesterMetrics {
52+
func newIngesterMetrics(r prometheus.Registerer,
53+
createMetricsConflictingWithTSDB bool,
54+
activeSeriesEnabled bool,
55+
instanceLimitsFn func() *InstanceLimits,
56+
ingestionRate *util_math.EwmaRate,
57+
inflightRequests *atomic.Int64,
58+
maxInflightQueryRequests *atomic.Int64,
59+
) *ingesterMetrics {
5260
const (
5361
instanceLimits = "cortex_ingester_instance_limits"
5462
instanceLimitsHelp = "Instance limits used by this ingester." // Must be same for all registrations.
@@ -193,6 +201,18 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD
193201
return 0
194202
}),
195203

204+
inflightQueryRequests: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
205+
Name: "cortex_ingester_inflight_query_requests",
206+
Help: "Max number of inflight query requests in ingester.",
207+
}, func() float64 {
208+
if maxInflightQueryRequests != nil {
209+
r := maxInflightQueryRequests.Load()
210+
maxInflightQueryRequests.Store(0)
211+
return float64(r)
212+
}
213+
return 0
214+
}),
215+
196216
// Not registered automatically, but only if activeSeriesEnabled is true.
197217
activeSeriesPerUser: prometheus.NewGaugeVec(prometheus.GaugeOpts{
198218
Name: "cortex_ingester_active_series",

pkg/ingester/metrics_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,134 @@ import (
88
"github.com/prometheus/client_golang/prometheus/promauto"
99
"github.com/prometheus/client_golang/prometheus/testutil"
1010
"github.com/stretchr/testify/require"
11+
"go.uber.org/atomic"
12+
13+
util_math "github.com/cortexproject/cortex/pkg/util/math"
1114
)
1215

16+
func TestIngesterMetrics(t *testing.T) {
17+
mainReg := prometheus.NewPedanticRegistry()
18+
ingestionRate := util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval)
19+
inflightPushRequests := &atomic.Int64{}
20+
maxInflightQueryRequests := &atomic.Int64{}
21+
maxInflightQueryRequests.Store(98)
22+
inflightPushRequests.Store(14)
23+
24+
m := newIngesterMetrics(mainReg,
25+
false,
26+
true,
27+
func() *InstanceLimits {
28+
return &InstanceLimits{
29+
MaxIngestionRate: 12,
30+
MaxInMemoryTenants: 1,
31+
MaxInMemorySeries: 11,
32+
MaxInflightPushRequests: 6,
33+
}
34+
},
35+
ingestionRate,
36+
inflightPushRequests,
37+
maxInflightQueryRequests)
38+
39+
require.NotNil(t, m)
40+
41+
err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
42+
# HELP cortex_ingester_inflight_push_requests Current number of inflight push requests in ingester.
43+
# TYPE cortex_ingester_inflight_push_requests gauge
44+
cortex_ingester_inflight_push_requests 14
45+
# HELP cortex_ingester_inflight_query_requests Max number of inflight query requests in ingester.
46+
# TYPE cortex_ingester_inflight_query_requests gauge
47+
cortex_ingester_inflight_query_requests 98
48+
# HELP cortex_ingester_ingested_exemplars_failures_total The total number of exemplars that errored on ingestion.
49+
# TYPE cortex_ingester_ingested_exemplars_failures_total counter
50+
cortex_ingester_ingested_exemplars_failures_total 0
51+
# HELP cortex_ingester_ingested_exemplars_total The total number of exemplars ingested.
52+
# TYPE cortex_ingester_ingested_exemplars_total counter
53+
cortex_ingester_ingested_exemplars_total 0
54+
# HELP cortex_ingester_ingested_metadata_failures_total The total number of metadata that errored on ingestion.
55+
# TYPE cortex_ingester_ingested_metadata_failures_total counter
56+
cortex_ingester_ingested_metadata_failures_total 0
57+
# HELP cortex_ingester_ingested_metadata_total The total number of metadata ingested.
58+
# TYPE cortex_ingester_ingested_metadata_total counter
59+
cortex_ingester_ingested_metadata_total 0
60+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
61+
# TYPE cortex_ingester_ingested_samples_failures_total counter
62+
cortex_ingester_ingested_samples_failures_total 0
63+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
64+
# TYPE cortex_ingester_ingested_samples_total counter
65+
cortex_ingester_ingested_samples_total 0
66+
# HELP cortex_ingester_ingestion_rate_samples_per_second Current ingestion rate in samples/sec that ingester is using to limit access.
67+
# TYPE cortex_ingester_ingestion_rate_samples_per_second gauge
68+
cortex_ingester_ingestion_rate_samples_per_second 0
69+
# HELP cortex_ingester_instance_limits Instance limits used by this ingester.
70+
# TYPE cortex_ingester_instance_limits gauge
71+
cortex_ingester_instance_limits{limit="max_inflight_push_requests"} 6
72+
cortex_ingester_instance_limits{limit="max_ingestion_rate"} 12
73+
cortex_ingester_instance_limits{limit="max_series"} 11
74+
cortex_ingester_instance_limits{limit="max_tenants"} 1
75+
# HELP cortex_ingester_memory_metadata The current number of metadata in memory.
76+
# TYPE cortex_ingester_memory_metadata gauge
77+
cortex_ingester_memory_metadata 0
78+
# HELP cortex_ingester_memory_series The current number of series in memory.
79+
# TYPE cortex_ingester_memory_series gauge
80+
cortex_ingester_memory_series 0
81+
# HELP cortex_ingester_memory_users The current number of users in memory.
82+
# TYPE cortex_ingester_memory_users gauge
83+
cortex_ingester_memory_users 0
84+
# HELP cortex_ingester_queried_chunks The total number of chunks returned from queries.
85+
# TYPE cortex_ingester_queried_chunks histogram
86+
cortex_ingester_queried_chunks_bucket{le="10"} 0
87+
cortex_ingester_queried_chunks_bucket{le="80"} 0
88+
cortex_ingester_queried_chunks_bucket{le="640"} 0
89+
cortex_ingester_queried_chunks_bucket{le="5120"} 0
90+
cortex_ingester_queried_chunks_bucket{le="40960"} 0
91+
cortex_ingester_queried_chunks_bucket{le="327680"} 0
92+
cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 0
93+
cortex_ingester_queried_chunks_bucket{le="+Inf"} 0
94+
cortex_ingester_queried_chunks_sum 0
95+
cortex_ingester_queried_chunks_count 0
96+
# HELP cortex_ingester_queried_exemplars The total number of exemplars returned from queries.
97+
# TYPE cortex_ingester_queried_exemplars histogram
98+
cortex_ingester_queried_exemplars_bucket{le="10"} 0
99+
cortex_ingester_queried_exemplars_bucket{le="50"} 0
100+
cortex_ingester_queried_exemplars_bucket{le="250"} 0
101+
cortex_ingester_queried_exemplars_bucket{le="1250"} 0
102+
cortex_ingester_queried_exemplars_bucket{le="6250"} 0
103+
cortex_ingester_queried_exemplars_bucket{le="+Inf"} 0
104+
cortex_ingester_queried_exemplars_sum 0
105+
cortex_ingester_queried_exemplars_count 0
106+
# HELP cortex_ingester_queried_samples The total number of samples returned from queries.
107+
# TYPE cortex_ingester_queried_samples histogram
108+
cortex_ingester_queried_samples_bucket{le="10"} 0
109+
cortex_ingester_queried_samples_bucket{le="80"} 0
110+
cortex_ingester_queried_samples_bucket{le="640"} 0
111+
cortex_ingester_queried_samples_bucket{le="5120"} 0
112+
cortex_ingester_queried_samples_bucket{le="40960"} 0
113+
cortex_ingester_queried_samples_bucket{le="327680"} 0
114+
cortex_ingester_queried_samples_bucket{le="2.62144e+06"} 0
115+
cortex_ingester_queried_samples_bucket{le="2.097152e+07"} 0
116+
cortex_ingester_queried_samples_bucket{le="+Inf"} 0
117+
cortex_ingester_queried_samples_sum 0
118+
cortex_ingester_queried_samples_count 0
119+
# HELP cortex_ingester_queried_series The total number of series returned from queries.
120+
# TYPE cortex_ingester_queried_series histogram
121+
cortex_ingester_queried_series_bucket{le="10"} 0
122+
cortex_ingester_queried_series_bucket{le="80"} 0
123+
cortex_ingester_queried_series_bucket{le="640"} 0
124+
cortex_ingester_queried_series_bucket{le="5120"} 0
125+
cortex_ingester_queried_series_bucket{le="40960"} 0
126+
cortex_ingester_queried_series_bucket{le="327680"} 0
127+
cortex_ingester_queried_series_bucket{le="+Inf"} 0
128+
cortex_ingester_queried_series_sum 0
129+
cortex_ingester_queried_series_count 0
130+
# HELP cortex_ingester_queries_total The total number of queries the ingester has handled.
131+
# TYPE cortex_ingester_queries_total counter
132+
cortex_ingester_queries_total 0
133+
`))
134+
require.NoError(t, err)
135+
136+
require.Equal(t, int64(0), maxInflightQueryRequests.Load())
137+
}
138+
13139
func TestTSDBMetrics(t *testing.T) {
14140
mainReg := prometheus.NewPedanticRegistry()
15141

0 commit comments

Comments
 (0)