Skip to content

Implement max_fetched_data_bytes_per_query limit #4854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [FEATURE] Distributor: Added a new limit `-validation.max-labels-size-bytes` allowing to limit the combined size of labels for each timeseries. #4848
* [FEATURE] Storage/Bucket: Added `-*.s3.bucket-lookup-type` allowing to configure the s3 bucket lookup type. #4794
* [FEATURE] QueryFrontend: Implement experimental vertical sharding at query frontend for range/instant queries. #4863
* [FEATURE] Querier: Added a new limit `-querier.max-fetched-data-bytes-per-query` allowing to limit the maximum size of all data in bytes that a query can fetch from each ingester and storage. #4854
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
* [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields. #4767
* [BUGFIX] Respecting `-tracing.otel.sample-ratio` configuration when enabling OpenTelemetry tracing with X-ray. #4862
Expand Down
14 changes: 11 additions & 3 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2673,12 +2673,20 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -querier.max-fetched-series-per-query
[max_fetched_series_per_query: <int> | default = 0]

# The maximum size of all chunks in bytes that a query can fetch from each
# ingester and storage. This limit is enforced in the querier and ruler only
# when running Cortex with blocks storage. 0 to disable.
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
# of all chunks in bytes that a query can fetch from each ingester and storage.
# This limit is enforced in the querier and ruler only when running Cortex with
# blocks storage. 0 to disable.
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
[max_fetched_chunk_bytes_per_query: <int> | default = 0]

# The maximum combined size of all data that a query can fetch from each
# ingester and storage. This limit is only applied for `query`, `query_range`
# and `series` APIs. This limit is enforced in the querier and ruler only when
# running Cortex with blocks storage. 0 to disable.
# CLI flag: -querier.max-fetched-data-bytes-per-query
[max_fetched_data_bytes_per_query: <int> | default = 0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this limit apply only to Select call? Since I don't see we are limiting label names and label values. If that's the case probably we can mention it in the doc or give the flag a better name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the only question I have now. Other LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming of the limit is consistent with other similar limits. I think we can implement this limit also on the LabelNames and LabelValues as well. Right now I've updated the doc saying that it is only applied for query, query_range and series APIs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: when running Cortex with blocks storage

Currently we only have block storage as chunk storage was removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other limits have this line:

This limit is enforced in the querier only when running Cortex with blocks storage

Should we remove it everywhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll also address this in the PR to handle retryable failures.


# Limit how long back data (series and metadata) can be queried, up until
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
# and ruler. If the requested time range is outside the allowed range, the
Expand Down
6 changes: 6 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,9 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
if err != nil {
return nil, err
}
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, err
}
ms := ingester_client.FromMetricsForLabelMatchersResponse(resp)
for _, m := range ms {
if err := queryLimiter.AddSeries(cortexpb.FromMetricsToLabelAdapters(m)); err != nil {
Expand Down Expand Up @@ -1055,6 +1058,9 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
defer stream.CloseSend() //nolint:errcheck
for {
resp, err := stream.Recv()
if err := queryLimiter.AddDataBytes(resp.Size()); err != nil {
return nil, err
}

if err == io.EOF {
break
Expand Down
104 changes: 92 additions & 12 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
limits: limits,
})

ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit, 0))

// Push a number of series below the max chunks limit. Each series has 1 sample,
// so expect 1 chunk per series when querying back.
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
ctx := user.InjectOrgID(context.Background(), "user")
limits := &validation.Limits{}
flagext.DefaultValues(limits)
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0))

// Prepare distributors.
ds, _, _, _ := prepare(t, prepConfig{
Expand Down Expand Up @@ -1161,7 +1161,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
var maxBytesLimit = (seriesToAdd) * responseChunkSize

// Update the limiter with the calculated limits.
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0))
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, maxBytesLimit, 0, 0))

// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
Expand Down Expand Up @@ -1192,6 +1192,75 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit)))
}

func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsReached(t *testing.T) {
const seriesToAdd = 10

ctx := user.InjectOrgID(context.Background(), "user")
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// Prepare distributors.
// Use replication factor of 2 to always read all the chunks from both ingesters,
// this guarantees us to always read the same chunks and have a stable test.
ds, _, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: limits,
replicationFactor: 2,
})

allSeriesMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
}
// Push a single series to allow us to calculate the label size to calculate the limit for the test.
writeReq := &cortexpb.WriteRequest{}
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series"}}, 0, 0),
)
writeRes, err := ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)
dataSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)

// Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size.
var dataSize = dataSizeResponse.Size()
var maxBytesLimit = (seriesToAdd) * dataSize * 2 // Multiplying by RF because the limit is applied before de-duping.

// Update the limiter with the calculated limits.
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, 0, maxBytesLimit))

// Push a number of series below the max chunk bytes limit. Subtract one for the series added above.
writeReq = makeWriteRequest(0, seriesToAdd-1, 0)
writeRes, err = ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

// Since the number of chunk bytes is equal to the limit (but doesn't
// exceed it), we expect a query running on all series to succeed.
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.NoError(t, err)
assert.Len(t, queryRes.Chunkseries, seriesToAdd)

// Push another series to exceed the chunk bytes limit once we'll query back all series.
writeReq = &cortexpb.WriteRequest{}
writeReq.Timeseries = append(writeReq.Timeseries,
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: "another_series_1"}}, 0, 0),
)

writeRes, err = ds[0].Push(ctx, writeReq)
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
assert.Nil(t, err)

// Since the aggregated chunk size is exceeding the limit, we expect
// a query running on all series to fail.
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
require.Error(t, err)
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit)))
}

func TestDistributor_Push_LabelRemoval(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")

Expand Down Expand Up @@ -1930,7 +1999,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
},
expectedResult: []metric.Metric{},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
expectedErr: nil,
},
"should filter metrics by single matcher": {
Expand All @@ -1942,7 +2011,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
expectedErr: nil,
},
"should filter metrics by multiple matchers": {
Expand All @@ -1954,7 +2023,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[0].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
expectedErr: nil,
},
"should return all matching metrics even if their FastFingerprint collide": {
Expand All @@ -1966,7 +2035,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[4].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
expectedErr: nil,
},
"should query only ingesters belonging to tenant's subring if shuffle sharding is enabled": {
Expand All @@ -1980,7 +2049,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
},
expectedIngesters: 3,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
expectedErr: nil,
},
"should query all ingesters if shuffle sharding is enabled but shard size is 0": {
Expand All @@ -1994,7 +2063,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[1].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0),
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 0),
expectedErr: nil,
},
"should return err if series limit is exhausted": {
Expand All @@ -2005,9 +2074,20 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
},
expectedResult: nil,
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, 1)),
},
"should return err if data bytes limit is exhausted": {
shuffleShardEnabled: true,
shuffleShardSize: 0,
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_1"),
},
expectedResult: nil,
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(0, 0, 0, 1),
expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, 1)),
},
"should not exhaust series limit when only one series is fetched": {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test_2"),
Expand All @@ -2016,7 +2096,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
{Metric: util.LabelsToMetric(fixtures[2].lbls)},
},
expectedIngesters: numIngesters,
queryLimiter: limiter.NewQueryLimiter(1, 0, 0),
queryLimiter: limiter.NewQueryLimiter(1, 0, 0, 0),
expectedErr: nil,
},
}
Expand Down Expand Up @@ -2116,7 +2196,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
matchers: []*labels.Matcher{
mustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, "foo.+"),
},
queryLimiter: limiter.NewQueryLimiter(100, 0, 0),
queryLimiter: limiter.NewQueryLimiter(100, 0, 0, 0),
expectedErr: nil,
},
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
return nil, validation.LimitError(chunkBytesLimitErr.Error())
}

if dataBytesLimitErr := queryLimiter.AddDataBytes(resp.Size()); dataBytesLimitErr != nil {
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
Expand Down Expand Up @@ -392,6 +396,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri

reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
reqStats.AddFetchedDataBytes(uint64(resp.Size()))

return resp, nil
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ type Handler struct {
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
querySeconds *prometheus.CounterVec
querySeries *prometheus.CounterVec
queryBytes *prometheus.CounterVec
queryDataBytes *prometheus.CounterVec
activeUsers *util.ActiveUsersCleanupService
}

// NewHandler creates a new frontend handler.
Expand All @@ -90,10 +91,16 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge
Help: "Size of all chunks fetched to execute a query in bytes.",
}, []string{"user"})

