Skip to content

Commit 84d16b4

Browse files
committed
Support Ruler to query Query Frontend
Signed-off-by: SungJin1212 <[email protected]>
1 parent 22245aa commit 84d16b4

File tree

6 files changed

+184
-4
lines changed

6 files changed

+184
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
2121
* [FEATURE] Store Gateway: Token bucket limiter. #6016
2222
* [FEATURE] Ruler: Add support for `query_offset` field on RuleGroup and new `ruler_query_offset` per-tenant limit. #6085
23+
* [FEATURE] Ruler: Add `ruler.frontend-address` and `ruler.frontend-timeout' to allow query to query frontends instead of ingesters. #6151
2324
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
2425
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
2526
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4062,6 +4062,15 @@ The `redis_config` configures the Redis backend cache.
40624062
The `ruler_config` configures the Cortex ruler.
40634063

40644064
```yaml
4065+
# Address of the Query Frontend service, in host:port format. Ruler queries to
4066+
# Query Frontends. If not set, ruler queries to Ingesters directly.
4067+
# CLI flag: -ruler.frontend-address
4068+
[frontend_address: <string> | default = ""]
4069+
4070+
# HTTP timeout duration when querying to the Query Frontend.
4071+
# CLI flag: -ruler.frontend-timeout
4072+
[frontend_timeout: <duration> | default = 10s]
4073+
40654074
# URL of alerts return path.
40664075
# CLI flag: -ruler.external.url
40674076
[external_url: <url> | default = ]

pkg/cortex/modules.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -583,10 +583,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
583583
} else {
584584
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
585585
// TODO: Consider wrapping logger to differentiate from querier module logger
586-
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
587-
588-
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
589-
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
586+
if t.Cfg.Ruler.FrontendAddress == "" {
587+
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
588+
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
589+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
590+
} else {
591+
queryable, _, _ := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)
592+
managerFactory := ruler.TenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, t.Overrides, metrics)
593+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
594+
}
590595
}
591596

592597
if err != nil {

pkg/ruler/compat.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,33 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
226226
}
227227
}
228228

229+
func QueryFrontendQueryFunc(promClient *promClient, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
230+
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
231+
// Enforce the max query length.
232+
maxQueryLength := overrides.MaxQueryLength(userID)
233+
if maxQueryLength > 0 {
234+
expr, err := parser.ParseExpr(qs)
235+
// If failed to parse expression, skip checking select range.
236+
// Fail the query in the engine.
237+
if err == nil {
238+
// Enforce query length across all selectors in the query.
239+
length := promql_util.FindNonOverlapQueryLength(expr, 0, 0, lookbackDelta)
240+
if length > maxQueryLength {
241+
return nil, validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, length, maxQueryLength))
242+
}
243+
}
244+
}
245+
246+
evaluationDelay := overrides.EvaluationDelay(userID)
247+
v, err := promClient.InstantQuery(ctx, qs, t.Add(-evaluationDelay))
248+
if err != nil {
249+
return nil, err
250+
}
251+
252+
return v, nil
253+
}
254+
}
255+
229256
func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Counter) rules.QueryFunc {
230257
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
231258
queries.Inc()
@@ -366,6 +393,48 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
366393
}
367394
}
368395

396+
// TenantManagerFactory return a rule manager factory making rule manager using query frontend query func
397+
func TenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, overrides RulesLimits, evalMetrics *RuleEvalMetrics) ManagerFactory {
398+
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
399+
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
400+
// Errors from PromQL are always "user" errors.
401+
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)
402+
403+
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
404+
var queryTime prometheus.Counter
405+
if evalMetrics.RulerQuerySeconds != nil {
406+
queryTime = evalMetrics.RulerQuerySeconds.WithLabelValues(userID)
407+
}
408+
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
409+
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
410+
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
411+
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)
412+
413+
promClient := NewPromClient(cfg.FrontendAddress, userID, cfg.FrontendTimeout)
414+
queryFrontendQueryFunc := QueryFrontendQueryFunc(promClient, overrides, userID, cfg.LookbackDelta)
415+
metricsQueryFunc := MetricsQueryFunc(queryFrontendQueryFunc, totalQueries, failedQueries)
416+
417+
return rules.NewManager(&rules.ManagerOptions{
418+
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
419+
Queryable: q,
420+
QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger),
421+
Context: user.InjectOrgID(ctx, userID),
422+
ExternalURL: cfg.ExternalURL.URL,
423+
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
424+
Logger: log.With(logger, "user", userID),
425+
Registerer: reg,
426+
OutageTolerance: cfg.OutageTolerance,
427+
ForGracePeriod: cfg.ForGracePeriod,
428+
ResendDelay: cfg.ResendDelay,
429+
ConcurrentEvalsEnabled: cfg.ConcurrentEvalsEnabled,
430+
MaxConcurrentEvals: cfg.MaxConcurrentEvals,
431+
DefaultRuleQueryOffset: func() time.Duration {
432+
return overrides.RulerQueryOffset(userID)
433+
},
434+
})
435+
}
436+
}
437+
369438
type QueryableError struct {
370439
err error
371440
}

