@@ -69,6 +69,9 @@ const (
69
69
// mergeSlicesParallelism is a constant of how much go routines we should use to merge slices, and
70
70
// it was based on empirical observation: See BenchmarkMergeSlicesParallel
71
71
mergeSlicesParallelism = 8
72
+
73
+ sampleMetricTypeFloat = "float"
74
+ sampleMetricTypeHistogram = "histogram"
72
75
)
73
76
74
77
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -276,7 +279,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
276
279
Namespace : "cortex" ,
277
280
Name : "distributor_received_samples_total" ,
278
281
Help : "The total number of received samples, excluding rejected and deduped samples." ,
279
- }, []string {"user" }),
282
+ }, []string {"user" , "type" }),
280
283
receivedExemplars : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
281
284
Namespace : "cortex" ,
282
285
Name : "distributor_received_exemplars_total" ,
@@ -291,7 +294,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
291
294
Namespace : "cortex" ,
292
295
Name : "distributor_samples_in_total" ,
293
296
Help : "The total number of samples that have come in to the distributor, including rejected or deduped samples." ,
294
- }, []string {"user" }),
297
+ }, []string {"user" , "type" }),
295
298
incomingExemplars : promauto .With (reg ).NewCounterVec (prometheus.CounterOpts {
296
299
Namespace : "cortex" ,
297
300
Name : "distributor_exemplars_in_total" ,
@@ -428,10 +431,12 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
428
431
429
432
d .HATracker .CleanupHATrackerMetricsForUser (userID )
430
433
431
- d .receivedSamples .DeleteLabelValues (userID )
434
+ d .receivedSamples .DeleteLabelValues (userID , sampleMetricTypeFloat )
435
+ d .receivedSamples .DeleteLabelValues (userID , sampleMetricTypeHistogram )
432
436
d .receivedExemplars .DeleteLabelValues (userID )
433
437
d .receivedMetadata .DeleteLabelValues (userID )
434
- d .incomingSamples .DeleteLabelValues (userID )
438
+ d .incomingSamples .DeleteLabelValues (userID , sampleMetricTypeFloat )
439
+ d .incomingSamples .DeleteLabelValues (userID , sampleMetricTypeHistogram )
435
440
d .incomingExemplars .DeleteLabelValues (userID )
436
441
d .incomingMetadata .DeleteLabelValues (userID )
437
442
d .nonHASamples .DeleteLabelValues (userID )
@@ -547,7 +552,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
547
552
// Only alloc when data present
548
553
samples = make ([]cortexpb.Sample , 0 , len (ts .Samples ))
549
554
for _ , s := range ts .Samples {
550
- if err := validation .ValidateSample (d .validateMetrics , limits , userID , ts .Labels , s ); err != nil {
555
+ if err := validation .ValidateSampleTimestamp (d .validateMetrics , limits , userID , ts .Labels , s . TimestampMs ); err != nil {
551
556
return emptyPreallocSeries , err
552
557
}
553
558
samples = append (samples , s )
@@ -574,8 +579,13 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
574
579
if len (ts .Histograms ) > 0 {
575
580
// Only alloc when data present
576
581
histograms = make ([]cortexpb.Histogram , 0 , len (ts .Histograms ))
577
- // TODO(yeya24): we need to have validations for native histograms
578
- // at some point. Skip validations for now.
582
+ for _ , h := range ts .Histograms {
583
+ // TODO(yeya24): add other validations for native histogram.
584
+ // For example, Prometheus scrape has bucket limit and schema check.
585
+ if err := validation .ValidateSampleTimestamp (d .validateMetrics , limits , userID , ts .Labels , h .TimestampMs ); err != nil {
586
+ return emptyPreallocSeries , err
587
+ }
588
+ }
579
589
histograms = append (histograms , ts .Histograms ... )
580
590
}
581
591
@@ -607,14 +617,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
607
617
now := time .Now ()
608
618
d .activeUsers .UpdateUserTimestamp (userID , now )
609
619
610
- numSamples := 0
620
+ numFloatSamples := 0
621
+ numHistogramSamples := 0
611
622
numExemplars := 0
612
623
for _ , ts := range req .Timeseries {
613
- numSamples += len (ts .Samples ) + len (ts .Histograms )
624
+ numFloatSamples += len (ts .Samples )
625
+ numHistogramSamples += len (ts .Histograms )
614
626
numExemplars += len (ts .Exemplars )
615
627
}
616
628
// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
617
- d .incomingSamples .WithLabelValues (userID ).Add (float64 (numSamples ))
629
+ d .incomingSamples .WithLabelValues (userID , sampleMetricTypeFloat ).Add (float64 (numFloatSamples ))
630
+ d .incomingSamples .WithLabelValues (userID , sampleMetricTypeHistogram ).Add (float64 (numFloatSamples ))
618
631
d .incomingExemplars .WithLabelValues (userID ).Add (float64 (numExemplars ))
619
632
// Count the total number of metadata in.
620
633
d .incomingMetadata .WithLabelValues (userID ).Add (float64 (len (req .Metadata )))
@@ -642,31 +655,32 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
642
655
643
656
if errors .Is (err , ha.ReplicasNotMatchError {}) {
644
657
// These samples have been deduped.
645
- d .dedupedSamples .WithLabelValues (userID , cluster ).Add (float64 (numSamples ))
658
+ d .dedupedSamples .WithLabelValues (userID , cluster ).Add (float64 (numFloatSamples + numHistogramSamples ))
646
659
return nil , httpgrpc .Errorf (http .StatusAccepted , err .Error ())
647
660
}
648
661
649
662
if errors .Is (err , ha.TooManyReplicaGroupsError {}) {
650
- d .validateMetrics .DiscardedSamples .WithLabelValues (validation .TooManyHAClusters , userID ).Add (float64 (numSamples ))
663
+ d .validateMetrics .DiscardedSamples .WithLabelValues (validation .TooManyHAClusters , userID ).Add (float64 (numFloatSamples + numHistogramSamples ))
651
664
return nil , httpgrpc .Errorf (http .StatusBadRequest , err .Error ())
652
665
}
653
666
654
667
return nil , err
655
668
}
656
669
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
657
670
if ! removeReplica {
658
- d .nonHASamples .WithLabelValues (userID ).Add (float64 (numSamples ))
671
+ d .nonHASamples .WithLabelValues (userID ).Add (float64 (numFloatSamples + numHistogramSamples ))
659
672
}
660
673
}
661
674
662
675
// A WriteRequest can only contain series or metadata but not both. This might change in the future.
663
- seriesKeys , validatedTimeseries , validatedSamples , validatedExemplars , firstPartialErr , err := d .prepareSeriesKeys (ctx , req , userID , limits , removeReplica )
676
+ seriesKeys , validatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , err := d .prepareSeriesKeys (ctx , req , userID , limits , removeReplica )
664
677
if err != nil {
665
678
return nil , err
666
679
}
667
680
metadataKeys , validatedMetadata , firstPartialErr := d .prepareMetadataKeys (req , limits , userID , firstPartialErr )
668
681
669
- d .receivedSamples .WithLabelValues (userID ).Add (float64 (validatedSamples ))
682
+ d .receivedSamples .WithLabelValues (userID , sampleMetricTypeFloat ).Add (float64 (validatedFloatSamples ))
683
+ d .receivedSamples .WithLabelValues (userID , sampleMetricTypeHistogram ).Add (float64 (validatedHistogramSamples ))
670
684
d .receivedExemplars .WithLabelValues (userID ).Add (float64 (validatedExemplars ))
671
685
d .receivedMetadata .WithLabelValues (userID ).Add (float64 (len (validatedMetadata )))
672
686
@@ -677,18 +691,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
677
691
return & cortexpb.WriteResponse {}, firstPartialErr
678
692
}
679
693
680
- totalN := validatedSamples + validatedExemplars + len (validatedMetadata )
694
+ totalSamples := validatedFloatSamples + validatedHistogramSamples
695
+ totalN := totalSamples + validatedExemplars + len (validatedMetadata )
681
696
if ! d .ingestionRateLimiter .AllowN (now , userID , totalN ) {
682
697
// Ensure the request slice is reused if the request is rate limited.
683
698
cortexpb .ReuseSlice (req .Timeseries )
684
699
685
- d .validateMetrics .DiscardedSamples .WithLabelValues (validation .RateLimited , userID ).Add (float64 (validatedSamples ))
700
+ d .validateMetrics .DiscardedSamples .WithLabelValues (validation .RateLimited , userID ).Add (float64 (totalSamples ))
686
701
d .validateMetrics .DiscardedExemplars .WithLabelValues (validation .RateLimited , userID ).Add (float64 (validatedExemplars ))
687
702
d .validateMetrics .DiscardedMetadata .WithLabelValues (validation .RateLimited , userID ).Add (float64 (len (validatedMetadata )))
688
703
// Return a 429 here to tell the client it is going too fast.
689
704
// Client may discard the data or slow down and re-send.
690
705
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
691
- return nil , httpgrpc .Errorf (http .StatusTooManyRequests , "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata" , d .ingestionRateLimiter .Limit (now , userID ), validatedSamples , len (validatedMetadata ))
706
+ return nil , httpgrpc .Errorf (http .StatusTooManyRequests , "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata" , d .ingestionRateLimiter .Limit (now , userID ), totalSamples , len (validatedMetadata ))
692
707
}
693
708
694
709
// totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate.
@@ -810,15 +825,16 @@ func (d *Distributor) prepareMetadataKeys(req *cortexpb.WriteRequest, limits *va
810
825
return metadataKeys , validatedMetadata , firstPartialErr
811
826
}
812
827
813
- func (d * Distributor ) prepareSeriesKeys (ctx context.Context , req * cortexpb.WriteRequest , userID string , limits * validation.Limits , removeReplica bool ) ([]uint32 , []cortexpb.PreallocTimeseries , int , int , error , error ) {
828
+ func (d * Distributor ) prepareSeriesKeys (ctx context.Context , req * cortexpb.WriteRequest , userID string , limits * validation.Limits , removeReplica bool ) ([]uint32 , []cortexpb.PreallocTimeseries , int , int , int , error , error ) {
814
829
pSpan , _ := opentracing .StartSpanFromContext (ctx , "prepareSeriesKeys" )
815
830
defer pSpan .Finish ()
816
831
817
832
// For each timeseries or samples, we compute a hash to distribute across ingesters;
818
833
// check each sample/metadata and discard if outside limits.
819
834
validatedTimeseries := make ([]cortexpb.PreallocTimeseries , 0 , len (req .Timeseries ))
820
835
seriesKeys := make ([]uint32 , 0 , len (req .Timeseries ))
821
- validatedSamples := 0
836
+ validatedFloatSamples := 0
837
+ validatedHistogramSamples := 0
822
838
validatedExemplars := 0
823
839
824
840
var firstPartialErr error
@@ -839,7 +855,9 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
839
855
if len (ts .Samples ) > 0 {
840
856
latestSampleTimestampMs = max (latestSampleTimestampMs , ts .Samples [len (ts .Samples )- 1 ].TimestampMs )
841
857
}
842
- // TODO(yeya24): use timestamp of the latest native histogram in the series as well.
858
+ if len (ts .Histograms ) > 0 {
859
+ latestSampleTimestampMs = max (latestSampleTimestampMs , ts .Histograms [len (ts .Histograms )- 1 ].TimestampMs )
860
+ }
843
861
844
862
if mrc := limits .MetricRelabelConfigs ; len (mrc ) > 0 {
845
863
l , _ := relabel .Process (cortexpb .FromLabelAdaptersToLabels (ts .Labels ), mrc ... )
@@ -885,7 +903,7 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
885
903
// label and dropped labels (if any)
886
904
key , err := d .tokenForLabels (userID , ts .Labels )
887
905
if err != nil {
888
- return nil , nil , 0 , 0 , nil , err
906
+ return nil , nil , 0 , 0 , 0 , nil , err
889
907
}
890
908
validatedSeries , validationErr := d .validateSeries (ts , userID , skipLabelNameValidation , limits )
891
909
@@ -904,11 +922,11 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
904
922
905
923
seriesKeys = append (seriesKeys , key )
906
924
validatedTimeseries = append (validatedTimeseries , validatedSeries )
907
- // TODO(yeya24): add histogram samples as well when supported.
908
- validatedSamples += len (ts .Samples )
925
+ validatedFloatSamples += len ( ts . Samples )
926
+ validatedHistogramSamples += len (ts .Histograms )
909
927
validatedExemplars += len (ts .Exemplars )
910
928
}
911
- return seriesKeys , validatedTimeseries , validatedSamples , validatedExemplars , firstPartialErr , nil
929
+ return seriesKeys , validatedTimeseries , validatedFloatSamples , validatedHistogramSamples , validatedExemplars , firstPartialErr , nil
912
930
}
913
931
914
932
func sortLabelsIfNeeded (labels []cortexpb.LabelAdapter ) {
0 commit comments