Skip to content

Commit c78a066

Browse files
committed
Support Ruler to query Query Frontend
Signed-off-by: SungJin1212 <[email protected]>
1 parent f077e8e commit c78a066

File tree

9 files changed

+215
-27
lines changed

9 files changed

+215
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
2929
* [FEATURE] Store Gateway: Token bucket limiter. #6016
3030
* [FEATURE] Ruler: Add support for `query_offset` field on RuleGroup and new `ruler_query_offset` per-tenant limit. #6085
31+
* [FEATURE] Ruler: Add `ruler.frontend-address` and `ruler.frontend-timeout' to allow query to query frontends instead of ingesters. #6151
3132
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
3233
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
3334
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4063,6 +4063,15 @@ The `redis_config` configures the Redis backend cache.
40634063
The `ruler_config` configures the Cortex ruler.
40644064

40654065
```yaml
4066+
# Address of the Query Frontend service, in host:port format. Ruler queries to
4067+
# Query Frontends. If not set, ruler queries to Ingesters directly.
4068+
# CLI flag: -ruler.frontend-address
4069+
[frontend_address: <string> | default = ""]
4070+
4071+
# HTTP timeout duration when querying to the Query Frontend.
4072+
# CLI flag: -ruler.frontend-timeout
4073+
[frontend_timeout: <duration> | default = 10s]
4074+
40664075
# URL of alerts return path.
40674076
# CLI flag: -ruler.external.url
40684077
[external_url: <url> | default = ]

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
548548
}
549549

550550
t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta
551+
t.Cfg.Ruler.PrometheusHTTPPrefix = t.Cfg.API.PrometheusHTTPPrefix
551552
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
552553
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)
553554

pkg/ruler/compat.go

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ type RulesLimits interface {
157157
// EngineQueryFunc returns a new engine query function validating max queryLength.
158158
// Modified from Prometheus rules.EngineQueryFunc
159159
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
160-
func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
160+
func EngineQueryFunc(engine promql.QueryEngine, promClient *promClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
161161
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
162162
// Enforce the max query length.
163163
maxQueryLength := overrides.MaxQueryLength(userID)
@@ -174,25 +174,34 @@ func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable, overrides R
174174
}
175175
}
176176

177-
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
178-
if err != nil {
179-
return nil, err
180-
}
181-
res := q.Exec(ctx)
182-
if res.Err != nil {
183-
return nil, res.Err
184-
}
185-
switch v := res.Value.(type) {
186-
case promql.Vector:
177+
if promClient != nil {
178+
v, err := promClient.InstantQuery(ctx, qs, t)
179+
if err != nil {
180+
return nil, err
181+
}
182+
187183
return v, nil
188-
case promql.Scalar:
189-
return promql.Vector{promql.Sample{
190-
T: v.T,
191-
F: v.V,
192-
Metric: labels.Labels{},
193-
}}, nil
194-
default:
195-
return nil, errors.New("rule result is not a vector or scalar")
184+
} else {
185+
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
186+
if err != nil {
187+
return nil, err
188+
}
189+
res := q.Exec(ctx)
190+
if res.Err != nil {
191+
return nil, res.Err
192+
}
193+
switch v := res.Value.(type) {
194+
case promql.Vector:
195+
return v, nil
196+
case promql.Scalar:
197+
return promql.Vector{promql.Sample{
198+
T: v.T,
199+
F: v.V,
200+
Metric: labels.Labels{},
201+
}}, nil
202+
default:
203+
return nil, errors.New("rule result is not a vector or scalar")
204+
}
196205
}
197206
}
198207
}
@@ -294,16 +303,18 @@ type RulesManager interface {
294303
}
295304

296305
// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
297-
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager
306+
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) (RulesManager, error)
298307

299308
func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine promql.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory {
300309
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
301310
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
302311
// Errors from PromQL are always "user" errors.
303312
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)
304313

305-
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
314+
return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) (RulesManager, error) {
306315
var queryTime prometheus.Counter
316+
var promClient *promClient
317+
var err error
307318
if evalMetrics.RulerQuerySeconds != nil {
308319
queryTime = evalMetrics.RulerQuerySeconds.WithLabelValues(userID)
309320
}
@@ -313,7 +324,13 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
313324
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
314325
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)
315326

