Skip to content

Cache user overrides in distributor.Push to reduce lock contention. #4906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
* [ENHANCEMENT] Distributor: Add label name to labelValueTooLongError. #4855
* [ENHANCEMENT] Enhance traces with hostname information. #4898
* [ENHANCEMENT] Improve the documentation around limits. #4905
* [ENHANCEMENT] Distributor: cache user overrides to reduce lock contention. #4904
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
* [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818
Expand Down
36 changes: 20 additions & 16 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,15 +477,15 @@ func removeLabel(labelName string, labels *[]cortexpb.LabelAdapter) {
// Returns a boolean that indicates whether or not we want to remove the replica label going forward,
// and an error that indicates whether we want to accept samples based on the cluster/replica found in ts.
// nil for the error means accept the sample.
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (removeReplicaLabel bool, _ error) {
func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string, limits *validation.Limits) (removeReplicaLabel bool, _ error) {
// If the sample doesn't have either HA label, accept it.
// At the moment we want to accept these samples by default.
if cluster == "" || replica == "" {
return false, nil
}

// If replica label is too long, don't use it. We accept the sample here, but it will fail validation later anyway.
if len(replica) > d.limits.MaxLabelValueLength(userID) {
if len(replica) > limits.MaxLabelValueLength {
return false, nil
}

Expand All @@ -503,9 +503,10 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
// The returned error may retain the series labels.
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) {
func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool, limits *validation.Limits) (cortexpb.PreallocTimeseries, validation.ValidationError) {
d.labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil {

if err := validation.ValidateLabels(limits, userID, ts.Labels, skipLabelNameValidation); err != nil {
return emptyPreallocSeries, err
}

Expand All @@ -514,7 +515,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
// Only alloc when data present
samples = make([]cortexpb.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := validation.ValidateSample(d.limits, userID, ts.Labels, s); err != nil {
if err := validation.ValidateSample(limits, userID, ts.Labels, s); err != nil {
return emptyPreallocSeries, err
}
samples = append(samples, s)
Expand Down Expand Up @@ -598,9 +599,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
validatedSamples := 0
validatedExemplars := 0

if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica)
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
limits := d.limits.GetOverridesForUser(userID)

if limits.AcceptHASamples && len(req.Timeseries) > 0 {
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels)
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
if err != nil {
// Ensure the request slice is reused if the series get deduped.
cortexpb.ReuseSlice(req.Timeseries)
Expand Down Expand Up @@ -634,13 +638,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

// For each timeseries, compute a hash to distribute across ingesters;
// check each sample and discard if outside limits.

skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
for _, ts := range req.Timeseries {
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
if len(ts.Samples) > 0 {
latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)
}

if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 {
l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
if len(l) == 0 {
// all labels are gone, samples will be discarded
Expand All @@ -657,10 +663,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
if removeReplica {
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
removeLabel(limits.HAReplicaLabel, &ts.Labels)
}

for _, labelName := range d.limits.DropLabels(userID) {
for _, labelName := range limits.DropLabels {
removeLabel(labelName, &ts.Labels)
}

Expand All @@ -686,9 +692,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
if err != nil {
return nil, err
}

skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation)
validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits)

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
Expand All @@ -710,7 +714,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

for _, m := range req.Metadata {
err := validation.ValidateMetadata(d.limits, userID, m)
err := validation.ValidateMetadata(limits, userID, m)

if err != nil {
if firstPartialErr == nil {
Expand Down Expand Up @@ -756,7 +760,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

// Obtain a subring if required.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize)
}

keys := append(seriesKeys, metadataKeys...)
Expand Down
Loading