Skip to content

Commit 54a899f

Browse files
committed
Add retries for instant query
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent b49cf5a commit 54a899f

10 files changed

+158
-55
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532
6060
* [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517
6161
* [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542
62+
* [ENHANCEMENT] Query Frontend: Add retries for instant query. #5560
6263
* [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265
6364
* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286
6465
* [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293

pkg/cortex/modules.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
455455
prometheusCodec := queryrange.NewPrometheusCodec(false)
456456
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
457457
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true)
458+
retryMiddlewareMetrics := queryrange.NewRetryMiddlewareMetrics(prometheus.DefaultRegisterer)
458459

459460
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
460461
t.Cfg.QueryRange,
@@ -466,12 +467,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
466467
queryAnalyzer,
467468
prometheusCodec,
468469
shardedPrometheusCodec,
470+
retryMiddlewareMetrics,
469471
)
470472
if err != nil {
471473
return nil, err
472474
}
473475

474-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer)
476+
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, retryMiddlewareMetrics, t.Cfg.QueryRange.MaxRetries, queryAnalyzer)
475477
if err != nil {
476478
return nil, err
477479
}

pkg/querier/tripperware/instantquery/instant_query_middlewares.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,21 @@ import (
55
"github.com/thanos-io/thanos/pkg/querysharding"
66

77
"github.com/cortexproject/cortex/pkg/querier/tripperware"
8+
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
89
)
910

1011
func Middlewares(
1112
log log.Logger,
1213
limits tripperware.Limits,
14+
retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics,
15+
maxRetries int,
1316
queryAnalyzer querysharding.Analyzer,
1417
) ([]tripperware.Middleware, error) {
1518
var m []tripperware.Middleware
1619

20+
if maxRetries > 0 {
21+
m = append(m, queryrange.NewRetryMiddleware(log, maxRetries, retryMiddlewareMetrics))
22+
}
1723
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer))
1824
return m, nil
1925
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package instantquery
2+
3+
import (
4+
"context"
5+
"io"
6+
"net/http"
7+
"net/http/httptest"
8+
"net/url"
9+
"strconv"
10+
"testing"
11+
"time"
12+
13+
"github.com/go-kit/log"
14+
"github.com/stretchr/testify/require"
15+
"github.com/thanos-io/thanos/pkg/querysharding"
16+
"github.com/weaveworks/common/middleware"
17+
"github.com/weaveworks/common/user"
18+
"go.uber.org/atomic"
19+
20+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
21+
)
22+
23+
var (
24+
query = "/api/v1/query?time=1536716898&query=sum by (label) (up)&stats=all"
25+
responseBody = `{"status":"success","data":{"resultType":"vector","result":[]}}`
26+
)
27+
28+
func TestRoundTrip(t *testing.T) {
29+
t.Parallel()
30+
var try atomic.Int32
31+
s := httptest.NewServer(
32+
middleware.AuthenticateUser.Wrap(
33+
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
34+
var err error
35+
if try.Inc() > 2 {
36+
_, err = w.Write([]byte(responseBody))
37+
} else {
38+
http.Error(w, `{"status":"error"}`, http.StatusInternalServerError)
39+
}
40+
if err != nil {
41+
t.Fatal(err)
42+
}
43+
}),
44+
),
45+
)
46+
defer s.Close()
47+
48+
u, err := url.Parse(s.URL)
49+
require.NoError(t, err)
50+
51+
downstream := singleHostRoundTripper{
52+
host: u.Host,
53+
next: http.DefaultTransport,
54+
}
55+
limits := tripperware.MockLimits{
56+
ShardSize: 2,
57+
}
58+
qa := querysharding.NewQueryAnalyzer()
59+
instantQueryMiddlewares, err := Middlewares(
60+
log.NewNopLogger(),
61+
limits,
62+
nil,
63+
3,
64+
qa)
65+
require.NoError(t, err)
66+
67+
tw := tripperware.NewQueryTripperware(
68+
log.NewNopLogger(),
69+
nil,
70+
nil,
71+
nil,
72+
instantQueryMiddlewares,
73+
nil,
74+
InstantQueryCodec,
75+
limits,
76+
qa,
77+
time.Minute,
78+
)
79+
80+
for i, tc := range []struct {
81+
path, expectedBody string
82+
}{
83+
{query, responseBody},
84+
} {
85+
t.Run(strconv.Itoa(i), func(t *testing.T) {
86+
//parallel testing causes data race
87+
req, err := http.NewRequest("GET", tc.path, http.NoBody)
88+
require.NoError(t, err)
89+
90+
// query-frontend doesn't actually authenticate requests, we rely on
91+
// the queriers to do this. Hence we ensure the request doesn't have a
92+
// org ID in the ctx, but does have the header.
93+
ctx := user.InjectOrgID(context.Background(), "1")
94+
req = req.WithContext(ctx)
95+
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
96+
require.NoError(t, err)
97+
98+
resp, err := tw(downstream).RoundTrip(req)
99+
require.NoError(t, err)
100+
require.Equal(t, 200, resp.StatusCode)
101+
102+
bs, err := io.ReadAll(resp.Body)
103+
require.NoError(t, err)
104+
require.Equal(t, tc.expectedBody, string(bs))
105+
})
106+
}
107+
}
108+
109+
type singleHostRoundTripper struct {
110+
host string
111+
next http.RoundTripper
112+
}
113+
114+
func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
115+
r.URL.Scheme = "http"
116+
r.URL.Host = s.host
117+
return s.next.RoundTrip(r)
118+
}

