Skip to content

Commit 60f24cf

Browse files
authored
Create ValidateMetrics (#5988)
* Create ValidateMetrics The validation package is exporting Prometheus counters that are using the default Prometheus registry. This is problematic when unit tests are running tests in parallel because the distributors and ingesters will use the exported counters in their tests. Signed-off-by: Charlie Le <[email protected]> * Address linting errors Signed-off-by: Charlie Le <[email protected]> * Ignore AlreadyRegisterErr when registering ValidateMetrics These metrics could be registered by the distributor and the ingester in single binary mode. Signed-off-by: Charlie Le <[email protected]> * Set registry when creating ingesters Signed-off-by: Charlie Le <[email protected]> --------- Signed-off-by: Charlie Le <[email protected]>
1 parent 7e07d24 commit 60f24cf

File tree

7 files changed

+168
-166
lines changed

7 files changed

+168
-166
lines changed

pkg/distributor/distributor.go

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ type Distributor struct {
119119
ingesterQueryFailures *prometheus.CounterVec
120120
replicationFactor prometheus.Gauge
121121
latestSeenSampleTimestampPerUser *prometheus.GaugeVec
122+
123+
validateMetrics *validation.ValidateMetrics
122124
}
123125

124126
// Config contains the configuration required to
@@ -345,6 +347,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
345347
Name: "cortex_distributor_latest_seen_sample_timestamp_seconds",
346348
Help: "Unix timestamp of latest received sample per user.",
347349
}, []string{"user"}),
350+
351+
validateMetrics: validation.NewValidateMetrics(reg),
348352
}
349353

350354
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
@@ -437,7 +441,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
437441
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
438442
}
439443

440-
validation.DeletePerUserValidationMetrics(userID, d.log)
444+
validation.DeletePerUserValidationMetrics(d.validateMetrics, userID, d.log)
441445
}
442446