316-
engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID, cfg.LookbackDelta)
327+
if cfg.FrontendAddress != "" {
328+
promClient, err = NewPromClient(cfg.FrontendAddress, cfg.PrometheusHTTPPrefix, userID, cfg.FrontendTimeout)
329+
if err != nil {
330+
return nil, err
331+
}
332+
}
333+
engineQueryFunc := EngineQueryFunc(engine, promClient, q, overrides, userID, cfg.LookbackDelta)
317334
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
318335

319336
return rules.NewManager(&rules.ManagerOptions{
@@ -333,7 +350,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
333350
DefaultRuleQueryOffset: func() time.Duration {
334351
return overrides.RulerQueryOffset(userID)
335352
},
336-
})
353+
}), nil
337354
}
338355
}
339356

pkg/ruler/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func (r *DefaultMultiTenantManager) newManager(ctx context.Context, userID strin
269269
return nil, err
270270
}
271271

272-
return r.managerFactory(ctx, userID, notifier, r.logger, reg), nil
272+
return r.managerFactory(ctx, userID, notifier, r.logger, reg)
273273
}
274274

275275
func (r *DefaultMultiTenantManager) removeNotifier(userID string) {

pkg/ruler/manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,13 +304,13 @@ func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
304304
}
305305

306306
func RuleManagerFactory(groupsToReturn [][]*promRules.Group, waitDurations []time.Duration) ManagerFactory {
307-
return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) RulesManager {
307+
return func(_ context.Context, _ string, _ *notifier.Manager, _ log.Logger, _ prometheus.Registerer) (RulesManager, error) {
308308
return &mockRulesManager{
309309
done: make(chan struct{}),
310310
groupsToReturn: groupsToReturn,
311311
waitDurations: waitDurations,
312312
iteration: -1,
313-
}
313+
}, nil
314314
}
315315
}
316316

pkg/ruler/prom_client.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package ruler
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"strings"
8+
"time"
9+
10+
"github.com/go-kit/log/level"
11+
promapi "github.com/prometheus/client_golang/api"
12+
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
13+
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/model/histogram"
15+
"github.com/prometheus/prometheus/model/labels"
16+
"github.com/prometheus/prometheus/promql"
17+
18+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
19+
)
20+
21+
const orgIDHeader = "X-Scope-OrgId"
22+
23+
type promClient struct {
24+
promAPI promv1.API
25+
timeout time.Duration
26+
}
27+
28+
func NewPromClient(queryFrontendUrl, prometheusHTTPPrefix, orgID string, timeout time.Duration) (*promClient, error) {
29+
client, err := promapi.NewClient(promapi.Config{
30+
Address: queryFrontendUrl + prometheusHTTPPrefix,
31+
RoundTripper: &addOrgIDRoundTripper{orgID: orgID, next: http.DefaultTransport},
32+
})
33+
if err != nil {
34+
return nil, err
35+
}
36+
37+
return &promClient{
38+
promAPI: promv1.NewAPI(client),
39+
timeout: timeout,
40+
}, nil
41+
}
42+
43+
func (p *promClient) InstantQuery(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
44+
log, ctx := spanlogger.New(ctx, "promClient.InstantQuery")
45+
defer log.Span.Finish()
46+
47+
value, warns, err := p.promAPI.Query(ctx, qs, t, promv1.WithTimeout(p.timeout))
48+
if err != nil {
49+
level.Error(log).Log("err", err, "query", qs)
50+
return nil, err
51+
}
52+
if len(warns) > 0 {
53+
level.Warn(log).Log("warnings", strings.Join(warns, ", "), "query", qs)
54+
}
55+
56+
switch value.Type() {
57+
case model.ValVector:
58+
vector := value.(model.Vector)
59+
v := make([]promql.Sample, 0, len(vector))
60+
for _, sample := range value.(model.Vector) {
61+
metric := make([]labels.Label, 0, len(sample.Metric))
62+
for k, v := range sample.Metric {
63+
metric = append(metric, labels.Label{
64+
Name: string(k),
65+
Value: string(v),
66+
})
67+
}
68+
69+
if sample.Histogram != nil {
70+
v = append(v, promql.Sample{
71+
T: sample.Timestamp.Unix(),
72+
H: &histogram.FloatHistogram{
73+
Count: float64(sample.Histogram.Count),
74+
Sum: float64(sample.Histogram.Sum),
75+
},
76+
Metric: metric,
77+
})
78+
} else {
79+
v = append(v, promql.Sample{
80+
T: sample.Timestamp.Unix(),
81+
F: float64(sample.Value),
82+
Metric: metric,
83+
})
84+
}
85+
}
86+
return v, nil
87+
case model.ValScalar:
88+
scalar := value.(*model.Scalar)
89+
return promql.Vector{promql.Sample{
90+
T: scalar.Timestamp.Unix(),
91+
F: float64(scalar.Value),
92+
Metric: labels.Labels{},
93+
}}, nil
94+
default:
95+
return nil, errors.New("rule result is not a vector or scalar")
96+
}
97+
}
98+
99+
type addOrgIDRoundTripper struct {
100+
orgID string
101+
next http.RoundTripper
102+
}
103+
104+
func (r *addOrgIDRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
105+
req.Header.Set(orgIDHeader, r.orgID)
106+
return r.next.RoundTrip(req)
107+
}

