Skip to content

Commit 27d40bd

Browse files
committed
Allow ruler to retrieve proto format query response
Signed-off-by: SungJin1212 <[email protected]>
1 parent 320e475 commit 27d40bd

23 files changed

+1009
-84
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
1010
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
1111
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
12+
* [FEATURE] Ruler: Add an experimental flag `-ruler.query-response-format` to retrieve query response as a proto format. #6345
1213
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
1314
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
1415
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4298,6 +4298,11 @@ The `ruler_config` configures the Cortex ruler.
42984298
# CLI flag: -ruler.frontend-address
42994299
[frontend_address: <string> | default = ""]
43004300
4301+
# [Experimental] Query response format to get query results from Query Frontend
4302+
# when the rule evaluation. Supported values: json,protobuf
4303+
# CLI flag: -ruler.query-response-format
4304+
[query_response_format: <string> | default = "protobuf"]
4305+
43014306
frontend_client:
43024307
# gRPC client max receive message size (bytes).
43034308
# CLI flag: -ruler.frontendClient.grpc-max-recv-msg-size

docs/configuration/v1-guarantees.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ Cortex is an actively developed project and we want to encourage the introductio
3535

3636
Currently experimental features are:
3737

38-
- Ruler: Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address` )
38+
- Ruler
39+
- Evaluate rules to query frontend instead of ingesters (enabled via `-ruler.frontend-address`).
40+
- When `-ruler.frontend-address` is specified, the response format can be specified (via `-ruler.query-response-format`).
3941
- S3 Server Side Encryption (SSE) using KMS (including per-tenant KMS config overrides).
4042
- Azure blob storage.
4143
- Zone awareness based replication.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ require (
8080
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3
8181
github.com/cespare/xxhash/v2 v2.3.0
8282
github.com/google/go-cmp v0.6.0
83+
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8384
github.com/sercand/kuberesolver/v5 v5.1.1
8485
go.opentelemetry.io/collector/pdata v1.20.0
8586
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
@@ -188,7 +189,6 @@ require (
188189
github.com/mitchellh/mapstructure v1.5.0 // indirect
189190
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
190191
github.com/modern-go/reflect2 v1.0.2 // indirect
191-
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
192192
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
193193
github.com/ncw/swift v1.0.53 // indirect
194194
github.com/oklog/run v1.1.0 // indirect

integration/ruler_test.go

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,42 +1670,61 @@ func TestRulerEvalWithQueryFrontend(t *testing.T) {
16701670
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
16711671
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
16721672
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1673-
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1674-
require.NoError(t, s.Start(queryFrontend))
1675-
1676-
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1677-
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1678-
}), "")
1679-
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1680-
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1681-
}), "")
1682-
require.NoError(t, s.StartAndWaitReady(ruler, querier))
1683-
1684-
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1685-
require.NoError(t, err)
1673+
for _, format := range []string{"protobuf", "json"} {
1674+
t.Run(fmt.Sprintf("format:%s", format), func(t *testing.T) {
1675+
queryFrontendFlag := mergeFlags(flags, map[string]string{
1676+
"-ruler.query-response-format": format,
1677+
})
1678+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", queryFrontendFlag, "")
1679+
require.NoError(t, s.Start(queryFrontend))
16861680

1687-
expression := "metric"
1688-
groupName := "rule_group"
1689-
ruleName := "rule_name"
1690-
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1681+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(queryFrontendFlag, map[string]string{
1682+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1683+
}), "")
1684+
require.NoError(t, s.StartAndWaitReady(querier))
16911685

1692-
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1693-
// Wait until ruler has loaded the group.
1694-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1695-
// Wait until rule group has tried to evaluate the rule.
1696-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1686+
rulerFlag := mergeFlags(queryFrontendFlag, map[string]string{
1687+
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1688+
})
1689+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), rulerFlag, "")
1690+
require.NoError(t, s.StartAndWaitReady(ruler))
16971691

1698-
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1699-
// Check that cortex_ruler_query_frontend_clients went up
1700-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1701-
// Check that cortex_ruler_queries_total went up
1702-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1703-
// Check that cortex_ruler_queries_failed_total is zero
1704-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1705-
// Check that cortex_ruler_write_requests_total went up
1706-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1707-
// Check that cortex_ruler_write_requests_failed_total is zero
1708-
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1692+
t.Cleanup(func() {
1693+
_ = s.Stop(ruler)
1694+
_ = s.Stop(queryFrontend)
1695+
_ = s.Stop(querier)
1696+
})
1697+
1698+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1699+
require.NoError(t, err)
1700+
1701+
expression := "metric" // vector
1702+
//expression := "scalar(count(up == 1)) > bool 1" // scalar
1703+
groupName := "rule_group"
1704+
ruleName := "rule_name"
1705+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1706+
1707+
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1708+
// Wait until ruler has loaded the group.
1709+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1710+
// Wait until rule group has tried to evaluate the rule.
1711+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1712+
// Make sure not to fail
1713+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1714+
1715+
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1716+
// Check that cortex_ruler_query_frontend_clients went up
1717+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1718+
// Check that cortex_ruler_queries_total went up
1719+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1720+
// Check that cortex_ruler_queries_failed_total is zero
1721+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1722+
// Check that cortex_ruler_write_requests_total went up
1723+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1724+
// Check that cortex_ruler_write_requests_failed_total is zero
1725+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1726+
})
1727+
}
17091728
}
17101729

17111730
func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
475475
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
476476
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
477477
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
478-
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
478+
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Cfg.Ruler.QueryResponseFormat)
479479

480480
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
481481
t.Cfg.QueryRange,

pkg/querier/codec/protobuf_codec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ func (p ProtobufCodec) ContentType() v1.MIMEType {
2525
if !p.CortexInternal {
2626
return v1.MIMEType{Type: "application", SubType: "x-protobuf"}
2727
}
28-
// TODO: switch to use constants.
29-
return v1.MIMEType{Type: "application", SubType: "x-cortex-query+proto"}
28+
29+
return v1.MIMEType{Type: "application", SubType: tripperware.QueryResponseCortexMIMESubType}
3030
}
3131

3232
func (p ProtobufCodec) CanEncode(resp *v1.Response) bool {

0 commit comments

Comments
 (0)