Skip to content

Commit cca990e

Browse files
committed
Skip instant query round tripper if sharding is not applicable.
Signed-off-by: Ben Ye <[email protected]>
1 parent f930c7a commit cca990e

File tree

8 files changed

+102
-28
lines changed

8 files changed

+102
-28
lines changed

pkg/cortex/modules.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/prometheus/prometheus/rules"
1717
prom_storage "github.com/prometheus/prometheus/storage"
1818
"github.com/thanos-io/thanos/pkg/discovery/dns"
19+
"github.com/thanos-io/thanos/pkg/querysharding"
1920
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
2021
"github.com/weaveworks/common/server"
2122

@@ -437,19 +438,21 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
437438
// initQueryFrontendTripperware instantiates the tripperware used by the query frontend
438439
// to optimize Prometheus query requests.
439440
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
441+
queryAnalyzer := querysharding.NewQueryAnalyzer()
440442
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
441443
t.Cfg.QueryRange,
442444
util_log.Logger,
443445
t.Overrides,
444446
queryrange.PrometheusResponseExtractor{},
445447
prometheus.DefaultRegisterer,
446448
t.TombstonesLoader,
449+
queryAnalyzer,
447450
)
448451
if err != nil {
449452
return nil, err
450453
}
451454

452-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides)
455+
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer)
453456
if err != nil {
454457
return nil, err
455458
}
@@ -461,6 +464,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
461464
instantQueryMiddlewares,
462465
queryrange.PrometheusCodec,
463466
instantquery.InstantQueryCodec,
467+
t.Overrides,
468+
queryAnalyzer,
464469
)
465470

466471
return services.NewIdleService(nil, func(_ error) error {

pkg/querier/tripperware/instantquery/instant_query_middlewares.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ package instantquery
22

33
import (
44
"github.com/go-kit/log"
5+
"github.com/thanos-io/thanos/pkg/querysharding"
56

67
"github.com/cortexproject/cortex/pkg/querier/tripperware"
78
)
89

910
func Middlewares(
1011
log log.Logger,
1112
limits tripperware.Limits,
13+
queryAnalyzer querysharding.Analyzer,
1214
) ([]tripperware.Middleware, error) {
1315
var m []tripperware.Middleware
1416

15-
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec))
17+
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer))
1618
return m, nil
1719
}

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/go-kit/log"
2323
"github.com/pkg/errors"
2424
"github.com/prometheus/client_golang/prometheus"
25+
"github.com/thanos-io/thanos/pkg/querysharding"
2526

2627
"github.com/cortexproject/cortex/pkg/chunk/cache"
2728
"github.com/cortexproject/cortex/pkg/querier"
@@ -76,6 +77,7 @@ func Middlewares(
7677
cacheExtractor Extractor,
7778
registerer prometheus.Registerer,
7879
cacheGenNumberLoader CacheGenNumberLoader,
80+
queryAnalyzer querysharding.Analyzer,
7981
) ([]tripperware.Middleware, cache.Cache, error) {
8082
// Metric used to keep track of each middleware execution duration.
8183
metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer)
@@ -109,7 +111,7 @@ func Middlewares(
109111
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer)))
110112
}
111113

112-
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec))
114+
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec, queryAnalyzer))
113115

114116
return queryRangeMiddleware, c, nil
115117
}

pkg/querier/tripperware/queryrange/query_range_middlewares_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package queryrange
22