pkg/ruler/prom_client_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package ruler
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestNewPromClient(t *testing.T) {
11+
prometheusHTTPPrefix := "/prometheus"
12+
timeout := time.Second * 10
13+
orgId := "user"
14+
15+
tests := []struct {
16+
queryFrontendUrl string
17+
isError bool
18+
description string
19+
}{
20+
{
21+
queryFrontendUrl: "http://\tlocalhost:3333",
22+
isError: true,
23+
description: "invalid url of query frontend url should return nil",
24+
},
25+
{
26+
queryFrontendUrl: "service.namespace:80",
27+
isError: false,
28+
description: "k8s service address should return not nil",
29+
},
30+
{
31+
queryFrontendUrl: "service.namespace.svc.cluster.local",
32+
isError: false,
33+
description: "k8s service address should return not nil",
34+
},
35+
}
36+
37+
for _, test := range tests {
38+
if test.isError {
39+
_, err := NewPromClient(test.queryFrontendUrl, prometheusHTTPPrefix, orgId, timeout)
40+
require.Error(t, err, test.description)
41+
} else {
42+
_, err := NewPromClient(test.queryFrontendUrl, prometheusHTTPPrefix, orgId, timeout)
43+
require.NoError(t, err, test.description)
44+
}
45+
}
46+
}

pkg/ruler/ruler.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func (e *DisabledRuleGroupErr) Error() string {
9393

9494
// Config is the configuration for the recording rules server.
9595
type Config struct {
96+
// This is used for query to query frontend to evaluate rules
97+
FrontendAddress string `yaml:"frontend_address"`
98+
// HTTP timeout duration when querying to query frontend to evaluate rules
99+
FrontendTimeout time.Duration `yaml:"frontend_timeout"`
96100
// This is used for template expansion in alerts; must be a valid URL.
97101
ExternalURL flagext.URLValue `yaml:"external_url"`
98102
// Labels to add to all alerts
@@ -147,7 +151,8 @@ type Config struct {
147151
RingCheckPeriod time.Duration `yaml:"-"`
148152

149153
// Field will be populated during runtime.
150-
LookbackDelta time.Duration `yaml:"-"`
154+
LookbackDelta time.Duration `yaml:"-"`
155+
PrometheusHTTPPrefix string `yaml:"-"`
151156

152157
EnableQueryStats bool `yaml:"query_stats_enabled"`
153158
DisableRuleGroupLabel bool `yaml:"disable_rule_group_label"`
@@ -190,6 +195,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
190195
//lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods
191196
flagext.DeprecatedFlag(f, "ruler.alertmanager-use-v2", "This flag is no longer functional. V1 API is deprecated and removed", util_log.Logger)
192197

198+
f.StringVar(&cfg.FrontendAddress, "ruler.frontend-address", "", "Address of the Query Frontend service, in host:port format. Ruler queries to Query Frontends. If not set, ruler queries to Ingesters directly.")
199+
f.DurationVar(&cfg.FrontendTimeout, "ruler.frontend-timeout", 10*time.Second, "HTTP timeout duration when querying to the Query Frontend.")
193200
cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil
194201
f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.")
195202
f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")

0 commit comments

Comments
 (0)