Skip to content

Commit f1e2a3d

Browse files
committed
Support Ruler to query Query Frontend
Signed-off-by: SungJin1212 <[email protected]>
1 parent 39a168d commit f1e2a3d

13 files changed

+685
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869
66
* [ENHANCEMENT] Ruler: Add query statistics metrics when --ruler.query-stats-enabled=true. #6173
77
* [ENHANCEMENT] Distributor: Add new `cortex_reduced_resolution_histogram_samples_total` metric to to track the number of histogram samples which resolution was reduced. #6182
8+
* [FEATURE] Ruler: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151
89

910
## 1.18.0 in progress
1011

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4067,6 +4067,11 @@ The `redis_config` configures the Redis backend cache.
40674067
The `ruler_config` configures the Cortex ruler.
40684068

40694069
```yaml
4070+
# Address of the Query Frontend service, in host:port format. Ruler queries to
4071+
# Query Frontends via gRPC. If not set, ruler queries to Ingesters directly.
4072+
# CLI flag: -ruler.frontend-address
4073+
[frontend_address: <string> | default = ""]
4074+
40704075
# URL of alerts return path.
40714076
# CLI flag: -ruler.external.url
40724077
[external_url: <url> | default = ]

integration/ruler_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,72 @@ func TestRulerKeepFiring(t *testing.T) {
10891089
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
10901090
}
10911091

1092+
func TestRulerEvalWithQueryFrontend(t *testing.T) {
1093+
s, err := e2e.NewScenario(networkName)
1094+
require.NoError(t, err)
1095+
defer s.Close()
1096+
1097+
// Start dependencies.
1098+
consul := e2edb.NewConsul()
1099+
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
1100+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1101+
1102+
// Configure the ruler.
1103+
flags := mergeFlags(
1104+
BlocksStorageFlags(),
1105+
RulerFlags(),
1106+
map[string]string{
1107+
// Evaluate rules often, so that we don't need to wait for metrics to show up.
1108+
"-ruler.evaluation-interval": "2s",
1109+
// We run single ingester only, no replication.
1110+
"-distributor.replication-factor": "1",
1111+
},
1112+
)
1113+
1114+
const namespace = "test"
1115+
const user = "user"
1116+
1117+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1118+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1119+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1120+
queryFrontend := e2ecortex.NewQueryFrontend("query-frontend", flags, "")
1121+
require.NoError(t, s.Start(queryFrontend))
1122+
1123+
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1124+
"-ruler.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1125+
}), "")
1126+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1127+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1128+
}), "")
1129+
require.NoError(t, s.StartAndWaitReady(ruler, querier))
1130+
1131+
c, err := e2ecortex.NewClient("", "", "", ruler.HTTPEndpoint(), user)
1132+
require.NoError(t, err)
1133+
1134+
expression := "metric"
1135+
groupName := "rule_group"
1136+
ruleName := "rule_name"
1137+
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, ruleName, expression), namespace))
1138+
1139+
rgMatcher := ruleGroupMatcher(user, namespace, groupName)
1140+
// Wait until ruler has loaded the group.
1141+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1142+
// Wait until rule group has tried to evaluate the rule.
1143+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(rgMatcher), e2e.WaitMissingMetrics))
1144+
1145+
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
1146+
// Check that cortex_ruler_query_frontend_clients went up
1147+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ruler_query_frontend_clients"}, e2e.WaitMissingMetrics))
1148+
// Check that cortex_ruler_queries_total went up
1149+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1150+
// Check that cortex_ruler_queries_failed_total is zero
1151+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1152+
// Check that cortex_ruler_write_requests_total went up
1153+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1154+
// Check that cortex_ruler_write_requests_failed_total is zero
1155+
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
1156+
}
1157+
10921158
func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
10931159
responseJson, err := json.Marshal(rules)
10941160
require.NoError(t, err)

pkg/cortex/modules.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,8 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
548548
}
549549

550550
t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta
551+
t.Cfg.Ruler.FrontendTimeout = t.Cfg.Querier.Timeout
552+
t.Cfg.Ruler.PrometheusHTTPPrefix = t.Cfg.API.PrometheusHTTPPrefix
551553
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
552554
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)
553555

pkg/ruler/compat.go

Lines changed: 41 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/cortexproject/cortex/pkg/cortexpb"
2525
"github.com/cortexproject/cortex/pkg/querier"
2626
"github.com/cortexproject/cortex/pkg/querier/stats"
27+
"github.com/cortexproject/cortex/pkg/ring/client"
2728
util_log "github.com/cortexproject/cortex/pkg/util/log"
2829
promql_util "github.com/cortexproject/cortex/pkg/util/promql"
2930
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -157,7 +158,7 @@ type RulesLimits interface {
157158
// EngineQueryFunc returns a new engine query function validating max queryLength.
158159
// Modified from Prometheus rules.EngineQueryFunc
159160
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
160-
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
161+
func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
161162
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
162163
// Enforce the max query length.
163164
maxQueryLength := overrides.MaxQueryLength(userID)
@@ -174,25 +175,34 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
174175
}
175176
}
176177

177-
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
178-
if err != nil {
179-
return nil, err
180-
}
181-
res := q.Exec(ctx)
182-
if res.Err != nil {
183-
return nil, res.Err
184-
}
185-
switch v := res.Value.(type) {
186-
case promql.Vector:
178+
if frontendClient != nil {
179+
v, err := frontendClient.InstantQuery(ctx, qs, t)
180+
if err != nil {
181+
return nil, err
182+
}
183+
187184
return v, nil
188-
case promql.Scalar:
189-
return promql.Vector{promql.Sample{
190-
T: v.T,
191-
F: v.V,
192-
Metric: labels.Labels{},
193-
}}, nil
194-
default:
195-
return nil, errors.New("rule result is not a vector or scalar")
185+
} else {
186+
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
187+
if err != nil {
188+
return nil, err
189+
}
190+
res := q.Exec(ctx)
191+
if res.Err != nil {
192+
return nil, res.Err
193+
}
194+
switch v := res.Value.(type) {
195+
case promql.Vector:
196+
return v, nil
197+
case promql.Scalar:
198+
return promql.Vector{promql.Sample{
199+
T: v.T,
200+
F: v.V,
201+
Metric: labels.Labels{},
202+
}}, nil
203+
default:
204+
return nil, errors.New("rule result is not a vector or scalar")
205+
}
196206
}
197207
}
198208
}
@@ -300,22 +310,30 @@ type RulesManager interface {
300310
}
301311

302312
// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
303-
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager
313+
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error)
304314

305315
func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine promql.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory {
306316
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
307317
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
308318
// Errors from PromQL are always "user" errors.
309319
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)
310320

311-
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
321+
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error) {
322+
var client *frontendClient
312323
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
313324
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
314325
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
315326
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)
316327

328+
if cfg.FrontendAddress != "" {
329+
c, err := frontendPool.GetClientFor(cfg.FrontendAddress)
330+
if err != nil {
331+
return nil, err
332+
}
333+
client = c.(*frontendClient)
334+
}
317335
var queryFunc rules.QueryFunc
318-
engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID, cfg.LookbackDelta)
336+
engineQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta)
319337
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
320338
if cfg.EnableQueryStats {
321339
queryFunc = RecordAndReportRuleQueryMetrics(metricsQueryFunc, userID, evalMetrics, logger)
@@ -340,7 +358,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
340358
DefaultRuleQueryOffset: func() time.Duration {
341359
return overrides.RulerQueryOffset(userID)
342360
},
343-
})
361+
}), nil
344362
}
345363
}
346364

pkg/ruler/frontend_client.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
"net/textproto"
8+
"net/url"
9+
"strconv"
10+
"time"
11+
12+
"github.com/go-kit/log/level"
13+
"github.com/prometheus/common/version"
14+
"github.com/prometheus/prometheus/promql"
15+
"github.com/weaveworks/common/httpgrpc"
16+
"github.com/weaveworks/common/user"
17+
18+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
19+
)
20+
21+
const (
22+
orgIDHeader = "X-Scope-OrgID"
23+
instantQueryPath = "/api/v1/query"
24+
mimeTypeForm = "application/x-www-form-urlencoded"
25+
contentTypeJSON = "application/json"
26+
)
27+
28+
type FrontendClient struct {
29+
client httpgrpc.HTTPClient
30+
timeout time.Duration
31+
prometheusHTTPPrefix string
32+
jsonDecoder JsonDecoder
33+
}
34+
35+
func NewFrontendClient(client httpgrpc.HTTPClient, timeout time.Duration, prometheusHTTPPrefix string) *FrontendClient {
36+
return &FrontendClient{
37+
client: client,
38+
timeout: timeout,
39+
prometheusHTTPPrefix: prometheusHTTPPrefix,
40+
jsonDecoder: JsonDecoder{},
41+
}
42+
}
43+
44+
func (p *FrontendClient) makeRequest(ctx context.Context, qs string, ts time.Time) (*httpgrpc.HTTPRequest, error) {
45+
args := make(url.Values)
46+
args.Set("query", qs)
47+
if !ts.IsZero() {
48+
args.Set("time", ts.Format(time.RFC3339Nano))
49+
}
50+
body := []byte(args.Encode())
51+
52+
//lint:ignore faillint wrapper around upstream method
53+
orgID, err := user.ExtractOrgID(ctx)
54+
if err != nil {
55+
return nil, err
56+
}
57+
58+
req := &httpgrpc.HTTPRequest{
59+
Method: http.MethodPost,
60+
Url: p.prometheusHTTPPrefix + instantQueryPath,
61+
Body: body,
62+
Headers: []*httpgrpc.Header{
63+
{Key: textproto.CanonicalMIMEHeaderKey("User-Agent"), Values: []string{fmt.Sprintf("Cortex/%s", version.Version)}},
64+
{Key: textproto.CanonicalMIMEHeaderKey("Content-Type"), Values: []string{mimeTypeForm}},
65+
{Key: textproto.CanonicalMIMEHeaderKey("Content-Length"), Values: []string{strconv.Itoa(len(body))}},
66+
{Key: textproto.CanonicalMIMEHeaderKey("Accept"), Values: []string{contentTypeJSON}},
67+
{Key: textproto.CanonicalMIMEHeaderKey(orgIDHeader), Values: []string{orgID}},
68+
},
69+
}
70+
71+
return req, nil
72+
}
73+
74+
func (p *FrontendClient) InstantQuery(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
75+
log, ctx := spanlogger.New(ctx, "FrontendClient.InstantQuery")
76+
defer log.Span.Finish()
77+
78+
req, err := p.makeRequest(ctx, qs, t)
79+
if err != nil {
80+
level.Error(log).Log("err", err, "query", qs)
81+
return nil, err
82+
}
83+
84+
ctx, cancel := context.WithTimeout(ctx, p.timeout)
85+
defer cancel()
86+
87+
resp, err := p.client.Handle(ctx, req)
88+
89+
if err != nil {
90+
level.Error(log).Log("err", err, "query", qs)
91+
return nil, err
92+
}
93+
94+
vector, warning, err := p.jsonDecoder.Decode(resp.Body)
95+
if err != nil {
96+
level.Error(log).Log("err", err, "query", qs)
97+
return nil, err
98+
}
99+
100+
if len(warning) > 0 {
101+
level.Warn(log).Log("warnings", warning, "query", qs)
102+
}
103+
104+
return vector, nil
105+
}

0 commit comments

Comments
 (0)