Skip to content

Commit 57d4a0a

Browse files
committed
refactor query frontend to return prometheus error response
Signed-off-by: Ben Ye <[email protected]>
1 parent e961770 commit 57d4a0a

File tree

14 files changed

+274
-131
lines changed

14 files changed

+274
-131
lines changed

pkg/frontend/transport/handler.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@ import (
1616

1717
"github.com/go-kit/log"
1818
"github.com/go-kit/log/level"
19+
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
1920
"github.com/prometheus/client_golang/prometheus"
2021
"github.com/prometheus/client_golang/prometheus/promauto"
2122
"github.com/weaveworks/common/httpgrpc"
22-
"github.com/weaveworks/common/httpgrpc/server"
23+
"google.golang.org/grpc/codes"
2324
"google.golang.org/grpc/status"
2425

2526
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
2627
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2728
"github.com/cortexproject/cortex/pkg/tenant"
2829
"github.com/cortexproject/cortex/pkg/util"
30+
util_api "github.com/cortexproject/cortex/pkg/util/api"
2931
util_log "github.com/cortexproject/cortex/pkg/util/log"
3032
)
3133

@@ -239,8 +241,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
239241
writeServiceTimingHeader(queryResponseTime, hs, stats)
240242
}
241243

244+
logger := util_log.WithContext(r.Context(), f.log)
242245
if err != nil {
243-
writeError(w, err, hs)
246+
writeError(logger, w, err, hs)
244247
return
245248
}
246249

@@ -252,7 +255,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
252255
// log copy response body error so that we will know even though success response code returned
253256
bytesCopied, err := io.Copy(w, resp.Body)
254257
if err != nil && !errors.Is(err, syscall.EPIPE) {
255-
level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
258+
level.Error(logger).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err)
256259
}
257260
}
258261

@@ -441,7 +444,7 @@ func formatQueryString(queryString url.Values) (fields []interface{}) {
441444
return fields
442445
}
443446

