Skip to content

Commit 988c506

Browse files
authored
Returning Prometheus Warnings (#5916)
* Returning Prometheus Warnings Signed-off-by: alanprot <[email protected]> * changelog Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]>
1 parent e7f05d4 commit 988c506

12 files changed

+311
-99
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
5+
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
56
* [CHANGE] Upgrade Dockerfile Node version from 14x to 18x. #5906
67

78
## 1.17.0 2024-04-30

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/prometheus/prometheus/model/labels"
1919
"github.com/prometheus/prometheus/model/timestamp"
2020
promqlparser "github.com/prometheus/prometheus/promql/parser"
21+
"github.com/thanos-io/thanos/pkg/strutil"
2122
"github.com/weaveworks/common/httpgrpc"
2223
"google.golang.org/grpc/status"
2324

@@ -260,8 +261,12 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
260261
}
261262

262263
promResponses := make([]*PrometheusInstantQueryResponse, 0, len(responses))
264+
warnings := make([][]string, 0, len(responses))
263265
for _, resp := range responses {
264266
promResponses = append(promResponses, resp.(*PrometheusInstantQueryResponse))
267+
if w := resp.(*PrometheusInstantQueryResponse).Warnings; w != nil {
268+
warnings = append(warnings, w)
269+
}
265270
}
266271

267272
var data PrometheusInstantQueryData
@@ -303,8 +308,9 @@ func (instantQueryCodec) MergeResponse(ctx context.Context, req tripperware.Requ
303308
}
304309

305310
res := &PrometheusInstantQueryResponse{
306-
Status: queryrange.StatusSuccess,
307-
Data: data,
311+
Status: queryrange.StatusSuccess,
312+
Data: data,
313+
Warnings: strutil.MergeUnsortedSlices(warnings...),
308314
}
309315
return res, nil
310316
}

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,15 @@ func TestMergeResponse(t *testing.T) {
305305
},
306306
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
307307
},
308+
{
309+
name: "merge with warnings.",
310+
req: &PrometheusRequest{Query: "topk(10, up) by(job)"},
311+
resps: []string{
312+
`{"status":"success","warnings":["warning1","warning2"],"data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]}]}}`,
313+
`{"status":"success","warnings":["warning1","warning3"],"data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]}}`,
314+
},
315+
expectedResp: `{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"up","job":"foo"},"value":[1,"1"]},{"metric":{"__name__":"up","job":"bar"},"value":[1,"2"]}]},"warnings":["warning1","warning2","warning3"]}`,
316+
},
308317
{
309318
name: "merge two responses with stats",
310319
req: defaultReq,

pkg/querier/tripperware/instantquery/instantquery.pb.go

Lines changed: 110 additions & 43 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/querier/tripperware/instantquery/instantquery.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ message PrometheusInstantQueryResponse {
1818
string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"];
1919
string Error = 4 [(gogoproto.jsontag) = "error,omitempty"];
2020
repeated tripperware.PrometheusResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"];
21+
repeated string Warnings = 6 [(gogoproto.jsontag) = "warnings,omitempty"];
2122
}
2223

2324
message PrometheusInstantQueryData {

pkg/querier/tripperware/queryrange/query_range.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
otlog "github.com/opentracing/opentracing-go/log"
2020
"github.com/prometheus/common/model"
2121
"github.com/prometheus/prometheus/model/timestamp"
22+
"github.com/thanos-io/thanos/pkg/strutil"
2223
"github.com/weaveworks/common/httpgrpc"
2324

2425
"github.com/cortexproject/cortex/pkg/querier/tripperware"
@@ -135,8 +136,12 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques
135136
}
136137

137138
promResponses := make([]*PrometheusResponse, 0, len(responses))
139+
warnings := make([][]string, 0, len(responses))
138140
for _, res := range responses {
139141
promResponses = append(promResponses, res.(*PrometheusResponse))
142+
if w := res.(*PrometheusResponse).Warnings; w != nil {
143+
warnings = append(warnings, w)
144+
}
140145
}
141146

142147
// Merge the responses.
@@ -153,6 +158,7 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, _ tripperware.Reques
153158
Result: sampleStreams,
154159
Stats: statsMerge(c.sharded, promResponses),
155160
},
161+
Warnings: strutil.MergeUnsortedSlices(warnings...),
156162
}
157163

158164
return &response, nil

pkg/querier/tripperware/queryrange/query_range_middlewares_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ func TestRoundTrip(t *testing.T) {
3232
var err error
3333
if r.RequestURI == query {
3434
_, err = w.Write([]byte(responseBody))
35+
} else if r.RequestURI == queryWithWarnings {
36+
_, err = w.Write([]byte(responseBodyWithWarnings))
3537
} else {
3638
_, err = w.Write([]byte("bar"))
3739
}

pkg/querier/tripperware/queryrange/query_range_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ func TestRequest(t *testing.T) {
9191
func TestResponse(t *testing.T) {
9292
t.Parallel()
9393
r := *parsedResponse
94+
rWithWarnings := *parsedResponseWithWarnings
9495
r.Headers = respHeaders
96+
rWithWarnings.Headers = respHeaders
9597
for i, tc := range []struct {
9698
body string
9799
expected *PrometheusResponse
@@ -102,6 +104,10 @@ func TestResponse(t *testing.T) {
102104
body: responseBody,
103105
expected: &r,
104106
},
107+
{
108+
body: responseBodyWithWarnings,
109+
expected: &rWithWarnings,
110+
},
105111
{
106112
body: responseBody,
107113
cancelCtxBeforeDecode: true,
@@ -390,6 +396,28 @@ func TestMergeAPIResponses(t *testing.T) {
390396
},
391397
},
392398
},
399+
{
400+
name: "Merge response with warnings.",
401+
input: []tripperware.Response{
402+
mustParse(t, `{"status":"success","warnings":["warning1","warning2"],"data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"]]}]}}`),
403+
mustParse(t, `{"status":"success","warnings":["warning1","warning3"],"data":{"resultType":"matrix","result":[{"metric":{"a":"b","c":"d"},"values":[[1,"1"]]}]}}`),
404+
},
405+
expected: &PrometheusResponse{
406+
Status: StatusSuccess,
407+
Warnings: []string{"warning1", "warning2", "warning3"},
408+
Data: PrometheusData{
409+
ResultType: matrix,
410+
Result: []tripperware.SampleStream{
411+
{
412+
Labels: []cortexpb.LabelAdapter{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}},
413+
Samples: []cortexpb.Sample{
414+
{Value: 1, TimestampMs: 1000},
415+
},
416+
},
417+
},
418+
},
419+
},
420+
},
393421
{
394422
name: "Merging of samples where there is complete overlap.",
395423
input: []tripperware.Response{

0 commit comments

Comments
 (0)