Skip to content

Commit d579b4b

Browse files
committed
Support Ruler to query Query Frontend
Signed-off-by: SungJin1212 <[email protected]>
1 parent 22245aa commit d579b4b

File tree

7 files changed

+194
-21
lines changed

7 files changed

+194
-21
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
2121
* [FEATURE] Store Gateway: Token bucket limiter. #6016
2222
* [FEATURE] Ruler: Add support for `query_offset` field on RuleGroup and new `ruler_query_offset` per-tenant limit. #6085
23+
* [FEATURE] Ruler: Add `ruler.frontend-address` and `ruler.frontend-timeout' to allow query to query frontends instead of ingesters. #6151
2324
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
2425
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
2526
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4062,6 +4062,15 @@ The `redis_config` configures the Redis backend cache.
40624062
The `ruler_config` configures the Cortex ruler.
40634063

40644064
```yaml
4065+
# Address of the Query Frontend service, in host:port format. Ruler queries to
4066+
# Query Frontends. If not set, ruler queries to Ingesters directly.
4067+
# CLI flag: -ruler.frontend-address
4068+
[frontend_address: <string> | default = ""]
4069+
4070+
# HTTP timeout duration when querying to the Query Frontend.
4071+
# CLI flag: -ruler.frontend-timeout
4072+
[frontend_timeout: <duration> | default = 10s]
4073+
40654074
# URL of alerts return path.
40664075
# CLI flag: -ruler.external.url
40674076
[external_url: <url> | default = ]

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
548548
}
549549

550550
t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta
551+
t.Cfg.Ruler.PrometheusHTTPPrefix = t.Cfg.API.PrometheusHTTPPrefix
551552
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
552553
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)
553554

pkg/ruler/compat.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ type RulesLimits interface {
185185
// EngineQueryFunc returns a new engine query function by passing an altered timestamp.
186186
// Modified from Prometheus rules.EngineQueryFunc
187187
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
188-
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
188+
func EngineQueryFunc(engine promql.QueryEngine, promClient *promClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
189189
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
190190
// Enforce the max query length.
191191
maxQueryLength := overrides.MaxQueryLength(userID)
@@ -203,25 +203,34 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
203203
}
204204

205205
evaluationDelay := overrides.EvaluationDelay(userID)
206-
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t.Add(-evaluationDelay))
207-
if err != nil {
208-
return nil, err
209-
}
210-
res := q.Exec(ctx)
211-
if res.Err != nil {
212-
return nil, res.Err
213-
}
214-
switch v := res.Value.(type) {
215-
case promql.Vector:
206+
if promClient != nil {
207+
v, err := promClient.InstantQuery(ctx, qs, t.Add(-evaluationDelay))
208+
if err != nil {
209+
return nil, err
210+
}
211+
216212
return v, nil
217-
case promql.Scalar:
218-
return promql.Vector{promql.Sample{
219-
T: v.T,
220-
F: v.V,
221-
Metric: labels.Labels{},
222-
}}, nil
223-
default:
224-
return nil, errors.New("rule result is not a vector or scalar")
213+
} else {
214+
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t.Add(-evaluationDelay))
215+
if err != nil {
216+
return nil, err
217+
}
218+
res := q.Exec(ctx)
219+
if res.Err != nil {
220+
return nil, res.Err
221+
}
222+
switch v := res.Value.(type) {
223+
case promql.Vector:
224+
return v, nil
225+
case promql.Scalar:
226+
return promql.Vector{promql.Sample{
227+
T: v.T,
228+
F: v.V,
229+
Metric: labels.Labels{},
230+
}}, nil
231+
default:
232+
return nil, errors.New("rule result is not a vector or scalar")
233+
}
225234
}
226235
}
227236
}
@@ -342,7 +351,8 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
342351
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
343352
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)
344353

345-
engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID, cfg.LookbackDelta)
354+
promClient := NewPromClient(cfg.FrontendAddress, cfg.PrometheusHTTPPrefix, userID, cfg.FrontendTimeout)
355+
engineQueryFunc := EngineQueryFunc(engine, promClient, q, overrides, userID, cfg.LookbackDelta)
346356
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
347357

348358
return rules.NewManager(&rules.ManagerOptions{

pkg/ruler/prom_client.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"strings"
8+
"time"
9+
10+
"github.com/go-kit/log/level"
11+
promapi "github.com/prometheus/client_golang/api"
12+
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
13+
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/model/labels"
15+
"github.com/prometheus/prometheus/promql"
16+
17+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
18+
)
19+
20+
const orgIDHeader = "X-Scope-OrgId"
21+
22+
type promClient struct {
23+
promAPI promv1.API
24+
timeout time.Duration
25+
}
26+
27+
func NewPromClient(queryFrontendUrl, prometheusHTTPPrefix, orgID string, timeout time.Duration) *promClient {
28+
if queryFrontendUrl == "" {
29+
return nil
30+
}
31+
client, err := promapi.NewClient(promapi.Config{
32+
Address: queryFrontendUrl + prometheusHTTPPrefix,
33+
RoundTripper: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
34+
})
35+
if err != nil {
36+
// use distributor queryable instead
37+
return nil
38+
}
39+
40+
return &promClient{
41+
promAPI: promv1.NewAPI(client),
42+
timeout: timeout,
43+
}
44+
}
45+
46+
func (p *promClient) InstantQuery(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
47+
log, ctx := spanlogger.New(ctx, "promClient.InstantQuery")
48+
defer log.Span.Finish()
49+
50+
value, warns, err := p.promAPI.Query(ctx, qs, t, promv1.WithTimeout(p.timeout))
51+
if err != nil {
52+
level.Error(log).Log("err", err, "query", qs)
53+
return nil, err
54+
}
55+
if len(warns) > 0 {
56+
level.Warn(log).Log("warnings", strings.Join(warns, ", "), "query", qs)
57+
}
58+
59+
switch value.Type() {
60+
case model.ValVector:
61+
vector := value.(model.Vector)
62+
v := make([]promql.Sample, 0, len(vector))
63+
for _, sample := range value.(model.Vector) {
64+
metric := make([]labels.Label, 0, len(sample.Metric))
65+
for k, v := range sample.Metric {
66+
metric = append(metric, labels.Label{
67+
Name: string(k),
68+
Value: string(v),
69+
})
70+
}
71+
v = append(v, promql.Sample{
72+
T: sample.Timestamp.Unix(),
73+
F: float64(sample.Value),
74+
Metric: metric,
75+
})
76+
}
77+
return v, nil
78+
default:
79+
return nil, errors.New("rule result is not a vector")
80+
}
81+
}
82+
83+
type addOrgIDRoundTripper struct {
84+
orgID string
85+
next http.RoundTripper
86+
}
87+
88+
func (r *addOrgIDRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
89+
req.Header.Set(orgIDHeader, r.orgID)
90+
return r.next.RoundTrip(req)
91+
}

pkg/ruler/prom_client_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package ruler
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestNewPromClient(t *testing.T) {
11+
prometheusHTTPPrefix := "/prometheus"
12+
timeout := time.Second * 10
13+
orgId := "user"
14+
15+
tests := []struct {
16+
queryFrontendUrl string
17+
isNil bool
18+
description string
19+
}{
20+
{
21+
queryFrontendUrl: "",
22+
isNil: true,
23+
description: "empty string of query frontend url should return nil",
24+
},
25+
{
26+
queryFrontendUrl: "http://\tlocalhost:3333",
27+
isNil: true,
28+
description: "invalid url of query frontend url should return nil",
29+
},
30+
{
31+
queryFrontendUrl: "service.namespace:80",
32+
isNil: false,
33+
description: "k8s service address should return not nil",
34+
},
35+
{
36+
queryFrontendUrl: "service.namespace.svc.cluster.local",
37+
isNil: false,
38+
description: "k8s service address should return not nil",
39+
},
40+
}
41+
42+
for _, test := range tests {
43+
if test.isNil {
44+
require.Nil(t, NewPromClient(test.queryFrontendUrl, prometheusHTTPPrefix, orgId, timeout), test.description)
45+
} else {
46+
require.NotNil(t, NewPromClient(test.queryFrontendUrl, prometheusHTTPPrefix, orgId, timeout), test.description)
47+
}
48+
}
49+
}

pkg/ruler/ruler.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ var (
5050
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
5151
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
5252
errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0")
53+
errInvalidFrontendAddress = errors.New("invalid query frontend address")
5354
)
5455

5556
const (
@@ -93,6 +94,10 @@ func (e *DisabledRuleGroupErr) Error() string {
9394

9495
// Config is the configuration for the recording rules server.
9596
type Config struct {
97+
// This is used for query to query frontend to evaluate rules
98+
FrontendAddress string `yaml:"frontend_address"`
99+
// HTTP timeout duration when querying to query frontend to evaluate rules
100+
FrontendTimeout time.Duration `yaml:"frontend_timeout"`
96101
// This is used for template expansion in alerts; must be a valid URL.
97102
ExternalURL flagext.URLValue `yaml:"external_url"`
98103
// Labels to add to all alerts
@@ -147,7 +152,8 @@ type Config struct {
147152
RingCheckPeriod time.Duration `yaml:"-"`
148153

149154
// Field will be populated during runtime.
150-
LookbackDelta time.Duration `yaml:"-"`
155+
LookbackDelta time.Duration `yaml:"-"`
156+
PrometheusHTTPPrefix string `yaml:"-"`
151157

152158
EnableQueryStats bool `yaml:"query_stats_enabled"`
153159
DisableRuleGroupLabel bool `yaml:"disable_rule_group_label"`
@@ -167,6 +173,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {
167173
return errors.Wrap(err, "invalid ruler gRPC client config")
168174
}
169175

176+
if _, err := url.Parse(cfg.FrontendAddress); err != nil {
177+
return errInvalidFrontendAddress
178+
}
179+
170180
if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 {
171181
return errInvalidMaxConcurrentEvals
172182
}
@@ -190,6 +200,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
190200
//lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods
191201
flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger)
192202

203+
f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "Address of the Query Frontend service, in host:port format. Ruler queries to Query Frontends. If not set, ruler queries to Ingesters directly.")
204+
f.DurationVar(&cfg.FrontendTimeout, "ruler.frontend-timeout", 10*time.Second, "HTTP timeout duration when querying to the Query Frontend.")
193205
cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil
194206
f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.")
195207
f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")

0 commit comments

Comments
 (0)