diff --git a/CHANGELOG.md b/CHANGELOG.md index d7904e5140a..543a3d6ee51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ * [FEATURE] Distributor: Added a new limit `-validation.max-labels-size-bytes` allowing to limit the combined size of labels for each timeseries. #4848 * [FEATURE] Storage/Bucket: Added `-*.s3.bucket-lookup-type` allowing to configure the s3 bucket lookup type. #4794 * [FEATURE] QueryFrontend: Implement experimental vertical sharding at query frontend for range/instant queries. #4863 +* [FEATURE] Querier: Added a new limit `-querier.max-fetched-data-bytes-per-query` allowing to limit the maximum size of all data in bytes that a query can fetch from each ingester and storage. #4854 * [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804 * [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields. #4767 * [BUGFIX] Respecting `-tracing.otel.sample-ratio` configuration when enabling OpenTelemetry tracing with X-ray. #4862 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 26c1c92cb75..1097d00a4dd 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2673,12 +2673,20 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -querier.max-fetched-series-per-query [max_fetched_series_per_query: | default = 0] -# The maximum size of all chunks in bytes that a query can fetch from each -# ingester and storage. This limit is enforced in the querier and ruler only -# when running Cortex with blocks storage. 0 to disable. +# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size +# of all chunks in bytes that a query can fetch from each ingester and storage. +# This limit is enforced in the querier and ruler only when running Cortex with +# blocks storage. 0 to disable. # CLI flag: -querier.max-fetched-chunk-bytes-per-query [max_fetched_chunk_bytes_per_query: | default = 0] +# The maximum combined size of all data that a query can fetch from each +# ingester and storage. This limit is only applied for `query`, `query_range` +# and `series` APIs. This limit is enforced in the querier and ruler only when +# running Cortex with blocks storage. 0 to disable. +# CLI flag: -querier.max-fetched-data-bytes-per-query +[max_fetched_data_bytes_per_query: | default = 0] + # Limit how long back data (series and metadata) can be queried, up until # duration ago. This limit is enforced in the query-frontend, querier # and ruler. If the requested time range is outside the allowed range, the diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index a01ab3f3db1..f7577498b4d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1027,6 +1027,9 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through if err != nil { return nil, err } + if err := queryLimiter.AddDataBytes(resp.Size()); err != nil { + return nil, err + } ms := ingester_client.FromMetricsForLabelMatchersResponse(resp) for _, m := range ms { if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil { @@ -1055,6 +1058,9 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t defer stream.CloseSend() //nolint:errcheck for { resp, err := stream.Recv() + if err := queryLimiter.AddDataBytes(resp.Size()); err != nil { + return nil, err + } if err == io.EOF { break diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 9949889d087..1cb60b1ed09 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1032,7 +1032,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac limits: limits, }) - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, 0)) // Push a number of series below the max chunks limit. Each series has 1 sample, // so expect 1 chunk per series when querying back. @@ -1077,7 +1077,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac ctx := user.InjectOrgID(context.Background(), "user") limits := &validation.Limits{} flagext.DefaultValues(limits) - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0)) // Prepare distributors. ds, _, _, _ := prepare(t, prepConfig{ @@ -1161,7 +1161,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs var maxBytesLimit = (seriesToAdd) * responseChunkSize // Update the limiter with the calculated limits. - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0)) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0, 0)) // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. writeReq = makeWriteRequest(0, seriesToAdd-1, 0) @@ -1192,6 +1192,75 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit))) } +func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsReached(t *testing.T) { + const seriesToAdd = 10 + + ctx := user.InjectOrgID(context.Background(), "user") + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + // Prepare distributors. + // Use replication factor of 2 to always read all the chunks from both ingesters, + // this guarantees us to always read the same chunks and have a stable test. + ds, _, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: limits, + replicationFactor: 2, + }) + + allSeriesMatchers := []*labels.Matcher{ + labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"), + } + // Push a single series to allow us to calculate the label size to calculate the limit for the test. + writeReq := &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0), + ) + writeRes, err := ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + dataSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + + // Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size. + var dataSize = dataSizeResponse.Size() + var maxBytesLimit = (seriesToAdd) * dataSize * 2 // Multiplying by RF because the limit is applied before de-duping. + + // Update the limiter with the calculated limits. + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, maxBytesLimit)) + + // Push a number of series below the max chunk bytes limit. Subtract one for the series added above. + writeReq = makeWriteRequest(0, seriesToAdd-1, 0) + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + // Since the number of chunk bytes is equal to the limit (but doesn't + // exceed it), we expect a query running on all series to succeed. + queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.NoError(t, err) + assert.Len(t, queryRes.Chunkseries, seriesToAdd) + + // Push another series to exceed the chunk bytes limit once we'll query back all series. + writeReq = &cortexpb.WriteRequest{} + writeReq.Timeseries = append(writeReq.Timeseries, + makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0), + ) + + writeRes, err = ds[0].Push(ctx, writeReq) + assert.Equal(t, &cortexpb.WriteResponse{}, writeRes) + assert.Nil(t, err) + + // Since the aggregated chunk size is exceeding the limit, we expect + // a query running on all series to fail. + _, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...) + require.Error(t, err) + assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit))) +} + func TestDistributor_Push_LabelRemoval(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") @@ -1930,7 +1999,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { }, expectedResult: []metric.Metric{}, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), expectedErr: nil, }, "should filter metrics by single matcher": { @@ -1942,7 +2011,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[1].lbls)}, }, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), expectedErr: nil, }, "should filter metrics by multiple matchers": { @@ -1954,7 +2023,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[0].lbls)}, }, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), expectedErr: nil, }, "should return all matching metrics even if their FastFingerprint collide": { @@ -1966,7 +2035,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[4].lbls)}, }, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), expectedErr: nil, }, "should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": { @@ -1980,7 +2049,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[1].lbls)}, }, expectedIngesters: 3, - queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), expectedErr: nil, }, "should query all ingesters if shuffle sharding is enabled but shard size is 0": { @@ -1994,7 +2063,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[1].lbls)}, }, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(0, 0, 0), + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0), expectedErr: nil, }, "should return err if series limit is exhausted": { @@ -2005,9 +2074,20 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { }, expectedResult: nil, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(1, 0, 0), + queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), }, + "should return err if data bytes limit is exhausted": { + shuffleShardEnabled: true, + shuffleShardSize: 0, + matchers: []*labels.Matcher{ + mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"), + }, + expectedResult: nil, + expectedIngesters: numIngesters, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, 1)), + }, "should not exhaust series limit when only one series is fetched": { matchers: []*labels.Matcher{ mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"), @@ -2016,7 +2096,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { {Metric: util.LabelsToMetric(fixtures[2].lbls)}, }, expectedIngesters: numIngesters, - queryLimiter: limiter.NewQueryLimiter(1, 0, 0), + queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0), expectedErr: nil, }, } @@ -2116,7 +2196,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) { matchers: []*labels.Matcher{ mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"), }, - queryLimiter: limiter.NewQueryLimiter(100, 0, 0), + queryLimiter: limiter.NewQueryLimiter(100, 0, 0, 0), expectedErr: nil, }, } diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 0c2145cef8d..d41556e283c 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -335,6 +335,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri return nil, validation.LimitError(chunkBytesLimitErr.Error()) } + if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.Size()); dataBytesLimitErr != nil { + return nil, validation.LimitError(dataBytesLimitErr.Error()) + } + for _, series := range resp.Timeseries { if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil { return nil, validation.LimitError(limitErr.Error()) @@ -392,6 +396,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries))) reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize())) + reqStats.AddFetchedDataBytes(uint64(resp.Size())) return resp, nil } diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 37ed3a351d9..5eb406571f7 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -60,10 +60,11 @@ type Handler struct { roundTripper http.RoundTripper // Metrics. - querySeconds *prometheus.CounterVec - querySeries *prometheus.CounterVec - queryBytes *prometheus.CounterVec - activeUsers *util.ActiveUsersCleanupService + querySeconds *prometheus.CounterVec + querySeries *prometheus.CounterVec + queryBytes *prometheus.CounterVec + queryDataBytes *prometheus.CounterVec + activeUsers *util.ActiveUsersCleanupService } // NewHandler creates a new frontend handler. @@ -90,10 +91,16 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge Help: "Size of all chunks fetched to execute a query in bytes.", }, []string{"user"}) + h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_query_fetched_data_bytes_total", + Help: "Size of all data fetched to execute a query in bytes.", + }, []string{"user"}) + h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) h.querySeries.DeleteLabelValues(user) h.queryBytes.DeleteLabelValues(user) + h.queryDataBytes.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) @@ -186,11 +193,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer wallTime := stats.LoadWallTime() numSeries := stats.LoadFetchedSeries() numBytes := stats.LoadFetchedChunkBytes() + numDataBytes := stats.LoadFetchedDataBytes() // Track stats. f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) f.queryBytes.WithLabelValues(userID).Add(float64(numBytes)) + f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes)) f.activeUsers.UpdateUserTimestamp(userID, time.Now()) // Log stats. @@ -203,6 +212,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "query_wall_time_seconds", wallTime.Seconds(), "fetched_series_count", numSeries, "fetched_chunks_bytes", numBytes, + "fetched_data_bytes", numDataBytes, }, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index b7e7a954fca..95f69bda97b 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -658,12 +658,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } } chunksSize := countChunkBytes(s) + dataSize := countDataBytes(s) if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil { return validation.LimitError(chunkBytesLimitErr.Error()) } if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil { return validation.LimitError(chunkLimitErr.Error()) } + if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil { + return validation.LimitError(dataBytesLimitErr.Error()) + } } if w := resp.GetWarning(); w != "" { @@ -687,14 +691,17 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( numSeries := len(mySeries) chunkBytes := countChunkBytes(mySeries...) + dataBytes := countDataBytes(mySeries...) reqStats.AddFetchedSeries(uint64(numSeries)) reqStats.AddFetchedChunkBytes(uint64(chunkBytes)) + reqStats.AddFetchedDataBytes(uint64(dataBytes)) level.Debug(spanLog).Log("msg", "received series from store-gateway", "instance", c.RemoteAddress(), "fetched series", numSeries, "fetched chunk bytes", chunkBytes, + "fetched data bytes", dataBytes, "requested blocks", strings.Join(convertULIDsToString(blockIDs), " "), "queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " ")) @@ -1000,6 +1007,15 @@ func countChunkBytes(series ...*storepb.Series) (count int) { return count } +// countDataBytes returns the combined size of the all series +func countDataBytes(series ...*storepb.Series) (count int) { + for _, s := range series { + count += s.Size() + } + + return count +} + // only retry connection issues func isRetryableError(err error) bool { switch status.Code(err) { diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index dbf90d28b67..5d7f3923528 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -54,7 +54,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} series1Label = labels.Label{Name: "series", Value: "1"} series2Label = labels.Label{Name: "series", Value: "2"} - noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0) + noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0, 0) ) type valueResult struct { @@ -472,7 +472,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, - queryLimiter: limiter.NewQueryLimiter(0, 0, 1), + queryLimiter: limiter.NewQueryLimiter(0, 0, 1, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 1)), }, "max chunks per query limit hit while fetching chunks during subsequent attempts": { @@ -548,7 +548,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, - queryLimiter: limiter.NewQueryLimiter(0, 0, 3), + queryLimiter: limiter.NewQueryLimiter(0, 0, 3, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunksPerQueryLimit, 3)), }, "max series per query limit hit while fetching chunks": { @@ -566,7 +566,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{}, - queryLimiter: limiter.NewQueryLimiter(1, 0, 0), + queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)), }, "max chunk bytes per query limit hit while fetching chunks": { @@ -584,9 +584,27 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, - queryLimiter: limiter.NewQueryLimiter(0, 8, 0), + queryLimiter: limiter.NewQueryLimiter(0, 8, 0, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 8)), }, + "max data bytes per query limit hit while fetching chunks": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT+1, 2), + mockHintsResponse(block1, block2), + }}: {block1, block2}, + }, + }, + limits: &blocksStoreLimitsMock{maxChunksPerQuery: 1}, + queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1), + expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, 1)), + }, "multiple store-gateways has the block, but one of them fails to return": { finderResult: bucketindex.Blocks{ {ID: block1}, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 750ea69b007..399f162ee56 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -221,7 +221,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, return nil, err } - ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID))) + ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(limits.MaxFetchedSeriesPerQuery(userID), limits.MaxFetchedChunkBytesPerQuery(userID), limits.MaxChunksPerQuery(userID), limits.MaxFetchedDataBytesPerQuery(userID))) mint, maxt, err = validateQueryTimeRange(ctx, userID, mint, maxt, limits, cfg.MaxQueryIntoFuture) if err == errEmptyTimeRange { diff --git a/pkg/querier/stats/stats.go b/pkg/querier/stats/stats.go index 1a39b320696..57c0fc23f56 100644 --- a/pkg/querier/stats/stats.go +++ b/pkg/querier/stats/stats.go @@ -86,6 +86,22 @@ func (s *Stats) LoadFetchedChunkBytes() uint64 { return atomic.LoadUint64(&s.FetchedChunkBytes) } +func (s *Stats) AddFetchedDataBytes(bytes uint64) { + if s == nil { + return + } + + atomic.AddUint64(&s.FetchedDataBytes, bytes) +} + +func (s *Stats) LoadFetchedDataBytes() uint64 { + if s == nil { + return 0 + } + + return atomic.LoadUint64(&s.FetchedDataBytes) +} + // Merge the provide Stats into this one. func (s *Stats) Merge(other *Stats) { if s == nil || other == nil { @@ -95,6 +111,7 @@ func (s *Stats) Merge(other *Stats) { s.AddWallTime(other.LoadWallTime()) s.AddFetchedSeries(other.LoadFetchedSeries()) s.AddFetchedChunkBytes(other.LoadFetchedChunkBytes()) + s.AddFetchedDataBytes(other.LoadFetchedDataBytes()) } func ShouldTrackHTTPGRPCResponse(r *httpgrpc.HTTPResponse) bool { diff --git a/pkg/querier/stats/stats.pb.go b/pkg/querier/stats/stats.pb.go index 9fd4affc1f4..0f6ad5377bb 100644 --- a/pkg/querier/stats/stats.pb.go +++ b/pkg/querier/stats/stats.pb.go @@ -36,6 +36,8 @@ type Stats struct { FetchedSeriesCount uint64 `protobuf:"varint,2,opt,name=fetched_series_count,json=fetchedSeriesCount,proto3" json:"fetched_series_count,omitempty"` // The number of bytes of the chunks fetched for the query FetchedChunkBytes uint64 `protobuf:"varint,3,opt,name=fetched_chunk_bytes,json=fetchedChunkBytes,proto3" json:"fetched_chunk_bytes,omitempty"` + // The number of bytes of data fetched for the query + FetchedDataBytes uint64 `protobuf:"varint,4,opt,name=fetched_data_bytes,json=fetchedDataBytes,proto3" json:"fetched_data_bytes,omitempty"` } func (m *Stats) Reset() { *m = Stats{} } @@ -91,6 +93,13 @@ func (m *Stats) GetFetchedChunkBytes() uint64 { return 0 } +func (m *Stats) GetFetchedDataBytes() uint64 { + if m != nil { + return m.FetchedDataBytes + } + return 0 +} + func init() { proto.RegisterType((*Stats)(nil), "stats.Stats") } @@ -98,25 +107,26 @@ func init() { func init() { proto.RegisterFile("stats.proto", fileDescriptor_b4756a0aec8b9d44) } var fileDescriptor_b4756a0aec8b9d44 = []byte{ - // 281 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0xd0, 0xb1, 0x4e, 0x83, 0x40, - 0x1c, 0xc7, 0xf1, 0xfb, 0xab, 0x35, 0x95, 0x4e, 0xa2, 0x03, 0x76, 0xf8, 0xb7, 0x71, 0xea, 0xe2, - 0xd5, 0xe8, 0xe8, 0x62, 0xa8, 0x4f, 0xd0, 0x3a, 0xb9, 0x10, 0xa0, 0x57, 0x20, 0x02, 0x67, 0xe0, - 0x2e, 0xc6, 0xcd, 0x47, 0x70, 0xf4, 0x11, 0x4c, 0x7c, 0x91, 0x8e, 0x8c, 0x9d, 0x54, 0x8e, 0xc5, - 0xb1, 0x8f, 0x60, 0xee, 0xa0, 0x71, 0xe3, 0x97, 0x0f, 0xdf, 0x4b, 0xee, 0xac, 0x41, 0x29, 0x7c, - 0x51, 0xd2, 0xa7, 0x82, 0x0b, 0x6e, 0xf7, 0xcc, 0x18, 0x5e, 0x44, 0x89, 0x88, 0x65, 0x40, 0x43, - 0x9e, 0x4d, 0x23, 0x1e, 0xf1, 0xa9, 0xd1, 0x40, 0xae, 0xcc, 0x32, 0xc3, 0x7c, 0xb5, 0xd5, 0x10, - 0x23, 0xce, 0xa3, 0x94, 0xfd, 0xff, 0xb5, 0x94, 0x85, 0x2f, 0x12, 0x9e, 0xb7, 0x7e, 0xfe, 0x09, - 0x56, 0x6f, 0xa1, 0x0f, 0xb6, 0x6f, 0xad, 0xa3, 0x67, 0x3f, 0x4d, 0x3d, 0x91, 0x64, 0xcc, 0x81, - 0x31, 0x4c, 0x06, 0x57, 0x67, 0xb4, 0xad, 0xe9, 0xae, 0xa6, 0x77, 0x5d, 0xed, 0xf6, 0xd7, 0x5f, - 0x23, 0xf2, 0xfe, 0x3d, 0x82, 0x79, 0x5f, 0x57, 0xf7, 0x49, 0xc6, 0xec, 0x4b, 0xeb, 0x74, 0xc5, - 0x44, 0x18, 0xb3, 0xa5, 0x57, 0xb2, 0x22, 0x61, 0xa5, 0x17, 0x72, 0x99, 0x0b, 0x67, 0x6f, 0x0c, - 0x93, 0x83, 0xb9, 0xdd, 0xd9, 0xc2, 0xd0, 0x4c, 0x8b, 0x4d, 0xad, 0x93, 0x5d, 0x11, 0xc6, 0x32, - 0x7f, 0xf4, 0x82, 0x17, 0xc1, 0x4a, 0x67, 0xdf, 0x04, 0xc7, 0x1d, 0xcd, 0xb4, 0xb8, 0x1a, 0xdc, - 0x9b, 0xaa, 0x46, 0xb2, 0xa9, 0x91, 0x6c, 0x6b, 0x84, 0x57, 0x85, 0xf0, 0xa1, 0x10, 0xd6, 0x0a, - 0xa1, 0x52, 0x08, 0x3f, 0x0a, 0xe1, 0x57, 0x21, 0xd9, 0x2a, 0x84, 0xb7, 0x06, 0x49, 0xd5, 0x20, - 0xd9, 0x34, 0x48, 0x1e, 0xda, 0x97, 0x0b, 0x0e, 0xcd, 0x2d, 0xae, 0xff, 0x02, 0x00, 0x00, 0xff, - 0xff, 0x9d, 0xf1, 0x86, 0xb8, 0x56, 0x01, 0x00, 0x00, + // 300 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x90, 0x31, 0x4e, 0xc3, 0x30, + 0x14, 0x86, 0xfd, 0xa0, 0x45, 0xc5, 0x5d, 0xc0, 0x30, 0x94, 0x0e, 0xaf, 0x15, 0x53, 0x07, 0x70, + 0x11, 0x8c, 0x2c, 0xa8, 0xed, 0x09, 0x5a, 0x26, 0x96, 0xc8, 0x49, 0xdd, 0x34, 0xa2, 0x8d, 0x51, + 0xe2, 0x08, 0xb1, 0x71, 0x04, 0x46, 0x8e, 0xc0, 0x51, 0x3a, 0x66, 0x2c, 0x0b, 0x10, 0x67, 0x61, + 0xec, 0x11, 0x50, 0x9c, 0x44, 0x6c, 0xfe, 0xf5, 0xfd, 0xdf, 0x2f, 0xf9, 0xd1, 0x76, 0xac, 0x85, + 0x8e, 0xf9, 0x53, 0xa4, 0xb4, 0x62, 0x4d, 0x1b, 0xba, 0x97, 0x7e, 0xa0, 0x97, 0x89, 0xcb, 0x3d, + 0xb5, 0x1e, 0xfa, 0xca, 0x57, 0x43, 0x4b, 0xdd, 0x64, 0x61, 0x93, 0x0d, 0xf6, 0x55, 0x5a, 0x5d, + 0xf4, 0x95, 0xf2, 0x57, 0xf2, 0xbf, 0x35, 0x4f, 0x22, 0xa1, 0x03, 0x15, 0x96, 0xfc, 0xfc, 0x13, + 0x68, 0x73, 0x56, 0x0c, 0xb3, 0x3b, 0x7a, 0xf8, 0x2c, 0x56, 0x2b, 0x47, 0x07, 0x6b, 0xd9, 0x81, + 0x3e, 0x0c, 0xda, 0xd7, 0x67, 0xbc, 0xb4, 0x79, 0x6d, 0xf3, 0x49, 0x65, 0x8f, 0x5a, 0x9b, 0xaf, + 0x1e, 0x79, 0xff, 0xee, 0xc1, 0xb4, 0x55, 0x58, 0xf7, 0xc1, 0x5a, 0xb2, 0x2b, 0x7a, 0xba, 0x90, + 0xda, 0x5b, 0xca, 0xb9, 0x13, 0xcb, 0x28, 0x90, 0xb1, 0xe3, 0xa9, 0x24, 0xd4, 0x9d, 0xbd, 0x3e, + 0x0c, 0x1a, 0x53, 0x56, 0xb1, 0x99, 0x45, 0xe3, 0x82, 0x30, 0x4e, 0x4f, 0x6a, 0xc3, 0x5b, 0x26, + 0xe1, 0xa3, 0xe3, 0xbe, 0x68, 0x19, 0x77, 0xf6, 0xad, 0x70, 0x5c, 0xa1, 0x71, 0x41, 0x46, 0x05, + 0x60, 0x17, 0xb4, 0x5e, 0x71, 0xe6, 0x42, 0x8b, 0xaa, 0xde, 0xb0, 0xf5, 0xa3, 0x8a, 0x4c, 0x84, + 0x16, 0xb6, 0x3d, 0xba, 0x4d, 0x33, 0x24, 0xdb, 0x0c, 0xc9, 0x2e, 0x43, 0x78, 0x35, 0x08, 0x1f, + 0x06, 0x61, 0x63, 0x10, 0x52, 0x83, 0xf0, 0x63, 0x10, 0x7e, 0x0d, 0x92, 0x9d, 0x41, 0x78, 0xcb, + 0x91, 0xa4, 0x39, 0x92, 0x6d, 0x8e, 0xe4, 0xa1, 0xbc, 0xb3, 0x7b, 0x60, 0xff, 0x7c, 0xf3, 0x17, + 0x00, 0x00, 0xff, 0xff, 0xb7, 0x24, 0x3a, 0xa7, 0x84, 0x01, 0x00, 0x00, } func (this *Stats) Equal(that interface{}) bool { @@ -147,17 +157,21 @@ func (this *Stats) Equal(that interface{}) bool { if this.FetchedChunkBytes != that1.FetchedChunkBytes { return false } + if this.FetchedDataBytes != that1.FetchedDataBytes { + return false + } return true } func (this *Stats) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&stats.Stats{") s = append(s, "WallTime: "+fmt.Sprintf("%#v", this.WallTime)+",\n") s = append(s, "FetchedSeriesCount: "+fmt.Sprintf("%#v", this.FetchedSeriesCount)+",\n") s = append(s, "FetchedChunkBytes: "+fmt.Sprintf("%#v", this.FetchedChunkBytes)+",\n") + s = append(s, "FetchedDataBytes: "+fmt.Sprintf("%#v", this.FetchedDataBytes)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -189,6 +203,11 @@ func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.FetchedDataBytes != 0 { + i = encodeVarintStats(dAtA, i, uint64(m.FetchedDataBytes)) + i-- + dAtA[i] = 0x20 + } if m.FetchedChunkBytes != 0 { i = encodeVarintStats(dAtA, i, uint64(m.FetchedChunkBytes)) i-- @@ -235,6 +254,9 @@ func (m *Stats) Size() (n int) { if m.FetchedChunkBytes != 0 { n += 1 + sovStats(uint64(m.FetchedChunkBytes)) } + if m.FetchedDataBytes != 0 { + n += 1 + sovStats(uint64(m.FetchedDataBytes)) + } return n } @@ -252,6 +274,7 @@ func (this *Stats) String() string { `WallTime:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.WallTime), "Duration", "duration.Duration", 1), `&`, ``, 1) + `,`, `FetchedSeriesCount:` + fmt.Sprintf("%v", this.FetchedSeriesCount) + `,`, `FetchedChunkBytes:` + fmt.Sprintf("%v", this.FetchedChunkBytes) + `,`, + `FetchedDataBytes:` + fmt.Sprintf("%v", this.FetchedDataBytes) + `,`, `}`, }, "") return s @@ -364,6 +387,25 @@ func (m *Stats) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field FetchedDataBytes", wireType) + } + m.FetchedDataBytes = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStats + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.FetchedDataBytes |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipStats(dAtA[iNdEx:]) diff --git a/pkg/querier/stats/stats.proto b/pkg/querier/stats/stats.proto index 765dd99582c..4cbc4e5b7e2 100644 --- a/pkg/querier/stats/stats.proto +++ b/pkg/querier/stats/stats.proto @@ -17,4 +17,6 @@ message Stats { uint64 fetched_series_count = 2; // The number of bytes of the chunks fetched for the query uint64 fetched_chunk_bytes = 3; + // The number of bytes of data fetched for the query + uint64 fetched_data_bytes = 4; } diff --git a/pkg/querier/stats/stats_test.go b/pkg/querier/stats/stats_test.go index f8a23c96c30..add83dfaf58 100644 --- a/pkg/querier/stats/stats_test.go +++ b/pkg/querier/stats/stats_test.go @@ -59,23 +59,43 @@ func TestStats_AddFetchedChunkBytes(t *testing.T) { }) } +func TestStats_AddFetchedDataBytes(t *testing.T) { + t.Run("add and load bytes", func(t *testing.T) { + stats, _ := ContextWithEmptyStats(context.Background()) + stats.AddFetchedDataBytes(4096) + stats.AddFetchedDataBytes(4096) + + assert.Equal(t, uint64(8192), stats.LoadFetchedDataBytes()) + }) + + t.Run("add and load bytes nil receiver", func(t *testing.T) { + var stats *Stats + stats.AddFetchedDataBytes(1024) + + assert.Equal(t, uint64(0), stats.LoadFetchedDataBytes()) + }) +} + func TestStats_Merge(t *testing.T) { t.Run("merge two stats objects", func(t *testing.T) { stats1 := &Stats{} stats1.AddWallTime(time.Millisecond) stats1.AddFetchedSeries(50) stats1.AddFetchedChunkBytes(42) + stats1.AddFetchedDataBytes(100) stats2 := &Stats{} stats2.AddWallTime(time.Second) stats2.AddFetchedSeries(60) stats2.AddFetchedChunkBytes(100) + stats2.AddFetchedDataBytes(101) stats1.Merge(stats2) assert.Equal(t, 1001*time.Millisecond, stats1.LoadWallTime()) assert.Equal(t, uint64(110), stats1.LoadFetchedSeries()) assert.Equal(t, uint64(142), stats1.LoadFetchedChunkBytes()) + assert.Equal(t, uint64(201), stats1.LoadFetchedDataBytes()) }) t.Run("merge two nil stats objects", func(t *testing.T) { @@ -87,5 +107,6 @@ func TestStats_Merge(t *testing.T) { assert.Equal(t, time.Duration(0), stats1.LoadWallTime()) assert.Equal(t, uint64(0), stats1.LoadFetchedSeries()) assert.Equal(t, uint64(0), stats1.LoadFetchedChunkBytes()) + assert.Equal(t, uint64(0), stats1.LoadFetchedDataBytes()) }) } diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index b5c33c62efb..13da54dc74d 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -18,6 +18,7 @@ var ( ctxKey = &queryLimiterCtxKey{} ErrMaxSeriesHit = "the query hit the max number of series limit (limit: %d series)" ErrMaxChunkBytesHit = "the query hit the aggregated chunks size limit (limit: %d bytes)" + ErrMaxDataBytesHit = "the query hit the aggregated data size limit (limit: %d bytes)" ErrMaxChunksPerQueryLimit = "the query hit the max number of chunks limit (limit: %d chunks)" ) @@ -26,16 +27,18 @@ type QueryLimiter struct { uniqueSeries map[model.Fingerprint]struct{} chunkBytesCount atomic.Int64 + dataBytesCount atomic.Int64 chunkCount atomic.Int64 maxSeriesPerQuery int maxChunkBytesPerQuery int + maxDataBytesPerQuery int maxChunksPerQuery int } // NewQueryLimiter makes a new per-query limiter. Each query limiter // is configured using the `maxSeriesPerQuery` limit. -func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int, maxChunksPerQuery int) *QueryLimiter { +func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery, maxChunksPerQuery, maxDataBytesPerQuery int) *QueryLimiter { return &QueryLimiter{ uniqueSeriesMx: sync.Mutex{}, uniqueSeries: map[model.Fingerprint]struct{}{}, @@ -43,6 +46,7 @@ func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery int, maxChunksPerQ maxSeriesPerQuery: maxSeriesPerQuery, maxChunkBytesPerQuery: maxChunkBytesPerQuery, maxChunksPerQuery: maxChunksPerQuery, + maxDataBytesPerQuery: maxDataBytesPerQuery, } } @@ -56,7 +60,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { ql, ok := ctx.Value(ctxKey).(*QueryLimiter) if !ok { // If there's no limiter return a new unlimited limiter as a fallback - ql = NewQueryLimiter(0, 0, 0) + ql = NewQueryLimiter(0, 0, 0, 0) } return ql } @@ -98,6 +102,17 @@ func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error { return nil } +// AddDataBytes adds the queried data bytes and returns an error if the limit is reached. +func (ql *QueryLimiter) AddDataBytes(dataSizeInBytes int) error { + if ql.maxDataBytesPerQuery == 0 { + return nil + } + if ql.dataBytesCount.Add(int64(dataSizeInBytes)) > int64(ql.maxDataBytesPerQuery) { + return fmt.Errorf(ErrMaxDataBytesHit, ql.maxDataBytesPerQuery) + } + return nil +} + func (ql *QueryLimiter) AddChunks(count int) error { if ql.maxChunksPerQuery == 0 { return nil diff --git a/pkg/util/limiter/query_limiter_test.go b/pkg/util/limiter/query_limiter_test.go index 8ee93e205d6..02b1fc9f73c 100644 --- a/pkg/util/limiter/query_limiter_test.go +++ b/pkg/util/limiter/query_limiter_test.go @@ -25,7 +25,7 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing labels.MetricName: metricName + "_2", "series2": "1", }) - limiter = NewQueryLimiter(100, 0, 0) + limiter = NewQueryLimiter(100, 0, 0, 0) ) err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(series1)) assert.NoError(t, err) @@ -75,7 +75,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) labels.MetricName: metricName + "_2", "series2": "1", }) - limiter = NewQueryLimiter(1, 0, 0) + limiter = NewQueryLimiter(1, 0, 0, 0) ) err := limiter.AddSeries(series1) require.NoError(t, err) @@ -88,7 +88,7 @@ func TestQueryLimiter_AddSeriers_ShouldReturnErrorOnLimitExceeded(t *testing.T) } func TestQueryLimiter_AddChunkBytes(t *testing.T) { - var limiter = NewQueryLimiter(0, 100, 0) + var limiter = NewQueryLimiter(0, 100, 0, 0) err := limiter.AddChunkBytes(100) require.NoError(t, err) @@ -96,6 +96,15 @@ func TestQueryLimiter_AddChunkBytes(t *testing.T) { require.Error(t, err) } +func TestQueryLimiter_AddDataBytes(t *testing.T) { + var limiter = NewQueryLimiter(0, 0, 0, 100) + + err := limiter.AddDataBytes(100) + require.NoError(t, err) + err = limiter.AddDataBytes(1) + require.Error(t, err) +} + func BenchmarkQueryLimiter_AddSeries(b *testing.B) { const ( metricName = "test_metric" @@ -110,7 +119,7 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) { } b.ResetTimer() - limiter := NewQueryLimiter(b.N+1, 0, 0) + limiter := NewQueryLimiter(b.N+1, 0, 0, 0) for _, s := range series { err := limiter.AddSeries(cortexpb.FromLabelsToLabelAdapters(s)) assert.NoError(b, err) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index e568f71de42..c3a3b3c886e 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -73,6 +73,7 @@ type Limits struct { MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` MaxFetchedSeriesPerQuery int `yaml:"max_fetched_series_per_query" json:"max_fetched_series_per_query"` MaxFetchedChunkBytesPerQuery int `yaml:"max_fetched_chunk_bytes_per_query" json:"max_fetched_chunk_bytes_per_query"` + MaxFetchedDataBytesPerQuery int `yaml:"max_fetched_data_bytes_per_query" json:"max_fetched_data_bytes_per_query"` MaxQueryLookback model.Duration `yaml:"max_query_lookback" json:"max_query_lookback"` MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"` MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"` @@ -150,7 +151,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.") f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 2000000, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier only when running Cortex with blocks storage. 0 to disable") - f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler only when running Cortex with blocks storage. 0 to disable.") + f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler only when running Cortex with blocks storage. 0 to disable.") + f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is only applied for `query`, `query_range` and `series` APIs. This limit is enforced in the querier and ruler only when running Cortex with blocks storage. 0 to disable.") f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query) and in the querier (on the query possibly split by the query-frontend). 0 to disable.") f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.") f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.") @@ -406,6 +408,12 @@ func (o *Overrides) MaxFetchedChunkBytesPerQuery(userID string) int { return o.getOverridesForUser(userID).MaxFetchedChunkBytesPerQuery } +// MaxFetchedDataBytesPerQuery returns the maximum number of bytes for all data allowed per query when fetching +// from ingesters and blocks storage. +func (o *Overrides) MaxFetchedDataBytesPerQuery(userID string) int { + return o.getOverridesForUser(userID).MaxFetchedDataBytesPerQuery +} + // MaxQueryLookback returns the max lookback period of queries. func (o *Overrides) MaxQueryLookback(userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback)