Skip to content

Commit 9f0b9d8

Browse files
committed
Add exemplar metrics for distributor received/in and validation discarded per reason
Signed-off-by: Martin Disibio <[email protected]>
1 parent 5b59fec commit 9f0b9d8

File tree

4 files changed

+108
-2
lines changed

4 files changed

+108
-2
lines changed

pkg/distributor/distributor.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,10 @@ type Distributor struct {
9393
// Metrics
9494
queryDuration *instrument.HistogramCollector
9595
receivedSamples *prometheus.CounterVec
96+
receivedExemplars *prometheus.CounterVec
9697
receivedMetadata *prometheus.CounterVec
9798
incomingSamples *prometheus.CounterVec
99+
incomingExemplars *prometheus.CounterVec
98100
incomingMetadata *prometheus.CounterVec
99101
nonHASamples *prometheus.CounterVec
100102
dedupedSamples *prometheus.CounterVec
@@ -241,6 +243,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
241243
Name: "distributor_received_samples_total",
242244
Help: "The total number of received samples, excluding rejected and deduped samples.",
243245
}, []string{"user"}),
246+
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
247+
Namespace: "cortex",
248+
Name: "distributor_received_exemplars_total",
249+
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
250+
}, []string{"user"}),
244251
receivedMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
245252
Namespace: "cortex",
246253
Name: "distributor_received_metadata_total",
@@ -251,6 +258,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
251258
Name: "distributor_samples_in_total",
252259
Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.",
253260
}, []string{"user"}),
261+
incomingExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
262+
Namespace: "cortex",
263+
Name: "distributor_exemplars_in_total",
264+
Help: "The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.",
265+
}, []string{"user"}),
254266
incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
255267
Namespace: "cortex",
256268
Name: "distributor_metadata_in_total",
@@ -375,8 +387,10 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
375387
d.HATracker.cleanupHATrackerMetricsForUser(userID)
376388

377389
d.receivedSamples.DeleteLabelValues(userID)
390+
d.receivedExemplars.DeleteLabelValues(userID)
378391
d.receivedMetadata.DeleteLabelValues(userID)
379392
d.incomingSamples.DeleteLabelValues(userID)
393+
d.incomingExemplars.DeleteLabelValues(userID)
380394
d.incomingMetadata.DeleteLabelValues(userID)
381395
d.nonHASamples.DeleteLabelValues(userID)
382396
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)
@@ -543,11 +557,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
543557
removeReplica := false
544558

545559
numSamples := 0
560+
numExemplars := 0
546561
for _, ts := range req.Timeseries {
547562
numSamples += len(ts.Samples)
563+
numExemplars += len(ts.Exemplars)
548564
}
549565
// Count the total samples in, prior to validation or deduplication, for comparison with other metrics.
550566
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
567+
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
551568
// Count the total number of metadata in.
552569
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
553570

@@ -675,6 +692,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
675692
}
676693

677694
d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
695+
d.receivedExemplars.WithLabelValues(userID).Add((float64(validatedExemplars)))
678696
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
679697

680698
if len(seriesKeys) == 0 && len(metadataKeys) == 0 {
@@ -692,6 +710,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
692710
// Return a 4xx here to have the client discard the data and not retry. If a client
693711
// is sending too much data consistently we will unlikely ever catch up otherwise.
694712
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
713+
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
695714
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
696715
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
697716
}

pkg/distributor/distributor_test.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,19 +273,24 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
273273

274274
metrics := []string{
275275
"cortex_distributor_received_samples_total",
276+
"cortex_distributor_received_exemplars_total",
276277
"cortex_distributor_received_metadata_total",
277278
"cortex_distributor_deduped_samples_total",
278279
"cortex_distributor_samples_in_total",
280+
"cortex_distributor_exemplars_in_total",
279281
"cortex_distributor_metadata_in_total",
280282
"cortex_distributor_non_ha_samples_received_total",
281283
"cortex_distributor_latest_seen_sample_timestamp_seconds",
282284
}
283285