443447
// Called after distributor is asked to stop via StopAsync.
@@ -534,7 +538,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
534538
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool, limits *validation.Limits) (cortexpb.PreallocTimeseries, validation.ValidationError) {
535539
d.labelsHistogram.Observe(float64(len(ts.Labels)))
536540

537-
if err := validation.ValidateLabels(limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
541+
if err := validation.ValidateLabels(d.validateMetrics, limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
538542
return emptyPreallocSeries, err
539543
}
540544

@@ -543,7 +547,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
543547
// Only alloc when data present
544548
samples = make([]cortexpb.Sample, 0, len(ts.Samples))
545549
for _, s := range ts.Samples {
546-
if err := validation.ValidateSample(limits, userID, ts.Labels, s); err != nil {
550+
if err := validation.ValidateSample(d.validateMetrics, limits, userID, ts.Labels, s); err != nil {
547551
return emptyPreallocSeries, err
548552
}
549553
samples = append(samples, s)
@@ -555,7 +559,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
555559
// Only alloc when data present
556560
exemplars = make([]cortexpb.Exemplar, 0, len(ts.Exemplars))
557561
for _, e := range ts.Exemplars {
558-
if err := validation.ValidateExemplar(userID, ts.Labels, e); err != nil {
562+
if err := validation.ValidateExemplar(d.validateMetrics, userID, ts.Labels, e); err != nil {
559563
// An exemplar validation error prevents ingesting samples
560564
// in the same series object. However, because the current Prometheus
561565
// remote write implementation only populates one or the other,
@@ -643,7 +647,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
643647
}
644648

645649
if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
646-
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
650+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
647651
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
648652
}
649653

@@ -678,9 +682,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
678682
// Ensure the request slice is reused if the request is rate limited.
679683
cortexpb.ReuseSlice(req.Timeseries)
680684

681-
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
682-
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
683-
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
685+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
686+
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
687+
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
684688
// Return a 429 here to tell the client it is going too fast.
685689
// Client may discard the data or slow down and re-send.
686690
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
@@ -790,7 +794,7 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
790794
metadataKeys := make([]uint32, 0, len(req.Metadata))
791795

792796
for _, m := range req.Metadata {
793-
err := validation.ValidateMetadata(limits, userID, m)
797+
err := validation.ValidateMetadata(d.validateMetrics, limits, userID, m)
794798

795799
if err != nil {
796800
if firstPartialErr == nil {
@@ -841,7 +845,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
841845
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
842846
if len(l) == 0 {
843847
// all labels are gone, samples will be discarded
844-
validation.DiscardedSamples.WithLabelValues(
848+
d.validateMetrics.DiscardedSamples.WithLabelValues(
845849
validation.DroppedByRelabelConfiguration,
846850
userID,
847851
).Add(float64(len(ts.Samples)))
@@ -862,7 +866,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
862866
}
863867

864868
if len(ts.Labels) == 0 {
865-
validation.DiscardedExemplars.WithLabelValues(
869+
d.validateMetrics.DiscardedExemplars.WithLabelValues(
866870
validation.DroppedByUserConfigurationOverride,
867871
userID,
868872
).Add(float64(len(ts.Samples)))

pkg/distributor/distributor_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3589,9 +3589,6 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
35893589
limits: &limits,
35903590
})
35913591

3592-
regs[0].MustRegister(validation.DiscardedSamples)
3593-
validation.DiscardedSamples.Reset()
3594-
35953592
// Push the series to the distributor
35963593
req := mockWriteRequest(inputSeries, 1, 1)
35973594
ctx := user.InjectOrgID(context.Background(), "userDistributorPushRelabelDropWillExportMetricOfDroppedSamples")
@@ -3615,7 +3612,6 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
36153612
# TYPE cortex_distributor_received_samples_total counter
36163613
cortex_distributor_received_samples_total{user="userDistributorPushRelabelDropWillExportMetricOfDroppedSamples"} 1
36173614
`
3618-
36193615
require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...))
36203616
}
36213617

pkg/ingester/ingester.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,10 @@ type Ingester struct {
194194

195195
cfg Config
196196

197-
metrics *ingesterMetrics
198-
logger log.Logger
197+
metrics *ingesterMetrics
198+
validateMetrics *validation.ValidateMetrics
199+
200+
logger log.Logger
199201

200202
lifecycler *ring.Lifecycler
201203
limits *validation.Overrides
@@ -659,6 +661,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
659661
i.ingestionRate,
660662
&i.inflightPushRequests,
661663
&i.maxInflightQueryRequests)
664+
i.validateMetrics = validation.NewValidateMetrics(registerer)
662665

663666
// Replace specific metrics which we can't directly track but we need to read
664667
// them from the underlying system (ie. TSDB).
@@ -1235,29 +1238,29 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12351238
i.metrics.ingestedExemplarsFail.Add(float64(failedExemplarsCount))
12361239

12371240
if sampleOutOfBoundsCount > 0 {
1238-
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
1241+
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Add(float64(sampleOutOfBoundsCount))
12391242
}
12401243
if sampleOutOfOrderCount > 0 {
1241-
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
1244+
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
12421245
}
12431246
if sampleTooOldCount > 0 {
1244-
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
1247+
i.validateMetrics.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
12451248
}
12461249
if newValueForTimestampCount > 0 {
1247-
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
1250+
i.validateMetrics.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
12481251
}
12491252
if perUserSeriesLimitCount > 0 {
1250-
validation.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
1253+
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
12511254
}
12521255
if perMetricSeriesLimitCount > 0 {
1253-
validation.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
1256+
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
12541257
}
12551258
if perLabelSetSeriesLimitCount > 0 {
1256-
validation.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
1259+
i.validateMetrics.DiscardedSamples.WithLabelValues(perLabelsetSeriesLimit, userID).Add(float64(perLabelSetSeriesLimitCount))
12571260
}
12581261

12591262
if nativeHistogramCount > 0 {
1260-
validation.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
1263+
i.validateMetrics.DiscardedSamples.WithLabelValues(nativeHistogramSample, userID).Add(float64(nativeHistogramCount))
12611264
}
12621265

12631266
// Distributor counts both samples and metadata, so for consistency ingester does the same.
@@ -2529,7 +2532,7 @@ func (i *Ingester) closeAndDeleteUserTSDBIfIdle(userID string) tsdbCloseCheckRes
25292532
i.deleteUserMetadata(userID)
25302533
i.metrics.deletePerUserMetrics(userID)
25312534

2532-
validation.DeletePerUserValidationMetrics(userID, i.logger)
2535+
validation.DeletePerUserValidationMetrics(i.validateMetrics, userID, i.logger)
25332536

25342537
// And delete local data.
25352538
if err := os.RemoveAll(dir); err != nil {
@@ -2603,7 +2606,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata {
26032606
// Ensure it was not created between switching locks.
26042607
userMetadata, ok := i.usersMetadata[userID]
26052608
if !ok {
2606-
userMetadata = newMetadataMap(i.limiter, i.metrics, userID)
2609+
userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID)
26072610
i.usersMetadata[userID] = userMetadata
26082611
}
26092612
return userMetadata

0 commit comments

Comments
 (0)