Skip to content

Commit 887f111

Browse files
committed
Move priority, mint and maxt to query stats
Signed-off-by: Justin Jung <[email protected]>
1 parent 7835190 commit 887f111

File tree

12 files changed

+101
-84
lines changed

12 files changed

+101
-84
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686
1717
* [ENHANCEMENT] Query Frontend: Log number of split queries in `query stats` log. #5703
1818
* [ENHANCEMENT] Logging: Added new options for logging HTTP request headers: `-server.log-request-headers` enables logging HTTP request headers, `-server.log-request-headers-exclude-list` allows users to specify headers which should not be logged. #5744
19-
* [ENHANCEMENT] Query Frontend/Scheduler: Time check in query priority now considers overall data fetched time window (including range selectors, modifiers and lookback delta). #5758
19+
* [ENHANCEMENT] Query Frontend/Scheduler: Time check in query priority now considers overall data select time window (including range selectors, modifiers and lookback delta). #5758
2020
* [ENHANCEMENT] Querier: Added `querier.store-gateway-query-stats-enabled` to enable or disable store gateway query stats log. #5749
2121
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
2222
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719

docs/configuration/config-file-reference.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5095,16 +5095,16 @@ otel:
50955095
# Regex that the query string should match. If not set, it won't be checked.
50965096
[regex: <string> | default = ""]
50975097
5098-
# Overall data fetch time window (including range selectors, modifiers and
5098+
# Overall data select time window (including range selectors, modifiers and
50995099
# lookback delta) that the query should be within. If not set, it won't be
51005100
# checked.
51015101
time_window:
5102-
# Start of the data fetch time window (including range selectors, modifiers
5102+
# Start of the data select time window (including range selectors, modifiers
51035103
# and lookback delta) that the query should be within. If set to 0, it won't
51045104
# be checked.
51055105
[start: <int> | default = 0]
51065106
5107-
# End of the data fetch time window (including range selectors, modifiers and
5107+
# End of the data select time window (including range selectors, modifiers and
51085108
# lookback delta) that the query should be within. If set to 0, it won't be
51095109
# checked.
51105110
[end: <int> | default = 0]

pkg/frontend/transport/handler.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
295295
numChunkBytes := stats.LoadFetchedChunkBytes()
296296
numDataBytes := stats.LoadFetchedDataBytes()
297297
splitQueries := stats.LoadSplitQueries()
298+
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
299+
dataSelectMinTime := stats.LoadDataSelectMinTime()
298300

299301
// Track stats.
300302
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
@@ -339,20 +341,20 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
339341
logMessage = append(logMessage, "content_encoding", encoding)
340342
}
341343

344+
if dataSelectMaxTime > 0 {
345+
logMessage = append(logMessage, "data_select_max_time", util.FormatMillisToSeconds(dataSelectMaxTime))
346+
}
347+
if dataSelectMinTime > 0 {
348+
logMessage = append(logMessage, "data_select_min_time", util.FormatMillisToSeconds(dataSelectMinTime))
349+
}
342350
if query := queryString.Get("query"); len(query) > 0 {
343351
logMessage = append(logMessage, "query_length", len(query))
344352
}
345353
if ua := r.Header.Get("User-Agent"); len(ua) > 0 {
346354
logMessage = append(logMessage, "user_agent", ua)
347355
}
348-
if queryPriority, ok := r.Context().Value(tripperware.QueryPriorityCtxKey).(int64); ok {
349-
logMessage = append(logMessage, "priority", queryPriority)
350-
}
351-
if maxT, ok := r.Context().Value(tripperware.DataFetchedMaxTimeCtxKey).(int64); ok {
352-
logMessage = append(logMessage, "data_fetched_max_time", util.FormatMillisToSeconds(maxT))
353-
}
354-
if minT, ok := r.Context().Value(tripperware.DataFetchedMinTimeCtxKey).(int64); ok {
355-
logMessage = append(logMessage, "data_fetched_min_time", util.FormatMillisToSeconds(minT))
356+
if priority, ok := stats.LoadPriority(); ok {
357+
logMessage = append(logMessage, "priority", priority)
356358
}
357359

358360
if error != nil {

pkg/frontend/transport/handler_test.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"github.com/weaveworks/common/user"
2222

2323
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
24-
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2524
)
2625

2726
type roundTripperFunc func(*http.Request) (*http.Response, error)
@@ -351,29 +350,25 @@ func TestReportQueryStatsFormat(t *testing.T) {
351350
},
352351
"should include query priority": {
353352
queryString: url.Values(map[string][]string{"query": {"up"}}),
354-
priority: 99,
353+
queryStats: &querier_stats.QueryStats{
354+
Priority: 99,
355+
PriorityAssigned: true,
356+
},
355357
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`,
356358
},
357359
"should include data fetch min and max time": {
358360
queryString: url.Values(map[string][]string{"query": {"up"}}),
359-
minT: 1704067200000,
360-
maxT: 1704153600000,
361-
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 maxt=1704153600 mint=1704067200 param_query=up`,
361+
queryStats: &querier_stats.QueryStats{
362+
DataSelectMaxTime: 1704153600000,
363+
DataSelectMinTime: 1704067200000,
364+
},
365+
expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`,
362366
},
363367
}
364368

365369
for testName, testData := range tests {
366370
t.Run(testName, func(t *testing.T) {
367371
req.Header = testData.header
368-
if testData.priority > 0 {
369-
*req = *req.WithContext(context.WithValue(req.Context(), tripperware.QueryPriorityCtxKey, testData.priority))
370-
}
371-
if testData.minT > 0 {
372-
*req = *req.WithContext(context.WithValue(req.Context(), tripperware.DataFetchedMinTimeCtxKey, testData.minT))
373-
}
374-
if testData.maxT > 0 {
375-
*req = *req.WithContext(context.WithValue(req.Context(), tripperware.DataFetchedMaxTimeCtxKey, testData.maxT))
376-
}
377372
handler.reportQueryStats(req, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp)
378373
data, err := io.ReadAll(outputBuf)
379374
require.NoError(t, err)

pkg/frontend/v1/frontend.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/cortexproject/cortex/pkg/frontend/transport"
1919
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
2020
"github.com/cortexproject/cortex/pkg/querier/stats"
21-
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2221
"github.com/cortexproject/cortex/pkg/scheduler/queue"
2322
"github.com/cortexproject/cortex/pkg/tenant"
2423
"github.com/cortexproject/cortex/pkg/util"
@@ -100,7 +99,7 @@ type request struct {
10099
}
101100

102101
func (r request) Priority() int64 {
103-
priority, ok := r.originalCtx.Value(tripperware.QueryPriorityCtxKey).(int64)
102+
priority, ok := stats.FromContext(r.originalCtx).LoadPriority()
104103
if !ok {
105104
return 0
106105
}

pkg/querier/stats/stats.go

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ var ctxKey = contextKey(0)
1616

1717
type QueryStats struct {
1818
Stats
19-
m sync.Mutex
19+
PriorityAssigned bool
20+
Priority int64
21+
DataSelectMaxTime int64
22+
DataSelectMinTime int64
23+
m sync.Mutex
2024
}
2125

2226
// ContextWithEmptyStats returns a context with empty stats.
@@ -196,6 +200,58 @@ func (s *QueryStats) LoadSplitQueries() uint64 {
196200
return atomic.LoadUint64(&s.SplitQueries)
197201
}
198202

203+
func (s *QueryStats) SetPriority(priority int64) {
204+
if s == nil {
205+
return
206+
}
207+
208+
if !s.PriorityAssigned {
209+
s.PriorityAssigned = true
210+
}
211+
212+
atomic.StoreInt64(&s.Priority, priority)
213+
}
214+
215+
func (s *QueryStats) LoadPriority() (int64, bool) {
216+
if s == nil {
217+
return 0, false
218+
}
219+
220+
return atomic.LoadInt64(&s.Priority), s.PriorityAssigned
221+
}
222+
223+
func (s *QueryStats) SetDataSelectMaxTime(dataSelectMaxTime int64) {
224+
if s == nil {
225+
return
226+
}
227+
228+
atomic.StoreInt64(&s.DataSelectMaxTime, dataSelectMaxTime)
229+
}
230+
231+
func (s *QueryStats) LoadDataSelectMaxTime() int64 {
232+
if s == nil {
233+
return 0
234+
}
235+
236+
return atomic.LoadInt64(&s.DataSelectMaxTime)
237+
}
238+
239+
func (s *QueryStats) SetDataSelectMinTime(dataSelectMinTime int64) {
240+
if s == nil {
241+
return
242+
}
243+
244+
atomic.StoreInt64(&s.DataSelectMinTime, dataSelectMinTime)
245+
}
246+
247+
func (s *QueryStats) LoadDataSelectMinTime() int64 {
248+
if s == nil {
249+
return 0
250+
}
251+
252+
return atomic.LoadInt64(&s.DataSelectMinTime)
253+
}
254+
199255
// Merge the provided Stats into this one.
200256
func (s *QueryStats) Merge(other *QueryStats) {
201257
if s == nil || other == nil {

pkg/querier/tripperware/priority.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,14 @@
11
package tripperware
22

33
import (
4-
"errors"
54
"time"
65

76
"github.com/cortexproject/cortex/pkg/util/validation"
87
)
98

10-
var (
11-
errQueryPriorityDisabled = errors.New("query priority disabled")
12-
errEmptyQueryString = errors.New("empty query string")
13-
)
14-
15-
func GetPriority(query string, minTime, maxTime int64, now time.Time, queryPriority validation.QueryPriority) (int64, error) {
16-
if !queryPriority.Enabled {
17-
return 0, errQueryPriorityDisabled
18-
}
19-
20-
if query == "" {
21-
return 0, errEmptyQueryString
22-
}
23-
24-
if len(queryPriority.Priorities) == 0 {
25-
return queryPriority.DefaultPriority, nil
9+
func GetPriority(query string, minTime, maxTime int64, now time.Time, queryPriority validation.QueryPriority) int64 {
10+
if !queryPriority.Enabled || query == "" || len(queryPriority.Priorities) == 0 {
11+
return queryPriority.DefaultPriority
2612
}
2713

2814
for _, priority := range queryPriority.Priorities {
@@ -34,12 +20,12 @@ func GetPriority(query string, minTime, maxTime int64, now time.Time, queryPrior
3420
}
3521

3622
if isWithinTimeAttributes(attribute.TimeWindow, now, minTime, maxTime) {
37-
return priority.Priority, nil
23+
return priority.Priority
3824
}
3925
}
4026
}
4127

42-
return queryPriority.DefaultPriority, nil
28+
return queryPriority.DefaultPriority
4329
}
4430

4531
func isWithinTimeAttributes(timeWindow validation.TimeWindow, now time.Time, startTime, endTime int64) bool {

pkg/querier/tripperware/priority_test.go

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,31 +29,23 @@ func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrInvalidQueryString
2929

3030
type testCase struct {
3131
query string
32-
err error
3332
queryPriorityEnabled bool
3433
}
3534

3635
tests := map[string]testCase{
3736
"should miss if query priority not enabled": {
3837
query: "up",
39-
err: errQueryPriorityDisabled,
4038
},
4139
"should miss if query string empty": {
4240
query: "",
43-
err: errEmptyQueryString,
4441
queryPriorityEnabled: true,
4542
},
4643
}
4744

4845
for testName, testData := range tests {
4946
t.Run(testName, func(t *testing.T) {
5047
limits.queryPriority.Enabled = testData.queryPriorityEnabled
51-
priority, err := GetPriority(testData.query, 0, 0, now, limits.queryPriority)
52-
if err != nil {
53-
assert.Equal(t, testData.err, err)
54-
} else {
55-
assert.NoError(t, err)
56-
}
48+
priority := GetPriority(testData.query, 0, 0, now, limits.queryPriority)
5749
assert.Equal(t, int64(0), priority)
5850
})
5951
}
@@ -111,8 +103,7 @@ func Test_GetPriorityShouldConsiderRegex(t *testing.T) {
111103
t.Run(testName, func(t *testing.T) {
112104
limits.queryPriority.Priorities[0].QueryAttributes[0].Regex = testData.regex
113105
limits.queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(testData.regex)
114-
priority, err := GetPriority(testData.query, 0, 0, now, limits.queryPriority)
115-
assert.NoError(t, err)
106+
priority := GetPriority(testData.query, 0, 0, now, limits.queryPriority)
116107
assert.Equal(t, int64(testData.expectedPriority), priority)
117108
})
118109
}
@@ -180,8 +171,7 @@ func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) {
180171

181172
for testName, testData := range tests {
182173
t.Run(testName, func(t *testing.T) {
183-
priority, err := GetPriority("sum(up)", testData.start.UnixMilli(), testData.end.UnixMilli(), now, limits.queryPriority)
184-
assert.NoError(t, err)
174+
priority := GetPriority("sum(up)", testData.start.UnixMilli(), testData.end.UnixMilli(), now, limits.queryPriority)
185175
assert.Equal(t, int64(testData.expectedPriority), priority)
186176
})
187177
}
@@ -225,8 +215,7 @@ func Test_GetPriorityShouldNotConsiderStartAndEndTimeIfEmpty(t *testing.T) {
225215

226216
for testName, testData := range tests {
227217
t.Run(testName, func(t *testing.T) {
228-
priority, err := GetPriority("sum(up)", testData.start.Unix(), testData.end.Unix(), now, limits.queryPriority)
229-
assert.NoError(t, err)
218+
priority := GetPriority("sum(up)", testData.start.Unix(), testData.end.Unix(), now, limits.queryPriority)
230219
assert.Equal(t, int64(1), priority)
231220
})
232221
}

pkg/querier/tripperware/roundtrip.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/weaveworks/common/httpgrpc"
3333
"github.com/weaveworks/common/user"
3434

35+
"github.com/cortexproject/cortex/pkg/querier/stats"
3536
"github.com/cortexproject/cortex/pkg/tenant"
3637
"github.com/cortexproject/cortex/pkg/util"
3738
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -165,17 +166,14 @@ func NewQueryTripperware(
165166
return next.RoundTrip(r)
166167
}
167168

169+
reqStats := stats.FromContext(r.Context())
168170
minTime, maxTime := util.FindMinMaxTime(r, expr, lookbackDelta, now)
169-
*r = *r.WithContext(context.WithValue(r.Context(), DataFetchedMinTimeCtxKey, minTime))
170-
*r = *r.WithContext(context.WithValue(r.Context(), DataFetchedMaxTimeCtxKey, maxTime))
171+
reqStats.SetDataSelectMaxTime(maxTime)
172+
reqStats.SetDataSelectMinTime(minTime)
171173

172174
if limits != nil && limits.QueryPriority(userStr).Enabled {
173-
priority, err := GetPriority(query, minTime, maxTime, now, limits.QueryPriority(userStr))
174-
if err != nil {
175-
level.Debug(log).Log("msg", "failed to get query priority for user", "user", userStr, "err", err.Error())
176-
} else {
177-
*r = *r.WithContext(context.WithValue(r.Context(), QueryPriorityCtxKey, priority))
178-
}
175+
priority := GetPriority(query, minTime, maxTime, now, limits.QueryPriority(userStr))
176+
reqStats.SetPriority(priority)
179177
}
180178
}
181179

pkg/querier/tripperware/util.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,6 @@ import (
1010
"github.com/cortexproject/cortex/pkg/util/validation"
1111
)
1212

13-
type contextKey string
14-
15-
const (
16-
DataFetchedMinTimeCtxKey = contextKey("dataFetchedMinTime")
17-
DataFetchedMaxTimeCtxKey = contextKey("dataFetchedMaxTime")
18-
QueryPriorityCtxKey = contextKey("queryPriority")
19-
)
20-
2113
// RequestResponse contains a request response and the respective request that was used.
2214
type RequestResponse struct {
2315
Request Request

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"google.golang.org/grpc"
2222

2323
"github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb"
24-
"github.com/cortexproject/cortex/pkg/querier/tripperware"
24+
"github.com/cortexproject/cortex/pkg/querier/stats"
2525
"github.com/cortexproject/cortex/pkg/scheduler/queue"
2626
"github.com/cortexproject/cortex/pkg/scheduler/schedulerpb"
2727
"github.com/cortexproject/cortex/pkg/tenant"
@@ -167,7 +167,7 @@ type schedulerRequest struct {
167167
}
168168

169169
func (s schedulerRequest) Priority() int64 {
170-
priority, ok := s.ctx.Value(tripperware.QueryPriorityCtxKey).(int64)
170+
priority, ok := stats.FromContext(s.ctx).LoadPriority()
171171
if !ok {
172172
return 0
173173
}

pkg/util/validation/limits.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,13 @@ type PriorityDef struct {
6464

6565
type QueryAttribute struct {
6666
Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Regex that the query string should match. If not set, it won't be checked."`
67-
TimeWindow TimeWindow `yaml:"time_window" json:"time_window" doc:"nocli|description=Overall data fetch time window (including range selectors, modifiers and lookback delta) that the query should be within. If not set, it won't be checked."`
67+
TimeWindow TimeWindow `yaml:"time_window" json:"time_window" doc:"nocli|description=Overall data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If not set, it won't be checked."`
6868
CompiledRegex *regexp.Regexp
6969
}
7070

7171
type TimeWindow struct {
72-
Start model.Duration `yaml:"start" json:"start" doc:"nocli|description=Start of the data fetch time window (including range selectors, modifiers and lookback delta) that the query should be within. If set to 0, it won't be checked.|default=0"`
73-
End model.Duration `yaml:"end" json:"end" doc:"nocli|description=End of the data fetch time window (including range selectors, modifiers and lookback delta) that the query should be within. If set to 0, it won't be checked.|default=0"`
72+
Start model.Duration `yaml:"start" json:"start" doc:"nocli|description=Start of the data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If set to 0, it won't be checked.|default=0"`
73+
End model.Duration `yaml:"end" json:"end" doc:"nocli|description=End of the data select time window (including range selectors, modifiers and lookback delta) that the query should be within. If set to 0, it won't be checked.|default=0"`
7474
}
7575

7676
// Limits describe all the limits for users; can be used to describe global default

0 commit comments

Comments
 (0)