444-
func writeError(w http.ResponseWriter, err error, additionalHeaders http.Header) {
447+
func writeError(logger log.Logger, w http.ResponseWriter, err error, additionalHeaders http.Header) {
445448
switch err {
446449
case context.Canceled:
447450
err = errCanceled
@@ -453,20 +456,35 @@ func writeError(w http.ResponseWriter, err error, additionalHeaders http.Header)
453456
}
454457
}
455458

459+
headers := w.Header()
460+
for k, values := range additionalHeaders {
461+
for _, value := range values {
462+
headers.Set(k, value)
463+
}
464+
}
456465
resp, ok := httpgrpc.HTTPResponseFromError(err)
457466
if ok {
458-
for k, values := range additionalHeaders {
459-
resp.Headers = append(resp.Headers, &httpgrpc.Header{Key: k, Values: values})
467+
code := int(resp.Code)
468+
var errTyp v1.ErrorType
469+
switch resp.Code {
470+
case http.StatusBadRequest, http.StatusRequestEntityTooLarge:
471+
errTyp = v1.ErrBadData
472+
case StatusClientClosedRequest:
473+
errTyp = v1.ErrCanceled
474+
case http.StatusGatewayTimeout:
475+
errTyp = v1.ErrTimeout
476+
case http.StatusUnprocessableEntity:
477+
errTyp = v1.ErrExec
478+
case int32(codes.PermissionDenied):
479+
// Convert gRPC status code to HTTP status code.
480+
code = http.StatusUnprocessableEntity
481+
errTyp = v1.ErrBadData
482+
default:
483+
errTyp = v1.ErrServer
460484
}
461-
_ = server.WriteResponse(w, resp)
485+
util_api.RespondError(logger, w, errTyp, string(resp.Body), code)
462486
} else {
463-
headers := w.Header()
464-
for k, values := range additionalHeaders {
465-
for _, value := range values {
466-
headers.Set(k, value)
467-
}
468-
}
469-
http.Error(w, err.Error(), http.StatusInternalServerError)
487+
util_api.RespondError(logger, w, v1.ErrServer, err.Error(), http.StatusInternalServerError)
470488
}
471489
}
472490

pkg/frontend/transport/handler_test.go

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package transport
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
67
"io"
78
"net/http"
89
"net/http/httptest"
@@ -13,14 +14,18 @@ import (
1314

1415
"github.com/go-kit/log"
1516
"github.com/pkg/errors"
17+
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
1618
"github.com/prometheus/client_golang/prometheus"
1719
promtest "github.com/prometheus/client_golang/prometheus/testutil"
1820
"github.com/stretchr/testify/assert"
1921
"github.com/stretchr/testify/require"
2022
"github.com/weaveworks/common/httpgrpc"
2123
"github.com/weaveworks/common/user"
24+
"google.golang.org/grpc/codes"
2225

2326
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
27+
util_api "github.com/cortexproject/cortex/pkg/util/api"
28+
util_log "github.com/cortexproject/cortex/pkg/util/log"
2429
)
2530

2631
type roundTripperFunc func(*http.Request) (*http.Response, error)
@@ -34,19 +39,111 @@ func TestWriteError(t *testing.T) {
3439
status int
3540
err error
3641
additionalHeaders http.Header
42+
expectedErrResp util_api.Response
3743
}{
38-
{http.StatusInternalServerError, errors.New("unknown"), http.Header{"User-Agent": []string{"Golang"}}},
39-
{http.StatusInternalServerError, errors.New("unknown"), nil},
40-
{http.StatusGatewayTimeout, context.DeadlineExceeded, nil},
41-
{StatusClientClosedRequest, context.Canceled, nil},
42-
{StatusClientClosedRequest, context.Canceled, http.Header{"User-Agent": []string{"Golang"}}},
43-
{StatusClientClosedRequest, context.Canceled, http.Header{"User-Agent": []string{"Golang"}, "Content-Type": []string{"application/json"}}},
44-
{http.StatusBadRequest, httpgrpc.Errorf(http.StatusBadRequest, ""), http.Header{}},
45-
{http.StatusRequestEntityTooLarge, errors.New("http: request body too large"), http.Header{}},
44+
{
45+
http.StatusInternalServerError,
46+
errors.New("unknown"),
47+
http.Header{"User-Agent": []string{"Golang"}},
48+
util_api.Response{
49+
Status: "error",
50+
ErrorType: v1.ErrServer,
51+
Error: "unknown",
52+
},
53+
},
54+
{
55+
http.StatusInternalServerError,
56+
errors.New("unknown"),
57+
nil,
58+
util_api.Response{
59+
Status: "error",
60+
ErrorType: v1.ErrServer,
61+
Error: "unknown",
62+
},
63+
},
64+
{
65+
http.StatusGatewayTimeout,
66+
context.DeadlineExceeded,
67+
nil,
68+
util_api.Response{
69+
Status: "error",
70+
ErrorType: v1.ErrTimeout,
71+
Error: "",
72+
},
73+
},
74+
{
75+
StatusClientClosedRequest,
76+
context.Canceled,
77+
nil,
78+
util_api.Response{
79+
Status: "error",
80+
ErrorType: v1.ErrCanceled,
81+
Error: "",
82+
},
83+
},
84+
{
85+
StatusClientClosedRequest,
86+
context.Canceled,
87+
http.Header{"User-Agent": []string{"Golang"}},
88+
util_api.Response{
89+
Status: "error",
90+
ErrorType: v1.ErrCanceled,
91+
Error: "",
92+
},
93+
},
94+
{
95+
StatusClientClosedRequest,
96+
context.Canceled,
97+
http.Header{"User-Agent": []string{"Golang"}, "Content-Type": []string{"application/json"}},
98+
util_api.Response{
99+
Status: "error",
100+
ErrorType: v1.ErrCanceled,
101+
Error: "",
102+
},
103+
},
104+
{http.StatusBadRequest,
105+
httpgrpc.Errorf(http.StatusBadRequest, ""),
106+
http.Header{},
107+
util_api.Response{
108+
Status: "error",
109+
ErrorType: v1.ErrBadData,
110+
Error: "",
111+
},
112+
},
113+
{
114+
http.StatusRequestEntityTooLarge,
115+
errors.New("http: request body too large"),
116+
http.Header{},
117+
util_api.Response{
118+
Status: "error",
119+
ErrorType: v1.ErrBadData,
120+
Error: "http: request body too large",
121+
},
122+
},
123+
{
124+
http.StatusUnprocessableEntity,
125+
httpgrpc.Errorf(http.StatusUnprocessableEntity, "limit hit"),
126+
http.Header{},
127+
util_api.Response{
128+
Status: "error",
129+
ErrorType: v1.ErrExec,
130+
Error: "limit hit",
131+
},
132+
},
133+
{
134+
http.StatusUnprocessableEntity,
135+
httpgrpc.Errorf(int(codes.PermissionDenied), "permission denied"),
136+
http.Header{},
137+
util_api.Response{
138+
Status: "error",
139+
ErrorType: v1.ErrBadData,
140+
Error: "permission denied",
141+
},
142+
},
46143
} {
47144
t.Run(test.err.Error(), func(t *testing.T) {
48145
w := httptest.NewRecorder()
49-
writeError(w, test.err, test.additionalHeaders)
146+
writeError(util_log.Logger, w, test.err, test.additionalHeaders)
50147
require.Equal(t, test.status, w.Result().StatusCode)
51148
expectedAdditionalHeaders := test.additionalHeaders
52149
if expectedAdditionalHeaders != nil {
@@ -56,6 +153,18 @@ func TestWriteError(t *testing.T) {
56153
}
57154
}
58155
}
156+
data, err := io.ReadAll(w.Result().Body)
157+
require.NoError(t, err)
158+
var res util_api.Response
159+
err = json.Unmarshal(data, &res)
160+
require.NoError(t, err)
161+
resp, ok := httpgrpc.HTTPResponseFromError(test.err)
162+
if ok {
163+
require.Equal(t, string(resp.Body), res.Error)
164+
} else {
165+
require.Equal(t, test.err.Error(), res.Error)
166+
167+
}
59168
})
60169
}
61170
}

