From e3f869b17e6beac96103a46be2c6f00c7872fefc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Wed, 13 Sep 2023 10:30:50 -0700 Subject: [PATCH] Add retries for instant query MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- CHANGELOG.md | 3 + docs/blocks-storage/querier.md | 5 + docs/blocks-storage/store-gateway.md | 5 + docs/configuration/config-file-reference.md | 5 + pkg/cortex/modules.go | 4 +- pkg/querier/blocks_store_queryable.go | 6 + pkg/querier/blocks_store_queryable_test.go | 80 ++++++++++++ .../instantquery/instant_query_middlewares.go | 6 + .../instant_query_middlewares_test.go | 118 ++++++++++++++++++ .../tripperware/queryrange/limits_test.go | 30 +---- .../queryrange/query_range_middlewares.go | 3 +- .../query_range_middlewares_test.go | 3 +- .../queryrange/results_cache_test.go | 16 +-- .../queryrange/split_by_interval_test.go | 2 +- .../tripperware/test_shard_by_query_utils.go | 30 ++--- pkg/storage/tsdb/config.go | 2 + pkg/storegateway/bucket_stores.go | 35 ++++++ pkg/storegateway/bucket_stores_test.go | 42 +++++++ pkg/storegateway/gateway.go | 20 +++ 19 files changed, 360 insertions(+), 55 deletions(-) create mode 100644 pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6be45a15e6c..36c03226533 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [FEATURE] Ruler: Support for filtering rules in the API. #5417 * [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432 * [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496 +* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests`for store gateways to reject further requests upon reaching the limit. #5553 * [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319 * [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292 * [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323 @@ -59,6 +60,8 @@ * [ENHANCEMENT] Store Gateway: add metric `cortex_bucket_store_chunk_refetches_total` for number of chunk refetches. #5532 * [ENHANCEMENT] BasicLifeCycler: allow final-sleep during shutdown #5517 * [ENHANCEMENT] All: Handling CMK Access Denied errors. #5420 #5542 +* [ENHANCEMENT] Querier: Retry store gateway client connection closing gRPC error. #5558 +* [ENHANCEMENT] Query Frontend: Add retries for instant query. #5560 * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 * [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 * [BUGFIX] Alertmanager: Route web-ui requests to the alertmanager distributor when sharding is enabled. #5293 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 01361c8e5c2..76ecf8a1795 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -499,6 +499,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. + # The limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index e210238508b..d407806542d 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -602,6 +602,11 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. + # The limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 817d89ed462..0fb7777c300 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1042,6 +1042,11 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.max-concurrent [max_concurrent: | default = 100] + # Max number of inflight queries to execute against the long-term storage. The + # limit is shared across all tenants. 0 to disable. + # CLI flag: -blocks-storage.bucket-store.max-inflight-requests + [max_inflight_requests: | default = 0] + # Maximum number of concurrent tenants synching blocks. # CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency [tenant_sync_concurrency: | default = 10] diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 02817b6468b..28496167a28 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -455,6 +455,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec := queryrange.NewPrometheusCodec(false) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) shardedPrometheusCodec := queryrange.NewPrometheusCodec(true) + retryMiddlewareMetrics := queryrange.NewRetryMiddlewareMetrics(prometheus.DefaultRegisterer) queryRangeMiddlewares, cache, err := queryrange.Middlewares( t.Cfg.QueryRange, @@ -466,12 +467,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryAnalyzer, prometheusCodec, shardedPrometheusCodec, + retryMiddlewareMetrics, ) if err != nil { return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, retryMiddlewareMetrics, t.Cfg.QueryRange.MaxRetries, queryAnalyzer) if err != nil { return nil, err } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c78d8952362..5729a82a176 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -1116,6 +1116,12 @@ func isRetryableError(err error) bool { switch status.Code(err) { case codes.Unavailable: return true + case codes.ResourceExhausted: + return errors.Is(err, storegateway.ErrTooManyInflightRequests) + // Client side connection closing, this error happens during store gateway deployment. + // https://github.com/grpc/grpc-go/blob/03172006f5d168fc646d87928d85cb9c4a480291/clientconn.go#L67 + case codes.Canceled: + return strings.Contains(err.Error(), "grpc: the client connection is closing") default: return false } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 26935a1f397..a01d4f28937 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" @@ -638,6 +639,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them fails to return due to clientconn closing": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.Canceled, "grpc: the client connection is closing"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, "all store-gateways return PermissionDenied": { finderResult: bucketindex.Blocks{ {ID: block1}, @@ -708,6 +738,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them had too many inflight requests": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: storegateway.ErrTooManyInflightRequests, + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, + "store gateway returns resource exhausted error other than max inflight request": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"), + }, } for testName, testData := range tests { diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index b88515e6be0..f46770c574d 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -5,15 +5,21 @@ import ( "github.com/thanos-io/thanos/pkg/querysharding" "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" ) func Middlewares( log log.Logger, limits tripperware.Limits, + retryMiddlewareMetrics *queryrange.RetryMiddlewareMetrics, + maxRetries int, queryAnalyzer querysharding.Analyzer, ) ([]tripperware.Middleware, error) { var m []tripperware.Middleware + if maxRetries > 0 { + m = append(m, queryrange.NewRetryMiddleware(log, maxRetries, retryMiddlewareMetrics)) + } m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer)) return m, nil } diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go new file mode 100644 index 00000000000..54163e8b26a --- /dev/null +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go @@ -0,0 +1,118 @@ +package instantquery + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/querysharding" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" +) + +var ( + query = "/api/v1/query?time=1536716898&query=sum by (label) (up)&stats=all" + responseBody = `{"status":"success","data":{"resultType":"vector","result":[]}}` +) + +func TestRoundTrip(t *testing.T) { + t.Parallel() + var try atomic.Int32 + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + if try.Inc() > 2 { + _, err = w.Write([]byte(responseBody)) + } else { + http.Error(w, `{"status":"error"}`, http.StatusInternalServerError) + } + if err != nil { + t.Fatal(err) + } + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + downstream := singleHostRoundTripper{ + host: u.Host, + next: http.DefaultTransport, + } + limits := tripperware.MockLimits{ + ShardSize: 2, + } + qa := querysharding.NewQueryAnalyzer() + instantQueryMiddlewares, err := Middlewares( + log.NewNopLogger(), + limits, + nil, + 3, + qa) + require.NoError(t, err) + + tw := tripperware.NewQueryTripperware( + log.NewNopLogger(), + nil, + nil, + nil, + instantQueryMiddlewares, + nil, + InstantQueryCodec, + limits, + qa, + time.Minute, + ) + + for i, tc := range []struct { + path, expectedBody string + }{ + {query, responseBody}, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + //parallel testing causes data race + req, err := http.NewRequest("GET", tc.path, http.NoBody) + require.NoError(t, err) + + // query-frontend doesn't actually authenticate requests, we rely on + // the queriers to do this. Hence we ensure the request doesn't have a + // org ID in the ctx, but does have the header. + ctx := user.InjectOrgID(context.Background(), "1") + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + resp, err := tw(downstream).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + bs, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedBody, string(bs)) + }) + } +} + +type singleHostRoundTripper struct { + host string + next http.RoundTripper +} + +func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r.URL.Scheme = "http" + r.URL.Host = s.host + return s.next.RoundTrip(r) +} diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 1569ea2e3af..1ab044010ea 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -75,7 +75,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) { End: util.TimeToMillis(testData.reqEndTime), } - limits := mockLimits{maxQueryLookback: testData.maxQueryLookback} + limits := tripperware.MockLimits{QueryLookback: testData.maxQueryLookback} middleware := NewLimitsMiddleware(limits) innerRes := NewEmptyPrometheusResponse() @@ -163,7 +163,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { End: util.TimeToMillis(testData.reqEndTime), } - limits := mockLimits{maxQueryLength: testData.maxQueryLength} + limits := tripperware.MockLimits{QueryLength: testData.maxQueryLength} middleware := NewLimitsMiddleware(limits) innerRes := NewEmptyPrometheusResponse() @@ -193,32 +193,6 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { } } -type mockLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration - maxCacheFreshness time.Duration -} - -func (m mockLimits) MaxQueryLookback(string) time.Duration { - return m.maxQueryLookback -} - -func (m mockLimits) MaxQueryLength(string) time.Duration { - return m.maxQueryLength -} - -func (mockLimits) MaxQueryParallelism(string) int { - return 14 // Flag default. -} - -func (m mockLimits) MaxCacheFreshness(string) time.Duration { - return m.maxCacheFreshness -} - -func (m mockLimits) QueryVerticalShardSize(userID string) int { - return 0 -} - type mockHandler struct { mock.Mock } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 3cc14cbb54e..cd3f0cd0183 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -80,6 +80,7 @@ func Middlewares( queryAnalyzer querysharding.Analyzer, prometheusCodec tripperware.Codec, shardedPrometheusCodec tripperware.Codec, + retryMiddlewareMetrics *RetryMiddlewareMetrics, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) @@ -110,7 +111,7 @@ func Middlewares( } if cfg.MaxRetries > 0 { - queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, NewRetryMiddlewareMetrics(registerer))) + queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("retry", metrics), NewRetryMiddleware(log, cfg.MaxRetries, retryMiddlewareMetrics)) } queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer)) diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 5ade2abd522..0127f9f025e 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -54,13 +54,14 @@ func TestRoundTrip(t *testing.T) { qa := querysharding.NewQueryAnalyzer() queyrangemiddlewares, _, err := Middlewares(Config{}, log.NewNopLogger(), - mockLimits{}, + tripperware.MockLimits{}, nil, nil, nil, qa, PrometheusCodec, ShardedPrometheusCodec, + NewRetryMiddlewareMetrics(nil), ) require.NoError(t, err) diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index b7ba0d5f977..d0d5d621c77 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -209,7 +209,7 @@ func TestStatsCacheQuerySamples(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{}, + tripperware.MockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -974,7 +974,7 @@ func TestHandleHit(t *testing.T) { sut := resultsCache{ extractor: PrometheusResponseExtractor{}, minCacheExtent: 10, - limits: mockLimits{}, + limits: tripperware.MockLimits{}, merger: PrometheusCodec, next: tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { return mkAPIResponse(req.GetStart(), req.GetEnd(), req.GetStep()), nil @@ -1004,7 +1004,7 @@ func TestResultsCache(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{}, + tripperware.MockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -1046,7 +1046,7 @@ func TestResultsCacheRecent(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{maxCacheFreshness: 10 * time.Minute}, + tripperware.MockLimits{CacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -1087,13 +1087,13 @@ func TestResultsCacheMaxFreshness(t *testing.T) { expectedResponse *PrometheusResponse }{ { - fakeLimits: mockLimits{maxCacheFreshness: 5 * time.Second}, + fakeLimits: tripperware.MockLimits{CacheFreshness: 5 * time.Second}, Handler: nil, expectedResponse: mkAPIResponse(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3), 10), }, { // should not lookup cache because per-tenant override will be applied - fakeLimits: mockLimits{maxCacheFreshness: 10 * time.Minute}, + fakeLimits: tripperware.MockLimits{CacheFreshness: 10 * time.Minute}, Handler: tripperware.HandlerFunc(func(_ context.Context, _ tripperware.Request) (tripperware.Response, error) { return parsedResponse, nil }), @@ -1150,7 +1150,7 @@ func Test_resultsCache_MissingData(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{}, + tripperware.MockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, nil, @@ -1263,7 +1263,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { log.NewNopLogger(), cfg, constSplitter(day), - mockLimits{maxCacheFreshness: 10 * time.Minute}, + tripperware.MockLimits{CacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, nil, diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 564263216a4..bcbe0bb91e9 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -309,7 +309,7 @@ func TestSplitByDay(t *testing.T) { roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{ host: u.Host, next: http.DefaultTransport, - }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil)) + }, PrometheusCodec, nil, NewLimitsMiddleware(tripperware.MockLimits{}), SplitByIntervalMiddleware(interval, tripperware.MockLimits{}, PrometheusCodec, nil)) req, err := http.NewRequest("GET", tc.path, http.NoBody) require.NoError(t, err) diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index 657d7daa3a1..cac16d6b993 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -441,7 +441,7 @@ http_requests_total`, } qa := thanosquerysharding.NewQueryAnalyzer() - roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), mockLimits{shardSize: tt.shardSize}, tt.codec, qa)) + roundtripper := NewRoundTripper(downstream, tt.codec, nil, ShardByMiddleware(log.NewNopLogger(), MockLimits{ShardSize: tt.shardSize}, tt.codec, qa)) ctx := user.InjectOrgID(context.Background(), "1") @@ -461,31 +461,31 @@ http_requests_total`, } } -type mockLimits struct { - maxQueryLookback time.Duration - maxQueryLength time.Duration - maxCacheFreshness time.Duration - shardSize int +type MockLimits struct { + QueryLookback time.Duration + QueryLength time.Duration + CacheFreshness time.Duration + ShardSize int } -func (m mockLimits) MaxQueryLookback(string) time.Duration { - return m.maxQueryLookback +func (m MockLimits) MaxQueryLookback(string) time.Duration { + return m.QueryLookback } -func (m mockLimits) MaxQueryLength(string) time.Duration { - return m.maxQueryLength +func (m MockLimits) MaxQueryLength(string) time.Duration { + return m.QueryLength } -func (mockLimits) MaxQueryParallelism(string) int { +func (MockLimits) MaxQueryParallelism(string) int { return 14 // Flag default. } -func (m mockLimits) MaxCacheFreshness(string) time.Duration { - return m.maxCacheFreshness +func (m MockLimits) MaxCacheFreshness(string) time.Duration { + return m.CacheFreshness } -func (m mockLimits) QueryVerticalShardSize(userID string) int { - return m.shardSize +func (m MockLimits) QueryVerticalShardSize(userID string) int { + return m.ShardSize } type singleHostRoundTripper struct { diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index d28901d3512..e8af5e1c41f 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -241,6 +241,7 @@ type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` SyncInterval time.Duration `yaml:"sync_interval"` MaxConcurrent int `yaml:"max_concurrent"` + MaxInflightRequests int `yaml:"max_inflight_requests"` TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"` BlockSyncConcurrency int `yaml:"block_sync_concurrency"` MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` @@ -294,6 +295,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.") f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.") f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.") + f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.") f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.") f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.") f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.") diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index ef493ec65b1..d7c709c4ec1 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -31,6 +31,7 @@ import ( "github.com/weaveworks/common/logging" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" @@ -72,6 +73,10 @@ type BucketStores struct { storesErrorsMu sync.RWMutex storesErrors map[string]error + // Keeps number of inflight requests + inflightRequestCnt int + inflightRequestMu sync.RWMutex + // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -79,6 +84,8 @@ type BucketStores struct { tenantsSynced prometheus.Gauge } +var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") + // NewBucketStores makes a new BucketStores. func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) @@ -313,6 +320,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return nil } + maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests + if maxInflightRequests > 0 { + if u.getInflightRequestCnt() >= maxInflightRequests { + return ErrTooManyInflightRequests + } + + u.incrementInflightRequestCnt() + defer u.decrementInflightRequestCnt() + } + err = store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, @@ -321,6 +338,24 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return err } +func (u *BucketStores) getInflightRequestCnt() int { + u.inflightRequestMu.RLock() + defer u.inflightRequestMu.RUnlock() + return u.inflightRequestCnt +} + +func (u *BucketStores) incrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt++ + u.inflightRequestMu.Unlock() +} + +func (u *BucketStores) decrementInflightRequestCnt() { + u.inflightRequestMu.Lock() + u.inflightRequestCnt-- + u.inflightRequestMu.Unlock() +} + // LabelNames implements the Storegateway proto service. func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 7cb3188e745..1b9b4887687 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -514,6 +514,48 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t } } +func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.MaxInflightRequests = 10 + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 + stores.inflightRequestMu.Unlock() + series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) + assert.ErrorIs(t, err, ErrTooManyInflightRequests) + assert.Empty(t, series) + assert.Empty(t, warnings) +} + +func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) { + cfg := prepareStorageConfig(t) + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + stores.inflightRequestMu.Lock() + stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled + stores.inflightRequestMu.Unlock() + series, _, err := querySeries(stores, "user_id", "series_1", 0, 100) + require.NoError(t, err) + assert.Equal(t, 1, len(series)) +} + func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig { cfg := cortex_tsdb.BlocksStorageConfig{} flagext.DefaultValues(&cfg) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index cdf4930d8f9..fe99a32fa13 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -39,6 +39,10 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + instanceLimitsMetric = "cortex_storegateway_instance_limits" + instanceLimitsMetricHelp = "Instance limits used by this store gateway." + limitLabel = "limit" ) var ( @@ -142,6 +146,22 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf g.bucketSync.WithLabelValues(syncReasonPeriodic) g.bucketSync.WithLabelValues(syncReasonRingChange) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_inflight_requests"}, + }).Set(float64(storageCfg.BucketStore.MaxInflightRequests)) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_concurrent"}, + }).Set(float64(storageCfg.BucketStore.MaxConcurrent)) + promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: instanceLimitsMetric, + Help: instanceLimitsMetricHelp, + ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"}, + }).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes)) + // Init sharding strategy. var shardingStrategy ShardingStrategy