diff --git a/integration/resource_based_limiter_test.go b/integration/resource_based_limiter_test.go new file mode 100644 index 00000000000..fc7e5b80916 --- /dev/null +++ b/integration/resource_based_limiter_test.go @@ -0,0 +1,40 @@ +//go:build requires_docker +// +build requires_docker + +package integration + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" +) + +func Test_ResourceBasedLimiter_shouldStartWithoutError(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-monitored.resources": "cpu,heap", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-ingester.instance-limits.cpu-utilization": "0.8", + "-ingester.instance-limits.heap-utilization": "0.8", + }), "") + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-store-gateway.instance-limits.cpu-utilization": "0.8", + "-store-gateway.instance-limits.heap-utilization": "0.8", + }), "") + require.NoError(t, s.StartAndWaitReady(ingester, storeGateway)) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 3f6c06b79d4..4dee07dbfca 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -770,10 +770,12 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) { } func (t *Cortex) initResourceMonitor() (services.Service, error) { - if len(t.Cfg.MonitoredResources) == 0 { + if t.Cfg.MonitoredResources.String() == "" || len(t.Cfg.MonitoredResources) == 0 { return nil, nil } + util_log.WarnExperimentalUse(fmt.Sprintf("resource monitor for [%s]", t.Cfg.MonitoredResources.String())) + containerLimits := make(map[resource.Type]float64) for _, res := range t.Cfg.MonitoredResources { switch resource.Type(res) { @@ -781,15 +783,13 @@ func (t *Cortex) initResourceMonitor() (services.Service, error) { containerLimits[resource.Type(res)] = float64(runtime.GOMAXPROCS(0)) case resource.Heap: containerLimits[resource.Type(res)] = float64(debug.SetMemoryLimit(-1)) + default: + return nil, fmt.Errorf("unknown resource type: %s", res) } } var err error t.ResourceMonitor, err = resource.NewMonitor(containerLimits, prometheus.DefaultRegisterer) - if t.ResourceMonitor != nil { - util_log.WarnExperimentalUse("resource monitor") - } - return t.ResourceMonitor, err } @@ -798,7 +798,7 @@ func (t *Cortex) setupModuleManager() error { // Register all modules here. // RegisterModule(name string, initFn func()(services.Service, error)) - mm.RegisterModule(ResourceMonitor, t.initResourceMonitor) + mm.RegisterModule(ResourceMonitor, t.initResourceMonitor, modules.UserInvisibleModule) mm.RegisterModule(Server, t.initServer, modules.UserInvisibleModule) mm.RegisterModule(API, t.initAPI, modules.UserInvisibleModule) mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule) diff --git a/pkg/cortex/modules_test.go b/pkg/cortex/modules_test.go index 7316e072747..171c5509673 100644 --- a/pkg/cortex/modules_test.go +++ b/pkg/cortex/modules_test.go @@ -232,3 +232,16 @@ func Test_setupModuleManager(t *testing.T) { } } } + +func Test_initResourceMonitor_shouldFailOnInvalidResource(t *testing.T) { + cortex := &Cortex{ + Server: &server.Server{}, + Cfg: Config{ + MonitoredResources: []string{"invalid"}, + }, + } + + // log warning message and spin up other cortex services + _, err := cortex.initResourceMonitor() + require.ErrorContains(t, err, "unknown resource type") +} diff --git a/pkg/util/limiter/resource_based_limiter.go b/pkg/util/limiter/resource_based_limiter.go index 40e4768cd0c..40415e39195 100644 --- a/pkg/util/limiter/resource_based_limiter.go +++ b/pkg/util/limiter/resource_based_limiter.go @@ -30,7 +30,7 @@ func NewResourceBasedLimiter(resourceMonitor resource.IMonitor, limits map[resou promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ Name: "cortex_resource_based_limiter_limit", Help: "Limit set for the resource utilization.", - ConstLabels: map[string]string{"component": component}, + ConstLabels: map[string]string{"component": component, "resource": string(resType)}, }).Set(limit) default: return nil, fmt.Errorf("unsupported resource type: [%s]", resType) diff --git a/pkg/util/limiter/resource_based_limiter_test.go b/pkg/util/limiter/resource_based_limiter_test.go new file mode 100644 index 00000000000..c84d59009f6 --- /dev/null +++ b/pkg/util/limiter/resource_based_limiter_test.go @@ -0,0 +1,30 @@ +package limiter + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/resource" +) + +func Test_ResourceBasedLimiter(t *testing.T) { + limits := map[resource.Type]float64{ + resource.CPU: 0.5, + resource.Heap: 0.5, + } + + _, err := NewResourceBasedLimiter(&mockMonitor{}, limits, prometheus.DefaultRegisterer, "ingester") + require.NoError(t, err) +} + +type mockMonitor struct{} + +func (m *mockMonitor) GetCPUUtilization() float64 { + return 0 +} + +func (m *mockMonitor) GetHeapUtilization() float64 { + return 0 +} diff --git a/pkg/util/resource/monitor.go b/pkg/util/resource/monitor.go index 057fe4e1c87..7c1c699758b 100644 --- a/pkg/util/resource/monitor.go +++ b/pkg/util/resource/monitor.go @@ -50,6 +50,7 @@ func NewMonitor(limits map[Type]float64, registerer prometheus.Registerer) (*Mon m := &Monitor{ containerLimit: limits, scanners: make(map[Type]scanner), + utilization: make(map[Type]float64), cpuRates: [dataPointsToAvg]float64{}, cpuIntervals: [dataPointsToAvg]float64{}, diff --git a/pkg/util/resource/monitor_test.go b/pkg/util/resource/monitor_test.go new file mode 100644 index 00000000000..bc79f27fc13 --- /dev/null +++ b/pkg/util/resource/monitor_test.go @@ -0,0 +1,18 @@ +package resource + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func Test_Monitor(t *testing.T) { + m, err := NewMonitor(map[Type]float64{}, prometheus.DefaultRegisterer) + + m.scanners[CPU] = &noopScanner{} + m.containerLimit[CPU] = 1 + m.utilization[CPU] = 0.5 + + require.NoError(t, err) +} diff --git a/pkg/util/resource/scanner.go b/pkg/util/resource/scanner.go index 1d4f0906c70..28916d3af29 100644 --- a/pkg/util/resource/scanner.go +++ b/pkg/util/resource/scanner.go @@ -6,7 +6,7 @@ import ( ) const ( - heapMetricName = "/memory/classes/Heap/objects:bytes" + heapMetricName = "/memory/classes/heap/objects:bytes" ) type scanner interface { @@ -39,5 +39,5 @@ func newHeapScanner() (scanner, error) { func (s *heapScanner) scan() (float64, error) { metrics.Read(s.metricSamples) - return s.metricSamples[0].Value.Float64(), nil + return float64(s.metricSamples[0].Value.Uint64()), nil } diff --git a/pkg/util/resource/scanner_test.go b/pkg/util/resource/scanner_test.go index 0e632d21e7f..0d3b71cead2 100644 --- a/pkg/util/resource/scanner_test.go +++ b/pkg/util/resource/scanner_test.go @@ -12,3 +12,8 @@ func Test_NoopScanner(t *testing.T) { require.NoError(t, err) require.Zero(t, val) } + +func Test_HeapScanner(t *testing.T) { + _, err := newHeapScanner() + require.NoError(t, err) +}