Skip to content

Commit 6e37330

Browse files
committed
remove interface
Signed-off-by: Justin Jung <[email protected]>
1 parent 5cccd60 commit 6e37330

File tree

6 files changed

+67
-66
lines changed

6 files changed

+67
-66
lines changed

pkg/ingester/ingester.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ type Ingester struct {
232232
lifecycler *ring.Lifecycler
233233
limits *validation.Overrides
234234
limiter *Limiter
235-
resourceMonitor resource.IMonitor
235+
resourceMonitor *resource.Monitor
236236
subservicesWatcher *services.FailureWatcher
237237

238238
stoppedMtx sync.RWMutex // protects stopped
@@ -701,7 +701,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
701701
}
702702

703703
// New returns a new Ingester that uses Cortex block storage instead of chunks storage.
704-
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger, resourceMonitor resource.IMonitor) (*Ingester, error) {
704+
func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger, resourceMonitor *resource.Monitor) (*Ingester, error) {
705705
defaultInstanceLimits = &cfg.DefaultLimits
706706
if cfg.ingesterClientFactory == nil {
707707
cfg.ingesterClientFactory = client.MakeIngesterClient
@@ -2156,7 +2156,6 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
21562156

21572157
i.maxInflightQueryRequests.Track(i.inflightQueryRequests.Inc())
21582158

2159-
//if _, ok := i.resourceMonitor.(*resource.Monitor); ok {
21602159
if i.resourceMonitor != nil {
21612160
if resourceName, threshold, utilization, err := i.resourceMonitor.CheckResourceUtilization(); err != nil {
21622161
level.Warn(i.logger).Log("msg", "resource threshold breached", "resource", resourceName, "threshold", threshold, "utilization", utilization)

pkg/ingester/ingester_test.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import (
4949

5050
"github.com/cortexproject/cortex/pkg/chunk"
5151
"github.com/cortexproject/cortex/pkg/chunk/encoding"
52+
"github.com/cortexproject/cortex/pkg/configs"
5253
"github.com/cortexproject/cortex/pkg/cortexpb"
5354
"github.com/cortexproject/cortex/pkg/ingester/client"
5455
"github.com/cortexproject/cortex/pkg/querier/batch"
@@ -3068,11 +3069,14 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) {
30683069
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
30693070
}
30703071

3071-
mockErr := fmt.Errorf("resource monitor error")
3072-
resourceMonitor := testResourceMonitor{
3073-
err: mockErr,
3074-
}
3075-
i, err := prepareIngesterWithResourceMonitor(t, &resourceMonitor)
3072+
thresholds := configs.Resources{Heap: 0.1}
3073+
limits := configs.Resources{Heap: 10}
3074+
resourceMonitor, err := resource.NewMonitor(thresholds, limits, &mockResourceScanner{
3075+
heap: uint64(5),
3076+
}, prometheus.NewRegistry())
3077+
require.NoError(t, err)
3078+
3079+
i, err := prepareIngesterWithResourceMonitor(t, resourceMonitor)
30763080
require.NoError(t, err)
30773081
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
30783082
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@@ -3095,23 +3099,18 @@ func Test_Ingester_Query_ResourceThresholdBreached(t *testing.T) {
30953099
s := &mockQueryStreamServer{ctx: ctx}
30963100
err = i.QueryStream(rreq, s)
30973101
require.Error(t, err)
3098-
require.ErrorContains(t, err, mockErr.Error())
3102+
exhaustedErr := resource.ExhaustedError{}
3103+
require.ErrorContains(t, err, exhaustedErr.Error())
30993104
}
31003105

3101-
type testResourceMonitor struct {
3102-
err error
3106+
type mockResourceScanner struct {
3107+
heap uint64
31033108
}
31043109

3105-
func (t *testResourceMonitor) GetCPUUtilization() float64 {
3106-
return 0
3107-
}
3108-
3109-
func (t *testResourceMonitor) GetHeapUtilization() float64 {
3110-
return 0
3111-
}
3112-
3113-
func (t *testResourceMonitor) CheckResourceUtilization() (string, float64, float64, error) {
3114-
return "", 0, 0, t.err
3110+
func (m *mockResourceScanner) Scan() (resource.Stats, error) {
3111+
return resource.Stats{
3112+
Heap: m.heap,
3113+
}, nil
31153114
}
31163115

31173116
func TestIngester_LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
@@ -3997,7 +3996,7 @@ func prepareIngesterWithBlocksStorageAndLimits(t testing.TB, ingesterCfg Config,
39973996
return ingester, nil
39983997
}
39993998

4000-
func prepareIngesterWithResourceMonitor(t testing.TB, resourceMonitor resource.IMonitor) (*Ingester, error) {
3999+
func prepareIngesterWithResourceMonitor(t testing.TB, resourceMonitor *resource.Monitor) (*Ingester, error) {
40014000
dataDir := t.TempDir()
40024001
bucketDir := t.TempDir()
40034002

@@ -4008,6 +4007,12 @@ func prepareIngesterWithResourceMonitor(t testing.TB, resourceMonitor resource.I
40084007

40094008
overrides, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil)
40104009

4010+
err := resourceMonitor.StartAsync(context.Background())
4011+
if err != nil {
4012+
return nil, err
4013+
}
4014+
time.Sleep(time.Second)
4015+
40114016
ingester, err := New(ingesterCfg, overrides, prometheus.NewRegistry(), log.NewNopLogger(), resourceMonitor)
40124017
if err != nil {
40134018
return nil, err

pkg/storegateway/gateway.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ type StoreGateway struct {
118118
subservices *services.Manager
119119
subservicesWatcher *services.FailureWatcher
120120

121-
resourceMonitor resource.IMonitor
121+
resourceMonitor *resource.Monitor
122122

123123
bucketSync *prometheus.CounterVec
124124
}
@@ -146,7 +146,7 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
146146
return newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, limits, logLevel, logger, reg, resourceMonitor)
147147
}
148148

149-
func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, resourceMonitor resource.IMonitor) (*StoreGateway, error) {
149+
func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer, resourceMonitor *resource.Monitor) (*StoreGateway, error) {
150150
var err error
151151

152152
g := &StoreGateway{
@@ -408,7 +408,7 @@ func (g *StoreGateway) LabelValues(ctx context.Context, req *storepb.LabelValues
408408
}
409409

410410
func (g *StoreGateway) checkResourceUtilization() error {
411-
if _, ok := g.resourceMonitor.(*resource.Monitor); !ok {
411+
if g.resourceMonitor == nil {
412412
return nil
413413
}
414414

pkg/storegateway/gateway_test.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/thanos-io/thanos/pkg/store/storepb"
3333
"google.golang.org/grpc/status"
3434

35+
"github.com/cortexproject/cortex/pkg/configs"
3536
"github.com/cortexproject/cortex/pkg/ring"
3637
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
3738
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -41,6 +42,7 @@ import (
4142
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
4243
"github.com/cortexproject/cortex/pkg/util"
4344
"github.com/cortexproject/cortex/pkg/util/flagext"
45+
"github.com/cortexproject/cortex/pkg/util/resource"
4446
"github.com/cortexproject/cortex/pkg/util/services"
4547
"github.com/cortexproject/cortex/pkg/util/test"
4648
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -1209,36 +1211,37 @@ func TestStoreGateway_SeriesThrottledByResourceMonitor(t *testing.T) {
12091211
gatewayCfg.ShardingEnabled = false
12101212
storageCfg := mockStorageConfig(t)
12111213

1212-
mockErr := fmt.Errorf("resource monitor error")
1213-
resourceMonitor := testResourceMonitor{
1214-
err: mockErr,
1215-
}
1214+
thresholds := configs.Resources{Heap: 0.1}
1215+
limits := configs.Resources{Heap: 10}
1216+
resourceMonitor, err := resource.NewMonitor(thresholds, limits, &mockResourceScanner{
1217+
heap: uint64(5),
1218+
}, prometheus.NewRegistry())
1219+
require.NoError(t, err)
1220+
1221+
err = resourceMonitor.StartAsync(context.Background())
1222+
require.NoError(t, err)
1223+
time.Sleep(time.Second)
12161224

1217-
g, err := newStoreGateway(gatewayCfg, storageCfg, objstore.WithNoopInstr(bucketClient), nil, overrides, mockLoggingLevel(), logger, nil, &resourceMonitor)
1225+
g, err := newStoreGateway(gatewayCfg, storageCfg, objstore.WithNoopInstr(bucketClient), nil, overrides, mockLoggingLevel(), logger, nil, resourceMonitor)
12181226
require.NoError(t, err)
12191227
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
12201228
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck
12211229

12221230
srv := newBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID))
12231231
err = g.Series(req, srv)
12241232
require.Error(t, err)
1225-
require.ErrorContains(t, err, mockErr.Error())
1226-
}
1227-
1228-
type testResourceMonitor struct {
1229-
err error
1230-
}
1231-
1232-
func (t *testResourceMonitor) GetCPUUtilization() float64 {
1233-
return 0
1233+
exhaustedErr := resource.ExhaustedError{}
1234+
require.ErrorContains(t, err, exhaustedErr.Error())
12341235
}
12351236

1236-
func (t *testResourceMonitor) GetHeapUtilization() float64 {
1237-
return 0
1237+
type mockResourceScanner struct {
1238+
heap uint64
12381239
}
12391240

1240-
func (t *testResourceMonitor) CheckResourceUtilization() (string, float64, float64, error) {
1241-
return "", 0, 0, t.err
1241+
func (m *mockResourceScanner) Scan() (resource.Stats, error) {
1242+
return resource.Stats{
1243+
Heap: m.heap,
1244+
}, nil
12421245
}
12431246

12441247
func mockGatewayConfig() Config {

pkg/util/resource/monitor.go

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (e *ExhaustedError) Error() string {
2424
return "resource exhausted"
2525
}
2626

27-
const heapMetricName = "/memory/classes/heap/objects:bytes"
27+
const heapMetricName = "/memory/classes/Heap/objects:bytes"
2828
const monitorInterval = time.Second
2929
const dataPointsToAvg = 30
3030

@@ -38,8 +38,8 @@ type Scanner struct {
3838
}
3939

4040
type Stats struct {
41-
cpu float64
42-
heap uint64
41+
CPU float64
42+
Heap uint64
4343
}
4444

4545
func NewScanner() (*Scanner, error) {
@@ -73,17 +73,11 @@ func (s *Scanner) Scan() (Stats, error) {
7373
metrics.Read(s.metricSamples)
7474

7575
return Stats{
76-
cpu: stat.CPUTime(),
77-
heap: s.metricSamples[0].Value.Uint64(),
76+
CPU: stat.CPUTime(),
77+
Heap: s.metricSamples[0].Value.Uint64(),
7878
}, nil
7979
}
8080

81-
type IMonitor interface {
82-
GetCPUUtilization() float64
83-
GetHeapUtilization() float64
84-
CheckResourceUtilization() (string, float64, float64, error)
85-
}
86-
8781
type Monitor struct {
8882
services.Service
8983

@@ -121,19 +115,19 @@ func NewMonitor(thresholds configs.Resources, limits configs.Resources, scanner
121115

122116
promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
123117
Name: "cortex_resource_utilization",
124-
ConstLabels: map[string]string{"resource": "cpu"},
118+
ConstLabels: map[string]string{"resource": "CPU"},
125119
}, m.GetCPUUtilization)
126120
promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
127121
Name: "cortex_resource_utilization",
128-
ConstLabels: map[string]string{"resource": "heap"},
122+
ConstLabels: map[string]string{"resource": "Heap"},
129123
}, m.GetHeapUtilization)
130124
promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
131125
Name: "cortex_resource_threshold",
132-
ConstLabels: map[string]string{"resource": "cpu"},
126+
ConstLabels: map[string]string{"resource": "CPU"},
133127
}).Set(thresholds.CPU)
134128
promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
135129
Name: "cortex_resource_threshold",
136-
ConstLabels: map[string]string{"resource": "heap"},
130+
ConstLabels: map[string]string{"resource": "Heap"},
137131
}).Set(thresholds.Heap)
138132

139133
return m, nil
@@ -167,21 +161,21 @@ func (m *Monitor) storeCPUUtilization(stats Stats) {
167161
now := time.Now()
168162

169163
if m.lastUpdate.IsZero() {
170-
m.lastCPU = stats.cpu
164+
m.lastCPU = stats.CPU
171165
m.lastUpdate = now
172166
return
173167
}
174168

175169
m.totalCPU -= m.cpuRates[m.index]
176170
m.totalInterval -= m.cpuIntervals[m.index]
177171

178-
m.cpuRates[m.index] = stats.cpu - m.lastCPU
172+
m.cpuRates[m.index] = stats.CPU - m.lastCPU
179173
m.cpuIntervals[m.index] = now.Sub(m.lastUpdate).Seconds()
180174

181175
m.totalCPU += m.cpuRates[m.index]
182176
m.totalInterval += m.cpuIntervals[m.index]
183177

184-
m.lastCPU = stats.cpu
178+
m.lastCPU = stats.CPU
185179
m.lastUpdate = now
186180
m.index = (m.index + 1) % dataPointsToAvg
187181

@@ -195,7 +189,7 @@ func (m *Monitor) storeHeapUtilization(stats Stats) {
195189
defer m.lock.Unlock()
196190

197191
if m.containerLimit.Heap > 0 {
198-
m.utilization.Heap = float64(stats.heap) / m.containerLimit.Heap
192+
m.utilization.Heap = float64(stats.Heap) / m.containerLimit.Heap
199193
}
200194
}
201195

@@ -219,12 +213,12 @@ func (m *Monitor) CheckResourceUtilization() (string, float64, float64, error) {
219213

220214
if m.thresholds.CPU > 0 && cpu > m.thresholds.CPU {
221215
err := ExhaustedError{}
222-
return "cpu", m.thresholds.CPU, cpu, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
216+
return "CPU", m.thresholds.CPU, cpu, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
223217
}
224218

225219
if m.thresholds.Heap > 0 && heap > m.thresholds.Heap {
226220
err := ExhaustedError{}
227-
return "heap", m.thresholds.Heap, heap, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
221+
return "Heap", m.thresholds.Heap, heap, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
228222
}
229223

230224
return "", 0, 0, nil

pkg/util/resource/monitor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ type mockScanner struct {
7777

7878
func (m *mockScanner) Scan() (Stats, error) {
7979
return Stats{
80-
cpu: m.CPU,
81-
heap: m.Heap,
80+
CPU: m.CPU,
81+
Heap: m.Heap,
8282
}, nil
8383
}

0 commit comments

Comments
 (0)