Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
* [BUGFIX] Ingester: fixed incorrect logging at the start of ingester block shipping logic. #4934
* [BUGFIX] Storage/Bucket: fixed global mark missing on deletion. #4949
* [BUGFIX] QueryFrontend/Querier: fixed regression added by #4863 where we stopped compressing the response between querier and query frontend. #4960
* [BUGFIX] QueryFrontend/Querier: fixed fix response error to be ungzipped when status code is not 2xx. #4975

## 1.13.0 2022-07-14

Expand Down
9 changes: 4 additions & 5 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
}

func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := io.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
}

log, ctx := spanlogger.New(ctx, "PrometheusInstantQueryResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

Expand All @@ -165,6 +160,10 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _
log.Error(err)
return nil, err
}
if r.StatusCode/100 != 2 {
return nil, httpgrpc.Errorf(r.StatusCode, string(buf))
}

var resp PrometheusInstantQueryResponse
if err := json.Unmarshal(buf, &resp); err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err)
Expand Down
58 changes: 58 additions & 0 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package instantquery

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
Expand All @@ -12,6 +13,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
Expand Down Expand Up @@ -93,6 +95,62 @@ func TestRequest(t *testing.T) {
}
}

func TestGzippedResponse(t *testing.T) {
for _, tc := range []struct {
body string
status int
err error
}{
{
body: `{"status":"success","data":{"resultType":"string","result":[1,"foo"]}}`,
status: 200,
},
{
body: `error generic 400`,
status: 400,
err: httpgrpc.Errorf(400, "error generic 400"),
},
{
status: 400,
err: httpgrpc.Errorf(400, ""),
},
} {
for _, c := range []bool{true, false} {
t.Run(fmt.Sprintf("compressed %t [%s]", c, tc.body), func(t *testing.T) {
h := http.Header{
"Content-Type": []string{"application/json"},
}

responseBody := bytes.NewBuffer([]byte(tc.body))
if c {
h.Set("Content-Encoding", "gzip")
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write([]byte(tc.body))
require.NoError(t, err)
w.Close()
responseBody = &buf
}

response := &http.Response{
StatusCode: tc.status,
Header: h,
Body: io.NopCloser(responseBody),
}
r, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
require.Equal(t, tc.err, err)

if err == nil {
resp, err := json.Marshal(r)
require.NoError(t, err)

require.Equal(t, tc.body, string(resp))
}
})
}
}
}

func TestResponse(t *testing.T) {
for i, tc := range []struct {
body string
Expand Down
7 changes: 3 additions & 4 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,6 @@ func (prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request)
}

func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
if r.StatusCode/100 != 2 {
body, _ := io.ReadAll(r.Body)
return nil, httpgrpc.Errorf(r.StatusCode, string(body))
}
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

Expand All @@ -270,6 +266,9 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t
log.Error(err)
return nil, err
}
if r.StatusCode/100 != 2 {
return nil, httpgrpc.Errorf(r.StatusCode, string(buf))
}
log.LogFields(otlog.Int("bytes", len(buf)))

var resp PrometheusResponse
Expand Down
58 changes: 58 additions & 0 deletions pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package queryrange

import (
"bytes"
"compress/gzip"
"context"
"fmt"
io "io"
"net/http"
"strconv"
Expand Down Expand Up @@ -657,6 +659,62 @@ func TestMergeAPIResponses(t *testing.T) {
}
}

func TestGzippedResponse(t *testing.T) {
for _, tc := range []struct {
body string
status int
err error
}{
{
body: `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[2,"2"],[3,"3"]]}],"stats":{"samples":{"totalQueryableSamples":20,"totalQueryableSamplesPerStep":[[2,2],[3,3]]}}}}`,
status: 200,
},
{
body: `error generic 400`,
status: 400,
err: httpgrpc.Errorf(400, `error generic 400`),
},
{
status: 400,
err: httpgrpc.Errorf(400, ""),
},
} {
for _, c := range []bool{true, false} {
t.Run(fmt.Sprintf("compressed %t [%s]", c, tc.body), func(t *testing.T) {
h := http.Header{
"Content-Type": []string{"application/json"},
}

responseBody := bytes.NewBuffer([]byte(tc.body))
if c {
h.Set("Content-Encoding", "gzip")
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write([]byte(tc.body))
require.NoError(t, err)
w.Close()
responseBody = &buf
}

response := &http.Response{
StatusCode: tc.status,
Header: h,
Body: io.NopCloser(responseBody),
}
r, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil)
require.Equal(t, tc.err, err)

if err == nil {
resp, err := json.Marshal(r)
require.NoError(t, err)

require.Equal(t, tc.body, string(resp))
}
})
}
}
}

func mustParse(t *testing.T, response string) tripperware.Response {
var resp PrometheusResponse
// Needed as goimports automatically add a json import otherwise.
Expand Down