Skip to content

Commit 2f5538a

Browse files
committed
support ingesting native histograms
Signed-off-by: Ben Ye <[email protected]>
1 parent e6a4e49 commit 2f5538a

24 files changed

+827
-320
lines changed

pkg/cortexpb/compat.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"github.com/cortexproject/cortex/pkg/util"
1919
)
2020

21-
// ToWriteRequest converts matched slices of Labels, Samples and Metadata into a WriteRequest proto.
21+
// ToWriteRequest converts matched slices of Labels, Samples, Metadata and Histograms into a WriteRequest proto.
2222
// It gets timeseries from the pool, so ReuseSlice() should be called when done.
2323
func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMetadata, histograms []Histogram, source WriteRequest_SourceEnum) *WriteRequest {
2424
req := &WriteRequest{
@@ -27,13 +27,17 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMe
2727
Source: source,
2828
}
2929

30-
for i, s := range samples {
30+
i := 0
31+
for i < len(samples) || i < len(histograms) {
3132
ts := TimeseriesFromPool()
3233
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
33-
ts.Samples = append(ts.Samples, s)
34+
if i < len(samples) {
35+
ts.Samples = append(ts.Samples, samples[i])
36+
}
3437
if i < len(histograms) {
3538
ts.Histograms = append(ts.Histograms, histograms[i])
3639
}
40+
i++
3741
req.Timeseries = append(req.Timeseries, PreallocTimeseries{TimeSeries: ts})
3842
}
3943

pkg/cortexpb/histograms.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2017 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package cortexpb
15+
16+
import "github.com/prometheus/prometheus/model/histogram"
17+
18+
func (h Histogram) IsFloatHistogram() bool {
19+
_, ok := h.GetCount().(*Histogram_CountFloat)
20+
return ok
21+
}
22+
23+
// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
24+
// provided proto message. The caller has to make sure that the proto message
25+
// represents an interger histogram and not a float histogram.
26+
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L626-L645
27+
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
28+
if hp.IsFloatHistogram() {
29+
panic("HistogramProtoToHistogram called with a float histogram")
30+
}
31+
return &histogram.Histogram{
32+
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
33+
Schema: hp.Schema,
34+
ZeroThreshold: hp.ZeroThreshold,
35+
ZeroCount: hp.GetZeroCountInt(),
36+
Count: hp.GetCountInt(),
37+
Sum: hp.Sum,
38+
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
39+
PositiveBuckets: hp.GetPositiveDeltas(),
40+
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
41+
NegativeBuckets: hp.GetNegativeDeltas(),
42+
}
43+
}
44+
45+
// FloatHistogramProtoToFloatHistogram extracts a float Histogram from the provided proto message.
46+
// The caller has to make sure that the proto message represents a float histogram and not an
47+
// integer histogram, or it panics.
48+
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L647-L667
49+
func FloatHistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
50+
if !hp.IsFloatHistogram() {
51+
panic("FloatHistogramProtoToFloatHistogram called with an integer histogram")
52+
}
53+
return &histogram.FloatHistogram{
54+
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
55+
Schema: hp.Schema,
56+
ZeroThreshold: hp.ZeroThreshold,
57+
ZeroCount: hp.GetZeroCountFloat(),
58+
Count: hp.GetCountFloat(),
59+
Sum: hp.Sum,
60+
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
61+
PositiveBuckets: hp.GetPositiveCounts(),
62+
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
63+
NegativeBuckets: hp.GetNegativeCounts(),
64+
}
65+
}
66+
67+
// HistogramToHistogramProto converts a (normal integer) Histogram to its protobuf message type.
68+
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L709-L723
69+
func HistogramToHistogramProto(timestamp int64, h *histogram.Histogram) Histogram {
70+
return Histogram{
71+
Count: &Histogram_CountInt{CountInt: h.Count},
72+
Sum: h.Sum,
73+
Schema: h.Schema,
74+
ZeroThreshold: h.ZeroThreshold,
75+
ZeroCount: &Histogram_ZeroCountInt{ZeroCountInt: h.ZeroCount},
76+
NegativeSpans: spansToSpansProto(h.NegativeSpans),
77+
NegativeDeltas: h.NegativeBuckets,
78+
PositiveSpans: spansToSpansProto(h.PositiveSpans),
79+
PositiveDeltas: h.PositiveBuckets,
80+
ResetHint: Histogram_ResetHint(h.CounterResetHint),
81+
TimestampMs: timestamp,
82+
}
83+
}
84+
85+
// FloatHistogramToHistogramProto converts a float Histogram to a normal
86+
// Histogram's protobuf message type.
87+
// Changed from https://github.com/prometheus/prometheus/blob/0ab95536115adfe50af249d36d73674be694ca3f/storage/remote/codec.go#L725-L739
88+
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
89+
return Histogram{
90+
Count: &Histogram_CountFloat{CountFloat: fh.Count},
91+
Sum: fh.Sum,
92+
Schema: fh.Schema,
93+
ZeroThreshold: fh.ZeroThreshold,
94+
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
95+
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
96+
NegativeCounts: fh.NegativeBuckets,
97+
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
98+
PositiveCounts: fh.PositiveBuckets,
99+
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
100+
TimestampMs: timestamp,
101+
}
102+
}
103+
104+
func spansProtoToSpans(s []BucketSpan) []histogram.Span {
105+
spans := make([]histogram.Span, len(s))
106+
for i := 0; i < len(s); i++ {
107+
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
108+
}
109+
110+
return spans
111+
}
112+
113+
func spansToSpansProto(s []histogram.Span) []BucketSpan {
114+
spans := make([]BucketSpan, len(s))
115+
for i := 0; i < len(s); i++ {
116+
spans[i] = BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
117+
}
118+
119+
return spans
120+
}

pkg/distributor/distributor.go

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ const (
6969
// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
7070
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
7171
mergeSlicesParallelism = 8
72+
73+
sampleMetricTypeFloat = "float"
74+
sampleMetricTypeHistogram = "histogram"
7275
)
7376

7477
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -274,7 +277,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
274277
Namespace: "cortex",
275278
Name: "distributor_received_samples_total",
276279
Help: "The total number of received samples, excluding rejected and deduped samples.",
277-
}, []string{"user"}),
280+
}, []string{"user", "type"}),
278281
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
279282
Namespace: "cortex",
280283
Name: "distributor_received_exemplars_total",
@@ -289,7 +292,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
289292
Namespace: "cortex",
290293
Name: "distributor_samples_in_total",
291294
Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.",
292-
}, []string{"user"}),
295+
}, []string{"user", "type"}),
293296
incomingExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
294297
Namespace: "cortex",
295298
Name: "distributor_exemplars_in_total",
@@ -424,10 +427,12 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
424427

425428
d.HATracker.CleanupHATrackerMetricsForUser(userID)
426429

427-
d.receivedSamples.DeleteLabelValues(userID)
430+
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
431+
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
428432
d.receivedExemplars.DeleteLabelValues(userID)
429433
d.receivedMetadata.DeleteLabelValues(userID)
430-
d.incomingSamples.DeleteLabelValues(userID)
434+
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
435+
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
431436
d.incomingExemplars.DeleteLabelValues(userID)
432437
d.incomingMetadata.DeleteLabelValues(userID)
433438
d.nonHASamples.DeleteLabelValues(userID)
@@ -543,7 +548,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
543548
// Only alloc when data present
544549
samples = make([]cortexpb.Sample, 0, len(ts.Samples))
545550
for _, s := range ts.Samples {
546-
if err := validation.ValidateSample(limits, userID, ts.Labels, s); err != nil {
551+
if err := validation.ValidateSampleTimestamp(limits, userID, ts.Labels, s.TimestampMs); err != nil {
547552
return emptyPreallocSeries, err
548553
}
549554
samples = append(samples, s)
@@ -570,8 +575,13 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
570575
if len(ts.Histograms) > 0 {
571576
// Only alloc when data present
572577
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
573-
// TODO(yeya24): we need to have validations for native histograms
574-
// at some point. Skip validations for now.
578+
for _, h := range ts.Histograms {
579+
// TODO(yeya24): add other validations for native histogram.
580+
// For example, Prometheus scrape has bucket limit and schema check.
581+
if err := validation.ValidateSampleTimestamp(limits, userID, ts.Labels, h.TimestampMs); err != nil {
582+
return emptyPreallocSeries, err
583+
}
584+
}
575585
histograms = append(histograms, ts.Histograms...)
576586
}
577587

@@ -603,14 +613,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
603613
now := time.Now()
604614
d.activeUsers.UpdateUserTimestamp(userID, now)
605615

606-
numSamples := 0
616+
numFloatSamples := 0
617+
numHistogramSamples := 0
607618
numExemplars := 0
608619
for _, ts := range req.Timeseries {
609-
numSamples += len(ts.Samples) + len(ts.Histograms)
620+
numFloatSamples += len(ts.Samples)
621+
numHistogramSamples += len(ts.Histograms)
610622
numExemplars += len(ts.Exemplars)
611623
}
612624
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
613-
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
625+
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples))
626+
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numFloatSamples))
614627
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
615628
// Count the total number of metadata in.
616629
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
@@ -638,31 +651,32 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
638651