h.queryDataBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_fetched_data_bytes_total",
Help: "Size of all data fetched to execute a query in bytes.",
}, []string{"user"})

h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) {
h.querySeconds.DeleteLabelValues(user)
h.querySeries.DeleteLabelValues(user)
h.queryBytes.DeleteLabelValues(user)
h.queryDataBytes.DeleteLabelValues(user)
})
// If cleaner stops or fail, we will simply not clean the metrics for inactive users.
_ = h.activeUsers.StartAsync(context.Background())
Expand Down Expand Up @@ -186,11 +193,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
wallTime := stats.LoadWallTime()
numSeries := stats.LoadFetchedSeries()
numBytes := stats.LoadFetchedChunkBytes()
numDataBytes := stats.LoadFetchedDataBytes()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
f.querySeries.WithLabelValues(userID).Add(float64(numSeries))
f.queryBytes.WithLabelValues(userID).Add(float64(numBytes))
f.queryDataBytes.WithLabelValues(userID).Add(float64(numDataBytes))
f.activeUsers.UpdateUserTimestamp(userID, time.Now())

// Log stats.
Expand All @@ -203,6 +212,7 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"query_wall_time_seconds", wallTime.Seconds(),
"fetched_series_count", numSeries,
"fetched_chunks_bytes", numBytes,
"fetched_data_bytes", numDataBytes,
}, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
Expand Down
16 changes: 16 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,16 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
}
}
chunksSize := countChunkBytes(s)
dataSize := countDataBytes(s)
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(chunksSize); chunkBytesLimitErr != nil {
return validation.LimitError(chunkBytesLimitErr.Error())
}
if chunkLimitErr := queryLimiter.AddChunks(len(s.Chunks)); chunkLimitErr != nil {
return validation.LimitError(chunkLimitErr.Error())
}
if dataBytesLimitErr := queryLimiter.AddDataBytes(dataSize); dataBytesLimitErr != nil {
Copy link
Contributor

@yeya24 yeya24 Sep 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question here. So the query limiter we are using have the same lifecycle with each query, let's say if we first query a SG and we limit some data bytes, then maybe due to network issue or SG restart, the stream erros and we need to retry another store gateway.
In this case, do you think it makes more sense to release the bytes we consumed and then start retrying another store gateway? Otherwise probably it is easy to hit the limit.

But it is also okay to hit the limit, and the query frontend will retry. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a valid point. I also want to note that we are not doing this for other existing limits like fetched chunks and fetched series limits.

The query-frontend only retries 5XXs. In this case, we'll be returning a 422 error which will not be retried.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do something like:

if isRetryableError(err) {
  level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to receive series from %s due to retryable error", c.RemoteAddress()))
  queryLimiter.RemoveSeries(seriesCnt)
  queryLimiter.RemoveDataBytes(databytesCnt)
  queryLimiter.RemoveChunkBytes(chunkbytesCnt)
  return nil
}

If others agree with this approach, I can implement this as an enhancement in another PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. We should follow up with another PR to fix this i think!

return validation.LimitError(dataBytesLimitErr.Error())
}
}

if w := resp.GetWarning(); w != "" {
Expand All @@ -687,14 +691,17 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(

numSeries := len(mySeries)
chunkBytes := countChunkBytes(mySeries...)
dataBytes := countDataBytes(mySeries...)

reqStats.AddFetchedSeries(uint64(numSeries))
reqStats.AddFetchedChunkBytes(uint64(chunkBytes))
reqStats.AddFetchedDataBytes(uint64(dataBytes))

level.Debug(spanLog).Log("msg", "received series from store-gateway",
"instance", c.RemoteAddress(),
"fetched series", numSeries,
"fetched chunk bytes", chunkBytes,
"fetched data bytes", dataBytes,
"requested blocks", strings.Join(convertULIDsToString(blockIDs), " "),
"queried blocks", strings.Join(convertULIDsToString(myQueriedBlocks), " "))

Expand Down Expand Up @@ -1000,6 +1007,15 @@ func countChunkBytes(series ...*storepb.Series) (count int) {
return count
}

// countDataBytes returns the combined size of the all series
func countDataBytes(series ...*storepb.Series) (count int) {
for _, s := range series {
count += s.Size()
}

return count
}

// only retry connection issues
func isRetryableError(err error) bool {
switch status.Code(err) {
Expand Down
Loading