pkg/querier/tripperware/queryrange/limits_test.go

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) {
7575
End: util.TimeToMillis(testData.reqEndTime),
7676
}
7777

78-
limits := mockLimits{maxQueryLookback: testData.maxQueryLookback}
78+
limits := tripperware.MockLimits{QueryLookback: testData.maxQueryLookback}
7979
middleware := NewLimitsMiddleware(limits)
8080

8181
innerRes := NewEmptyPrometheusResponse()
@@ -163,7 +163,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
163163
End: util.TimeToMillis(testData.reqEndTime),
164164
}
165165

166-
limits := mockLimits{maxQueryLength: testData.maxQueryLength}
166+
limits := tripperware.MockLimits{QueryLength: testData.maxQueryLength}
167167
middleware := NewLimitsMiddleware(limits)
168168

169169
innerRes := NewEmptyPrometheusResponse()
@@ -193,32 +193,6 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
193193
}
194194
}
195195

196-
type mockLimits struct {
197-
maxQueryLookback time.Duration
198-
maxQueryLength time.Duration
199-
maxCacheFreshness time.Duration
200-
}
201-
202-
func (m mockLimits) MaxQueryLookback(string) time.Duration {
203-
return m.maxQueryLookback
204-
}
205-
206-
func (m mockLimits) MaxQueryLength(string) time.Duration {
207-
return m.maxQueryLength
208-
}
209-
210-
func (mockLimits) MaxQueryParallelism(string) int {
211-
return 14 // Flag default.
212-
}
213-
214-
func (m mockLimits) MaxCacheFreshness(string) time.Duration {
215-
return m.maxCacheFreshness
216-
}
217-
218-
func (m mockLimits) QueryVerticalShardSize(userID string) int {
219-
return 0
220-
}
221-
222196
type mockHandler struct {
223197
mock.Mock
224198
}

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func Middlewares(
8080
queryAnalyzer querysharding.Analyzer,
8181
prometheusCodec tripperware.Codec,
8282
shardedPrometheusCodec tripperware.Codec,
83+
retryMiddlewareMetrics *RetryMiddlewareMetrics,
8384
) ([]tripperware.Middleware, cache.Cache, error) {
8485
// Metric used to keep track of each middleware execution duration.
8586
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
@@ -110,7 +111,7 @@ func Middlewares(
110111
}
111112

112113
if cfg.MaxRetries > 0 {
113-
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
114+
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics))
114115
}
115116

116117
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer))

