Skip to content

Commit c815b3c

Browse files
authored
Ruler external support (#4808)
* Added external ruler support and proper unit tests Signed-off-by: jmoscoso1 <[email protected]> * adressing comments Signed-off-by: jmoscoso1 <[email protected]> * adding check for nil ExternalPusher and ExternalQueryable Signed-off-by: jmoscoso1 <[email protected]> * moving all changes into ruler module Signed-off-by: jmoscoso1 <[email protected]> * formatting Signed-off-by: jmoscoso1 <[email protected]> * adding test for setupModuleManager and adressing comment Signed-off-by: jmoscoso1 <[email protected]> * adding promql import Signed-off-by: jmoscoso1 <[email protected]> * fixing lint problems Signed-off-by: jmoscoso1 <[email protected]> Signed-off-by: jmoscoso1 <[email protected]>
1 parent 5578eeb commit c815b3c

File tree

4 files changed

+126
-5
lines changed

4 files changed

+126
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
4747
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
4848
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
49+
* [FEATURE] Ruler: Add support to pass custom implementations of queryable and pusher #4782
4950
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804
5051
* [BUGFIX] Ruler: Fix /ruler/rule_groups returns YAML with extra fields #4767
5152

pkg/cortex/cortex.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ type Config struct {
8989
PrintConfig bool `yaml:"-"`
9090
HTTPPrefix string `yaml:"http_prefix"`
9191

92+
ExternalQueryable prom_storage.Queryable `yaml:"-"`
93+
ExternalPusher ruler.Pusher `yaml:"-"`
94+
9295
API api.Config `yaml:"api"`
9396
Server server.Config `yaml:"server"`
9497
Distributor distributor.Config `yaml:"distributor"`

pkg/cortex/modules.go

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ import (
66
"fmt"
77
"net/http"
88

9+
"github.com/go-kit/log"
910
"github.com/go-kit/log/level"
1011
"github.com/opentracing-contrib/go-stdlib/nethttp"
1112
"github.com/opentracing/opentracing-go"
1213
"github.com/pkg/errors"
1314
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/prometheus/promql"
1416
"github.com/prometheus/prometheus/rules"
1517
prom_storage "github.com/prometheus/prometheus/storage"
1618
"github.com/thanos-io/thanos/pkg/discovery/dns"
@@ -493,19 +495,52 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
493495
return
494496
}
495497

498+
func createActiveQueryTracker(cfg querier.Config, logger log.Logger) promql.QueryTracker {
499+
dir := cfg.ActiveQueryTrackerDir
500+
501+
if dir != "" {
502+
return promql.NewActiveQueryTracker(dir, cfg.MaxConcurrent, logger)
503+
}
504+
505+
return nil
506+
}
507+
496508
func (t *Cortex) initRuler() (serv services.Service, err error) {
509+
var manager *ruler.DefaultMultiTenantManager
497510
if t.RulerStorage == nil {
498511
level.Info(util_log.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.")
499512
return nil, nil
500513
}
501514

502515
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
503-
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
504-
// TODO: Consider wrapping logger to differentiate from querier module logger
505-
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)
506516

507-
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
508-
manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
517+
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
518+
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
519+
engine := promql.NewEngine(promql.EngineOpts{
520+
Logger: util_log.Logger,
521+
Reg: rulerRegisterer,
522+
ActiveQueryTracker: createActiveQueryTracker(t.Cfg.Querier, util_log.Logger),
523+
MaxSamples: t.Cfg.Querier.MaxSamples,
524+
Timeout: t.Cfg.Querier.Timeout,
525+
LookbackDelta: t.Cfg.Querier.LookbackDelta,
526+
EnablePerStepStats: t.Cfg.Querier.EnablePerStepStats,
527+
EnableAtModifier: t.Cfg.Querier.AtModifierEnabled,
528+
NoStepSubqueryIntervalFn: func(int64) int64 {
529+
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
530+
},
531+
})
532+
533+
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, engine, t.Overrides, prometheus.DefaultRegisterer)
534+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
535+
} else {
536+
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
537+
// TODO: Consider wrapping logger to differentiate from querier module logger
538+
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)
539+
540+
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
541+
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
542+
}
543+
509544
if err != nil {
510545
return nil, err
511546
}
@@ -706,6 +741,9 @@ func (t *Cortex) setupModuleManager() error {
706741
TenantFederation: {Queryable},
707742
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler},
708743
}
744+
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
745+
deps[Ruler] = []string{Overrides, DeleteRequestsStore, RulerStorage}
746+
}
709747
for mod, targets := range deps {
710748
if err := mm.AddDependency(mod, targets...); err != nil {
711749
return err

pkg/cortex/modules_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
package cortex
22

33
import (
4+
"context"
45
"net/http/httptest"
56
"os"
67
"testing"
78

89
"github.com/gorilla/mux"
10+
prom_storage "github.com/prometheus/prometheus/storage"
911
"github.com/stretchr/testify/assert"
1012
"github.com/stretchr/testify/require"
1113
"github.com/weaveworks/common/server"
14+
15+
"github.com/cortexproject/cortex/pkg/cortexpb"
1216
)
1317

1418
func changeTargetConfig(c *Config) {
@@ -153,3 +157,78 @@ func TestCortex_InitRulerStorage(t *testing.T) {
153157
})
154158
}
155159
}
160+
161+
type myPusher struct{}
162+
163+
func (p *myPusher) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
164+
return nil, nil
165+
}
166+
167+
type myQueryable struct{}
168+
169+
func (q *myQueryable) Querier(ctx context.Context, mint, maxt int64) (prom_storage.Querier, error) {
170+
return prom_storage.NoopQuerier(), nil
171+
}
172+
173+
func Test_setupModuleManager(t *testing.T) {
174+
tests := []struct {
175+
config *Config
176+
expectedOriginal bool
177+
}{
178+
{
179+
config: func() *Config {
180+
cfg := newDefaultConfig()
181+
cfg.Target = []string{"all"}
182+
cfg.RulerStorage.Backend = "local"
183+
cfg.RulerStorage.Local.Directory = os.TempDir()
184+
cfg.ExternalPusher = &myPusher{}
185+
cfg.ExternalQueryable = &myQueryable{}
186+
return cfg
187+
}(),
188+
expectedOriginal: false,
189+
},
190+
{
191+
config: func() *Config {
192+
cfg := newDefaultConfig()
193+
cfg.Target = []string{"all"}
194+
cfg.RulerStorage.Backend = "local"
195+
cfg.RulerStorage.Local.Directory = os.TempDir()
196+
return cfg
197+
}(),
198+
expectedOriginal: true,
199+
},
200+
}
201+
202+
for _, test := range tests {
203+
cortex := &Cortex{
204+
Cfg: *test.config,
205+
Server: &server.Server{},
206+
}
207+
208+
err := cortex.setupModuleManager()
209+
require.Nil(t, err)
210+
211+
deps := cortex.ModuleManager.DependenciesForModule(Ruler)
212+
originalDependecies := []string{DistributorService, StoreQueryable}
213+
214+
if test.expectedOriginal {
215+
check := []bool{false, false}
216+
for _, dep := range deps {
217+
for i, o := range originalDependecies {
218+
if dep == o {
219+
check[i] = true
220+
}
221+
}
222+
}
223+
for _, val := range check {
224+
require.True(t, val)
225+
}
226+
} else {
227+
for _, dep := range deps {
228+
for _, o := range originalDependecies {
229+
require.NotEqual(t, dep, o)
230+
}
231+
}
232+
}
233+
}
234+
}

0 commit comments

Comments
 (0)