284286
d.receivedSamples.WithLabelValues("userA").Add(5)
285287
d.receivedSamples.WithLabelValues("userB").Add(10)
288+
d.receivedExemplars.WithLabelValues("userA").Add(5)
289+
d.receivedExemplars.WithLabelValues("userB").Add(10)
286290
d.receivedMetadata.WithLabelValues("userA").Add(5)
287291
d.receivedMetadata.WithLabelValues("userB").Add(10)
288292
d.incomingSamples.WithLabelValues("userA").Add(5)
293+
d.incomingExemplars.WithLabelValues("userA").Add(5)
289294
d.incomingMetadata.WithLabelValues("userA").Add(5)
290295
d.nonHASamples.WithLabelValues("userA").Add(5)
291296
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
@@ -318,10 +323,19 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
318323
cortex_distributor_received_samples_total{user="userA"} 5
319324
cortex_distributor_received_samples_total{user="userB"} 10
320325
326+
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
327+
# TYPE cortex_distributor_received_exemplars_total counter
328+
cortex_distributor_received_exemplars_total{user="userA"} 5
329+
cortex_distributor_received_exemplars_total{user="userB"} 10
330+
321331
# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
322332
# TYPE cortex_distributor_samples_in_total counter
323333
cortex_distributor_samples_in_total{user="userA"} 5
324-
`), metrics...))
334+
335+
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
336+
# TYPE cortex_distributor_exemplars_in_total counter
337+
cortex_distributor_exemplars_in_total{user="userA"} 5
338+
`), metrics...))
325339

326340
d.cleanupInactiveUser("userA")
327341

@@ -346,9 +360,16 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
346360
# TYPE cortex_distributor_received_samples_total counter
347361
cortex_distributor_received_samples_total{user="userB"} 10
348362
363+
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
364+
# TYPE cortex_distributor_received_exemplars_total counter
365+
cortex_distributor_received_exemplars_total{user="userB"} 10
366+
349367
# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
350368
# TYPE cortex_distributor_samples_in_total counter
351-
`), metrics...))
369+
370+
# HELP cortex_distributor_exemplars_in_total The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.
371+
# TYPE cortex_distributor_exemplars_in_total counter
372+
`), metrics...))
352373
}
353374

