Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions integration/resource_based_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//go:build requires_docker
// +build requires_docker

package integration

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
)

func Test_ResourceBasedLimiter_shouldStartWithoutError(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-monitored.resources": "cpu,heap",
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Cortex components.
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-ingester.instance-limits.cpu-utilization": "0.8",
"-ingester.instance-limits.heap-utilization": "0.8",
}), "")
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-store-gateway.instance-limits.cpu-utilization": "0.8",
"-store-gateway.instance-limits.heap-utilization": "0.8",
}), "")
require.NoError(t, s.StartAndWaitReady(ingester, storeGateway))
}
12 changes: 6 additions & 6 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,26 +770,26 @@ func (t *Cortex) initQueryScheduler() (services.Service, error) {
}

func (t *Cortex) initResourceMonitor() (services.Service, error) {
if len(t.Cfg.MonitoredResources) == 0 {
if t.Cfg.MonitoredResources.String() == "" || len(t.Cfg.MonitoredResources) == 0 {
return nil, nil
}

util_log.WarnExperimentalUse(fmt.Sprintf("resource monitor for [%s]", t.Cfg.MonitoredResources.String()))

containerLimits := make(map[resource.Type]float64)
for _, res := range t.Cfg.MonitoredResources {
switch resource.Type(res) {
case resource.CPU:
containerLimits[resource.Type(res)] = float64(runtime.GOMAXPROCS(0))
case resource.Heap:
containerLimits[resource.Type(res)] = float64(debug.SetMemoryLimit(-1))
default:
return nil, fmt.Errorf("unknown resource type: %s", res)
}
}

var err error
t.ResourceMonitor, err = resource.NewMonitor(containerLimits, prometheus.DefaultRegisterer)
if t.ResourceMonitor != nil {
util_log.WarnExperimentalUse("resource monitor")
}

return t.ResourceMonitor, err
}

Expand All @@ -798,7 +798,7 @@ func (t *Cortex) setupModuleManager() error {

// Register all modules here.
// RegisterModule(name string, initFn func()(services.Service, error))
mm.RegisterModule(ResourceMonitor, t.initResourceMonitor)
mm.RegisterModule(ResourceMonitor, t.initResourceMonitor, modules.UserInvisibleModule)
mm.RegisterModule(Server, t.initServer, modules.UserInvisibleModule)
mm.RegisterModule(API, t.initAPI, modules.UserInvisibleModule)
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule)
Expand Down
13 changes: 13 additions & 0 deletions pkg/cortex/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,16 @@ func Test_setupModuleManager(t *testing.T) {
}
}
}

func Test_initResourceMonitor_shouldFailOnInvalidResource(t *testing.T) {
cortex := &Cortex{
Server: &server.Server{},
Cfg: Config{
MonitoredResources: []string{"invalid"},
},
}

// log warning message and spin up other cortex services
_, err := cortex.initResourceMonitor()
require.ErrorContains(t, err, "unknown resource type")
}
2 changes: 1 addition & 1 deletion pkg/util/limiter/resource_based_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewResourceBasedLimiter(resourceMonitor resource.IMonitor, limits map[resou
promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_resource_based_limiter_limit",
Help: "Limit set for the resource utilization.",
ConstLabels: map[string]string{"component": component},
ConstLabels: map[string]string{"component": component, "resource": string(resType)},
}).Set(limit)
default:
return nil, fmt.Errorf("unsupported resource type: [%s]", resType)
Expand Down
30 changes: 30 additions & 0 deletions pkg/util/limiter/resource_based_limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package limiter

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util/resource"
)

func Test_ResourceBasedLimiter(t *testing.T) {
limits := map[resource.Type]float64{
resource.CPU: 0.5,
resource.Heap: 0.5,
}

_, err := NewResourceBasedLimiter(&mockMonitor{}, limits, prometheus.DefaultRegisterer, "ingester")
require.NoError(t, err)
}

type mockMonitor struct{}

func (m *mockMonitor) GetCPUUtilization() float64 {
return 0
}

func (m *mockMonitor) GetHeapUtilization() float64 {
return 0
}
1 change: 1 addition & 0 deletions pkg/util/resource/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func NewMonitor(limits map[Type]float64, registerer prometheus.Registerer) (*Mon
m := &Monitor{
containerLimit: limits,
scanners: make(map[Type]scanner),
utilization: make(map[Type]float64),

cpuRates: [dataPointsToAvg]float64{},
cpuIntervals: [dataPointsToAvg]float64{},
Expand Down
18 changes: 18 additions & 0 deletions pkg/util/resource/monitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package resource

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func Test_Monitor(t *testing.T) {
m, err := NewMonitor(map[Type]float64{}, prometheus.DefaultRegisterer)

m.scanners[CPU] = &noopScanner{}
m.containerLimit[CPU] = 1
m.utilization[CPU] = 0.5

require.NoError(t, err)
}
4 changes: 2 additions & 2 deletions pkg/util/resource/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

const (
heapMetricName = "/memory/classes/Heap/objects:bytes"
heapMetricName = "/memory/classes/heap/objects:bytes"
)

type scanner interface {
Expand Down Expand Up @@ -39,5 +39,5 @@ func newHeapScanner() (scanner, error) {

func (s *heapScanner) scan() (float64, error) {
metrics.Read(s.metricSamples)
return s.metricSamples[0].Value.Float64(), nil
return float64(s.metricSamples[0].Value.Uint64()), nil
}
5 changes: 5 additions & 0 deletions pkg/util/resource/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ func Test_NoopScanner(t *testing.T) {
require.NoError(t, err)
require.Zero(t, val)
}

func Test_HeapScanner(t *testing.T) {
_, err := newHeapScanner()
require.NoError(t, err)
}
Loading