Skip to content

Commit 31f2cce

Browse files
committed
Allow ruler to retrieve proto format query response
Signed-off-by: SungJin1212 <[email protected]>
1 parent b72a536 commit 31f2cce

22 files changed

+708
-89
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
1010
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
1111
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
12+
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
1213
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
1314
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
1415
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4313,6 +4313,12 @@ The `ruler_config` configures the Cortex ruler.
43134313
# CLI flag: -ruler.frontend-address
43144314
[frontend_address: <string> | default = ""]
43154315
4316+
# [Experimental] Query response format to get query results from Query Frontend
4317+
# when the rule evaluation. It will only take effect when
4318+
# `-ruler.frontend-address` is configured. Supported values: json,protobuf
4319+
# CLI flag: -ruler.query-response-format
4320+
[query_response_format: <string> | default = "protobuf"]
4321+
43164322
frontend_client:
43174323
# gRPC client max receive message size (bytes).
43184324
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size

docs/configuration/v1-guarantees.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ Cortex is an actively developed project and we want to encourage the introductio
3535

3636
Currently experimental features are:
3737

38-
- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` )
38+
- Ruler
39+
- Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address`).
40+
- When `-ruler.frontend-address` is specified, the response format can be specified (via `-ruler.query-response-format`).
3941
- S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides).
4042
- Azure blob storage.
4143
- Zone awareness based replication.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ require (
8181
github.com/cespare/xxhash/v2 v2.3.0
8282
github.com/google/go-cmp v0.6.0
8383
github.com/hashicorp/golang-lru/v2 v2.0.7
84+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8485
github.com/sercand/kuberesolver/v5 v5.1.1
8586
github.com/tjhop/slog-gokit v0.1.2
8687
go.opentelemetry.io/collector/pdata v1.21.0
@@ -189,7 +190,6 @@ require (
189190
github.com/mitchellh/mapstructure v1.5.0 // indirect
190191
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
191192
github.com/modern-go/reflect2 v1.0.2 // indirect
192-
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
193193
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
194194
github.com/ncw/swift v1.0.53 // indirect
195195
github.com/oklog/run v1.1.0 // indirect

integration/ruler_test.go

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,42 +1670,64 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
16701670
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
16711671
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
16721672
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1673-
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1674-
require.NoError(t, s.Start(queryFrontend))
1675-
1676-
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1677-
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1678-
}), "")
1679-
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1680-
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1681-
}), "")
1682-
require.NoError(t, s.StartAndWaitReady(ruler, querier))
1683-
1684-
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1685-
require.NoError(t, err)
1673+
for _, format := range []string{"protobuf", "json"} {
1674+
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
1675+
queryFrontendFlag := mergeFlags(flags, map[string]string{
1676+
"-ruler.query-response-format": format,
1677+
})
1678+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
1679+
require.NoError(t, s.Start(queryFrontend))
16861680

1687-
expression := "metric"
1688-
groupName := "rule_group"
1689-
ruleName := "rule_name"
1690-
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1681+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{
1682+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1683+
}), "")
1684+
require.NoError(t, s.StartAndWaitReady(querier))
16911685

1692-
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1693-
// Wait until ruler has loaded the group.
1694-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1695-
// Wait until rule group has tried to evaluate the rule.
1696-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1686+
rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{
1687+
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1688+
})
1689+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "")
1690+
require.NoError(t, s.StartAndWaitReady(ruler))
16971691

1698-
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1699-
// Check that cortex_ruler_query_frontend_clients went up
1700-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1701-
// Check that cortex_ruler_queries_total went up
1702-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1703-
// Check that cortex_ruler_queries_failed_total is zero
1704-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1705-
// Check that cortex_ruler_write_requests_total went up
1706-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1707-
// Check that cortex_ruler_write_requests_failed_total is zero
1708-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1692+
t.Cleanup(func() {
1693+
_ = s.Stop(ruler)
1694+
_ = s.Stop(queryFrontend)
1695+
_ = s.Stop(querier)
1696+
})
1697+
1698+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1699+
require.NoError(t, err)
1700+
1701+
expression := "metric" // vector
1702+
//expression := "scalar(count(up == 1)) > bool 1" // scalar
1703+
groupName := "rule_group"
1704+
ruleName := "rule_name"
1705+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1706+
1707+
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1708+
// Wait until ruler has loaded the group.
1709+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1710+
// Wait until rule group has tried to evaluate the rule.
1711+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1712+
// Make sure not to fail
1713+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1714+
1715+
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1716+
sourceMatcher := labels.MustNewMatcher(labels.MatchEqual, "source", "ruler")
1717+
// Check that cortex_ruler_query_frontend_clients went up
1718+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1719+
// Check that cortex_ruler_queries_total went up
1720+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1721+
// Check that cortex_ruler_queries_failed_total is zero
1722+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1723+
// Check that cortex_ruler_write_requests_total went up
1724+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1725+
// Check that cortex_ruler_write_requests_failed_total is zero
1726+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1727+
// Check that cortex_query_frontend_queries_total went up
1728+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(matcher, sourceMatcher), e2e.WaitMissingMetrics))
1729+
})
1730+
}
17091731
}
17101732