639652
if errors.Is(err, ha.ReplicasNotMatchError{}) {
640653
// These samples have been deduped.
641-
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numSamples))
654+
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples))
642655
return nil, httpgrpc.Errorf(http.StatusAccepted, err.Error())
643656
}
644657

645658
if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
646-
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numSamples))
659+
validation.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples))
647660
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
648661
}
649662

650663
return nil, err
651664
}
652665
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
653666
if !removeReplica {
654-
d.nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
667+
d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples))
655668
}
656669
}
657670

658671
// A WriteRequest can only contain series or metadata but not both. This might change in the future.
659-
seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
672+
seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, err := d.prepareSeriesKeys(ctx, req, userID, limits, removeReplica)
660673
if err != nil {
661674
return nil, err
662675
}
663676
metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr)
664677

665-
d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
678+
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples))
679+
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples))
666680
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
667681
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
668682

@@ -673,18 +687,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
673687
return &cortexpb.WriteResponse{}, firstPartialErr
674688
}
675689

676-
totalN := validatedSamples + validatedExemplars + len(validatedMetadata)
690+
totalSamples := validatedFloatSamples + validatedHistogramSamples
691+
totalN := totalSamples + validatedExemplars + len(validatedMetadata)
677692
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
678693
// Ensure the request slice is reused if the request is rate limited.
679694
cortexpb.ReuseSlice(req.Timeseries)
680695

681-
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
696+
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples))
682697
validation.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars))
683698
validation.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata)))
684699
// Return a 429 here to tell the client it is going too fast.
685700
// Client may discard the data or slow down and re-send.
686701
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
687-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
702+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata))
688703
}
689704

690705
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
@@ -806,15 +821,16 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
806821
return metadataKeys, validatedMetadata, firstPartialErr
807822
}
808823

809-
func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, error, error) {
824+
func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.WriteRequest, userID string, limits *validation.Limits, removeReplica bool) ([]uint32, []cortexpb.PreallocTimeseries, int, int, int, error, error) {
810825
pSpan, _ := opentracing.StartSpanFromContext(ctx, "prepareSeriesKeys")
811826
defer pSpan.Finish()
812827

813828
// For each timeseries or samples, we compute a hash to distribute across ingesters;
814829
// check each sample/metadata and discard if outside limits.
815830
validatedTimeseries := make([]cortexpb.PreallocTimeseries, 0, len(req.Timeseries))
816831
seriesKeys := make([]uint32, 0, len(req.Timeseries))
817-
validatedSamples := 0
832+
validatedFloatSamples := 0
833+
validatedHistogramSamples := 0
818834
validatedExemplars := 0
819835

820836
var firstPartialErr error
@@ -835,7 +851,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
835851
if len(ts.Samples) > 0 {
836852
latestSampleTimestampMs = max(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
837853
}
838-
// TODO(yeya24): use timestamp of the latest native histogram in the series as well.
854+
if len(ts.Histograms) > 0 {
855+
latestSampleTimestampMs = max(latestSampleTimestampMs, ts.Histograms[len(ts.Histograms)-1].TimestampMs)
856+
}
839857

840858
if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
841859
l, _ := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
@@ -881,7 +899,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
881899
// label and dropped labels (if any)
882900
key, err := d.tokenForLabels(userID, ts.Labels)
883901
if err != nil {
884-
return nil, nil, 0, 0, nil, err
902+
return nil, nil, 0, 0, 0, nil, err
885903
}
886904
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)
887905

@@ -900,11 +918,11 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
900918

901919
seriesKeys = append(seriesKeys, key)
902920
validatedTimeseries = append(validatedTimeseries, validatedSeries)
903-
// TODO(yeya24): add histogram samples as well when supported.
904-
validatedSamples += len(ts.Samples)
921+
validatedFloatSamples += len(ts.Samples)
922+
validatedHistogramSamples += len(ts.Histograms)
905923
validatedExemplars += len(ts.Exemplars)
906924
}
907-
return seriesKeys, validatedTimeseries, validatedSamples, validatedExemplars, firstPartialErr, nil
925+
return seriesKeys, validatedTimeseries, validatedFloatSamples, validatedHistogramSamples, validatedExemplars, firstPartialErr, nil
908926
}
909927

910928
func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {

0 commit comments

Comments
 (0)