pkg/querier/tripperware/instantquery/limits.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe
4848
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 {
4949
expr, err := parser.ParseExpr(r.GetQuery())
5050
if err != nil {
51-
// Let Querier propagates the parsing error.
52-
return l.next.Do(ctx, r)
51+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
5352
}
5453

5554
// Enforce query length across all selectors in the query.

pkg/querier/tripperware/instantquery/limits_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package instantquery
22

33
import (
44
"context"
5+
"net/http"
56
"testing"
67
"time"
78

9+
"github.com/prometheus/prometheus/promql/parser"
810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/mock"
1012
"github.com/stretchr/testify/require"
13+
"github.com/weaveworks/common/httpgrpc"
1114
"github.com/weaveworks/common/user"
1215

1316
"github.com/cortexproject/cortex/pkg/querier/tripperware"
@@ -20,6 +23,9 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
2023
thirtyDays = 30 * 24 * time.Hour
2124
)
2225

26+
wrongQuery := `up[`
27+
_, parserErr := parser.ParseExpr(wrongQuery)
28+
2329
tests := map[string]struct {
2430
maxQueryLength time.Duration
2531
query string
@@ -31,6 +37,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
3137
"even though failed to parse expression, should return no error since request will pass to next middleware": {
3238
query: `up[`,
3339
maxQueryLength: thirtyDays,
40+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, parserErr.Error()).Error(),
3441
},
3542
"should succeed on a query not exceeding time range": {
3643
query: `up`,

pkg/querier/tripperware/queryrange/limits.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,7 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe
8484

8585
expr, err := parser.ParseExpr(r.GetQuery())
8686
if err != nil {
87-
// Let Querier propagates the parsing error.
88-
return l.next.Do(ctx, r)
87+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
8988
}
9089

9190
// Enforce query length across all selectors in the query.

pkg/querier/tripperware/queryrange/limits_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package queryrange
22

33
import (
44
"context"
5+
"net/http"
56
"testing"
67
"time"
78

9+
"github.com/prometheus/prometheus/promql/parser"
810
"github.com/stretchr/testify/assert"
911
"github.com/stretchr/testify/mock"
1012
"github.com/stretchr/testify/require"
13+
"github.com/weaveworks/common/httpgrpc"
1114
"github.com/weaveworks/common/user"
1215

1316
"github.com/cortexproject/cortex/pkg/querier/tripperware"
@@ -115,6 +118,9 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
115118

116119
now := time.Now()
117120

121+
wrongQuery := `up[`
122+
_, parserErr := parser.ParseExpr(wrongQuery)
123+
118124
tests := map[string]struct {
119125
maxQueryLength time.Duration
120126
query string
@@ -132,6 +138,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
132138
reqStartTime: now.Add(-time.Hour),
133139
reqEndTime: now,
134140
maxQueryLength: thirtyDays,
141+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, parserErr.Error()).Error(),
135142
},
136143
"should succeed on a query on short time range, ending now": {
137144
maxQueryLength: thirtyDays,

pkg/querier/tripperware/queryrange/split_by_interval.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper
4747
// to line up the boundaries with step.
4848
reqs, err := splitQuery(r, s.interval(r))
4949
if err != nil {
50-
// If the query itself is bad, we don't return error but send the query
51-
// to querier to return the expected error message. This is not very efficient
52-
// but should be okay for now.
53-
// TODO(yeya24): query frontend can reuse the Prometheus API handler and return
54-
// expected error message locally without passing it to the querier through network.
55-
return s.next.Do(ctx, r)
50+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
5651
}
5752
s.splitByCounter.Add(float64(len(reqs)))
5853

pkg/querier/tripperware/roundtrip.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func NewQueryTripperware(
143143
tenantIDs, err := tenant.TenantIDs(r.Context())
144144
// This should never happen anyways because we have auth middleware before this.
145145
if err != nil {
146-
return nil, err
146+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
147147
}
148148
now := time.Now()
149149
userStr := tenant.JoinTenantIDs(tenantIDs)
@@ -162,8 +162,7 @@ func NewQueryTripperware(
162162

163163
expr, err := parser.ParseExpr(query)
164164
if err != nil {
165-
// If query is invalid, no need to go through tripperwares for further splitting.
166-
return next.RoundTrip(r)
165+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
167166
}
168167

169168
reqStats := stats.FromContext(r.Context())
@@ -189,7 +188,10 @@ func NewQueryTripperware(
189188
return next.RoundTrip(r)
190189
}
191190
analysis, err := queryAnalyzer.Analyze(query)
192-
if err != nil || !analysis.IsShardable() {
191+
if err != nil {
192+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
193+
}
194+
if !analysis.IsShardable() {
193195
return next.RoundTrip(r)
194196
}
195197
return instantQuery.RoundTrip(r)

0 commit comments

Comments
 (0)