Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
* [ENHANCEMENT] Querier: limit label APIs to query only ingesters if `start` param is not been specified. #6618
* [ENHANCEMENT] Alertmanager: Add new limits `-alertmanager.max-silences-count` and `-alertmanager.max-silences-size-bytes` for limiting silences per tenant. #6605
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3568,6 +3568,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -querier.max-query-parallelism
[max_query_parallelism: <int> | default = 14]

# The maximum total uncompressed query response size. If the query was sharded
# the limit is applied to the total response size of all shards. This limit is
# enforced in query-frontend for `query` and `query_range` APIs. 0 to disable.
# CLI flag: -frontend.max-query-response-size
[max_query_response_size: <int> | default = 0]

# Most recent allowed cacheable result per-tenant, to prevent caching very
# recent results that might still be in flux.
# CLI flag: -frontend.max-cache-freshness
Expand Down
65 changes: 65 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -952,3 +953,67 @@ func TestQueryFrontendStatsFromResultsCacheShouldBeSame(t *testing.T) {
// we expect same amount of samples_scanned added to the metric despite the second query hit the cache.
require.Equal(t, numSamplesScannedTotal2, numSamplesScannedTotal*2)
}

func TestQueryFrontendResponseSizeLimit(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-frontend.max-query-response-size": "4096",
"-querier.split-queries-by-interval": "1m",
})

consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
require.NoError(t, s.Start(queryFrontend))

flags["-querier.frontend-address"] = queryFrontend.NetworkGRPCEndpoint()

ingester := e2ecortex.NewIngesterWithConfigFile("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")

querier := e2ecortex.NewQuerierWithConfigFile("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "")

require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))
require.NoError(t, s.WaitReady(queryFrontend))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
require.NoError(t, err)

startTime := time.Now().Add(-1 * time.Hour)
for i := 0; i < 10; i++ {
ts := startTime.Add(time.Duration(i) * time.Minute)
longLabelValue1 := strings.Repeat("long_label_value_1_", 100)
longLabelValue2 := strings.Repeat("long_label_value_2_", 100)
largeSeries, _ := generateSeries("large_series", ts, prompb.Label{Name: "label1", Value: longLabelValue1}, prompb.Label{Name: "label2", Value: longLabelValue2})
res, err := c.Push(largeSeries)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

smallSeries, _ := generateSeries("small_series", ts)
res, err = c.Push(smallSeries)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)
}

qfeClient, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

queryStart := startTime.Add(-1 * time.Minute)
queryEnd := startTime.Add(10 * time.Minute)

// Expect response size larger than limit (4 KB)
resp, body, err := qfeClient.QueryRangeRaw(`{__name__="large_series"}`, queryStart, queryEnd, 30*time.Second, nil)
require.NoError(t, err)
require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
require.Contains(t, string(body), "the query response size exceeds limit")

// Expect response size less than limit (4 KB)
resp, _, err = qfeClient.QueryRangeRaw(`{__name__="small_series"}`, queryStart, queryEnd, 30*time.Second, nil)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}
5 changes: 4 additions & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_api "github.com/cortexproject/cortex/pkg/util/api"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

Expand Down Expand Up @@ -72,6 +73,8 @@ const (
limitBytesStoreGateway = `exceeded bytes limit`
)

var noopResponseSizeLimiter = limiter.NewResponseSizeLimiter(0)

// Config for a Handler.
type HandlerConfig struct {
LogQueriesLongerThan time.Duration `yaml:"log_queries_longer_than"`
Expand Down Expand Up @@ -277,7 +280,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If the response status code is not 2xx, try to get the
// error message from response body.
if resp.StatusCode/100 != 2 {
body, err2 := tripperware.BodyBuffer(resp, f.log)
body, err2 := tripperware.BodyBytes(resp, noopResponseSizeLimiter, f.log)
if err2 == nil {
err = httpgrpc.Errorf(resp.StatusCode, "%s", string(body))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)
// This is not that efficient as we might decode the body multiple
// times. But error response should be too large so we should be fine.
// TODO: investigate ways to decode only once.
body, err := tripperware.BodyBufferFromHTTPGRPCResponse(resp, nil)
body, err := tripperware.BodyBytesFromHTTPGRPCResponse(resp, nil)
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -92,25 +93,27 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
return &result, nil
}

func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
log, ctx := spanlogger.New(ctx, "DecodeQueryInstantResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

if err := ctx.Err(); err != nil {
return nil, err
}

buf, err := tripperware.BodyBuffer(r, log)
responseSizeLimiter := limiter.ResponseSizeLimiterFromContextWithFallback(ctx)
body, err := tripperware.BodyBytes(r, responseSizeLimiter, log)
if err != nil {
log.Error(err)
return nil, err
}

if r.StatusCode/100 != 2 {
return nil, httpgrpc.Errorf(r.StatusCode, "%s", string(buf))
return nil, httpgrpc.Errorf(r.StatusCode, "%s", string(body))
}

var resp tripperware.PrometheusResponse
err = tripperware.UnmarshalResponse(r, buf, &resp)
err = tripperware.UnmarshalResponse(r, body, &resp)

if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
Expand Down
22 changes: 14 additions & 8 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
)

var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType))

var jsonHttpReq = &http.Request{
Header: map[string][]string{
"Accept": {"application/json"},
Expand Down Expand Up @@ -190,7 +189,9 @@ func TestCompressedResponse(t *testing.T) {
Header: h,
Body: io.NopCloser(responseBody),
}
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
resp, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.Equal(t, tc.err, err)

if err == nil {
Expand Down Expand Up @@ -454,7 +455,8 @@ func TestResponse(t *testing.T) {
}
}

resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
ctx := user.InjectOrgID(context.Background(), "1")
resp, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.NoError(t, err)

// Reset response, as the above call will have consumed the body reader.
Expand All @@ -464,7 +466,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))),
ContentLength: int64(len(tc.jsonBody)),
}
resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), jsonHttpReq, resp)
resp2, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})
Expand Down Expand Up @@ -710,7 +712,7 @@ func TestMergeResponse(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))