17111733
func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {

pkg/querier/codec/protobuf_codec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func (p ProtobufCodec) ContentType() v1.MIMEType {
2525
if !p.CortexInternal {
2626
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
2727
}
28-
// TODO: switch to use constants.
29-
return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"}
28+
29+
return v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
3030
}
3131

3232
func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {

pkg/querier/codec/protobuf_codec_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
4242
expected *tripperware.PrometheusResponse
4343
}{
4444
{
45+
name: "vector",
4546
data: &v1.QueryData{
4647
ResultType: parser.ValueTypeVector,
4748
Result: promql.Vector{
@@ -85,6 +86,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
8586
},
8687
},
8788
{
89+
name: "scalar",
8890
data: &v1.QueryData{
8991
ResultType: parser.ValueTypeScalar,
9092
Result: promql.Scalar{T: 1000, V: 1},
@@ -147,6 +149,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
147149
},
148150
},
149151
{
152+
name: "matrix",
150153
data: &v1.QueryData{
151154
ResultType: parser.ValueTypeMatrix,
152155
Result: promql.Matrix{
@@ -180,6 +183,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
180183
},
181184
},
182185
{
186+
name: "matrix with multiple series",
183187
data: &v1.QueryData{
184188
ResultType: parser.ValueTypeMatrix,
185189
Result: promql.Matrix{
@@ -223,6 +227,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
223227
},
224228
},
225229
{
230+
name: "matrix: not cortex internal with histogram",
226231
data: &v1.QueryData{
227232
ResultType: parser.ValueTypeMatrix,
228233
Result: promql.Matrix{
@@ -312,6 +317,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
312317
},
313318
},
314319
{
320+
name: "vector: not cortex internal with histogram",
315321
data: &v1.QueryData{
316322
ResultType: parser.ValueTypeVector,
317323
Result: promql.Vector{
@@ -400,7 +406,7 @@ func TestProtobufCodec_Encode(t *testing.T) {
400406
},
401407
},
402408
{
403-
name: "cortex internal with native histogram",
409+
name: "vector: cortex internal with native histogram",
404410
cortexInternal: true,
405411
data: &v1.QueryData{
406412
ResultType: parser.ValueTypeVector,

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import (
1111
"time"
1212

1313
jsoniter "github.com/json-iterator/go"
14+
"github.com/munnerz/goautoneg"
1415
"github.com/opentracing/opentracing-go"
1516
otlog "github.com/opentracing/opentracing-go/log"
1617
"github.com/prometheus/common/model"
18+
v1 "github.com/prometheus/prometheus/web/api/v1"
1719
"github.com/weaveworks/common/httpgrpc"
1820
"google.golang.org/grpc/status"
1921

@@ -29,6 +31,9 @@ var (
2931
SortMapKeys: true,
3032
ValidateJsonRawMessage: false,
3133
}.Froze()
34+
35+
rulerMIMEType = v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
36+
jsonMIMEType = v1.MIMEType{Type: "application", SubType: "json"}
3237
)
3338

3439
type instantQueryCodec struct {
@@ -68,12 +73,18 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
6873
result.Stats = r.FormValue("stats")
6974
result.Path = r.URL.Path
7075

71-
// Include the specified headers from http request in prometheusRequest.
72-
for _, header := range forwardHeaders {
73-
for h, hv := range r.Header {
74-
if strings.EqualFold(h, header) {
75-
result.Headers[h] = hv
76-
break
76+
isSourceRuler := strings.Contains(r.Header.Get("User-Agent"), tripperware.RulerUserAgent)
77+
if isSourceRuler {
78+
// When the source is the Ruler, then forward whole headers
79+
result.Headers = r.Header
80+
} else {
81+
// Include the specified headers from http request in prometheusRequest.
82+
for _, header := range forwardHeaders {
83+
for h, hv := range r.Header {
84+
if strings.EqualFold(h, header) {
85+
result.Headers[h] = hv
86+
break
87+
}
7788
}
7889
}
7990
}
@@ -155,7 +166,11 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
155166
}
156167
}
157168

158-
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
169+
isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent)
170+
if !isSourceRuler {
171+
// When the source is the Ruler, skip set header
172+
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
173+
}
159174

160175
req := &http.Request{
161176
Method: "GET",
@@ -168,7 +183,7 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
168183
return req.WithContext(ctx), nil
169184
}
170185

171-
func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Response) (*http.Response, error) {
186+
func (c instantQueryCodec) EncodeResponse(ctx context.Context, req *http.Request, res tripperware.Response) (*http.Response, error) {
172187
sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse")
173188
defer sp.Finish()
174189

@@ -180,7 +195,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
180195
queryStats := stats.FromContext(ctx)
181196
tripperware.SetQueryResponseStats(a, queryStats)
182197

183-
b, err := json.Marshal(a)
198+
contentType, b, err := marshalResponse(a, req.Header.Get("Accept"))
184199
if err != nil {
185200
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)
186201
}
@@ -189,7 +204,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res
189204

190205
resp := http.Response{
191206
Header: http.Header{
192-
"Content-Type": []string{tripperware.ApplicationJson},
207+
"Content-Type": []string{contentType},
193208
},
194209
Body: io.NopCloser(bytes.NewBuffer(b)),
195210
StatusCode: http.StatusOK,
@@ -217,3 +232,18 @@ func decorateWithParamName(err error, field string) error {
217232
}
218233
return fmt.Errorf(errTmpl, field, err)
219234
}
235+
236+
func marshalResponse(resp *tripperware.PrometheusResponse, acceptHeader string) (string, []byte, error) {
237+
for _, clause := range goautoneg.ParseAccept(acceptHeader) {
238+
if jsonMIMEType.Satisfies(clause) {
239+
b, err := json.Marshal(resp)
240+
return tripperware.ApplicationJson, b, err
241+
} else if rulerMIMEType.Satisfies(clause) {
242+
b, err := resp.Marshal()
243+
return tripperware.QueryResponseCortexMIMEType, b, err
244+
}
245+
}
246+
247+
b, err := json.Marshal(resp)
248+
return tripperware.ApplicationJson, b, err
249+
}

0 commit comments

Comments
 (0)