Skip to content

Commit fd29ea4

Browse files
authored
Optimized metric name extraction in distributor (#4001)
Signed-off-by: Marco Pracucci <[email protected]>
1 parent 73744e5 commit fd29ea4

File tree

4 files changed

+62
-15
lines changed

4 files changed

+62
-15
lines changed

pkg/distributor/distributor.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,11 +334,11 @@ func (d *Distributor) tokenForLabels(userID string, labels []cortexpb.LabelAdapt
334334
return shardByAllLabels(userID, labels), nil
335335
}
336336

337-
metricName, err := extract.MetricNameFromLabelAdapters(labels)
337+
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(labels)
338338
if err != nil {
339339
return 0, err
340340
}
341-
return shardByMetricName(userID, metricName), nil
341+
return shardByMetricName(userID, unsafeMetricName), nil
342342
}
343343

344344
func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32 {
@@ -349,6 +349,8 @@ func (d *Distributor) tokenForMetadata(userID string, metricName string) uint32
349349
return shardByUser(userID)
350350
}
351351

352+
// shardByMetricName returns the token for the given metric. The provided metricName
353+
// is guaranteed to not be retained.
352354
func shardByMetricName(userID string, metricName string) uint32 {
353355
h := shardByUser(userID)
354356
h = ingester_client.HashAdd32(h, metricName)
@@ -410,16 +412,16 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
410412
// Validates a single series from a write request. Will remove labels if
411413
// any are configured to be dropped for the user ID.
412414
// Returns the validated series with it's labels/samples, and any error.
415+
// The returned error may retain the series labels.
413416
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) {
414417
d.labelsHistogram.Observe(float64(len(ts.Labels)))
415418
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
416419
return emptyPreallocSeries, err
417420
}
418421

419-
metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
420422
samples := make([]cortexpb.Sample, 0, len(ts.Samples))
421423
for _, s := range ts.Samples {
422-
if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil {
424+
if err := validation.ValidateSample(d.limits, userID, ts.Labels, s); err != nil {
423425
return emptyPreallocSeries, err
424426
}
425427
samples = append(samples, s)
@@ -549,6 +551,8 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
549551
// Errors in validation are considered non-fatal, as one series in a request may contain
550552
// invalid data but all the remaining series could be perfectly valid.
551553
if validationErr != nil && firstPartialErr == nil {
554+
// The series labels may be retained by validationErr but that's not a problem for this
555+
// use case because we format it calling Error() and then we discard it.
552556
firstPartialErr = httpgrpc.Errorf(http.StatusBadRequest, validationErr.Error())
553557
}
554558

pkg/distributor/distributor_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -969,7 +969,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
969969
}
970970
}
971971