var resps []tripperware.Response
for _, r := range tc.resps {
Expand Down Expand Up @@ -1723,7 +1725,7 @@ func TestMergeResponseProtobuf(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))

var resps []tripperware.Response
for _, r := range tc.resps {
Expand Down Expand Up @@ -1870,7 +1872,9 @@ func Benchmark_Decode(b *testing.B) {
StatusCode: 200,
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
_, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.NoError(b, err)
}
})
Expand Down Expand Up @@ -1933,7 +1937,9 @@ func Benchmark_Decode_Protobuf(b *testing.B) {
Header: http.Header{"Content-Type": []string{"application/x-protobuf"}},
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
_, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.NoError(b, err)
}
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/tripperware/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type Limits interface {
// frontend will process in parallel.
MaxQueryParallelism(string) int

// MaxQueryResponseSize returns the max total response size of a query in bytes.
MaxQueryResponseSize(string) int64

// MaxCacheFreshness returns the period after which results are cacheable,
// to prevent caching of very recent results.
MaxCacheFreshness(string) time.Duration
Expand Down
24 changes: 18 additions & 6 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"fmt"
"io"
"net/http"
Expand All @@ -14,7 +15,6 @@ import (

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
Expand All @@ -26,6 +26,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/runutil"
)

Expand Down Expand Up @@ -448,7 +449,7 @@ type Buffer interface {
Bytes() []byte
}

func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
func BodyBytes(res *http.Response, responseSizeLimiter *limiter.ResponseSizeLimiter, logger log.Logger) ([]byte, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Does this method need to be renamed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function does not return body buffer. It returns body bytes array. I made that distinction when I split it into two functions, but even as one function it didn't make sense to me that it is called BodyBuffer() when it does not return a body buffer.

I am okay with keeping the name or changing it to something else, but what is the reason for using BodyBuffer()

var buf *bytes.Buffer

// Attempt to cast the response body to a Buffer and use it if possible.
Expand All @@ -466,6 +467,11 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
}
}

responseSize := getResponseSize(res, buf)
if err := responseSizeLimiter.AddResponseBytes(responseSize); err != nil {
return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%s", err.Error())
}

// if the response is gzipped, lets unzip it here
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") {
gReader, err := gzip.NewReader(buf)
Expand All @@ -475,15 +481,12 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader")

return io.ReadAll(gReader)
} else if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") {
sReader := snappy.NewReader(buf)
return io.ReadAll(sReader)
Comment on lines -478 to -480
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we double check if this is safe to be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set the compression headers for instant and range query requests here https://github.com/afhassan/cortex/blob/be0fc7f216589a3e9cb66af85ee25cf192fd533b/pkg/querier/tripperware/query.go#L762

We currently only support gzip compression (default) or no compression. We never set any other Accept-Encoding header.

Use compression for metrics query API or instant and range query APIs.
Supports 'gzip' and '' (disable compression)
CLI flag: -querier.response-compression
[response_compression: | default = "gzip"]

Snappy was added because we planned to potentially support it as well, but I think that can be left for a future PR to do.

}

return buf.Bytes(), nil
}

func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) {
func BodyBytesFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) {
// if the response is gzipped, lets unzip it here
headers := http.Header{}
for _, h := range res.Headers {
Expand All @@ -502,6 +505,15 @@ func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logge
return res.Body, nil
}

func getResponseSize(res *http.Response, buf *bytes.Buffer) int {
if strings.EqualFold(res.Header.Get("Content-Encoding"), "gzip") && len(buf.Bytes()) >= 4 {
// GZIP body contains the size of the original (uncompressed) input data
// modulo 2^32 in the last 4 bytes (https://www.ietf.org/rfc/rfc1952.txt).
return int(binary.LittleEndian.Uint32(buf.Bytes()[len(buf.Bytes())-4:]))
}
return len(buf.Bytes())
}

// UnmarshalJSON implements json.Unmarshaler.
func (s *PrometheusData) UnmarshalJSON(data []byte) error {
var queryData struct {
Expand Down
50 changes: 50 additions & 0 deletions pkg/querier/tripperware/query_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package tripperware

import (
"bytes"
"compress/gzip"
"math"
"net/http"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -193,3 +196,50 @@ func generateData(timeseries, datapoints int) (floatMatrix, histogramMatrix []*S
}
return
}

func Test_getResponseSize(t *testing.T) {
tests := []struct {
body []byte
useGzip bool
}{
{
body: []byte(`foo`),
useGzip: false,
},
{
body: []byte(`foo`),
useGzip: true,
},
{
body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`),
useGzip: false,
},
{
body: []byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`),
useGzip: true,
},
}

for i, test := range tests {
t.Run(strconv.Itoa(i), func(t *testing.T) {
expectedBodyLength := len(test.body)
buf := &bytes.Buffer{}
response := &http.Response{}

if test.useGzip {
response = &http.Response{
Header: http.Header{"Content-Encoding": []string{"gzip"}},
}
w := gzip.NewWriter(buf)
_, err := w.Write(test.body)
require.NoError(t, err)
w.Close()
} else {
buf = bytes.NewBuffer(test.body)
}

bodyLength := getResponseSize(response, buf)
require.Equal(t, expectedBodyLength, bodyLength)
})
}
}
Loading
Loading