From f3952a9fd5f7e4a78a795025601ecde85bbb0d1e Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 5 May 2023 00:51:54 -0700 Subject: [PATCH 1/7] check subquery step size in query frontend Signed-off-by: Ben Ye --- integration/query_frontend_test.go | 26 ++++++++ pkg/cortex/modules.go | 15 ++++- .../tripperware/instantquery/instant_query.go | 16 ++--- .../instantquery/instant_query_middlewares.go | 3 +- .../instantquery/instant_query_test.go | 12 +++- .../instantquery/shard_by_query_test.go | 3 +- .../tripperware/queryrange/query_range.go | 20 +++++-- .../queryrange/query_range_middlewares.go | 8 ++- .../query_range_middlewares_test.go | 8 +++ .../queryrange/query_range_test.go | 6 +- .../tripperware/queryrange/step_align.go | 2 +- pkg/querier/tripperware/subquery.go | 43 ++++++++++++++ pkg/querier/tripperware/subquery_test.go | 59 +++++++++++++++++++ 13 files changed, 195 insertions(+), 26 deletions(-) create mode 100644 pkg/querier/tripperware/subquery.go create mode 100644 pkg/querier/tripperware/subquery_test.go diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index cd93908cd6f..8f71382e6fd 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -33,6 +33,7 @@ type queryFrontendTestConfig struct { querySchedulerEnabled bool queryStatsEnabled bool remoteReadEnabled bool + testSubQueryStepSize bool setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) } @@ -209,6 +210,19 @@ func TestQueryFrontendRemoteRead(t *testing.T) { }) } +func TestQueryFrontendSubQueryStepSize(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + testSubQueryStepSize: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) + + minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + return cortexConfigFile, flags + }, + }) +} + func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { const numUsers = 10 const numQueriesPerUser = 10 @@ -334,6 +348,14 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0) } + // No need to repeat the test on subquery step size. + if userID == 0 && cfg.testSubQueryStepSize { + _, err := c.Query(`up[30d:1m]`, now) + apiErr, ok := err.(*v1.Error) + require.True(t, ok) + require.Equal(t, apiErr.Type, v1.ErrBadData) + } + // In this test we do ensure that the /series start/end time is ignored and Cortex // always returns series in ingesters memory. No need to repeat it for each user. if userID == 0 { @@ -386,6 +408,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { extra++ } + if cfg.testSubQueryStepSize { + extra++ + } + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total")) // The number of received request is greater than the query requests because include diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index ab03b711439..f67bfd386be 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -450,6 +450,13 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { // to optimize Prometheus query requests. func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) { queryAnalyzer := querysharding.NewQueryAnalyzer() + defaultSubQueryInterval := t.Cfg.Querier.DefaultEvaluationInterval + // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. + prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval) + // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) + shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval) + instantQueryCodec := instantquery.NewInstantQueryCodec(defaultSubQueryInterval) + queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, util_log.Logger, @@ -458,12 +465,14 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheus.DefaultRegisterer, t.TombstonesLoader, queryAnalyzer, + prometheusCodec, + shardedPrometheusCodec, ) if err != nil { return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, instantQueryCodec) if err != nil { return nil, err } @@ -473,8 +482,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro t.Cfg.QueryRange.ForwardHeaders, queryRangeMiddlewares, instantQueryMiddlewares, - queryrange.PrometheusCodec, - instantquery.InstantQueryCodec, + prometheusCodec, + instantQueryCodec, t.Overrides, queryAnalyzer, ) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 2e8666545a6..e83e84ff4f2 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -19,11 +19,10 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" + promqlparser "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/status" - promqlparser "github.com/prometheus/prometheus/promql/parser" - "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" @@ -32,8 +31,6 @@ import ( ) var ( - InstantQueryCodec tripperware.Codec = newInstantQueryCodec() - json = jsoniter.Config{ EscapeHTML: false, // No HTML in our responses. SortMapKeys: true, @@ -109,11 +106,12 @@ func (r *PrometheusRequest) WithStats(stats string) tripperware.Request { type instantQueryCodec struct { tripperware.Codec - now func() time.Time + now func() time.Time + noStepSubQueryInterval time.Duration } -func newInstantQueryCodec() instantQueryCodec { - return instantQueryCodec{now: time.Now} +func NewInstantQueryCodec(noStepSubQueryInterval time.Duration) instantQueryCodec { + return instantQueryCodec{now: time.Now, noStepSubQueryInterval: noStepSubQueryInterval} } func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string { @@ -139,6 +137,10 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for } result.Query = r.FormValue("query") + if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil { + return nil, err + } + result.Stats = r.FormValue("stats") result.Path = r.URL.Path diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index b88515e6be0..af8b659f796 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -11,9 +11,10 @@ func Middlewares( log log.Logger, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, + codec instantQueryCodec, ) ([]tripperware.Middleware, error) { var m []tripperware.Middleware - m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer)) + m = append(m, tripperware.ShardByMiddleware(log, limits, codec, queryAnalyzer)) return m, nil } diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 3d5313a443a..12bbdc41d77 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -19,12 +19,14 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) +var ( + InstantQueryCodec = NewInstantQueryCodec(time.Minute) +) + func TestRequest(t *testing.T) { t.Parallel() now := time.Now() - codec := instantQueryCodec{now: func() time.Time { - return now - }} + codec := InstantQueryCodec for _, tc := range []struct { url string @@ -71,6 +73,10 @@ func TestRequest(t *testing.T) { }, }, }, + { + url: "/api/v1/query?query=up%5B30d%3A%5D", + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000), + }, } { tc := tc t.Run(tc.url, func(t *testing.T) { diff --git a/pkg/querier/tripperware/instantquery/shard_by_query_test.go b/pkg/querier/tripperware/instantquery/shard_by_query_test.go index d4b326dc16a..50c83cb5eeb 100644 --- a/pkg/querier/tripperware/instantquery/shard_by_query_test.go +++ b/pkg/querier/tripperware/instantquery/shard_by_query_test.go @@ -2,6 +2,7 @@ package instantquery import ( "testing" + "time" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" @@ -9,5 +10,5 @@ import ( func Test_shardQuery(t *testing.T) { t.Parallel() - tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.ShardedPrometheusCodec) + tripperware.TestQueryShardQuery(t, InstantQueryCodec, queryrange.NewPrometheusCodec(true, time.Minute)) } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 101170ebb43..4be3f4c7748 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -40,17 +40,21 @@ var ( errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer") errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") - // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. - PrometheusCodec tripperware.Codec = &prometheusCodec{sharded: false} - // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) - ShardedPrometheusCodec tripperware.Codec = &prometheusCodec{sharded: true} - // Name of the cache control header. cacheControlHeader = "Cache-Control" ) type prometheusCodec struct { sharded bool + + noStepSubQueryInterval time.Duration +} + +func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { + return &prometheusCodec{ + sharded: sharded, + noStepSubQueryInterval: noStepSubQueryInterval, + } } // WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp. @@ -166,7 +170,7 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques return &response, nil } -func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { +func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { var result PrometheusRequest var err error result.Start, err = util.ParseTime(r.FormValue("start")) @@ -199,6 +203,10 @@ func (prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forward } result.Query = r.FormValue("query") + if err := tripperware.SubQueryStepSizeCheck(result.Query, c.noStepSubQueryInterval, tripperware.MaxStep); err != nil { + return nil, err + } + result.Stats = r.FormValue("stats") result.Path = r.URL.Path diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 24714c54e1b..5966e599881 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -78,6 +78,8 @@ func Middlewares( registerer prometheus.Registerer, cacheGenNumberLoader CacheGenNumberLoader, queryAnalyzer querysharding.Analyzer, + prometheusCodec *prometheusCodec, + shardedPrometheusCodec *prometheusCodec, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) @@ -88,7 +90,7 @@ func Middlewares( } if cfg.SplitQueriesByInterval != 0 { staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval } - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, PrometheusCodec, registerer)) + queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer)) } var c cache.Cache @@ -99,7 +101,7 @@ func Middlewares( } return false } - queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, PrometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer) + queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, cacheGenNumberLoader, shouldCache, registerer) if err != nil { return nil, nil, err } @@ -111,7 +113,7 @@ func Middlewares( queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) } - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, ShardedPrometheusCodec, queryAnalyzer)) + queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer)) return queryRangeMiddleware, c, nil } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index a3db501dff0..d4ff07160eb 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -8,6 +8,7 @@ import ( "net/url" "strconv" "testing" + "time" "github.com/go-kit/log" "github.com/stretchr/testify/require" @@ -18,6 +19,11 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) +var ( + PrometheusCodec = NewPrometheusCodec(false, time.Minute) + ShardedPrometheusCodec = NewPrometheusCodec(false, time.Minute) +) + func TestRoundTrip(t *testing.T) { t.Parallel() s := httptest.NewServer( @@ -53,6 +59,8 @@ func TestRoundTrip(t *testing.T) { nil, nil, qa, + PrometheusCodec, + ShardedPrometheusCodec, ) require.NoError(t, err) diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index f66a43f1c29..8951015f00f 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -5,7 +5,7 @@ import ( "compress/gzip" "context" "fmt" - io "io" + "io" "net/http" "strconv" "testing" @@ -61,6 +61,10 @@ func TestRequest(t *testing.T) { url: "api/v1/query_range?start=0&end=11001&step=1", expectedErr: errStepTooSmall, }, + { + url: "/api/v1/query?query=up%5B30d%3A%5D&start=123&end=456&step=10", + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000), + }, } { tc := tc t.Run(tc.url, func(t *testing.T) { diff --git a/pkg/querier/tripperware/queryrange/step_align.go b/pkg/querier/tripperware/queryrange/step_align.go index f5a4108f626..469e5162b53 100644 --- a/pkg/querier/tripperware/queryrange/step_align.go +++ b/pkg/querier/tripperware/queryrange/step_align.go @@ -7,7 +7,7 @@ import ( ) // StepAlignMiddleware aligns the start and end of request to the step to -// improved the cacheability of the query results. +// improve the cacheability of the query results. var StepAlignMiddleware = tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler { return stepAlign{ next: next, diff --git a/pkg/querier/tripperware/subquery.go b/pkg/querier/tripperware/subquery.go new file mode 100644 index 00000000000..83443e2da51 --- /dev/null +++ b/pkg/querier/tripperware/subquery.go @@ -0,0 +1,43 @@ +package tripperware + +import ( + "net/http" + "time" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" +) + +var ( + ErrSubQueryStepTooSmall = "exceeded maximum resolution of %d points per timeseries in subquery. Try decreasing the step size of your subquery" +) + +const ( + MaxStep = 11000 +) + +// SubQueryStepSizeCheck ensures the query doesn't contain too small step size in subqueries. +func SubQueryStepSizeCheck(query string, defaultSubQueryInterval time.Duration, maxStep int64) error { + expr, err := parser.ParseExpr(query) + if err != nil { + // If query fails to parse, we don't throw step size error + // but fail query later on querier. + return nil + } + parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { + e, ok := node.(*parser.SubqueryExpr) + if !ok { + return nil + } + step := e.Step + if e.Step == 0 { + step = defaultSubQueryInterval + } + + if e.Range/step > time.Duration(maxStep) { + err = httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, maxStep) + } + return err + }) + return err +} diff --git a/pkg/querier/tripperware/subquery_test.go b/pkg/querier/tripperware/subquery_test.go new file mode 100644 index 00000000000..d72b30db035 --- /dev/null +++ b/pkg/querier/tripperware/subquery_test.go @@ -0,0 +1,59 @@ +package tripperware + +import ( + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" +) + +func TestSubQueryStepSizeCheck(t *testing.T) { + t.Parallel() + for _, tc := range []struct { + name string + query string + defaultStep time.Duration + err error + maxStep int64 + }{ + { + name: "invalid query", + query: "sum(up", + }, + { + name: "no subquery", + query: "up", + }, + { + name: "valid subquery and within step limit", + query: "up[60m:1m]", + maxStep: 100, + }, + { + name: "valid subquery, not within step limit", + query: "up[60m:1m]", + maxStep: 10, + err: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 10), + }, + { + name: "subquery with no step size defined, use default step and pass", + query: "up[60m:]", + maxStep: 100, + defaultStep: time.Minute, + }, + { + name: "subquery with no step size defined, use default step and fail", + query: "up[60m:]", + maxStep: 100, + defaultStep: time.Second, + err: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 100), + }, + } { + t.Run(tc.name, func(t *testing.T) { + err := SubQueryStepSizeCheck(tc.query, tc.defaultStep, tc.maxStep) + require.Equal(t, tc.err, err) + }) + } +} From ed358bd752b0ef513b3deaa84c5f0ff14db18853 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 5 May 2023 00:59:49 -0700 Subject: [PATCH 2/7] more test cases Signed-off-by: Ben Ye --- pkg/querier/tripperware/subquery_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/querier/tripperware/subquery_test.go b/pkg/querier/tripperware/subquery_test.go index d72b30db035..301d36deaf0 100644 --- a/pkg/querier/tripperware/subquery_test.go +++ b/pkg/querier/tripperware/subquery_test.go @@ -50,6 +50,13 @@ func TestSubQueryStepSizeCheck(t *testing.T) { defaultStep: time.Second, err: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 100), }, + { + name: "two subqueries within functions", + query: "sum_over_time(up[60m:]) + avg_over_time(test[5m:1m])", + maxStep: 10, + defaultStep: time.Second, + err: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 10), + }, } { t.Run(tc.name, func(t *testing.T) { err := SubQueryStepSizeCheck(tc.query, tc.defaultStep, tc.maxStep) From d9bac8b7f39f3c80ef2643eb8324b1222b729b67 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 5 May 2023 01:01:02 -0700 Subject: [PATCH 3/7] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b5aab2113c..4221e514008 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289 * [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request. * [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292 +* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 From 087ad53a425ab28a71a738a2b441a91fe2e7f6dc Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 5 May 2023 12:18:01 -0700 Subject: [PATCH 4/7] try again Signed-off-by: Ben Ye --- pkg/cortex/modules.go | 3 +- .../tripperware/instantquery/instant_query.go | 6 ++- .../instantquery/instant_query_middlewares.go | 2 +- .../instantquery/instant_query_test.go | 8 ---- .../tripperware/queryrange/query_range.go | 2 +- .../queryrange/query_range_middlewares.go | 4 +- .../query_range_middlewares_test.go | 1 + pkg/querier/tripperware/roundtrip.go | 10 ++++- pkg/querier/tripperware/roundtrip_test.go | 40 ++++++++++++++----- 9 files changed, 48 insertions(+), 28 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f67bfd386be..8e525419931 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -455,7 +455,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval) - instantQueryCodec := instantquery.NewInstantQueryCodec(defaultSubQueryInterval) + instantQueryCodec := instantquery.NewInstantQueryCodec() queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, @@ -486,6 +486,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro instantQueryCodec, t.Overrides, queryAnalyzer, + defaultSubQueryInterval, ) return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index e83e84ff4f2..a45da6dbbb3 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -31,6 +31,8 @@ import ( ) var ( + InstantQueryCodec tripperware.Codec = newInstantQueryCodec() + json = jsoniter.Config{ EscapeHTML: false, // No HTML in our responses. SortMapKeys: true, @@ -110,8 +112,8 @@ type instantQueryCodec struct { noStepSubQueryInterval time.Duration } -func NewInstantQueryCodec(noStepSubQueryInterval time.Duration) instantQueryCodec { - return instantQueryCodec{now: time.Now, noStepSubQueryInterval: noStepSubQueryInterval} +func newInstantQueryCodec() instantQueryCodec { + return instantQueryCodec{now: time.Now} } func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string { diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index af8b659f796..01726278988 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -11,7 +11,7 @@ func Middlewares( log log.Logger, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, - codec instantQueryCodec, + codec tripperware.Codec, ) ([]tripperware.Middleware, error) { var m []tripperware.Middleware diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 12bbdc41d77..9dbc80d405c 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -19,10 +19,6 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" ) -var ( - InstantQueryCodec = NewInstantQueryCodec(time.Minute) -) - func TestRequest(t *testing.T) { t.Parallel() now := time.Now() @@ -73,10 +69,6 @@ func TestRequest(t *testing.T) { }, }, }, - { - url: "/api/v1/query?query=up%5B30d%3A%5D", - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tripperware.ErrSubQueryStepTooSmall, 11000), - }, } { tc := tc t.Run(tc.url, func(t *testing.T) { diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 4be3f4c7748..704049e66a2 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -50,7 +50,7 @@ type prometheusCodec struct { noStepSubQueryInterval time.Duration } -func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { +func NewPrometheusCodec(sharded bool, noStepSubQueryInterval time.Duration) *prometheusCodec { //nolint:revive return &prometheusCodec{ sharded: sharded, noStepSubQueryInterval: noStepSubQueryInterval, diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 5966e599881..3cc14cbb54e 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -78,8 +78,8 @@ func Middlewares( registerer prometheus.Registerer, cacheGenNumberLoader CacheGenNumberLoader, queryAnalyzer querysharding.Analyzer, - prometheusCodec *prometheusCodec, - shardedPrometheusCodec *prometheusCodec, + prometheusCodec tripperware.Codec, + shardedPrometheusCodec tripperware.Codec, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index d4ff07160eb..41821695230 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -73,6 +73,7 @@ func TestRoundTrip(t *testing.T) { nil, nil, qa, + time.Minute, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 39f30a116b9..ecc6ae52215 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -102,6 +102,7 @@ func NewQueryTripperware( instantQueryCodec Codec, limits Limits, queryAnalyzer querysharding.Analyzer, + defaultSubQueryInterval time.Duration, ) Tripperware { // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ @@ -144,13 +145,18 @@ func NewQueryTripperware( if isQueryRange { return queryrange.RoundTrip(r) } else if isQuery { + // If the given query is not shardable, use downstream roundtripper. + query := r.FormValue("query") + // Check subquery step size. + if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil { + return nil, err + } + // If vertical sharding is not enabled for the tenant, use downstream roundtripper. numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) if numShards <= 1 { return next.RoundTrip(r) } - // If the given query is not shardable, use downstream roundtripper. - query := r.FormValue("query") analysis, err := queryAnalyzer.Analyze(query) if err != nil || !analysis.IsShardable() { return next.RoundTrip(r) diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 07a7bbf906c..9b787ad0cf8 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -3,11 +3,13 @@ package tripperware import ( "bytes" "context" + "github.com/weaveworks/common/httpgrpc" "io" "net/http" "net/http/httptest" "net/url" "testing" + "time" "github.com/go-kit/log" "github.com/stretchr/testify/require" @@ -19,11 +21,13 @@ import ( ) const ( - queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" - query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680" - queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss&start=1536673680" - queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'" - responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` + queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" + query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680" + queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss&start=1536673680" + queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'" + querySubqueryStepSizeTooSmall = "/api/v1/query?query=up%5B30d%3A%5D" + + responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` ) type mockRequest struct { @@ -103,6 +107,7 @@ func TestRoundTrip(t *testing.T) { require.NoError(t, err) for _, tc := range []struct { path, expectedBody string + expectedErr error limits Limits }{ { @@ -140,8 +145,16 @@ func TestRoundTrip(t *testing.T) { expectedBody: responseBody, limits: shardingOverrides, }, + { + path: querySubqueryStepSizeTooSmall, + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 11000), + limits: defaultOverrides, + }, } { t.Run(tc.path, func(t *testing.T) { + if tc.path != querySubqueryStepSizeTooSmall { + return + } //parallel testing causes data race req, err := http.NewRequest("GET", tc.path, http.NoBody) require.NoError(t, err) @@ -163,14 +176,19 @@ func TestRoundTrip(t *testing.T) { mockCodec{}, tc.limits, querysharding.NewQueryAnalyzer(), + time.Minute, ) resp, err := tw(downstream).RoundTrip(req) - require.NoError(t, err) - require.Equal(t, 200, resp.StatusCode) - - bs, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.Equal(t, tc.expectedBody, string(bs)) + if tc.expectedErr == nil { + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + bs, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedBody, string(bs)) + } else { + require.Equal(t, tc.expectedErr, err) + } }) } } From 16e4c84930ca0b08f2669610f02f8e4f7170b64f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 5 May 2023 12:24:45 -0700 Subject: [PATCH 5/7] update again Signed-off-by: Ben Ye --- pkg/cortex/modules.go | 5 ++--- .../tripperware/instantquery/instant_query_middlewares.go | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 8e525419931..772af21d156 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -455,7 +455,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec := queryrange.NewPrometheusCodec(false, defaultSubQueryInterval) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, defaultSubQueryInterval) - instantQueryCodec := instantquery.NewInstantQueryCodec() queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, @@ -472,7 +471,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, instantQueryCodec) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer) if err != nil { return nil, err } @@ -483,7 +482,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryRangeMiddlewares, instantQueryMiddlewares, prometheusCodec, - instantQueryCodec, + instantquery.InstantQueryCodec, t.Overrides, queryAnalyzer, defaultSubQueryInterval, diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index 01726278988..b88515e6be0 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -11,10 +11,9 @@ func Middlewares( log log.Logger, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, - codec tripperware.Codec, ) ([]tripperware.Middleware, error) { var m []tripperware.Middleware - m = append(m, tripperware.ShardByMiddleware(log, limits, codec, queryAnalyzer)) + m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer)) return m, nil } From 5fb45926b2c5c9cdd9ff5c38be273f4acd533682 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 5 May 2023 13:31:55 -0700 Subject: [PATCH 6/7] fix test Signed-off-by: Ben Ye --- integration/query_frontend_test.go | 7 +++---- pkg/querier/tripperware/roundtrip.go | 2 +- pkg/querier/tripperware/roundtrip_test.go | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 8f71382e6fd..67d6b5f8c5c 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -7,6 +7,7 @@ import ( "crypto/x509" "crypto/x509/pkix" "fmt" + "net/http" "os" "path/filepath" "strconv" @@ -350,10 +351,8 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on subquery step size. if userID == 0 && cfg.testSubQueryStepSize { - _, err := c.Query(`up[30d:1m]`, now) - apiErr, ok := err.(*v1.Error) - require.True(t, ok) - require.Equal(t, apiErr.Type, v1.ErrBadData) + resp, _, _ := c.QueryRaw(`up[30d:1m]`, now) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) } // In this test we do ensure that the /series start/end time is ignored and Cortex diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index ecc6ae52215..3aa6b1313f1 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -151,7 +151,7 @@ func NewQueryTripperware( if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, MaxStep); err != nil { return nil, err } - + // If vertical sharding is not enabled for the tenant, use downstream roundtripper. numShards := validation.SmallestPositiveIntPerTenant(tenantIDs, limits.QueryVerticalShardSize) if numShards <= 1 { diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 9b787ad0cf8..8497491ba6f 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -3,7 +3,6 @@ package tripperware import ( "bytes" "context" - "github.com/weaveworks/common/httpgrpc" "io" "net/http" "net/http/httptest" @@ -14,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/querysharding" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/util/flagext" From d4c7a4649be264e91e476a2ecce8ebee3bb364c7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 8 May 2023 15:24:46 -0700 Subject: [PATCH 7/7] address review comments Signed-off-by: Ben Ye --- pkg/querier/tripperware/subquery.go | 2 +- pkg/querier/tripperware/subquery_test.go | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/querier/tripperware/subquery.go b/pkg/querier/tripperware/subquery.go index 83443e2da51..cebce45f261 100644 --- a/pkg/querier/tripperware/subquery.go +++ b/pkg/querier/tripperware/subquery.go @@ -9,7 +9,7 @@ import ( ) var ( - ErrSubQueryStepTooSmall = "exceeded maximum resolution of %d points per timeseries in subquery. Try decreasing the step size of your subquery" + ErrSubQueryStepTooSmall = "exceeded maximum resolution of %d points per timeseries in subquery. Try increasing the step size of your subquery" ) const ( diff --git a/pkg/querier/tripperware/subquery_test.go b/pkg/querier/tripperware/subquery_test.go index 301d36deaf0..c0ccece43ff 100644 --- a/pkg/querier/tripperware/subquery_test.go +++ b/pkg/querier/tripperware/subquery_test.go @@ -51,12 +51,18 @@ func TestSubQueryStepSizeCheck(t *testing.T) { err: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 100), }, { - name: "two subqueries within functions", + name: "two subqueries within functions, one exceeds the limit while another is not", query: "sum_over_time(up[60m:]) + avg_over_time(test[5m:1m])", maxStep: 10, defaultStep: time.Second, err: httpgrpc.Errorf(http.StatusBadRequest, ErrSubQueryStepTooSmall, 10), }, + { + name: "two subqueries within functions, all within the limit", + query: "sum_over_time(up[60m:]) + avg_over_time(test[5m:1m])", + maxStep: 100, + defaultStep: time.Minute, + }, } { t.Run(tc.name, func(t *testing.T) { err := SubQueryStepSizeCheck(tc.query, tc.defaultStep, tc.maxStep)