Skip to content

Commit aaa2aa2

Browse files
author
Ganesh Vernekar
committed
Merge remote-tracking branch 'upstream/master' into wal
Signed-off-by: Ganesh Vernekar <[email protected]>
2 parents 8b55cdb + efb54b3 commit aaa2aa2

File tree

6 files changed

+199
-48
lines changed

6 files changed

+199
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ instructions below to upgrade your Postgres.
4343
* [BUGFIX] TSDB: `experimental.tsdb.ship-interval` of <=0 treated as disabled instead of allowing panic. #1975
4444
* [BUGFIX] TSDB: Fixed `cortex_ingester_queried_samples` and `cortex_ingester_queried_series` metrics when using block storage. #1981
4545
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series` and `cortex_ingester_memory_users` metrics when using with the experimental TSDB blocks storage. #1982
46+
* [BUGFIX] TSDB: Fixed `cortex_ingester_memory_series_created_total` and `cortex_ingester_memory_series_removed_total` metrics when using TSDB blocks storage. #1990
4647

4748
### Upgrading Postgres (if you're using configs service)
4849

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
157157
i := &Ingester{
158158
cfg: cfg,
159159
clientConfig: clientConfig,
160-
metrics: newIngesterMetrics(registerer),
160+
metrics: newIngesterMetrics(registerer, true),
161161
limits: limits,
162162
chunkStore: chunkStore,
163163
quit: make(chan struct{}),

pkg/ingester/ingester_v2.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TSDBState struct {
4646
// transferring to a joining ingester
4747
transferOnce sync.Once
4848

49-
shipperMetrics *shipperMetrics
49+
tsdbMetrics *tsdbMetrics
5050
}
5151

5252
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
@@ -59,15 +59,15 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
5959
i := &Ingester{
6060
cfg: cfg,
6161
clientConfig: clientConfig,
62-
metrics: newIngesterMetrics(registerer),
62+
metrics: newIngesterMetrics(registerer, false),
6363
limits: limits,
6464
chunkStore: nil,
6565
quit: make(chan struct{}),
6666
wal: &noopWAL{},
6767
TSDBState: TSDBState{
68-
dbs: make(map[string]*tsdb.DB),
69-
bucket: bucketClient,
70-
shipperMetrics: newShipperMetrics(registerer),
68+
dbs: make(map[string]*tsdb.DB),
69+
bucket: bucketClient,
70+
tsdbMetrics: newTSDBMetrics(registerer),
7171
},
7272
}
7373

@@ -419,10 +419,12 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)
419419

420420
// createTSDB creates a TSDB for a given userID, and returns the created db.
421421
func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
422+
tsdbPromReg := prometheus.NewRegistry()
423+
422424
udir := i.cfg.TSDBConfig.BlocksDir(userID)
423425

424426
// Create a new user database
425-
db, err := tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
427+
db, err := tsdb.Open(udir, util.Logger, tsdbPromReg, &tsdb.Options{
426428
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
427429
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
428430
NoLockfile: true,
@@ -443,7 +445,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
443445

444446
// Create a new shipper for this database
445447
if i.cfg.TSDBConfig.ShipInterval > 0 {
446-
s := shipper.New(util.Logger, i.TSDBState.shipperMetrics.newRegistryForUser(userID), udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource)
448+
s := shipper.New(util.Logger, tsdbPromReg, udir, &Bucket{userID, i.TSDBState.bucket}, func() labels.Labels { return l }, metadata.ReceiveSource)
447449
i.done.Add(1)
448450
go func() {
449451
defer i.done.Done()
@@ -458,6 +460,7 @@ func (i *Ingester) createTSDB(userID string) (*tsdb.DB, error) {
458460
}()
459461
}
460462

463+
i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)
461464
return db, nil
462465
}
463466

pkg/ingester/ingester_v2_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ func TestIngester_v2Push(t *testing.T) {
3636
"cortex_ingester_ingested_samples_failures_total",
3737
"cortex_ingester_memory_series",
3838
"cortex_ingester_memory_users",
39+
"cortex_ingester_memory_series_created_total",
40+
"cortex_ingester_memory_series_removed_total",
3941
}
4042
userID := "test"
4143

@@ -73,6 +75,12 @@ func TestIngester_v2Push(t *testing.T) {
7375
# HELP cortex_ingester_memory_series The current number of series in memory.
7476
# TYPE cortex_ingester_memory_series gauge
7577
cortex_ingester_memory_series 1
78+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
79+
# TYPE cortex_ingester_memory_series_created_total counter
80+
cortex_ingester_memory_series_created_total{user="test"} 1
81+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
82+
# TYPE cortex_ingester_memory_series_removed_total counter
83+
cortex_ingester_memory_series_removed_total{user="test"} 0
7684
`,
7785
},
7886
"should soft fail on sample out of order": {
@@ -103,6 +111,12 @@ func TestIngester_v2Push(t *testing.T) {
103111
# HELP cortex_ingester_memory_series The current number of series in memory.
104112
# TYPE cortex_ingester_memory_series gauge
105113
cortex_ingester_memory_series 1
114+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
115+
# TYPE cortex_ingester_memory_series_created_total counter
116+
cortex_ingester_memory_series_created_total{user="test"} 1
117+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
118+
# TYPE cortex_ingester_memory_series_removed_total counter
119+
cortex_ingester_memory_series_removed_total{user="test"} 0
106120
`,
107121
},
108122
"should soft fail on sample out of bound": {
@@ -133,6 +147,12 @@ func TestIngester_v2Push(t *testing.T) {
133147
# HELP cortex_ingester_memory_series The current number of series in memory.
134148
# TYPE cortex_ingester_memory_series gauge
135149
cortex_ingester_memory_series 1
150+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
151+
# TYPE cortex_ingester_memory_series_created_total counter
152+
cortex_ingester_memory_series_created_total{user="test"} 1
153+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
154+
# TYPE cortex_ingester_memory_series_removed_total counter
155+
cortex_ingester_memory_series_removed_total{user="test"} 0
136156
`,
137157
},
138158
"should soft fail on two different sample values at the same timestamp": {
@@ -163,6 +183,12 @@ func TestIngester_v2Push(t *testing.T) {
163183
# HELP cortex_ingester_memory_series The current number of series in memory.
164184
# TYPE cortex_ingester_memory_series gauge
165185
cortex_ingester_memory_series 1
186+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
187+
# TYPE cortex_ingester_memory_series_created_total counter
188+
cortex_ingester_memory_series_created_total{user="test"} 1
189+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
190+
# TYPE cortex_ingester_memory_series_removed_total counter
191+
cortex_ingester_memory_series_removed_total{user="test"} 0
166192
`,
167193
},
168194
}
@@ -226,6 +252,8 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
226252
"cortex_ingester_ingested_samples_failures_total",
227253
"cortex_ingester_memory_series",
228254
"cortex_ingester_memory_users",
255+
"cortex_ingester_memory_series_created_total",
256+
"cortex_ingester_memory_series_removed_total",
229257
}
230258

231259
registry := prometheus.NewRegistry()
@@ -278,6 +306,14 @@ func TestIngester_v2Push_ShouldCorrectlyTrackMetricsInMultiTenantScenario(t *tes
278306
# HELP cortex_ingester_memory_series The current number of series in memory.
279307
# TYPE cortex_ingester_memory_series gauge
280308
cortex_ingester_memory_series 2
309+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
310+
# TYPE cortex_ingester_memory_series_created_total counter
311+
cortex_ingester_memory_series_created_total{user="test-1"} 1
312+
cortex_ingester_memory_series_created_total{user="test-2"} 1
313+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
314+
# TYPE cortex_ingester_memory_series_removed_total counter
315+
cortex_ingester_memory_series_removed_total{user="test-1"} 0
316+
cortex_ingester_memory_series_removed_total{user="test-2"} 0
281317
`
282318

283319
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))

pkg/ingester/metrics.go

Lines changed: 103 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,14 @@ import (
99
dto "github.com/prometheus/client_model/go"
1010
)
1111

12+
const (
13+
memSeriesCreatedTotalName = "cortex_ingester_memory_series_created_total"
14+
memSeriesCreatedTotalHelp = "The total number of series that were created per user."
15+
16+
memSeriesRemovedTotalName = "cortex_ingester_memory_series_removed_total"
17+
memSeriesRemovedTotalHelp = "The total number of series that were removed per user."
18+
)
19+
1220
type ingesterMetrics struct {
1321
flushQueueLength prometheus.Gauge
1422
ingestedSamples prometheus.Counter
@@ -24,7 +32,7 @@ type ingesterMetrics struct {
2432
walReplayDuration prometheus.Gauge
2533
}
2634

27-
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
35+
func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *ingesterMetrics {
2836
m := &ingesterMetrics{
2937
flushQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
3038
Name: "cortex_ingester_flush_queue_length",
@@ -69,12 +77,12 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
6977
Help: "The current number of users in memory.",
7078
}),
7179
memSeriesCreatedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
72-
Name: "cortex_ingester_memory_series_created_total",
73-
Help: "The total number of series that were created per user.",
80+
Name: memSeriesCreatedTotalName,
81+
Help: memSeriesCreatedTotalHelp,
7482
}, []string{"user"}),
7583
memSeriesRemovedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
76-
Name: "cortex_ingester_memory_series_removed_total",
77-
Help: "The total number of series that were removed per user.",
84+
Name: memSeriesRemovedTotalName,
85+
Help: memSeriesRemovedTotalHelp,
7886
}, []string{"user"}),
7987
walReplayDuration: prometheus.NewGauge(prometheus.GaugeOpts{
8088
Name: "cortex_ingester_wal_replay_duration_seconds",
@@ -93,29 +101,43 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
93101
m.queriedChunks,
94102
m.memSeries,
95103
m.memUsers,
96-
m.memSeriesCreatedTotal,
97-
m.memSeriesRemovedTotal,
98104
m.walReplayDuration,
99105
)
106+
107+
if registerMetricsConflictingWithTSDB {
108+
r.MustRegister(
109+
m.memSeriesCreatedTotal,
110+
m.memSeriesRemovedTotal,
111+
)
112+
}
100113
}
101114