pkg/ruler/prom_client.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"strings"
8+
"time"
9+
10+
"github.com/go-kit/log/level"
11+
promapi "github.com/prometheus/client_golang/api"
12+
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
13+
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/model/labels"
15+
"github.com/prometheus/prometheus/promql"
16+
17+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
18+
)
19+
20+
const orgIDHeader = "X-Scope-OrgId"
21+
22+
type promClient struct {
23+
promAPI promv1.API
24+
timeout time.Duration
25+
}
26+
27+
func NewPromClient(queryFrontendUrl, orgID string, timeout time.Duration) *promClient {
28+
client, _ := promapi.NewClient(promapi.Config{
29+
Address: queryFrontendUrl + "/api/prom",
30+
RoundTripper: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
31+
})
32+
33+
return &promClient{
34+
promAPI: promv1.NewAPI(client),
35+
timeout: timeout,
36+
}
37+
}
38+
39+
func (p *promClient) InstantQuery(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
40+
log, ctx := spanlogger.New(ctx, "promClient.InstantQuery")
41+
defer log.Span.Finish()
42+
43+
value, warns, err := p.promAPI.Query(ctx, qs, t, promv1.WithTimeout(p.timeout))
44+
if err != nil {
45+
level.Error(log).Log("err", err, "query", qs)
46+
return nil, err
47+
}
48+
if len(warns) > 0 {
49+
level.Warn(log).Log("warnings", strings.Join(warns, ", "), "query", qs)
50+
}
51+
52+
switch value.Type() {
53+
case model.ValVector:
54+
vector := value.(model.Vector)
55+
v := make([]promql.Sample, 0, len(vector))
56+
for _, sample := range value.(model.Vector) {
57+
metric := make([]labels.Label, 0, len(sample.Metric))
58+
for k, v := range sample.Metric {
59+
metric = append(metric, labels.Label{
60+
Name: string(k),
61+
Value: string(v),
62+
})
63+
}
64+
v = append(v, promql.Sample{
65+
T: sample.Timestamp.Unix(),
66+
F: float64(sample.Value),
67+
Metric: metric,
68+
})
69+
}
70+
return v, nil
71+
default:
72+
return nil, errors.New("rule result is not a vector")
73+
}
74+
}
75+
76+
type addOrgIDRoundTripper struct {
77+
orgID string
78+
next http.RoundTripper
79+
}
80+
81+
func (r *addOrgIDRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
82+
req.Header.Set(orgIDHeader, r.orgID)
83+
84+
return r.next.RoundTrip(req)
85+
}

pkg/ruler/ruler.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ var (
5050
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5151
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
5252
errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0")
53+
errInvalidFrontendAddress = errors.New("invalid query frontend address")
5354
)
5455

5556
const (
@@ -93,6 +94,10 @@ func (e *DisabledRuleGroupErr) Error() string {
9394

9495
// Config is the configuration for the recording rules server.
9596
type Config struct {
97+
// This is used for query to query frontend to evaluate rules
98+
FrontendAddress string `yaml:"frontend_address"`
99+
// HTTP timeout duration when querying to query frontend to evaluate rules
100+
FrontendTimeout time.Duration `yaml:"frontend_timeout"`
96101
// This is used for template expansion in alerts; must be a valid URL.
97102
ExternalURL flagext.URLValue `yaml:"external_url"`
98103
// Labels to add to all alerts
@@ -167,6 +172,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {
167172
return errors.Wrap(err, "invalid ruler gRPC client config")
168173
}
169174

175+
if _, err := url.Parse(cfg.FrontendAddress); err != nil {
176+
return errInvalidFrontendAddress
177+
}
178+
170179
if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 {
171180
return errInvalidMaxConcurrentEvals
172181
}
@@ -190,6 +199,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
190199
//lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods
191200
flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger)
192201

202+
f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "Address of the Query Frontend service, in host:port format. Ruler queries to Query Frontends. If not set, ruler queries to Ingesters directly.")
203+
f.DurationVar(&cfg.FrontendTimeout, "ruler.frontend-timeout", 10*time.Second, "HTTP timeout duration when querying to the Query Frontend.")
193204
cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil
194205
f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.")
195206
f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")

0 commit comments

Comments
 (0)