33
import (
44
"context"
5-
io "io"
5+
"io"
66
"net/http"
77
"net/http/httptest"
88
"net/url"
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/go-kit/log"
1313
"github.com/stretchr/testify/require"
14+
"github.com/thanos-io/thanos/pkg/querysharding"
1415
"github.com/weaveworks/common/middleware"
1516
"github.com/weaveworks/common/user"
1617

@@ -43,12 +44,14 @@ func TestRoundTrip(t *testing.T) {
4344
next: http.DefaultTransport,
4445
}
4546

47+
qa := querysharding.NewQueryAnalyzer()
4648
queyrangemiddlewares, _, err := Middlewares(Config{},
4749
log.NewNopLogger(),
4850
mockLimits{},
4951
nil,
5052
nil,
5153
nil,
54+
qa,
5255
)
5356
require.NoError(t, err)
5457

@@ -59,6 +62,8 @@ func TestRoundTrip(t *testing.T) {
5962
nil,
6063
PrometheusCodec,
6164
nil,
65+
nil,
66+
qa,
6267
)
6368

6469
for i, tc := range []struct {

pkg/querier/tripperware/roundtrip.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,14 @@ import (
2727
"github.com/opentracing/opentracing-go"
2828
"github.com/prometheus/client_golang/prometheus"
2929
"github.com/prometheus/client_golang/prometheus/promauto"
30+
"github.com/thanos-io/thanos/pkg/querysharding"
3031
"github.com/weaveworks/common/httpgrpc"
3132
"github.com/weaveworks/common/user"
3233

3334
"github.com/cortexproject/cortex/pkg/tenant"
3435
"github.com/cortexproject/cortex/pkg/util"
3536
util_log "github.com/cortexproject/cortex/pkg/util/log"
37+
"github.com/cortexproject/cortex/pkg/util/validation"
3638
)
3739

3840
// HandlerFunc is like http.HandlerFunc, but for Handler.
@@ -98,6 +100,8 @@ func NewQueryTripperware(
98100
instantRangeMiddleware []Middleware,
99101
queryRangeCodec Codec,
100102
instantQueryCodec Codec,
103+
limits Limits,
104+
queryAnalyzer querysharding.Analyzer,
101105
) Tripperware {
102106
// Per tenant query metrics.
103107
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
@@ -139,7 +143,18 @@ func NewQueryTripperware(
139143

140144
if isQueryRange {
141145
return queryrange.RoundTrip(r)
142-
} else if isQuery && len(instantRangeMiddleware) > 0 {
146+
} else if isQuery {
147+
// If vertical sharding is not enabled for the tenant, use downstream roundtripper.
148+
numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize)
149+
if numShards <= 1 {
150+
return next.RoundTrip(r)
151+
}
152+
// If the given query is not shardable, use downstream roundtripper.
153+
query := r.FormValue("query")
154+
analysis, err := queryAnalyzer.Analyze(query)
155+
if err != nil || !analysis.IsShardable() {
156+
return next.RoundTrip(r)
157+
}
143158
return instantQuery.RoundTrip(r)
144159
}
145160
return next.RoundTrip(r)

pkg/querier/tripperware/roundtrip_test.go

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@ import (
1111

1212
"github.com/go-kit/log"
1313
"github.com/stretchr/testify/require"
14+
"github.com/thanos-io/thanos/pkg/querysharding"
1415
"github.com/weaveworks/common/user"
16+
17+
"github.com/cortexproject/cortex/pkg/util/flagext"
18+
"github.com/cortexproject/cortex/pkg/util/validation"
1519
)
1620

1721
const (
18-
queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120"
19-
query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680"
20-
queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'"
21-
responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}`
22+
queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120"
23+
query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680"
24+
queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss&start=1536673680"
25+
queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'"
26+
responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}`
2227
)
2328

2429
type mockRequest struct {
@@ -86,22 +91,49 @@ func TestRoundTrip(t *testing.T) {
8691
return mockMiddleware{}
8792
}),
8893
}
89-
tw := NewQueryTripperware(log.NewNopLogger(),
90-
nil,
91-
nil,
92-
middlewares,
93-
middlewares,
94-
mockCodec{},
95-
mockCodec{},
96-
)
9794

95+
limits := validation.Limits{}
96+
flagext.DefaultValues(&limits)
97+
defaultOverrides, err := validation.NewOverrides(limits, nil)
98+
require.NoError(t, err)
99+
100+
limitsWithVerticalSharding := validation.Limits{QueryVerticalShardSize: 3}
101+
shardingOverrides, err := validation.NewOverrides(limitsWithVerticalSharding, nil)
102+
require.NoError(t, err)
98103
for _, tc := range []struct {
99104
path, expectedBody string
105+
limits Limits
100106
}{
101-
{"/foo", "bar"},
102-
{queryExemplar, "bar"},
103-
{queryRange, responseBody},
104-
{query, responseBody},
107+
{
108+
path: "/foo",
109+
expectedBody: "bar",
110+
limits: defaultOverrides,
111+
},
112+
{
113+
path: queryExemplar,
114+
expectedBody: "bar",
115+
limits: defaultOverrides,
116+
},
117+
{
118+
path: queryRange,
119+
expectedBody: responseBody,
120+
limits: defaultOverrides,
121+
},
122+
{
123+
path: query,
124+
expectedBody: "bar",
125+
limits: defaultOverrides,
126+
},
127+
{
128+
path: queryNonShardable,
129+
expectedBody: "bar",
130+
limits: defaultOverrides,
131+
},
132+
{
133+
path: query,
134+
expectedBody: responseBody,
135+
limits: shardingOverrides,
136+
},
105137
} {
106138
t.Run(tc.path, func(t *testing.T) {
107139
req, err := http.NewRequest("GET", tc.path, http.NoBody)
@@ -115,6 +147,16 @@ func TestRoundTrip(t *testing.T) {
115147
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
116148
require.NoError(t, err)
117149

150+
tw := NewQueryTripperware(log.NewNopLogger(),
151+
nil,
152+
nil,
153+
middlewares,
154+
middlewares,
155+
mockCodec{},
156+
mockCodec{},
157+
tc.limits,
158+
querysharding.NewQueryAnalyzer(),
159+
)
118160
resp, err := tw(downstream).RoundTrip(req)
119161
require.NoError(t, err)
120162
require.Equal(t, 200, resp.StatusCode)

pkg/querier/tripperware/shard_by.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ import (
1717
"github.com/cortexproject/cortex/pkg/util/validation"
1818
)
1919

20-
func ShardByMiddleware(logger log.Logger, limits Limits, merger Merger) Middleware {
20+
func ShardByMiddleware(logger log.Logger, limits Limits, merger Merger, queryAnalyzer querysharding.Analyzer) Middleware {
2121
return MiddlewareFunc(func(next Handler) Handler {
2222
return shardBy{
23-
next: next,
24-
limits: limits,
25-
merger: merger,
26-
logger: logger,
23+
next: next,
24+
limits: limits,
25+
merger: merger,
26+
logger: logger,
27+
queryAnalyzer: queryAnalyzer,
2728
}
2829
})
2930
}
@@ -33,7 +34,7 @@ type shardBy struct {
3334
limits Limits
3435
logger log.Logger
3536
merger Merger
36-
queryAnalyzer *querysharding.QueryAnalyzer
37+
queryAnalyzer querysharding.Analyzer
3738
}
3839

3940
func (s shardBy) Do(ctx context.Context, r Request) (Response, error) {

pkg/querier/tripperware/test_shard_by_query_utils.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/prometheus/common/model"
1717
"github.com/prometheus/prometheus/promql/parser"
1818
"github.com/stretchr/testify/require"
19+
thanosquerysharding "github.com/thanos-io/thanos/pkg/querysharding"
1920
"github.com/thanos-io/thanos/pkg/store/storepb"
2021
"github.com/weaveworks/common/user"
2122

@@ -406,7 +407,8 @@ http_requests_total`,
406407
next: http.DefaultTransport,
407408
}
408409

409-
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec))
410+
qa := thanosquerysharding.NewQueryAnalyzer()
411+
roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa))
410412

411413
ctx := user.InjectOrgID(context.Background(), "1")
412414

0 commit comments

Comments
 (0)