Skip to content

Ruler external support #4808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
* [FEATURE] Ruler: Add support to pass custom implementations of queryable and pusher #4782
* [BUGFIX] Memberlist: Add join with no retrying when starting service. #4804

## 1.13.0 2022-07-14
Expand Down
3 changes: 3 additions & 0 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ type Config struct {
PrintConfig bool `yaml:"-"`
HTTPPrefix string `yaml:"http_prefix"`

ExternalQueryable prom_storage.Queryable `yaml:"-"`
ExternalPusher ruler.Pusher `yaml:"-"`

API api.Config `yaml:"api"`
Server server.Config `yaml:"server"`
Distributor distributor.Config `yaml:"distributor"`
Expand Down
48 changes: 43 additions & 5 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"fmt"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/discovery/dns"
Expand Down Expand Up @@ -493,19 +495,52 @@ func (t *Cortex) initRulerStorage() (serv services.Service, err error) {
return
}

func createActiveQueryTracker(cfg querier.Config, logger log.Logger) promql.QueryTracker {
dir := cfg.ActiveQueryTrackerDir

if dir != "" {
return promql.NewActiveQueryTracker(dir, cfg.MaxConcurrent, logger)
}

return nil
}

func (t *Cortex) initRuler() (serv services.Service, err error) {
var manager *ruler.DefaultMultiTenantManager
if t.RulerStorage == nil {
level.Info(util_log.Logger).Log("msg", "RulerStorage is nil. Not starting the ruler.")
return nil, nil
}

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err := ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
engine := promql.NewEngine(promql.EngineOpts{
Logger: util_log.Logger,
Reg: rulerRegisterer,
ActiveQueryTracker: createActiveQueryTracker(t.Cfg.Querier, util_log.Logger),
MaxSamples: t.Cfg.Querier.MaxSamples,
Timeout: t.Cfg.Querier.Timeout,
LookbackDelta: t.Cfg.Querier.LookbackDelta,
EnablePerStepStats: t.Cfg.Querier.EnablePerStepStats,
EnableAtModifier: t.Cfg.Querier.AtModifierEnabled,
NoStepSubqueryIntervalFn: func(int64) int64 {
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
},
})

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, t.TombstonesLoader, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -706,6 +741,9 @@ func (t *Cortex) setupModuleManager() error {
TenantFederation: {Queryable},
All: {QueryFrontend, Querier, Ingester, Distributor, Purger, StoreGateway, Ruler},
}
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
deps[Ruler] = []string{Overrides, DeleteRequestsStore, RulerStorage}
}
for mod, targets := range deps {
if err := mm.AddDependency(mod, targets...); err != nil {
return err
Expand Down
79 changes: 79 additions & 0 deletions pkg/cortex/modules_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package cortex

import (
"context"
"net/http/httptest"
"os"
"testing"

"github.com/gorilla/mux"
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"

"github.com/cortexproject/cortex/pkg/cortexpb"
)

func changeTargetConfig(c *Config) {
Expand Down Expand Up @@ -153,3 +157,78 @@ func TestCortex_InitRulerStorage(t *testing.T) {
})
}
}

type myPusher struct{}

func (p *myPusher) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
return nil, nil
}

type myQueryable struct{}

func (q *myQueryable) Querier(ctx context.Context, mint, maxt int64) (prom_storage.Querier, error) {
return prom_storage.NoopQuerier(), nil
}

func Test_setupModuleManager(t *testing.T) {
tests := []struct {
config *Config
expectedOriginal bool
}{
{
config: func() *Config {
cfg := newDefaultConfig()
cfg.Target = []string{"all"}
cfg.RulerStorage.Backend = "local"
cfg.RulerStorage.Local.Directory = os.TempDir()
cfg.ExternalPusher = &myPusher{}
cfg.ExternalQueryable = &myQueryable{}
return cfg
}(),
expectedOriginal: false,
},
{
config: func() *Config {
cfg := newDefaultConfig()
cfg.Target = []string{"all"}
cfg.RulerStorage.Backend = "local"
cfg.RulerStorage.Local.Directory = os.TempDir()
return cfg
}(),
expectedOriginal: true,
},
}

for _, test := range tests {
cortex := &Cortex{
Cfg: *test.config,
Server: &server.Server{},
}

err := cortex.setupModuleManager()
require.Nil(t, err)

deps := cortex.ModuleManager.DependenciesForModule(Ruler)
originalDependecies := []string{DistributorService, StoreQueryable}

if test.expectedOriginal {
check := []bool{false, false}
for _, dep := range deps {
for i, o := range originalDependecies {
if dep == o {
check[i] = true
}
}
}
for _, val := range check {
require.True(t, val)
}
} else {
for _, dep := range deps {
for _, o := range originalDependecies {
require.NotEqual(t, dep, o)
}
}
}
}
}