102115
return m
103116
}
104117

105-
// TSDB shipper metrics. We aggregate metrics from individual TSDB shippers into
106-
// a single set of counters, which are exposed as Cortex metrics.
107-
type shipperMetrics struct {
118+
// TSDB metrics. Each tenant has its own registry, that TSDB code uses.
119+
type tsdbMetrics struct {
120+
// We aggregate metrics from individual TSDB registries into
121+
// a single set of counters, which are exposed as Cortex metrics.
108122
dirSyncs *prometheus.Desc // sum(thanos_shipper_dir_syncs_total)
109123
dirSyncFailures *prometheus.Desc // sum(thanos_shipper_dir_sync_failures_total)
110124
uploads *prometheus.Desc // sum(thanos_shipper_uploads_total)
111125
uploadFailures *prometheus.Desc // sum(thanos_shipper_upload_failures_total)
112126

127+
// These two metrics replace metrics in ingesterMetrics, as we count them differently
128+
memSeriesCreatedTotal *prometheus.Desc
129+
memSeriesRemovedTotal *prometheus.Desc
130+
131+
// These maps drive the collection output. Key = original metric name to group.
132+
sumCountersGlobally map[string]*prometheus.Desc
133+
sumCountersPerUser map[string]*prometheus.Desc
134+
113135
regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
114-
regs map[string]*prometheus.Registry // One prometheus registry (used by shipper) per tenant
136+
regs map[string]*prometheus.Registry // One prometheus registry per tenant
115137
}
116138

117-
func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
118-
m := &shipperMetrics{
139+
func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
140+
m := &tsdbMetrics{
119141
regs: make(map[string]*prometheus.Registry),
120142

121143
dirSyncs: prometheus.NewDesc(
@@ -134,6 +156,21 @@ func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
134156
"cortex_ingester_shipper_upload_failures_total",
135157
"TSDB: Total number of failed object uploads",
136158
nil, nil),
159+
160+
memSeriesCreatedTotal: prometheus.NewDesc(memSeriesCreatedTotalName, memSeriesCreatedTotalHelp, []string{"user"}, nil),
161+
memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil),
162+
}
163+
164+
m.sumCountersGlobally = map[string]*prometheus.Desc{
165+
"thanos_shipper_dir_syncs_total": m.dirSyncs,
166+
"thanos_shipper_dir_sync_failures_total": m.dirSyncFailures,
167+
"thanos_shipper_uploads_total": m.uploads,
168+
"thanos_shipper_upload_failures_total": m.uploadFailures,
169+
}
170+
171+
m.sumCountersPerUser = map[string]*prometheus.Desc{
172+
"prometheus_tsdb_head_series_created_total": m.memSeriesCreatedTotal,
173+
"prometheus_tsdb_head_series_removed_total": m.memSeriesRemovedTotal,
137174
}
138175

139176
if r != nil {
@@ -142,51 +179,58 @@ func newShipperMetrics(r prometheus.Registerer) *shipperMetrics {
142179
return m
143180
}
144181

145-
func (sm *shipperMetrics) Describe(out chan<- *prometheus.Desc) {
182+
func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
146183
out <- sm.dirSyncs
147184
out <- sm.dirSyncFailures
148185
out <- sm.uploads
149186
out <- sm.uploadFailures
187+
out <- sm.memSeriesCreatedTotal
188+
out <- sm.memSeriesRemovedTotal
150189
}
151190

152-
func (sm *shipperMetrics) Collect(out chan<- prometheus.Metric) {
153-
gathered := make(map[string][]*dto.MetricFamily)
191+
func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
192+
regs := sm.registries()
193+
data := gatheredMetricsPerUser{}
154194

155-
regs := sm.shipperRegistries()
156195
for userID, r := range regs {
157196
m, err := r.Gather()
158197
if err != nil {
159198
level.Warn(util.Logger).Log("msg", "failed to gather metrics from TSDB shipper", "user", userID, "err", err)
160199
continue
161200
}
162201

163-
addToGatheredMap(gathered, m)
202+
data.addGatheredDataForUser(userID, m)
164203
}
165204

166205
// OK, we have it all. Let's build results.
167-
out <- prometheus.MustNewConstMetric(sm.dirSyncs, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_dir_syncs_total"]))
168-
out <- prometheus.MustNewConstMetric(sm.dirSyncFailures, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_dir_sync_failures_total"]))
169-
out <- prometheus.MustNewConstMetric(sm.uploads, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_uploads_total"]))
170-
out <- prometheus.MustNewConstMetric(sm.uploadFailures, prometheus.CounterValue, sumCounters(gathered["thanos_shipper_upload_failures_total"]))
206+
for metric, desc := range sm.sumCountersGlobally {
207+
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, data.sumCountersAcrossAllUsers(metric))
208+
}
209+
210+
for metric, desc := range sm.sumCountersPerUser {
211+
userValues := data.sumCountersPerUser(metric)
212+
for user, val := range userValues {
213+
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, val, user)
214+
}
215+
}
171216
}
172217

173-
func (sm *shipperMetrics) shipperRegistries() []*prometheus.Registry {
218+
// make a copy of the map, so that metrics can be gathered while the new registry is being added.
219+
func (sm *tsdbMetrics) registries() map[string]*prometheus.Registry {
174220
sm.regsMu.RLock()
175221
defer sm.regsMu.RUnlock()
176222

177-
regs := make([]*prometheus.Registry, 0, len(sm.regs))
178-
for _, r := range sm.regs {
179-
regs = append(regs, r)
223+
regs := make(map[string]*prometheus.Registry, len(sm.regs))
224+
for u, r := range sm.regs {
225+
regs[u] = r
180226
}
181227
return regs
182228
}
183229

184-
func (sm *shipperMetrics) newRegistryForUser(userID string) prometheus.Registerer {
185-
reg := prometheus.NewRegistry()
230+
func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Registry) {
186231
sm.regsMu.Lock()
187-
sm.regs[userID] = reg
232+
sm.regs[userID] = registry
188233
sm.regsMu.Unlock()
189-
return reg
190234
}
191235

192236
func sumCounters(mfs []*dto.MetricFamily) float64 {
@@ -207,11 +251,37 @@ func sumCounters(mfs []*dto.MetricFamily) float64 {
207251
return result
208252
}
209253

210-
func addToGatheredMap(all map[string][]*dto.MetricFamily, mfs []*dto.MetricFamily) {
211-
for _, m := range mfs {
254+
// first key = userID, second key = metric name. Value = slice of gathered values with the same metric name.
255+
type gatheredMetricsPerUser map[string]map[string][]*dto.MetricFamily
256+
257+
func (d gatheredMetricsPerUser) addGatheredDataForUser(userID string, metrics []*dto.MetricFamily) {
258+
// first, create new map which maps metric names to a slice of MetricFamily instances.
259+
// That makes it easier to do searches later.
260+
perMetricName := map[string][]*dto.MetricFamily{}
261+
262+
for _, m := range metrics {
212263
if m.Name == nil {
213264
continue
214265
}
215-
all[*m.Name] = append(all[*m.Name], m)
266+
perMetricName[*m.Name] = append(perMetricName[*m.Name], m)
267+
}
268+
269+
d[userID] = perMetricName
270+
}
271+
272+
func (d gatheredMetricsPerUser) sumCountersAcrossAllUsers(counter string) float64 {
273+
result := float64(0)
274+
for _, perMetric := range d {
275+
result += sumCounters(perMetric[counter])
216276
}
277+
return result
278+
}
279+
280+
func (d gatheredMetricsPerUser) sumCountersPerUser(counter string) map[string]float64 {
281+
result := map[string]float64{}
282+
for user, perMetric := range d {
283+
v := sumCounters(perMetric[counter])
284+
result[user] = v
285+
}
286+
return result
217287
}

0 commit comments

Comments
 (0)