972-
func BenchmarkDistributor_PushOnError(b *testing.B) {
972+
func BenchmarkDistributor_Push(b *testing.B) {
973973
const (
974974
numSeriesPerRequest = 1000
975975
)
@@ -979,6 +979,29 @@ func BenchmarkDistributor_PushOnError(b *testing.B) {
979979
prepareSeries func() ([]labels.Labels, []cortexpb.Sample)
980980
expectedErr string
981981
}{
982+
"all samples successfully pushed": {
983+
prepareConfig: func(limits *validation.Limits) {},
984+
prepareSeries: func() ([]labels.Labels, []cortexpb.Sample) {
985+
metrics := make([]labels.Labels, numSeriesPerRequest)
986+
samples := make([]cortexpb.Sample, numSeriesPerRequest)
987+
988+
for i := 0; i < numSeriesPerRequest; i++ {
989+
lbls := labels.NewBuilder(labels.Labels{{Name: model.MetricNameLabel, Value: "foo"}})
990+
for i := 0; i < 10; i++ {
991+
lbls.Set(fmt.Sprintf("name_%d", i), fmt.Sprintf("value_%d", i))
992+
}
993+
994+
metrics[i] = lbls.Labels()
995+
samples[i] = cortexpb.Sample{
996+
Value: float64(i),
997+
TimestampMs: time.Now().UnixNano() / int64(time.Millisecond),
998+
}
999+
}
1000+
1001+
return metrics, samples
1002+
},
1003+
expectedErr: "",
1004+
},
9821005
"ingestion rate limit reached": {
9831006
prepareConfig: func(limits *validation.Limits) {
9841007
limits.IngestionRate = 1
@@ -1202,7 +1225,11 @@ func BenchmarkDistributor_PushOnError(b *testing.B) {
12021225

12031226
for n := 0; n < b.N; n++ {
12041227
_, err := distributor.Push(ctx, cortexpb.ToWriteRequest(metrics, samples, nil, cortexpb.API))
1205-
if err == nil || !strings.Contains(err.Error(), testData.expectedErr) {
1228+
1229+
if testData.expectedErr == "" && err != nil {
1230+
b.Fatalf("no error expected but got %v", err)
1231+
}
1232+
if testData.expectedErr != "" && (err == nil || !strings.Contains(err.Error(), testData.expectedErr)) {
12061233
b.Fatalf("expected %v error but got %v", testData.expectedErr, err)
12071234
}
12081235
}

pkg/util/extract/extract.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,24 @@ var (
1414
)
1515

1616
// MetricNameFromLabelAdapters extracts the metric name from a list of LabelPairs.
17+
// The returned metric name string is a copy of the label value.
1718
func MetricNameFromLabelAdapters(labels []cortexpb.LabelAdapter) (string, error) {
19+
unsafeMetricName, err := UnsafeMetricNameFromLabelAdapters(labels)
20+
if err != nil {
21+
return "", err
22+
}
23+
24+
// Force a string copy since LabelAdapter is often a pointer into
25+
// a large gRPC buffer which we don't want to keep alive on the heap.
26+
return string([]byte(unsafeMetricName)), nil
27+
}
28+
29+
// UnsafeMetricNameFromLabelAdapters extracts the metric name from a list of LabelPairs.
30+
// The returned metric name string is a reference to the label value (no copy).
31+
func UnsafeMetricNameFromLabelAdapters(labels []cortexpb.LabelAdapter) (string, error) {
1832
for _, label := range labels {
1933
if label.Name == model.MetricNameLabel {
20-
// Force a string copy since LabelAdapter is often a pointer into
21-
// a large gRPC buffer which we don't want to keep alive on the heap.
22-
return string([]byte(label.Value)), nil
34+
return label.Value, nil
2335
}
2436
}
2537
return "", errNoMetricNameLabel

pkg/util/validation/validate.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,18 @@ type SampleValidationConfig interface {
8383
}
8484

8585
// ValidateSample returns an err if the sample is invalid.
86-
func ValidateSample(cfg SampleValidationConfig, userID string, metricName string, s cortexpb.Sample) ValidationError {
86+
// The returned error may retain the provided series labels.
87+
func ValidateSample(cfg SampleValidationConfig, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError {
88+
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)
89+
8790
if cfg.RejectOldSamples(userID) && model.Time(s.TimestampMs) < model.Now().Add(-cfg.RejectOldSamplesMaxAge(userID)) {
8891
DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc()
89-
return newSampleTimestampTooOldError(metricName, s.TimestampMs)
92+
return newSampleTimestampTooOldError(unsafeMetricName, s.TimestampMs)
9093
}
9194

9295
if model.Time(s.TimestampMs) > model.Now().Add(cfg.CreationGracePeriod(userID)) {
9396
DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc()
94-
return newSampleTimestampTooNewError(metricName, s.TimestampMs)
97+
return newSampleTimestampTooNewError(unsafeMetricName, s.TimestampMs)
9598
}
9699

97100
return nil
@@ -106,17 +109,18 @@ type LabelValidationConfig interface {
106109
}
107110

108111
// ValidateLabels returns an err if the labels are invalid.
112+
// The returned error may retain the provided series labels.
109113
func ValidateLabels(cfg LabelValidationConfig, userID string, ls []cortexpb.LabelAdapter, skipLabelNameValidation bool) ValidationError {
110114
if cfg.EnforceMetricName(userID) {
111-
metricName, err := extract.MetricNameFromLabelAdapters(ls)
115+
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls)
112116
if err != nil {
113117
DiscardedSamples.WithLabelValues(missingMetricName, userID).Inc()
114118
return newNoMetricNameError()
115119
}
116120

117-
if !model.IsValidMetricName(model.LabelValue(metricName)) {
121+
if !model.IsValidMetricName(model.LabelValue(unsafeMetricName)) {
118122
DiscardedSamples.WithLabelValues(invalidMetricName, userID).Inc()
119-
return newInvalidMetricNameError(metricName)
123+
return newInvalidMetricNameError(unsafeMetricName)
120124
}
121125
}
122126

0 commit comments

Comments
 (0)