Skip to content

Commit 876c8fa

Browse files
authored
Fix ingester_memory_series/users metrics for TSDB storage (#1982)
* Fixed cortex_ingester_memory_users metric tracking for blocks storage Signed-off-by: Marco Pracucci <[email protected]> * Fixed cortex_ingester_memory_series metric tracking for blocks storage Signed-off-by: Marco Pracucci <[email protected]> * Updated changelog Signed-off-by: Marco Pracucci <[email protected]>
1 parent 326828b commit 876c8fa

File tree

6 files changed

+166
-38
lines changed

6 files changed

+166
-38
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
* [BUGFIX] TSDB: Fixed TSDB creation conflict with blocks transfer in a `JOINING` ingester with the experimental TSDB blocks storage. #1818
3535
* [BUGFIX] TSDB: `experimental.tsdb.ship-interval` of <=0 treated as disabled instead of allowing panic. #1975
3636
* [BUGFIX] TSDB: Fixed `cortex_ingester_queried_samples` and `cortex_ingester_queried_series` metrics when using block storage. #1981
37+
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series` and `cortex_ingester_memory_users` metrics when using with the experimental TSDB blocks storage. #1982
3738

3839
## 0.4.0 / 2019-12-02
3940

pkg/ingester/ingester.go

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@ const (
3636
)
3737

3838
type ingesterMetrics struct {
39-
flushQueueLength prometheus.Gauge
40-
ingestedSamples prometheus.Counter
41-
ingestedSamplesFail prometheus.Counter
42-
queries prometheus.Counter
43-
queriedSamples prometheus.Histogram
44-
queriedSeries prometheus.Histogram
45-
queriedChunks prometheus.Histogram
39+
flushQueueLength prometheus.Gauge
40+
ingestedSamples prometheus.Counter
41+
ingestedSamplesFail prometheus.Counter
42+
queries prometheus.Counter
43+
queriedSamples prometheus.Histogram
44+
queriedSeries prometheus.Histogram
45+
queriedChunks prometheus.Histogram
46+
memSeries prometheus.Gauge
47+
memUsers prometheus.Gauge
48+
memSeriesCreatedTotal *prometheus.CounterVec
49+
memSeriesRemovedTotal *prometheus.CounterVec
4650
}
4751

4852
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
@@ -81,6 +85,22 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
8185
// A small number of chunks per series - 10*(8^(7-1)) = 2.6m.
8286
Buckets: prometheus.ExponentialBuckets(10, 8, 7),
8387
}),
88+
memSeries: prometheus.NewGauge(prometheus.GaugeOpts{
89+
Name: "cortex_ingester_memory_series",
90+
Help: "The current number of series in memory.",
91+
}),
92+
memUsers: prometheus.NewGauge(prometheus.GaugeOpts{
93+
Name: "cortex_ingester_memory_users",
94+
Help: "The current number of users in memory.",
95+
}),
96+
memSeriesCreatedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
97+
Name: "cortex_ingester_memory_series_created_total",
98+
Help: "The total number of series that were created per user.",
99+
}, []string{"user"}),
100+
memSeriesRemovedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
101+
Name: "cortex_ingester_memory_series_removed_total",
102+
Help: "The total number of series that were removed per user.",
103+
}, []string{"user"}),
84104
}
85105

86106
if r != nil {
@@ -92,6 +112,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
92112
m.queriedSamples,
93113
m.queriedSeries,
94114
m.queriedChunks,
115+
m.memSeries,
116+
m.memUsers,
117+
m.memSeriesCreatedTotal,
118+
m.memSeriesRemovedTotal,
95119
)
96120
}
97121

@@ -212,7 +236,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
212236

213237
// Init the limter and instantiate the user states which depend on it
214238
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
215-
i.userStates = newUserStates(i.limiter, cfg)
239+
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
216240

217241
// Now that user states have been created, we can start the lifecycler
218242
i.lifecycler.Start()

pkg/ingester/ingester_v2.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,24 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
6868
},
6969
}
7070

71+
// Replace specific metrics which we can't directly track but we need to read
72+
// them from the underlying system (ie. TSDB).
73+
if registerer != nil {
74+
registerer.Unregister(i.metrics.memSeries)
75+
registerer.MustRegister(prometheus.NewGaugeFunc(prometheus.GaugeOpts{
76+
Name: "cortex_ingester_memory_series",
77+
Help: "The current number of series in memory.",
78+
}, i.numSeriesInTSDB))
79+
}
80+
7181
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey)
7282
if err != nil {
7383
return nil, err
7484
}
7585

7686
// Init the limter and instantiate the user states which depend on it
7787
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
78-
i.userStates = newUserStates(i.limiter, cfg)
88+
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
7989

8090
// Scan and open TSDB's that already exist on disk
8191
if err := i.openExistingTSDB(context.Background()); err != nil {
@@ -399,6 +409,8 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
399409

400410
// Add the db to list of user databases
401411
i.TSDBState.dbs[userID] = db
412+
i.metrics.memUsers.Inc()
413+
402414
return db, nil
403415
}
404416

@@ -533,7 +545,7 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
533545
i.userStatesMtx.Lock()
534546
i.TSDBState.dbs[userID] = db
535547
i.userStatesMtx.Unlock()
536-
548+
i.metrics.memUsers.Inc()
537549
}(userID)
538550

539551
return filepath.SkipDir // Don't descend into directories
@@ -548,3 +560,16 @@ func (i *Ingester) openExistingTSDB(ctx context.Context) error {
548560
}
549561
return err
550562
}
563+
564+
// numSeriesInTSDB returns the total number of in-memory series across all open TSDBs.
565+
func (i *Ingester) numSeriesInTSDB() float64 {
566+
i.userStatesMtx.RLock()
567+
defer i.userStatesMtx.RUnlock()
568+
569+
count := uint64(0)
570+
for _, db := range i.TSDBState.dbs {
571+
count += db.Head().NumSeries()
572+
}
573+
574+
return float64(count)
575+
}

pkg/ingester/ingester_v2_test.go

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ import (
3131
func TestIngester_v2Push(t *testing.T) {
3232
metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
3333
metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters)
34+
metricNames := []string{
35+
"cortex_ingester_ingested_samples_total",
36+
"cortex_ingester_ingested_samples_failures_total",
37+
"cortex_ingester_memory_series",
38+
"cortex_ingester_memory_users",
39+
}
3440
userID := "test"
3541

3642
tests := map[string]struct {
@@ -61,6 +67,12 @@ func TestIngester_v2Push(t *testing.T) {
6167
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
6268
# TYPE cortex_ingester_ingested_samples_failures_total counter
6369
cortex_ingester_ingested_samples_failures_total 0
70+
# HELP cortex_ingester_memory_users The current number of users in memory.
71+
# TYPE cortex_ingester_memory_users gauge
72+
cortex_ingester_memory_users 1
73+
# HELP cortex_ingester_memory_series The current number of series in memory.
74+
# TYPE cortex_ingester_memory_series gauge
75+
cortex_ingester_memory_series 1
6476
`,
6577
},
6678
"should soft fail on sample out of order": {
@@ -85,6 +97,12 @@ func TestIngester_v2Push(t *testing.T) {
8597
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
8698
# TYPE cortex_ingester_ingested_samples_failures_total counter
8799
cortex_ingester_ingested_samples_failures_total 1
100+
# HELP cortex_ingester_memory_users The current number of users in memory.
101+
# TYPE cortex_ingester_memory_users gauge
102+
cortex_ingester_memory_users 1
103+
# HELP cortex_ingester_memory_series The current number of series in memory.
104+
# TYPE cortex_ingester_memory_series gauge
105+
cortex_ingester_memory_series 1
88106
`,
89107
},
90108
"should soft fail on sample out of bound": {
@@ -109,6 +127,12 @@ func TestIngester_v2Push(t *testing.T) {
109127
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
110128
# TYPE cortex_ingester_ingested_samples_failures_total counter
111129
cortex_ingester_ingested_samples_failures_total 1
130+
# HELP cortex_ingester_memory_users The current number of users in memory.
131+
# TYPE cortex_ingester_memory_users gauge
132+
cortex_ingester_memory_users 1
133+
# HELP cortex_ingester_memory_series The current number of series in memory.
134+
# TYPE cortex_ingester_memory_series gauge
135+
cortex_ingester_memory_series 1
112136
`,
113137
},
114138
"should soft fail on two different sample values at the same timestamp": {
@@ -133,6 +157,12 @@ func TestIngester_v2Push(t *testing.T) {
133157
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
134158
# TYPE cortex_ingester_ingested_samples_failures_total counter
135159
cortex_ingester_ingested_samples_failures_total 1
160+
# HELP cortex_ingester_memory_users The current number of users in memory.
161+
# TYPE cortex_ingester_memory_users gauge
162+
cortex_ingester_memory_users 1
163+
# HELP cortex_ingester_memory_series The current number of series in memory.
164+
# TYPE cortex_ingester_memory_series gauge
165+
cortex_ingester_memory_series 1
136166
`,
137167
},
138168
}
@@ -182,13 +212,77 @@ func TestIngester_v2Push(t *testing.T) {
182212
assert.Equal(t, testData.expectedIngested, res.Timeseries)
183213

184214
// Check tracked Prometheus metrics
185-
metricNames := []string{"cortex_ingester_ingested_samples_total", "cortex_ingester_ingested_samples_failures_total"}
186215
err = testutil.GatherAndCompare(registry, strings.NewReader(testData.expectedMetrics), metricNames...)
187216
assert.NoError(t, err)
188217
})
189218
}
190219
}
191220

221+
func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *testing.T) {
222+
metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
223+
metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters)
224+
metricNames := []string{
225+
"cortex_ingester_ingested_samples_total",
226+
"cortex_ingester_ingested_samples_failures_total",
227+
"cortex_ingester_memory_series",
228+
"cortex_ingester_memory_users",
229+
}
230+
231+
registry := prometheus.NewRegistry()
232+
233+
// Create a mocked ingester
234+
cfg := defaultIngesterTestConfig()
235+
cfg.LifecyclerConfig.JoinAfter = 0
236+
237+
i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, registry)
238+
require.NoError(t, err)
239+
defer i.Shutdown()
240+
defer cleanup()
241+
242+
// Wait until the ingester is ACTIVE
243+
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
244+
return i.lifecycler.GetState()
245+
})
246+
247+
// Push timeseries for each user
248+
for _, userID := range []string{"test-1", "test-2"} {
249+
reqs := []*client.WriteRequest{
250+
client.ToWriteRequest(
251+
[]labels.Labels{metricLabels},
252+
[]client.Sample{{Value: 1, TimestampMs: 9}},
253+
client.API),
254+
client.ToWriteRequest(
255+
[]labels.Labels{metricLabels},
256+
[]client.Sample{{Value: 2, TimestampMs: 10}},
257+
client.API),
258+
}
259+
260+
for _, req := range reqs {
261+
ctx := user.InjectOrgID(context.Background(), userID)
262+
_, err := i.v2Push(ctx, req)
263+
require.NoError(t, err)
264+
}
265+
}
266+
267+
// Check tracked Prometheus metrics
268+
expectedMetrics := `
269+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
270+
# TYPE cortex_ingester_ingested_samples_total counter
271+
cortex_ingester_ingested_samples_total 4
272+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
273+
# TYPE cortex_ingester_ingested_samples_failures_total counter
274+
cortex_ingester_ingested_samples_failures_total 0
275+
# HELP cortex_ingester_memory_users The current number of users in memory.
276+
# TYPE cortex_ingester_memory_users gauge
277+
cortex_ingester_memory_users 2
278+
# HELP cortex_ingester_memory_series The current number of series in memory.
279+
# TYPE cortex_ingester_memory_series gauge
280+
cortex_ingester_memory_series 2
281+
`
282+
283+
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
284+
}
285+
192286
func Test_Ingester_v2LabelNames(t *testing.T) {
193287
series := []struct {
194288
lbls labels.Labels

pkg/ingester/transfer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e
6868
fromIngesterID := ""
6969
seriesReceived := 0
7070
xfer := func() error {
71-
userStates := newUserStates(i.limiter, i.cfg)
71+
userStates := newUserStates(i.limiter, i.cfg, i.metrics)
7272

7373
for {
7474
wireSeries, err := stream.Recv()

pkg/ingester/user_state.go

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88

99
"github.com/go-kit/kit/log/level"
1010
"github.com/prometheus/client_golang/prometheus"
11-
"github.com/prometheus/client_golang/prometheus/promauto"
1211
"github.com/prometheus/common/model"
1312
"github.com/prometheus/prometheus/pkg/labels"
1413
"github.com/segmentio/fasthash/fnv1a"
@@ -23,31 +22,13 @@ import (
2322
"github.com/weaveworks/common/user"
2423
)
2524

26-
var (
27-
memSeries = promauto.NewGauge(prometheus.GaugeOpts{
28-
Name: "cortex_ingester_memory_series",
29-
Help: "The current number of series in memory.",
30-
})
31-
memUsers = promauto.NewGauge(prometheus.GaugeOpts{
32-
Name: "cortex_ingester_memory_users",
33-
Help: "The current number of users in memory.",
34-
})
35-
memSeriesCreatedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
36-
Name: "cortex_ingester_memory_series_created_total",
37-
Help: "The total number of series that were created per user.",
38-
}, []string{"user"})
39-
memSeriesRemovedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
40-
Name: "cortex_ingester_memory_series_removed_total",
41-
Help: "The total number of series that were removed per user.",
42-
}, []string{"user"})
43-
)
44-
4525
// userStates holds the userState object for all users (tenants),
4626
// each one containing all the in-memory series for a given user.
4727
type userStates struct {
4828
states sync.Map
4929
limiter *SeriesLimiter
5030
cfg Config
31+
metrics *ingesterMetrics
5132
}
5233

5334
type userState struct {
@@ -62,6 +43,7 @@ type userState struct {
6243

6344
seriesInMetric []metricCounterShard
6445

46+
memSeries prometheus.Gauge
6547
memSeriesCreatedTotal prometheus.Counter
6648
memSeriesRemovedTotal prometheus.Counter
6749
discardedSamples *prometheus.CounterVec
@@ -80,10 +62,11 @@ type metricCounterShard struct {
8062
m map[string]int
8163
}
8264

83-
func newUserStates(limiter *SeriesLimiter, cfg Config) *userStates {
65+
func newUserStates(limiter *SeriesLimiter, cfg Config, metrics *ingesterMetrics) *userStates {
8466
return &userStates{
8567
limiter: limiter,
8668
cfg: cfg,
69+
metrics: metrics,
8770
}
8871
}
8972

@@ -157,14 +140,15 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe
157140
ingestedRuleSamples: newEWMARate(0.2, us.cfg.RateUpdatePeriod),
158141
seriesInMetric: seriesInMetric,
159142

160-
memSeriesCreatedTotal: memSeriesCreatedTotal.WithLabelValues(userID),
161-
memSeriesRemovedTotal: memSeriesRemovedTotal.WithLabelValues(userID),
143+
memSeries: us.metrics.memSeries,
144+
memSeriesCreatedTotal: us.metrics.memSeriesCreatedTotal.WithLabelValues(userID),
145+
memSeriesRemovedTotal: us.metrics.memSeriesRemovedTotal.WithLabelValues(userID),
162146
discardedSamples: validation.DiscardedSamples.MustCurryWith(prometheus.Labels{"user": userID}),
163147
}
164148
state.mapper = newFPMapper(state.fpToSeries)
165149
stored, ok := us.states.LoadOrStore(userID, state)
166150
if !ok {
167-
memUsers.Inc()
151+
us.metrics.memUsers.Inc()
168152
}
169153
state = stored.(*userState)
170154
}
@@ -212,7 +196,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
212196
}
213197

214198
u.memSeriesCreatedTotal.Inc()
215-
memSeries.Inc()
199+
u.memSeries.Inc()
216200

217201
labels := u.index.Add(metric, fp)
218202
series = newMemorySeries(labels)
@@ -256,7 +240,7 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) {
256240
}
257241

258242
u.memSeriesRemovedTotal.Inc()
259-
memSeries.Dec()
243+
u.memSeries.Dec()
260244
}
261245

262246
// forSeriesMatching passes all series matching the given matchers to the

0 commit comments

Comments
 (0)