pkg/querier/tripperware/queryrange/query_range_middlewares_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,14 @@ func TestRoundTrip(t *testing.T) {
5454
qa := querysharding.NewQueryAnalyzer()
5555
queyrangemiddlewares, _, err := Middlewares(Config{},
5656
log.NewNopLogger(),
57-
mockLimits{},
57+
tripperware.MockLimits{},
5858
nil,
5959
nil,
6060
nil,
6161
qa,
6262
PrometheusCodec,
6363
ShardedPrometheusCodec,
64+
NewRetryMiddlewareMetrics(nil),
6465
)
6566
require.NoError(t, err)
6667

pkg/querier/tripperware/queryrange/results_cache_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func TestStatsCacheQuerySamples(t *testing.T) {
209209
log.NewNopLogger(),
210210
cfg,
211211
constSplitter(day),
212-
mockLimits{},
212+
tripperware.MockLimits{},
213213
PrometheusCodec,
214214
PrometheusResponseExtractor{},
215215
nil,
@@ -974,7 +974,7 @@ func TestHandleHit(t *testing.T) {
974974
sut := resultsCache{
975975
extractor: PrometheusResponseExtractor{},
976976
minCacheExtent: 10,
977-
limits: mockLimits{},
977+
limits: tripperware.MockLimits{},
978978
merger: PrometheusCodec,
979979
next: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) {
980980
return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil
@@ -1004,7 +1004,7 @@ func TestResultsCache(t *testing.T) {
10041004
log.NewNopLogger(),
10051005
cfg,
10061006
constSplitter(day),
1007-
mockLimits{},
1007+
tripperware.MockLimits{},
10081008
PrometheusCodec,
10091009
PrometheusResponseExtractor{},
10101010
nil,
@@ -1046,7 +1046,7 @@ func TestResultsCacheRecent(t *testing.T) {
10461046
log.NewNopLogger(),
10471047
cfg,
10481048
constSplitter(day),
1049-
mockLimits{maxCacheFreshness: 10 * time.Minute},
1049+
tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
10501050
PrometheusCodec,
10511051
PrometheusResponseExtractor{},
10521052
nil,
@@ -1087,13 +1087,13 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
10871087
expectedResponse *PrometheusResponse
10881088
}{
10891089
{
1090-
fakeLimits: mockLimits{maxCacheFreshness: 5 * time.Second},
1090+
fakeLimits: tripperware.MockLimits{CacheFreshness: 5 * time.Second},
10911091
Handler: nil,
10921092
expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10),
10931093
},
10941094
{
10951095
// should not lookup cache because per-tenant override will be applied
1096-
fakeLimits: mockLimits{maxCacheFreshness: 10 * time.Minute},
1096+
fakeLimits: tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
10971097
Handler: tripperware.HandlerFunc(func(_ context.Context, _ tripperware.Request) (tripperware.Response, error) {
10981098
return parsedResponse, nil
10991099
}),
@@ -1150,7 +1150,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
11501150
log.NewNopLogger(),
11511151
cfg,
11521152
constSplitter(day),
1153-
mockLimits{},
1153+
tripperware.MockLimits{},
11541154
PrometheusCodec,
11551155
PrometheusResponseExtractor{},
11561156
nil,
@@ -1263,7 +1263,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
12631263
log.NewNopLogger(),
12641264
cfg,
12651265
constSplitter(day),
1266-
mockLimits{maxCacheFreshness: 10 * time.Minute},
1266+
tripperware.MockLimits{CacheFreshness: 10 * time.Minute},
12671267
PrometheusCodec,
12681268
PrometheusResponseExtractor{},
12691269
nil,

pkg/querier/tripperware/queryrange/split_by_interval_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ func TestSplitByDay(t *testing.T) {
309309
roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{
310310
host: u.Host,
311311
next: http.DefaultTransport,
312-
}, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil))
312+
}, PrometheusCodec, nil, NewLimitsMiddleware(tripperware.MockLimits{}), SplitByIntervalMiddleware(interval, tripperware.MockLimits{}, PrometheusCodec, nil))
313313

314314
req, err := http.NewRequest("GET", tc.path, http.NoBody)
315315
require.NoError(t, err)

pkg/querier/tripperware/test_shard_by_query_utils.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ http_requests_total`,
441441
}
442442

443443
qa := thanosquerysharding.NewQueryAnalyzer()
444-
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa))
444+
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), MockLimits{ShardSize: tt.shardSize}, tt.codec, qa))
445445

446446
ctx := user.InjectOrgID(context.Background(), "1")
447447

@@ -461,31 +461,31 @@ http_requests_total`,
461461
}
462462
}
463463

464-
type mockLimits struct {
465-
maxQueryLookback time.Duration
466-
maxQueryLength time.Duration
467-
maxCacheFreshness time.Duration
468-
shardSize int
464+
type MockLimits struct {
465+
QueryLookback time.Duration
466+
QueryLength time.Duration
467+
CacheFreshness time.Duration
468+
ShardSize int
469469
}
470470

471-
func (m mockLimits) MaxQueryLookback(string) time.Duration {
472-
return m.maxQueryLookback
471+
func (m MockLimits) MaxQueryLookback(string) time.Duration {
472+
return m.QueryLookback
473473
}
474474

475-
func (m mockLimits) MaxQueryLength(string) time.Duration {
476-
return m.maxQueryLength
475+
func (m MockLimits) MaxQueryLength(string) time.Duration {
476+
return m.QueryLength
477477
}
478478

479-
func (mockLimits) MaxQueryParallelism(string) int {
479+
func (MockLimits) MaxQueryParallelism(string) int {
480480
return 14 // Flag default.
481481
}
482482

483-
func (m mockLimits) MaxCacheFreshness(string) time.Duration {
484-
return m.maxCacheFreshness
483+
func (m MockLimits) MaxCacheFreshness(string) time.Duration {
484+
return m.CacheFreshness
485485
}
486486

487-
func (m mockLimits) QueryVerticalShardSize(userID string) int {
488-
return m.shardSize
487+
func (m MockLimits) QueryVerticalShardSize(userID string) int {
488+
return m.ShardSize
489489
}
490490

491491
type singleHostRoundTripper struct {

0 commit comments

Comments
 (0)