Skip to content

Commit efb54b3

Browse files
pstibranypracucci
authored andcommitted
TSDB: fix user metrics when using blocks storage (#1990)
* When using TSDB, we get per-user created and removed series from TSDB metrics. This affects cortex_ingester_memory_series_created_total and cortex_ingester_memory_series_removed_total. This change also renames previous shipperMetrics to tsdbMetrics, and registers the same registry to TSDB and TSDB shipper component. Signed-off-by: Peter Štibraný <[email protected]> * Store TSDB registry at the end of TSDB creation. This fixes problem with collection occuring when TSDB already registered metrics, but wasn't otherwise fully initialized yet, and it crashed. Signed-off-by: Peter Štibraný <[email protected]> * Remove repeated code. Signed-off-by: Peter Štibraný <[email protected]> * Fix tests after change to registry registration. Signed-off-by: Peter Štibraný <[email protected]> * Updated CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Fix TSDB capitalization. Signed-off-by: Peter Štibraný <[email protected]> * Test for new metrics. Signed-off-by: Peter Štibraný <[email protected]> * Fix variable name. Signed-off-by: Peter Štibraný <[email protected]> * Added new metrics to the test. Signed-off-by: Peter Štibraný <[email protected]>
1 parent add279f commit efb54b3

File tree

6 files changed

+199
-48
lines changed

6 files changed

+199
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ instructions below to upgrade your Postgres.
4343
* [BUGFIX] TSDB: `experimental.tsdb.ship-interval` of <=0 treated as disabled instead of allowing panic. #1975
4444
* [BUGFIX] TSDB: Fixed `cortex_ingester_queried_samples` and `cortex_ingester_queried_series` metrics when using block storage. #1981
4545
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series` and `cortex_ingester_memory_users` metrics when using with the experimental TSDB blocks storage. #1982
46+
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series_created_total` and `cortex_ingester_memory_series_removed_total` metrics when using TSDB blocks storage. #1990
4647

4748
### Upgrading Postgres (if you're using configs service)
4849

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
134134
i := &Ingester{
135135
cfg: cfg,
136136
clientConfig: clientConfig,
137-
metrics: newIngesterMetrics(registerer),
137+
metrics: newIngesterMetrics(registerer, true),
138138
limits: limits,
139139
chunkStore: chunkStore,
140140
quit: make(chan struct{}),

pkg/ingester/ingester_v2.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TSDBState struct {
4646
// transferring to a joining ingester
4747
transferOnce sync.Once
4848

49-
shipperMetrics *shipperMetrics
49+
tsdbMetrics *tsdbMetrics
5050
}
5151

5252
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
@@ -59,15 +59,15 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
5959
i := &Ingester{
6060
cfg: cfg,
6161
clientConfig: clientConfig,
62-
metrics: newIngesterMetrics(registerer),
62+
metrics: newIngesterMetrics(registerer, false),
6363
limits: limits,
6464
chunkStore: nil,
6565
quit: make(chan struct{}),
6666

6767
TSDBState: TSDBState{
68-
dbs: make(map[string]*tsdb.DB),
69-
bucket: bucketClient,
70-
shipperMetrics: newShipperMetrics(registerer),
68+
dbs: make(map[string]*tsdb.DB),
69+
bucket: bucketClient,
70+
tsdbMetrics: newTSDBMetrics(registerer),
7171
},
7272
}
7373

@@ -419,10 +419,12 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
419419

420420
// createTSDB creates a TSDB for a given userID, and returns the created db.
421421
func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
422+
tsdbPromReg := prometheus.NewRegistry()
423+
422424
udir := i.cfg.TSDBConfig.BlocksDir(userID)
423425

424426
// Create a new user database
425-
db, err := tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
427+
db, err := tsdb.Open(udir, util.Logger, tsdbPromReg, &tsdb.Options{
426428
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
427429
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
428430
NoLockfile: true,
@@ -443,7 +445,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
443445

444446
// Create a new shipper for this database
445447
if i.cfg.TSDBConfig.ShipInterval > 0 {
446-
s := shipper.New(util.Logger, i.TSDBState.shipperMetrics.newRegistryForUser(userID), udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource)
448+
s := shipper.New(util.Logger, tsdbPromReg, udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource)
447449
i.done.Add(1)
448450
go func() {
449451
defer i.done.Done()
@@ -458,6 +460,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
458460
}()
459461
}
460462

463+
i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)
461464
return db, nil
462465
}
463466

pkg/ingester/ingester_v2_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ func TestIngester_v2Push(t *testing.T) {
3636
"cortex_ingester_ingested_samples_failures_total",
3737
"cortex_ingester_memory_series",
3838
"cortex_ingester_memory_users",
39+
"cortex_ingester_memory_series_created_total",
40+
"cortex_ingester_memory_series_removed_total",
3941
}
4042
userID := "test"
4143

@@ -73,6 +75,12 @@ func TestIngester_v2Push(t *testing.T) {
7375
# HELP cortex_ingester_memory_series The current number of series in memory.
7476
# TYPE cortex_ingester_memory_series gauge
7577
cortex_ingester_memory_series 1
78+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
79+
# TYPE cortex_ingester_memory_series_created_total counter
80+
cortex_ingester_memory_series_created_total{user="test"} 1
81+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
82+
# TYPE cortex_ingester_memory_series_removed_total counter
83+
cortex_ingester_memory_series_removed_total{user="test"} 0
7684
`,
7785
},
7886
"should soft fail on sample out of order": {
@@ -103,6 +111,12 @@ func TestIngester_v2Push(t *testing.T) {
103111
# HELP cortex_ingester_memory_series The current number of series in memory.
104112
# TYPE cortex_ingester_memory_series gauge
105113
cortex_ingester_memory_series 1
114+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
115+
# TYPE cortex_ingester_memory_series_created_total counter
116+
cortex_ingester_memory_series_created_total{user="test"} 1
117+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
118+
# TYPE cortex_ingester_memory_series_removed_total counter
119+
cortex_ingester_memory_series_removed_total{user="test"} 0
106120
`,
107121
},
108122
"should soft fail on sample out of bound": {
@@ -133,6 +147,12 @@ func TestIngester_v2Push(t *testing.T) {
133147
# HELP cortex_ingester_memory_series The current number of series in memory.
134148
# TYPE cortex_ingester_memory_series gauge
135149
cortex_ingester_memory_series 1
150+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
151+
# TYPE cortex_ingester_memory_series_created_total counter
152+
cortex_ingester_memory_series_created_total{user="test"} 1
153+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
154+
# TYPE cortex_ingester_memory_series_removed_total counter
155+
cortex_ingester_memory_series_removed_total{user="test"} 0
136156
`,
137157
},
138158
"should soft fail on two different sample values at the same timestamp": {
@@ -163,6 +183,12 @@ func TestIngester_v2Push(t *testing.T) {
163183
# HELP cortex_ingester_memory_series The current number of series in memory.
164184
# TYPE cortex_ingester_memory_series gauge
165185
cortex_ingester_memory_series 1
186+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
187+
# TYPE cortex_ingester_memory_series_created_total counter
188+
cortex_ingester_memory_series_created_total{user="test"} 1
189+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
190+
# TYPE cortex_ingester_memory_series_removed_total counter
191+
cortex_ingester_memory_series_removed_total{user="test"} 0
166192
`,
167193
},
168194
}
@@ -226,6 +252,8 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
226252
"cortex_ingester_ingested_samples_failures_total",
227253
"cortex_ingester_memory_series",
228254
"cortex_ingester_memory_users",
255+
"cortex_ingester_memory_series_created_total",
256+
"cortex_ingester_memory_series_removed_total",
229257
}
230258

231259
registry := prometheus.NewRegistry()
@@ -278,6 +306,14 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
278306
# HELP cortex_ingester_memory_series The current number of series in memory.
279307
# TYPE cortex_ingester_memory_series gauge
280308
cortex_ingester_memory_series 2
309+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
310+
# TYPE cortex_ingester_memory_series_created_total counter
311+
cortex_ingester_memory_series_created_total{user="test-1"} 1
312+
cortex_ingester_memory_series_created_total{user="test-2"} 1
313+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
314+
# TYPE cortex_ingester_memory_series_removed_total counter
315+
cortex_ingester_memory_series_removed_total{user="test-1"} 0
316+
cortex_ingester_memory_series_removed_total{user="test-2"} 0
281317
`
282318

283319
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))

pkg/ingester/metrics.go

Lines changed: 103 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ import (
99
dto "github.com/prometheus/client_model/go"
1010
)
1111

12+
const (
13+
memSeriesCreatedTotalName = "cortex_ingester_memory_series_created_total"
14+
memSeriesCreatedTotalHelp = "The total number of series that were created per user."
15+
16+
memSeriesRemovedTotalName = "cortex_ingester_memory_series_removed_total"
17+
memSeriesRemovedTotalHelp = "The total number of series that were removed per user."
18+
)
19+
1220
type ingesterMetrics struct {
1321
flushQueueLength prometheus.Gauge
1422
ingestedSamples prometheus.Counter
@@ -23,7 +31,7 @@ type ingesterMetrics struct {
2331
memSeriesRemovedTotal *prometheus.CounterVec
2432
}
2533

26-
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
34+
func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *ingesterMetrics {
2735
m := &ingesterMetrics{
2836
flushQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
2937
Name: "cortex_ingester_flush_queue_length",
@@ -68,12 +76,12 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
6876
Help: "The current number of users in memory.",
6977
}),
7078
memSeriesCreatedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
71-
Name: "cortex_ingester_memory_series_created_total",
72-
Help: "The total number of series that were created per user.",
79+
Name: memSeriesCreatedTotalName,
80+
Help: memSeriesCreatedTotalHelp,
7381
}, []string{"user"}),
7482
memSeriesRemovedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
75-
Name: "cortex_ingester_memory_series_removed_total",
76-
Help: "The total number of series that were removed per user.",
83+
Name: memSeriesRemovedTotalName,
84+
Help: memSeriesRemovedTotalHelp,
7785
}, []string{"user"}),
7886
}
7987

@@ -88,28 +96,42 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
8896
m.queriedChunks,
8997
m.memSeries,
9098
m.memUsers,
91-
m.memSeriesCreatedTotal,
92-
m.memSeriesRemovedTotal,
9399
)
100+
101+
if registerMetricsConflictingWithTSDB {
102+
r.MustRegister(
103+
m.memSeriesCreatedTotal,
104+
m.memSeriesRemovedTotal,
105+
)
106+
}
94107
}
95108

96109
return m
97110
}
98111

99-
// TSDB shipper metrics. We aggregate metrics from individual TSDB shippers into
100-
// a single set of counters, which are exposed as Cortex metrics.
101-
type shipperMetrics struct {
112+
// TSDB metrics. Each tenant has its own registry, that TSDB code uses.
113+
type tsdbMetrics struct {
114+
// We aggregate metrics from individual TSDB registries into
115+
// a single set of counters, which are exposed as Cortex metrics.
102116
dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total)
103117
dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total)
104118
uploads *prometheus.Desc // sum(thanos_shipper_uploads_total)
105119
uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total)
106120

121+
// These two metrics replace metrics in ingesterMetrics, as we count them differently
122+
memSeriesCreatedTotal *prometheus.Desc
123+
memSeriesRemovedTotal *prometheus.Desc
124+
125+
// These maps drive the collection output. Key = original metric name to group.
126+
sumCountersGlobally map[string]*prometheus.Desc
127+
sumCountersPerUser map[string]*prometheus.Desc
128+
107129
regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
108-
regs map[string]*prometheus.Registry // One prometheus registry (used by shipper) per tenant
130+
regs map[string]*prometheus.Registry // One prometheus registry per tenant
109131
}
110132

111-
func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
112-
m := &shipperMetrics{
133+
func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
134+
m := &tsdbMetrics{
113135
regs: make(map[string]*prometheus.Registry),
114136

115137
dirSyncs: prometheus.NewDesc(
@@ -128,6 +150,21 @@ func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
128150
"cortex_ingester_shipper_upload_failures_total",
129151
"TSDB: Total number of failed object uploads",
130152
nil, nil),
153+
154+
memSeriesCreatedTotal: prometheus.NewDesc(memSeriesCreatedTotalName, memSeriesCreatedTotalHelp, []string{"user"}, nil),
155+
memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil),
156+
}
157+
158+
m.sumCountersGlobally = map[string]*prometheus.Desc{
159+
"thanos_shipper_dir_syncs_total": m.dirSyncs,
160+
"thanos_shipper_dir_sync_failures_total": m.dirSyncFailures,
161+
"thanos_shipper_uploads_total": m.uploads,
162+
"thanos_shipper_upload_failures_total": m.uploadFailures,
163+
}
164+
165+
m.sumCountersPerUser = map[string]*prometheus.Desc{
166+
"prometheus_tsdb_head_series_created_total": m.memSeriesCreatedTotal,
167+
"prometheus_tsdb_head_series_removed_total": m.memSeriesRemovedTotal,
131168
}
132169

133170
if r != nil {
@@ -136,51 +173,58 @@ func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
136173
return m
137174
}
138175

139-
func (sm *shipperMetrics) Describe(out chan<- *prometheus.Desc) {
176+
func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
140177
out <- sm.dirSyncs
141178
out <- sm.dirSyncFailures
142179
out <- sm.uploads
143180
out <- sm.uploadFailures
181+
out <- sm.memSeriesCreatedTotal
182+
out <- sm.memSeriesRemovedTotal
144183
}
145184

146-
func (sm *shipperMetrics) Collect(out chan<- prometheus.Metric) {
147-
gathered := make(map[string][]*dto.MetricFamily)
185+
func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
186+
regs := sm.registries()
187+
data := gatheredMetricsPerUser{}
148188

149-
regs := sm.shipperRegistries()
150189
for userID, r := range regs {
151190
m, err := r.Gather()
152191
if err != nil {
153192
level.Warn(util.Logger).Log("msg", "failed to gather metrics from TSDB shipper", "user", userID, "err", err)
154193
continue
155194
}
156195

157-
addToGatheredMap(gathered, m)
196+
data.addGatheredDataForUser(userID, m)
158197
}
159198

160199
// OK, we have it all. Let's build results.
161-
out <- prometheus.MustNewConstMetric(sm.dirSyncs, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_dir_syncs_total"]))
162-
out <- prometheus.MustNewConstMetric(sm.dirSyncFailures, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_dir_sync_failures_total"]))
163-
out <- prometheus.MustNewConstMetric(sm.uploads, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_uploads_total"]))
164-
out <- prometheus.MustNewConstMetric(sm.uploadFailures, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_upload_failures_total"]))
200+
for metric, desc := range sm.sumCountersGlobally {
201+
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, data.sumCountersAcrossAllUsers(metric))
202+
}
203+
204+
for metric, desc := range sm.sumCountersPerUser {
205+
userValues := data.sumCountersPerUser(metric)
206+
for user, val := range userValues {
207+
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, val, user)
208+
}
209+
}
165210
}
166211

167-
func (sm *shipperMetrics) shipperRegistries() []*prometheus.Registry {
212+
// make a copy of the map, so that metrics can be gathered while the new registry is being added.
213+
func (sm *tsdbMetrics) registries() map[string]*prometheus.Registry {
168214
sm.regsMu.RLock()
169215
defer sm.regsMu.RUnlock()
170216

171-
regs := make([]*prometheus.Registry, 0, len(sm.regs))
172-
for _, r := range sm.regs {
173-
regs = append(regs, r)
217+
regs := make(map[string]*prometheus.Registry, len(sm.regs))
218+
for u, r := range sm.regs {
219+
regs[u] = r
174220
}
175221
return regs
176222
}
177223

178-
func (sm *shipperMetrics) newRegistryForUser(userID string) prometheus.Registerer {
179-
reg := prometheus.NewRegistry()
224+
func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) {
180225
sm.regsMu.Lock()
181-
sm.regs[userID] = reg
226+
sm.regs[userID] = registry
182227
sm.regsMu.Unlock()
183-
return reg
184228
}
185229

186230
func sumCounters(mfs []*dto.MetricFamily) float64 {
@@ -201,11 +245,37 @@ func sumCounters(mfs []*dto.MetricFamily) float64 {
201245
return result
202246
}
203247

204-
func addToGatheredMap(all map[string][]*dto.MetricFamily, mfs []*dto.MetricFamily) {
205-
for _, m := range mfs {
248+
// first key = userID, second key = metric name. Value = slice of gathered values with the same metric name.
249+
type gatheredMetricsPerUser map[string]map[string][]*dto.MetricFamily
250+
251+
func (d gatheredMetricsPerUser) addGatheredDataForUser(userID string, metrics []*dto.MetricFamily) {
252+
// first, create new map which maps metric names to a slice of MetricFamily instances.
253+
// That makes it easier to do searches later.
254+
perMetricName := map[string][]*dto.MetricFamily{}
255+
256+
for _, m := range metrics {
206257
if m.Name == nil {
207258
continue
208259
}
209-
all[*m.Name] = append(all[*m.Name], m)
260+
perMetricName[*m.Name] = append(perMetricName[*m.Name], m)
261+
}
262+
263+
d[userID] = perMetricName
264+
}
265+
266+
func (d gatheredMetricsPerUser) sumCountersAcrossAllUsers(counter string) float64 {
267+
result := float64(0)
268+
for _, perMetric := range d {
269+
result += sumCounters(perMetric[counter])
210270
}
271+
return result
272+
}
273+
274+
func (d gatheredMetricsPerUser) sumCountersPerUser(counter string) map[string]float64 {
275+
result := map[string]float64{}
276+
for user, perMetric := range d {
277+
v := sumCounters(perMetric[counter])
278+
result[user] = v
279+
}
280+
return result
211281
}

0 commit comments

Comments
 (0)