diff --git a/CHANGELOG.md b/CHANGELOG.md index 1565d242ec4..2b4000653f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ * [ENHANCEMENT] Distributor: Add label name to labelValueTooLongError. #4855 * [ENHANCEMENT] Enhance traces with hostname information. #4898 * [ENHANCEMENT] Improve the documentation around limits. #4905 +* [ENHANCEMENT] Distributor: cache user overrides to reduce lock contention. #4904 * [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787 * [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index f7577498b4d..45320683a35 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -477,7 +477,7 @@ func removeLabel(labelName string, labels *[]cortexpb.LabelAdapter) { // Returns a boolean that indicates whether or not we want to remove the replica label going forward, // and an error that indicates whether we want to accept samples based on the cluster/replica found in ts. // nil for the error means accept the sample. -func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string) (removeReplicaLabel bool, _ error) { +func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica string, limits *validation.Limits) (removeReplicaLabel bool, _ error) { // If the sample doesn't have either HA label, accept it. // At the moment we want to accept these samples by default. if cluster == "" || replica == "" { @@ -485,7 +485,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica } // If replica label is too long, don't use it. We accept the sample here, but it will fail validation later anyway. - if len(replica) > d.limits.MaxLabelValueLength(userID) { + if len(replica) > limits.MaxLabelValueLength { return false, nil } @@ -503,9 +503,10 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica // any are configured to be dropped for the user ID. // Returns the validated series with it's labels/samples, and any error. // The returned error may retain the series labels. -func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool) (cortexpb.PreallocTimeseries, validation.ValidationError) { +func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID string, skipLabelNameValidation bool, limits *validation.Limits) (cortexpb.PreallocTimeseries, validation.ValidationError) { d.labelsHistogram.Observe(float64(len(ts.Labels))) - if err := validation.ValidateLabels(d.limits, userID, ts.Labels, skipLabelNameValidation); err != nil { + + if err := validation.ValidateLabels(limits, userID, ts.Labels, skipLabelNameValidation); err != nil { return emptyPreallocSeries, err } @@ -514,7 +515,7 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri // Only alloc when data present samples = make([]cortexpb.Sample, 0, len(ts.Samples)) for _, s := range ts.Samples { - if err := validation.ValidateSample(d.limits, userID, ts.Labels, s); err != nil { + if err := validation.ValidateSample(limits, userID, ts.Labels, s); err != nil { return emptyPreallocSeries, err } samples = append(samples, s) @@ -598,9 +599,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co validatedSamples := 0 validatedExemplars := 0 - if d.limits.AcceptHASamples(userID) && len(req.Timeseries) > 0 { - cluster, replica := findHALabels(d.limits.HAReplicaLabel(userID), d.limits.HAClusterLabel(userID), req.Timeseries[0].Labels) - removeReplica, err = d.checkSample(ctx, userID, cluster, replica) + // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 + limits := d.limits.GetOverridesForUser(userID) + + if limits.AcceptHASamples && len(req.Timeseries) > 0 { + cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels) + removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits) if err != nil { // Ensure the request slice is reused if the series get deduped. cortexpb.ReuseSlice(req.Timeseries) @@ -634,13 +638,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // For each timeseries, compute a hash to distribute across ingesters; // check each sample and discard if outside limits. + + skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation() for _, ts := range req.Timeseries { // Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong. if len(ts.Samples) > 0 { latestSampleTimestampMs = util_math.Max64(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs) } - if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { + if mrc := limits.MetricRelabelConfigs; len(mrc) > 0 { l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...) if len(l) == 0 { // all labels are gone, samples will be discarded @@ -657,10 +663,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // storing series in Cortex. If we kept the replica label we would end up with another series for the same // series we're trying to dedupe when HA tracking moves over to a different replica. if removeReplica { - removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) + removeLabel(limits.HAReplicaLabel, &ts.Labels) } - for _, labelName := range d.limits.DropLabels(userID) { + for _, labelName := range limits.DropLabels { removeLabel(labelName, &ts.Labels) } @@ -686,9 +692,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if err != nil { return nil, err } - - skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation() - validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation) + validatedSeries, validationErr := d.validateSeries(ts, userID, skipLabelNameValidation, limits) // Errors in validation are considered non-fatal, as one series in a request may contain // invalid data but all the remaining series could be perfectly valid. @@ -710,7 +714,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } for _, m := range req.Metadata { - err := validation.ValidateMetadata(d.limits, userID, m) + err := validation.ValidateMetadata(limits, userID, m) if err != nil { if firstPartialErr == nil { @@ -756,7 +760,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Obtain a subring if required. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID)) + subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) } keys := append(seriesKeys, metadataKeys...) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 3d27c5058dd..24869b15506 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -282,7 +282,7 @@ func NewOverrides(defaults Limits, tenantLimits TenantLimits) (*Overrides, error // IngestionRate returns the limit on ingester rate (samples per second). func (o *Overrides) IngestionRate(userID string) float64 { - return o.getOverridesForUser(userID).IngestionRate + return o.GetOverridesForUser(userID).IngestionRate } // IngestionRateStrategy returns whether the ingestion rate limit should be individually applied @@ -294,263 +294,263 @@ func (o *Overrides) IngestionRateStrategy() string { // IngestionBurstSize returns the burst size for ingestion rate. func (o *Overrides) IngestionBurstSize(userID string) int { - return o.getOverridesForUser(userID).IngestionBurstSize + return o.GetOverridesForUser(userID).IngestionBurstSize } // AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user. func (o *Overrides) AcceptHASamples(userID string) bool { - return o.getOverridesForUser(userID).AcceptHASamples + return o.GetOverridesForUser(userID).AcceptHASamples } // HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica. func (o *Overrides) HAClusterLabel(userID string) string { - return o.getOverridesForUser(userID).HAClusterLabel + return o.GetOverridesForUser(userID).HAClusterLabel } // HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica. func (o *Overrides) HAReplicaLabel(userID string) string { - return o.getOverridesForUser(userID).HAReplicaLabel + return o.GetOverridesForUser(userID).HAReplicaLabel } // DropLabels returns the list of labels to be dropped when ingesting HA samples for the user. func (o *Overrides) DropLabels(userID string) flagext.StringSlice { - return o.getOverridesForUser(userID).DropLabels + return o.GetOverridesForUser(userID).DropLabels } // MaxLabelNameLength returns maximum length a label name can be. func (o *Overrides) MaxLabelNameLength(userID string) int { - return o.getOverridesForUser(userID).MaxLabelNameLength + return o.GetOverridesForUser(userID).MaxLabelNameLength } // MaxLabelValueLength returns maximum length a label value can be. This also is // the maximum length of a metric name. func (o *Overrides) MaxLabelValueLength(userID string) int { - return o.getOverridesForUser(userID).MaxLabelValueLength + return o.GetOverridesForUser(userID).MaxLabelValueLength } // MaxLabelNamesPerSeries returns maximum number of label/value pairs timeseries. func (o *Overrides) MaxLabelNamesPerSeries(userID string) int { - return o.getOverridesForUser(userID).MaxLabelNamesPerSeries + return o.GetOverridesForUser(userID).MaxLabelNamesPerSeries } // MaxLabelsSizeBytes returns maximum number of label/value pairs timeseries. func (o *Overrides) MaxLabelsSizeBytes(userID string) int { - return o.getOverridesForUser(userID).MaxLabelsSizeBytes + return o.GetOverridesForUser(userID).MaxLabelsSizeBytes } // MaxMetadataLength returns maximum length metadata can be. Metadata refers // to the Metric Name, HELP and UNIT. func (o *Overrides) MaxMetadataLength(userID string) int { - return o.getOverridesForUser(userID).MaxMetadataLength + return o.GetOverridesForUser(userID).MaxMetadataLength } // RejectOldSamples returns true when we should reject samples older than certain // age. func (o *Overrides) RejectOldSamples(userID string) bool { - return o.getOverridesForUser(userID).RejectOldSamples + return o.GetOverridesForUser(userID).RejectOldSamples } // RejectOldSamplesMaxAge returns the age at which samples should be rejected. func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).RejectOldSamplesMaxAge) + return time.Duration(o.GetOverridesForUser(userID).RejectOldSamplesMaxAge) } // CreationGracePeriod is misnamed, and actually returns how far into the future // we should accept samples. func (o *Overrides) CreationGracePeriod(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).CreationGracePeriod) + return time.Duration(o.GetOverridesForUser(userID).CreationGracePeriod) } // MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit. func (o *Overrides) MaxSeriesPerQuery(userID string) int { - return o.getOverridesForUser(userID).MaxSeriesPerQuery + return o.GetOverridesForUser(userID).MaxSeriesPerQuery } // MaxLocalSeriesPerUser returns the maximum number of series a user is allowed to store in a single ingester. func (o *Overrides) MaxLocalSeriesPerUser(userID string) int { - return o.getOverridesForUser(userID).MaxLocalSeriesPerUser + return o.GetOverridesForUser(userID).MaxLocalSeriesPerUser } // MaxLocalSeriesPerMetric returns the maximum number of series allowed per metric in a single ingester. func (o *Overrides) MaxLocalSeriesPerMetric(userID string) int { - return o.getOverridesForUser(userID).MaxLocalSeriesPerMetric + return o.GetOverridesForUser(userID).MaxLocalSeriesPerMetric } // MaxGlobalSeriesPerUser returns the maximum number of series a user is allowed to store across the cluster. func (o *Overrides) MaxGlobalSeriesPerUser(userID string) int { - return o.getOverridesForUser(userID).MaxGlobalSeriesPerUser + return o.GetOverridesForUser(userID).MaxGlobalSeriesPerUser } // MaxGlobalSeriesPerMetric returns the maximum number of series allowed per metric across the cluster. func (o *Overrides) MaxGlobalSeriesPerMetric(userID string) int { - return o.getOverridesForUser(userID).MaxGlobalSeriesPerMetric + return o.GetOverridesForUser(userID).MaxGlobalSeriesPerMetric } // MaxChunksPerQueryFromStore returns the maximum number of chunks allowed per query when fetching // chunks from the long-term storage. func (o *Overrides) MaxChunksPerQueryFromStore(userID string) int { - return o.getOverridesForUser(userID).MaxChunksPerQuery + return o.GetOverridesForUser(userID).MaxChunksPerQuery } func (o *Overrides) MaxChunksPerQuery(userID string) int { - return o.getOverridesForUser(userID).MaxChunksPerQuery + return o.GetOverridesForUser(userID).MaxChunksPerQuery } // MaxFetchedSeriesPerQuery returns the maximum number of series allowed per query when fetching // chunks from ingesters and blocks storage. func (o *Overrides) MaxFetchedSeriesPerQuery(userID string) int { - return o.getOverridesForUser(userID).MaxFetchedSeriesPerQuery + return o.GetOverridesForUser(userID).MaxFetchedSeriesPerQuery } // MaxFetchedChunkBytesPerQuery returns the maximum number of bytes for chunks allowed per query when fetching // chunks from ingesters and blocks storage. func (o *Overrides) MaxFetchedChunkBytesPerQuery(userID string) int { - return o.getOverridesForUser(userID).MaxFetchedChunkBytesPerQuery + return o.GetOverridesForUser(userID).MaxFetchedChunkBytesPerQuery } // MaxFetchedDataBytesPerQuery returns the maximum number of bytes for all data allowed per query when fetching // from ingesters and blocks storage. func (o *Overrides) MaxFetchedDataBytesPerQuery(userID string) int { - return o.getOverridesForUser(userID).MaxFetchedDataBytesPerQuery + return o.GetOverridesForUser(userID).MaxFetchedDataBytesPerQuery } // MaxQueryLookback returns the max lookback period of queries. func (o *Overrides) MaxQueryLookback(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).MaxQueryLookback) + return time.Duration(o.GetOverridesForUser(userID).MaxQueryLookback) } // MaxQueryLength returns the limit of the length (in time) of a query. func (o *Overrides) MaxQueryLength(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).MaxQueryLength) + return time.Duration(o.GetOverridesForUser(userID).MaxQueryLength) } // MaxCacheFreshness returns the period after which results are cacheable, // to prevent caching of very recent results. func (o *Overrides) MaxCacheFreshness(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).MaxCacheFreshness) + return time.Duration(o.GetOverridesForUser(userID).MaxCacheFreshness) } // MaxQueriersPerUser returns the maximum number of queriers that can handle requests for this user. func (o *Overrides) MaxQueriersPerUser(userID string) int { - return o.getOverridesForUser(userID).MaxQueriersPerTenant + return o.GetOverridesForUser(userID).MaxQueriersPerTenant } // QueryVerticalShardSize returns the number of shards to use when distributing shardable PromQL queries. func (o *Overrides) QueryVerticalShardSize(userID string) int { - return o.getOverridesForUser(userID).QueryVerticalShardSize + return o.GetOverridesForUser(userID).QueryVerticalShardSize } // MaxQueryParallelism returns the limit to the number of split queries the // frontend will process in parallel. func (o *Overrides) MaxQueryParallelism(userID string) int { - return o.getOverridesForUser(userID).MaxQueryParallelism + return o.GetOverridesForUser(userID).MaxQueryParallelism } // EnforceMetricName whether to enforce the presence of a metric name. func (o *Overrides) EnforceMetricName(userID string) bool { - return o.getOverridesForUser(userID).EnforceMetricName + return o.GetOverridesForUser(userID).EnforceMetricName } // EnforceMetadataMetricName whether to enforce the presence of a metric name on metadata. func (o *Overrides) EnforceMetadataMetricName(userID string) bool { - return o.getOverridesForUser(userID).EnforceMetadataMetricName + return o.GetOverridesForUser(userID).EnforceMetadataMetricName } // MaxLocalMetricsWithMetadataPerUser returns the maximum number of metrics with metadata a user is allowed to store in a single ingester. func (o *Overrides) MaxLocalMetricsWithMetadataPerUser(userID string) int { - return o.getOverridesForUser(userID).MaxLocalMetricsWithMetadataPerUser + return o.GetOverridesForUser(userID).MaxLocalMetricsWithMetadataPerUser } // MaxLocalMetadataPerMetric returns the maximum number of metadata allowed per metric in a single ingester. func (o *Overrides) MaxLocalMetadataPerMetric(userID string) int { - return o.getOverridesForUser(userID).MaxLocalMetadataPerMetric + return o.GetOverridesForUser(userID).MaxLocalMetadataPerMetric } // MaxGlobalMetricsWithMetadataPerUser returns the maximum number of metrics with metadata a user is allowed to store across the cluster. func (o *Overrides) MaxGlobalMetricsWithMetadataPerUser(userID string) int { - return o.getOverridesForUser(userID).MaxGlobalMetricsWithMetadataPerUser + return o.GetOverridesForUser(userID).MaxGlobalMetricsWithMetadataPerUser } // MaxGlobalMetadataPerMetric returns the maximum number of metadata allowed per metric across the cluster. func (o *Overrides) MaxGlobalMetadataPerMetric(userID string) int { - return o.getOverridesForUser(userID).MaxGlobalMetadataPerMetric + return o.GetOverridesForUser(userID).MaxGlobalMetadataPerMetric } // IngestionTenantShardSize returns the ingesters shard size for a given user. func (o *Overrides) IngestionTenantShardSize(userID string) int { - return o.getOverridesForUser(userID).IngestionTenantShardSize + return o.GetOverridesForUser(userID).IngestionTenantShardSize } // EvaluationDelay returns the rules evaluation delay for a given user. func (o *Overrides) EvaluationDelay(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).RulerEvaluationDelay) + return time.Duration(o.GetOverridesForUser(userID).RulerEvaluationDelay) } // CompactorBlocksRetentionPeriod returns the retention period for a given user. func (o *Overrides) CompactorBlocksRetentionPeriod(userID string) time.Duration { - return time.Duration(o.getOverridesForUser(userID).CompactorBlocksRetentionPeriod) + return time.Duration(o.GetOverridesForUser(userID).CompactorBlocksRetentionPeriod) } // CompactorTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. func (o *Overrides) CompactorTenantShardSize(userID string) int { - return o.getOverridesForUser(userID).CompactorTenantShardSize + return o.GetOverridesForUser(userID).CompactorTenantShardSize } // MetricRelabelConfigs returns the metric relabel configs for a given user. func (o *Overrides) MetricRelabelConfigs(userID string) []*relabel.Config { - return o.getOverridesForUser(userID).MetricRelabelConfigs + return o.GetOverridesForUser(userID).MetricRelabelConfigs } // RulerTenantShardSize returns shard size (number of rulers) used by this tenant when using shuffle-sharding strategy. func (o *Overrides) RulerTenantShardSize(userID string) int { - return o.getOverridesForUser(userID).RulerTenantShardSize + return o.GetOverridesForUser(userID).RulerTenantShardSize } // RulerMaxRulesPerRuleGroup returns the maximum number of rules per rule group for a given user. func (o *Overrides) RulerMaxRulesPerRuleGroup(userID string) int { - return o.getOverridesForUser(userID).RulerMaxRulesPerRuleGroup + return o.GetOverridesForUser(userID).RulerMaxRulesPerRuleGroup } // RulerMaxRuleGroupsPerTenant returns the maximum number of rule groups for a given user. func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int { - return o.getOverridesForUser(userID).RulerMaxRuleGroupsPerTenant + return o.GetOverridesForUser(userID).RulerMaxRuleGroupsPerTenant } // StoreGatewayTenantShardSize returns the store-gateway shard size for a given user. func (o *Overrides) StoreGatewayTenantShardSize(userID string) int { - return o.getOverridesForUser(userID).StoreGatewayTenantShardSize + return o.GetOverridesForUser(userID).StoreGatewayTenantShardSize } // MaxHAClusters returns maximum number of clusters that HA tracker will track for a user. func (o *Overrides) MaxHAClusters(user string) int { - return o.getOverridesForUser(user).HAMaxClusters + return o.GetOverridesForUser(user).HAMaxClusters } // S3SSEType returns the per-tenant S3 SSE type. func (o *Overrides) S3SSEType(user string) string { - return o.getOverridesForUser(user).S3SSEType + return o.GetOverridesForUser(user).S3SSEType } // S3SSEKMSKeyID returns the per-tenant S3 KMS-SSE key id. func (o *Overrides) S3SSEKMSKeyID(user string) string { - return o.getOverridesForUser(user).S3SSEKMSKeyID + return o.GetOverridesForUser(user).S3SSEKMSKeyID } // S3SSEKMSEncryptionContext returns the per-tenant S3 KMS-SSE encryption context. func (o *Overrides) S3SSEKMSEncryptionContext(user string) string { - return o.getOverridesForUser(user).S3SSEKMSEncryptionContext + return o.GetOverridesForUser(user).S3SSEKMSEncryptionContext } // AlertmanagerReceiversBlockCIDRNetworks returns the list of network CIDRs that should be blocked // in the Alertmanager receivers for the given user. func (o *Overrides) AlertmanagerReceiversBlockCIDRNetworks(user string) []flagext.CIDR { - return o.getOverridesForUser(user).AlertmanagerReceiversBlockCIDRNetworks + return o.GetOverridesForUser(user).AlertmanagerReceiversBlockCIDRNetworks } // AlertmanagerReceiversBlockPrivateAddresses returns true if private addresses should be blocked // in the Alertmanager receivers for the given user. func (o *Overrides) AlertmanagerReceiversBlockPrivateAddresses(user string) bool { - return o.getOverridesForUser(user).AlertmanagerReceiversBlockPrivateAddresses + return o.GetOverridesForUser(user).AlertmanagerReceiversBlockPrivateAddresses } // Notification limits are special. Limits are returned in following order: @@ -559,7 +559,7 @@ func (o *Overrides) AlertmanagerReceiversBlockPrivateAddresses(user string) bool // 3. per-tenant limits // 4. default limits func (o *Overrides) getNotificationLimitForUser(user, integration string) float64 { - u := o.getOverridesForUser(user) + u := o.GetOverridesForUser(user) if n, ok := u.NotificationRateLimitPerIntegration[integration]; ok { return n } @@ -602,30 +602,31 @@ func (o *Overrides) NotificationBurstSize(user string, integration string) int { } func (o *Overrides) AlertmanagerMaxConfigSize(userID string) int { - return o.getOverridesForUser(userID).AlertmanagerMaxConfigSizeBytes + return o.GetOverridesForUser(userID).AlertmanagerMaxConfigSizeBytes } func (o *Overrides) AlertmanagerMaxTemplatesCount(userID string) int { - return o.getOverridesForUser(userID).AlertmanagerMaxTemplatesCount + return o.GetOverridesForUser(userID).AlertmanagerMaxTemplatesCount } func (o *Overrides) AlertmanagerMaxTemplateSize(userID string) int { - return o.getOverridesForUser(userID).AlertmanagerMaxTemplateSizeBytes + return o.GetOverridesForUser(userID).AlertmanagerMaxTemplateSizeBytes } func (o *Overrides) AlertmanagerMaxDispatcherAggregationGroups(userID string) int { - return o.getOverridesForUser(userID).AlertmanagerMaxDispatcherAggregationGroups + return o.GetOverridesForUser(userID).AlertmanagerMaxDispatcherAggregationGroups } func (o *Overrides) AlertmanagerMaxAlertsCount(userID string) int { - return o.getOverridesForUser(userID).AlertmanagerMaxAlertsCount + return o.GetOverridesForUser(userID).AlertmanagerMaxAlertsCount } func (o *Overrides) AlertmanagerMaxAlertsSizeBytes(userID string) int { - return o.getOverridesForUser(userID).AlertmanagerMaxAlertsSizeBytes + return o.GetOverridesForUser(userID).AlertmanagerMaxAlertsSizeBytes } -func (o *Overrides) getOverridesForUser(userID string) *Limits { +// GetOverridesForUser returns the per-tenant limits with overrides. +func (o *Overrides) GetOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits.ByUserID(userID) if l != nil { diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 9f4b5e306df..0985e7db75c 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -101,24 +101,17 @@ func init() { prometheus.MustRegister(DiscardedMetadata) } -// SampleValidationConfig helps with getting required config to validate sample. -type SampleValidationConfig interface { - RejectOldSamples(userID string) bool - RejectOldSamplesMaxAge(userID string) time.Duration - CreationGracePeriod(userID string) time.Duration -} - // ValidateSample returns an err if the sample is invalid. // The returned error may retain the provided series labels. -func ValidateSample(cfg SampleValidationConfig, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError { +func ValidateSample(limits *Limits, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError { unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) - if cfg.RejectOldSamples(userID) && model.Time(s.TimestampMs) < model.Now().Add(-cfg.RejectOldSamplesMaxAge(userID)) { + if limits.RejectOldSamples && model.Time(s.TimestampMs) < model.Now().Add(-time.Duration(limits.RejectOldSamplesMaxAge)) { DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc() return newSampleTimestampTooOldError(unsafeMetricName, s.TimestampMs) } - if model.Time(s.TimestampMs) > model.Now().Add(cfg.CreationGracePeriod(userID)) { + if model.Time(s.TimestampMs) > model.Now().Add(time.Duration(limits.CreationGracePeriod)) { DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc() return newSampleTimestampTooNewError(unsafeMetricName, s.TimestampMs) } @@ -163,19 +156,10 @@ func ValidateExemplar(userID string, ls []cortexpb.LabelAdapter, e cortexpb.Exem return nil } -// LabelValidationConfig helps with getting required config to validate labels. -type LabelValidationConfig interface { - EnforceMetricName(userID string) bool - MaxLabelNamesPerSeries(userID string) int - MaxLabelNameLength(userID string) int - MaxLabelValueLength(userID string) int - MaxLabelsSizeBytes(userID string) int -} - // ValidateLabels returns an err if the labels are invalid. // The returned error may retain the provided series labels. -func ValidateLabels(cfg LabelValidationConfig, userID string, ls []cortexpb.LabelAdapter, skipLabelNameValidation bool) ValidationError { - if cfg.EnforceMetricName(userID) { +func ValidateLabels(limits *Limits, userID string, ls []cortexpb.LabelAdapter, skipLabelNameValidation bool) ValidationError { + if limits.EnforceMetricName { unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls) if err != nil { DiscardedSamples.WithLabelValues(missingMetricName, userID).Inc() @@ -189,15 +173,15 @@ func ValidateLabels(cfg LabelValidationConfig, userID string, ls []cortexpb.Labe } numLabelNames := len(ls) - if numLabelNames > cfg.MaxLabelNamesPerSeries(userID) { + if numLabelNames > limits.MaxLabelNamesPerSeries { DiscardedSamples.WithLabelValues(maxLabelNamesPerSeries, userID).Inc() - return newTooManyLabelsError(ls, cfg.MaxLabelNamesPerSeries(userID)) + return newTooManyLabelsError(ls, limits.MaxLabelNamesPerSeries) } - maxLabelNameLength := cfg.MaxLabelNameLength(userID) - maxLabelValueLength := cfg.MaxLabelValueLength(userID) + maxLabelNameLength := limits.MaxLabelNameLength + maxLabelValueLength := limits.MaxLabelValueLength lastLabelName := "" - maxLabelsSizeBytes := cfg.MaxLabelsSizeBytes(userID) + maxLabelsSizeBytes := limits.MaxLabelsSizeBytes labelsSizeBytes := 0 for _, l := range ls { @@ -230,20 +214,14 @@ func ValidateLabels(cfg LabelValidationConfig, userID string, ls []cortexpb.Labe return nil } -// MetadataValidationConfig helps with getting required config to validate metadata. -type MetadataValidationConfig interface { - EnforceMetadataMetricName(userID string) bool - MaxMetadataLength(userID string) int -} - // ValidateMetadata returns an err if a metric metadata is invalid. -func ValidateMetadata(cfg MetadataValidationConfig, userID string, metadata *cortexpb.MetricMetadata) error { - if cfg.EnforceMetadataMetricName(userID) && metadata.GetMetricFamilyName() == "" { +func ValidateMetadata(cfg *Limits, userID string, metadata *cortexpb.MetricMetadata) error { + if cfg.EnforceMetadataMetricName && metadata.GetMetricFamilyName() == "" { DiscardedMetadata.WithLabelValues(missingMetricName, userID).Inc() return httpgrpc.Errorf(http.StatusBadRequest, errMetadataMissingMetricName) } - maxMetadataValueLength := cfg.MaxMetadataLength(userID) + maxMetadataValueLength := cfg.MaxMetadataLength var reason string var cause string var metadataType string diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 16532166067..71dddab5baa 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -16,56 +16,15 @@ import ( util_log "github.com/cortexproject/cortex/pkg/util/log" ) -type validateLabelsCfg struct { - enforceMetricName bool - maxLabelNamesPerSeries int - maxLabelNameLength int - maxLabelValueLength int - maxLabelsSizeBytes int -} - -func (v validateLabelsCfg) EnforceMetricName(userID string) bool { - return v.enforceMetricName -} - -func (v validateLabelsCfg) MaxLabelNamesPerSeries(userID string) int { - return v.maxLabelNamesPerSeries -} - -func (v validateLabelsCfg) MaxLabelNameLength(userID string) int { - return v.maxLabelNameLength -} - -func (v validateLabelsCfg) MaxLabelValueLength(userID string) int { - return v.maxLabelValueLength -} - -func (v validateLabelsCfg) MaxLabelsSizeBytes(userID string) int { - return v.maxLabelsSizeBytes -} - -type validateMetadataCfg struct { - enforceMetadataMetricName bool - maxMetadataLength int -} - -func (vm validateMetadataCfg) EnforceMetadataMetricName(userID string) bool { - return vm.enforceMetadataMetricName -} - -func (vm validateMetadataCfg) MaxMetadataLength(userID string) int { - return vm.maxMetadataLength -} - func TestValidateLabels(t *testing.T) { - var cfg validateLabelsCfg + cfg := new(Limits) userID := "testUser" - cfg.maxLabelValueLength = 25 - cfg.maxLabelNameLength = 25 - cfg.maxLabelNamesPerSeries = 2 - cfg.maxLabelsSizeBytes = 90 - cfg.enforceMetricName = true + cfg.MaxLabelValueLength = 25 + cfg.MaxLabelNameLength = 25 + cfg.MaxLabelNamesPerSeries = 2 + cfg.MaxLabelsSizeBytes = 90 + cfg.EnforceMetricName = true for _, c := range []struct { metric model.Metric @@ -101,7 +60,7 @@ func TestValidateLabels(t *testing.T) { newLabelNameTooLongError([]cortexpb.LabelAdapter{ {Name: model.MetricNameLabel, Value: "badLabelName"}, {Name: "this_is_a_really_really_long_name_that_should_cause_an_error", Value: "test_value_please_ignore"}, - }, "this_is_a_really_really_long_name_that_should_cause_an_error", cfg.maxLabelNameLength), + }, "this_is_a_really_really_long_name_that_should_cause_an_error", cfg.MaxLabelNameLength), }, { map[model.LabelName]model.LabelValue{model.MetricNameLabel: "badLabelValue", "much_shorter_name": "test_value_please_ignore_no_really_nothing_to_see_here"}, @@ -109,7 +68,7 @@ func TestValidateLabels(t *testing.T) { newLabelValueTooLongError([]cortexpb.LabelAdapter{ {Name: model.MetricNameLabel, Value: "badLabelValue"}, {Name: "much_shorter_name", Value: "test_value_please_ignore_no_really_nothing_to_see_here"}, - }, "much_shorter_name", "test_value_please_ignore_no_really_nothing_to_see_here", cfg.maxLabelValueLength), + }, "much_shorter_name", "test_value_please_ignore_no_really_nothing_to_see_here", cfg.MaxLabelValueLength), }, { map[model.LabelName]model.LabelValue{model.MetricNameLabel: "foo", "bar": "baz", "blip": "blop"}, @@ -126,7 +85,7 @@ func TestValidateLabels(t *testing.T) { labelSizeBytesExceededError([]cortexpb.LabelAdapter{ {Name: model.MetricNameLabel, Value: "exactly_twenty_five_chars"}, {Name: "exactly_twenty_five_chars", Value: "exactly_twenty_five_chars"}, - }, 91, cfg.maxLabelsSizeBytes), + }, 91, cfg.MaxLabelsSizeBytes), }, { map[model.LabelName]model.LabelValue{model.MetricNameLabel: "foo", "invalid%label&name": "bar"}, @@ -209,10 +168,11 @@ func TestValidateExemplars(t *testing.T) { } func TestValidateMetadata(t *testing.T) { + cfg := new(Limits) + cfg.EnforceMetadataMetricName = true + cfg.MaxMetadataLength = 22 + userID := "testUser" - var cfg validateMetadataCfg - cfg.enforceMetadataMetricName = true - cfg.maxMetadataLength = 22 for _, c := range []struct { desc string @@ -274,10 +234,10 @@ func TestValidateMetadata(t *testing.T) { } func TestValidateLabelOrder(t *testing.T) { - var cfg validateLabelsCfg - cfg.maxLabelNameLength = 10 - cfg.maxLabelNamesPerSeries = 10 - cfg.maxLabelValueLength = 10 + cfg := new(Limits) + cfg.MaxLabelNameLength = 10 + cfg.MaxLabelNamesPerSeries = 10 + cfg.MaxLabelValueLength = 10 userID := "testUser" @@ -295,10 +255,10 @@ func TestValidateLabelOrder(t *testing.T) { } func TestValidateLabelDuplication(t *testing.T) { - var cfg validateLabelsCfg - cfg.maxLabelNameLength = 10 - cfg.maxLabelNamesPerSeries = 10 - cfg.maxLabelValueLength = 10 + cfg := new(Limits) + cfg.MaxLabelNameLength = 10 + cfg.MaxLabelNamesPerSeries = 10 + cfg.MaxLabelValueLength = 10 userID := "testUser"