354375
func TestDistributor_PushIngestionRateLimiter(t *testing.T) {

pkg/util/validation/validate.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ const (
4545
labelsNotSorted = "labels_not_sorted"
4646
labelValueTooLong = "label_value_too_long"
4747

48+
// Exemplar-specific validation reasons
49+
exemplarLabelsMissing = "exemplar_labels_missing"
50+
exemplarLabelsTooLong = "exemplar_labels_too_long"
51+
exemplarTimestampInvalid = "exemplar_timestamp_invalid"
52+
4853
// RateLimited is one of the values for the reason to discard samples.
4954
// Declared here to avoid duplication in ingester and distributor.
5055
RateLimited = "rate_limited"
@@ -66,6 +71,15 @@ var DiscardedSamples = prometheus.NewCounterVec(
6671
[]string{discardReasonLabel, "user"},
6772
)
6873

74+
// DiscardedExemplars is a metric of the number of discarded exemplars, by reason.
75+
var DiscardedExemplars = prometheus.NewCounterVec(
76+
prometheus.CounterOpts{
77+
Name: "cortex_discarded_exemplars_total",
78+
Help: "The total number of exemplars that were discarded.",
79+
},
80+
[]string{discardReasonLabel, "user"},
81+
)
82+
6983
// DiscardedMetadata is a metric of the number of discarded metadata, by reason.
7084
var DiscardedMetadata = prometheus.NewCounterVec(
7185
prometheus.CounterOpts{
@@ -77,6 +91,7 @@ var DiscardedMetadata = prometheus.NewCounterVec(
7791

7892
func init() {
7993
prometheus.MustRegister(DiscardedSamples)
94+
prometheus.MustRegister(DiscardedExemplars)
8095
prometheus.MustRegister(DiscardedMetadata)
8196
}
8297

@@ -107,11 +122,13 @@ func ValidateSample(cfg SampleValidationConfig, userID string, ls []cortexpb.Lab
107122

108123
func ValidateExemplar(userID string, ls []cortexpb.LabelAdapter, e cortexpb.Exemplar) ValidationError {
109124
if len(e.Labels) <= 0 {
125+
DiscardedExemplars.WithLabelValues(exemplarLabelsMissing, userID).Inc()
110126
return fmt.Errorf(`exemplar missing labels: series: %s`,
111127
cortexpb.FromLabelAdaptersToLabels(ls).String())
112128
}
113129

114130
if e.TimestampMs == 0 {
131+
DiscardedExemplars.WithLabelValues(exemplarTimestampInvalid, userID).Inc()
115132
return fmt.Errorf(`exemplar missing timestamp: series: %s labels: %s`,
116133
cortexpb.FromLabelAdaptersToLabels(ls).String(),
117134
cortexpb.FromLabelAdaptersToLabels(e.Labels).String())
@@ -124,6 +141,7 @@ func ValidateExemplar(userID string, ls []cortexpb.LabelAdapter, e cortexpb.Exem
124141
}
125142

126143
if labelSetLen > ExemplarMaxLabelSetLength {
144+
DiscardedExemplars.WithLabelValues(exemplarLabelsTooLong, userID).Inc()
127145
return fmt.Errorf(`exemplar combined labelset too long: series: %s labels: %s`,
128146
cortexpb.FromLabelAdaptersToLabels(ls).String(),
129147
cortexpb.FromLabelAdaptersToLabels(e.Labels).String())
@@ -235,6 +253,9 @@ func DeletePerUserValidationMetrics(userID string, log log.Logger) {
235253
if err := util.DeleteMatchingLabels(DiscardedSamples, filter); err != nil {
236254
level.Warn(log).Log("msg", "failed to remove cortex_discarded_samples_total metric for user", "user", userID, "err", err)
237255
}
256+
if err := util.DeleteMatchingLabels(DiscardedExemplars, filter); err != nil {
257+
level.Warn(log).Log("msg", "failed to remove cortex_discarded_exemplars_total metric for user", "user", userID, "err", err)
258+
}
238259
if err := util.DeleteMatchingLabels(DiscardedMetadata, filter); err != nil {
239260
level.Warn(log).Log("msg", "failed to remove cortex_discarded_metadata_total metric for user", "user", userID, "err", err)
240261
}

pkg/util/validation/validate_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,51 @@ func TestValidateLabels(t *testing.T) {
148148
`), "cortex_discarded_samples_total"))
149149
}
150150

151+
func TestValidateExemplars(t *testing.T) {
152+
userID := "testUser"
153+
154+
invalidExemplars := []cortexpb.Exemplar{
155+
{
156+
// Missing labels
157+
Labels: nil,
158+
},
159+
{
160+
// Invalid timestamp
161+
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
162+
},
163+
{
164+
// Combined lableset too long
165+
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: strings.Repeat("0", 126)}},
166+
TimestampMs: 1000,
167+
},
168+
}
169+
170+
for _, ie := range invalidExemplars {
171+
err := ValidateExemplar(userID, []cortexpb.LabelAdapter{}, ie)
172+
assert.NotNil(t, err)
173+
}
174+
175+
DiscardedExemplars.WithLabelValues("random reason", "different user").Inc()
176+
177+
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(`
178+
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
179+
# TYPE cortex_discarded_exemplars_total counter
180+
cortex_discarded_exemplars_total{reason="exemplar_labels_missing",user="testUser"} 1
181+
cortex_discarded_exemplars_total{reason="exemplar_labels_too_long",user="testUser"} 1
182+
cortex_discarded_exemplars_total{reason="exemplar_timestamp_invalid",user="testUser"} 1
183+
184+
cortex_discarded_exemplars_total{reason="random reason",user="different user"} 1
185+
`), "cortex_discarded_exemplars_total"))
186+
187+
// Delete test user and verify only different remaining
188+
DeletePerUserValidationMetrics(userID, util_log.Logger)
189+
require.NoError(t, testutil.GatherAndCompare(prometheus.DefaultGatherer, strings.NewReader(`
190+
# HELP cortex_discarded_exemplars_total The total number of exemplars that were discarded.
191+
# TYPE cortex_discarded_exemplars_total counter
192+
cortex_discarded_exemplars_total{reason="random reason",user="different user"} 1
193+
`), "cortex_discarded_exemplars_total"))
194+
}
195+
151196
func TestValidateMetadata(t *testing.T) {
152197
userID := "testUser"
153198
var cfg validateMetadataCfg

0 commit comments

Comments
 (0)