From 9fa849fc414d63f59612d7e9d87efdbfdf21fc65 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 17 Nov 2022 23:23:57 -0800 Subject: [PATCH 1/4] fix response to be gzipped when status code is not 2xx Signed-off-by: Ben Ye --- pkg/querier/tripperware/instantquery/instant_query.go | 9 ++++----- pkg/querier/tripperware/queryrange/query_range.go | 7 +++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index a50b2f81eeb..f925f41cc19 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -152,11 +152,6 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for } func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) { - if r.StatusCode/100 != 2 { - body, _ := io.ReadAll(r.Body) - return nil, httpgrpc.Errorf(r.StatusCode, string(body)) - } - log, ctx := spanlogger.New(ctx, "PrometheusInstantQueryResponse") //nolint:ineffassign,staticcheck defer log.Finish() @@ -165,6 +160,10 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ log.Error(err) return nil, err } + if r.StatusCode/100 != 2 { + return nil, httpgrpc.Errorf(r.StatusCode, string(buf)) + } + var resp PrometheusInstantQueryResponse if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 5fc5886d9a1..c3ad1839c19 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -258,10 +258,6 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request) } func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) { - if r.StatusCode/100 != 2 { - body, _ := io.ReadAll(r.Body) - return nil, httpgrpc.Errorf(r.StatusCode, string(body)) - } log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer log.Finish() @@ -270,6 +266,9 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t log.Error(err) return nil, err } + if r.StatusCode/100 != 2 { + return nil, httpgrpc.Errorf(r.StatusCode, string(buf)) + } log.LogFields(otlog.Int("bytes", len(buf))) var resp PrometheusResponse From 4c70fdcf65061fb08f5be03b184020945e71c967 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 18 Nov 2022 00:04:38 -0800 Subject: [PATCH 2/4] adding tests Signed-off-by: Alan Protasio --- .../instantquery/instant_query_test.go | 57 +++++++++++++++++++ .../queryrange/query_range_test.go | 57 +++++++++++++++++++ 2 files changed, 114 insertions(+) diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index fd1ca8e87a0..b750b9d7b9a 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -2,6 +2,7 @@ package instantquery import ( "bytes" + "compress/gzip" "context" "fmt" "io" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/querier/tripperware" @@ -93,6 +95,61 @@ func TestRequest(t *testing.T) { } } +func TestGzippedResponse(t *testing.T) { + for _, tc := range []struct { + body string + status int + err error + }{ + { + body: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`, + status: 200, + }, + { + body: `error generic 400`, + status: 400, + err: httpgrpc.Errorf(400, "error generic 400"), + }, + { + status: 400, + err: httpgrpc.Errorf(400, ""), + }, + } { + for _, c := range []bool{true, false} { + t.Run(fmt.Sprintf("compressed %t [%s]", c, tc.body), func(t *testing.T) { + h := http.Header{ + "Content-Type": []string{"application/json"}, + } + + responseBody := bytes.NewBuffer([]byte(tc.body)) + if c { + h.Set("Content-Encoding", "gzip") + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + w.Write([]byte(tc.body)) + w.Close() + responseBody = &buf + } + + response := &http.Response{ + StatusCode: tc.status, + Header: h, + Body: io.NopCloser(responseBody), + } + r, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + require.Equal(t, tc.err, err) + + if err == nil { + resp, err := json.Marshal(r) + require.NoError(t, err) + + require.Equal(t, tc.body, string(resp)) + } + }) + } + } +} + func TestResponse(t *testing.T) { for i, tc := range []struct { body string diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 3df851b3e28..e45b8df7cea 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -2,7 +2,9 @@ package queryrange import ( "bytes" + "compress/gzip" "context" + "fmt" io "io" "net/http" "strconv" @@ -657,6 +659,61 @@ func TestMergeAPIResponses(t *testing.T) { } } +func TestGzippedResponse(t *testing.T) { + for _, tc := range []struct { + body string + status int + err error + }{ + { + body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3]]}}}}`, + status: 200, + }, + { + body: `error generic 400`, + status: 400, + err: httpgrpc.Errorf(400, `error generic 400`), + }, + { + status: 400, + err: httpgrpc.Errorf(400, ""), + }, + } { + for _, c := range []bool{true, false} { + t.Run(fmt.Sprintf("compressed %t [%s]", c, tc.body), func(t *testing.T) { + h := http.Header{ + "Content-Type": []string{"application/json"}, + } + + responseBody := bytes.NewBuffer([]byte(tc.body)) + if c { + h.Set("Content-Encoding", "gzip") + var buf bytes.Buffer + w := gzip.NewWriter(&buf) + w.Write([]byte(tc.body)) + w.Close() + responseBody = &buf + } + + response := &http.Response{ + StatusCode: tc.status, + Header: h, + Body: io.NopCloser(responseBody), + } + r, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil) + require.Equal(t, tc.err, err) + + if err == nil { + resp, err := json.Marshal(r) + require.NoError(t, err) + + require.Equal(t, tc.body, string(resp)) + } + }) + } + } +} + func mustParse(t *testing.T, response string) tripperware.Response { var resp PrometheusResponse // Needed as goimports automatically add a json import otherwise. From 67ca0c051503832042a7dc0ca3aac034ec725933 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 18 Nov 2022 00:23:23 -0800 Subject: [PATCH 3/4] lint Signed-off-by: Alan Protasio --- pkg/querier/tripperware/instantquery/instant_query_test.go | 3 ++- pkg/querier/tripperware/queryrange/query_range_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index b750b9d7b9a..a0013a5ab0a 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -126,7 +126,8 @@ func TestGzippedResponse(t *testing.T) { h.Set("Content-Encoding", "gzip") var buf bytes.Buffer w := gzip.NewWriter(&buf) - w.Write([]byte(tc.body)) + _, err := w.Write([]byte(tc.body)) + require.NoError(t, err) w.Close() responseBody = &buf } diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index e45b8df7cea..cfad6e8f999 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -690,7 +690,8 @@ func TestGzippedResponse(t *testing.T) { h.Set("Content-Encoding", "gzip") var buf bytes.Buffer w := gzip.NewWriter(&buf) - w.Write([]byte(tc.body)) + _, err := w.Write([]byte(tc.body)) + require.NoError(t, err) w.Close() responseBody = &buf } From ab8c58ca1d68c6fc1f9946f444fe74b094415633 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 18 Nov 2022 10:10:07 -0800 Subject: [PATCH 4/4] changelog Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c126b1fc0b5..689745008eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ * [BUGFIX] Ingester: fixed incorrect logging at the start of ingester block shipping logic. #4934 * [BUGFIX] Storage/Bucket: fixed global mark missing on deletion. #4949 * [BUGFIX] QueryFrontend/Querier: fixed regression added by #4863 where we stopped compressing the response between querier and query frontend. #4960 +* [BUGFIX] QueryFrontend/Querier: fixed fix response error to be ungzipped when status code is not 2xx. #4975 ## 1.13.0 2022-07-14