diff --git a/docs/arguments.md b/docs/arguments.md index 9c876f6fd9..ba1153aa68 100644 --- a/docs/arguments.md +++ b/docs/arguments.md @@ -108,6 +108,9 @@ The ingester query API was improved over time, but defaults to the old behaviour - `distributor.ha-tracker.enable` Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels). Global (for distributors), this ensures that the necessary internal data structures for the HA handling are created. The option `enable-for-all-users` is still needed to enable ingestion of HA samples for all users. +- `distributor.drop-label` + This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels. + ### Ring/HA Tracker Store The KVStore client is used by both the Ring and HA Tracker. @@ -144,7 +147,7 @@ prefix these flags with `distributor.ha-tracker.` ### HA Tracker -HA tracking has two of it's own flags: +HA tracking has two of its own flags: - `distributor.ha-tracker.cluster` Prometheus label to look for in samples to identify a Prometheus HA cluster. (default "cluster") - `distributor.ha-tracker.replica` diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4dc292186e..51ab89d356 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -96,6 +96,7 @@ var ( Name: "distributor_replication_factor", Help: "The configured replication factor.", }) + emptyPreallocSeries = ingester_client.PreallocTimeseries{} ) // Distributor is a storage.SampleAppender and a client.Querier which @@ -249,11 +250,11 @@ func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, erro return h, nil } -// Remove the replica label from a slice of LabelPairs if it exists. -func removeReplicaLabel(replica string, labels *[]client.LabelAdapter) { +// Remove the label labelname from a slice of LabelPairs if it exists. +func removeLabel(labelName string, labels *[]client.LabelAdapter) { for i := 0; i < len(*labels); i++ { pair := (*labels)[i] - if pair.Name == replica { + if pair.Name == labelName { *labels = append((*labels)[:i], (*labels)[i+1:]...) return } @@ -280,6 +281,52 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica return true, nil } +// Validates a single series from a write request. Will remove labels if +// any are configured to be dropped for the user ID. +// Returns the validated series with it's labels/samples, and any error. +func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTimeseries, userID string, removeReplica bool) (client.PreallocTimeseries, error) { + // If we found both the cluster and replica labels, we only want to include the cluster label when + // storing series in Cortex. If we kept the replica label we would end up with another series for the same + // series we're trying to dedupe when HA tracking moves over to a different replica. + if removeReplica { + removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) + } + + for _, labelName := range d.limits.DropLabels(userID) { + removeLabel(labelName, &ts.Labels) + } + if len(ts.Labels) == 0 { + return emptyPreallocSeries, nil + } + + key, err := d.tokenForLabels(userID, ts.Labels) + if err != nil { + return emptyPreallocSeries, err + } + + labelsHistogram.Observe(float64(len(ts.Labels))) + if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil { + return emptyPreallocSeries, err + } + + metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels) + samples := make([]client.Sample, 0, len(ts.Samples)) + for _, s := range ts.Samples { + if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil { + return emptyPreallocSeries, err + } + samples = append(samples, s) + } + + return client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: ts.Labels, + Samples: samples, + }, + }, + nil +} + // Push implements client.IngesterServer func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) { userID, err := user.ExtractOrgID(ctx) @@ -307,7 +354,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } return nil, err } - // If there wasn't an error but removeReplica is false that means we didn't find both HA labels + // If there wasn't an error but removeReplica is false that means we didn't find both HA labels. if !removeReplica { nonHASamples.WithLabelValues(userID).Add(float64(numSamples)) } @@ -319,20 +366,20 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie keys := make([]uint32, 0, len(req.Timeseries)) validatedSamples := 0 for _, ts := range req.Timeseries { - // If we found both the cluster and replica labels, we only want to include the cluster label when - // storing series in Cortex. If we kept the replica label we would end up with another series for the same - // series we're trying to dedupe when HA tracking moves over to a different replica. - if removeReplica { - removeReplicaLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) - } key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { return nil, err } - labelsHistogram.Observe(float64(len(ts.Labels))) - if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil { + validatedSeries, err := d.validateSeries(key, ts, userID, removeReplica) + // Errors in validation are considered non-fatal, as one series in a request may contain + // invalid data but all the remaining series could be perfectly valid. + if err != nil { lastPartialErr = err + } + + // validateSeries would have returned an emptyPreallocSeries if there were no valid samples. + if validatedSeries == emptyPreallocSeries { continue } @@ -347,13 +394,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } keys = append(keys, key) - validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{ - TimeSeries: &client.TimeSeries{ - Labels: ts.Labels, - Samples: samples, - }, - }) - + validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedSamples += len(ts.Samples) } receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d2ef007d53..dd5e8bf767 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "reflect" "sort" "strconv" "sync" @@ -299,6 +300,105 @@ func TestDistributorPushQuery(t *testing.T) { } } +func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { + ctx = user.InjectOrgID(context.Background(), "user") + + type testcase struct { + series client.PreallocTimeseries + outputSeries client.PreallocTimeseries + removeReplica bool + removeLabels []string + } + + cases := []testcase{ + { // Remove both cluster and replica label. + series: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}}, + }, + }, + outputSeries: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + }, + Samples: []client.Sample{}, + }, + }, + removeReplica: true, + removeLabels: []string{"cluster"}, + }, + { // Remove multiple labels and replica. + series: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + {Name: "foo", Value: "bar"}, + {Name: "some", Value: "thing"}}, + }, + }, + outputSeries: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + }, + Samples: []client.Sample{}, + }, + }, + removeReplica: true, + removeLabels: []string{"foo", "some"}, + }, + { // Don't remove any labels. + series: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}}, + }, + }, + outputSeries: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + Samples: []client.Sample{}, + }, + }, + removeReplica: false, + }, + } + + for _, tc := range cases { + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.DropLabels = tc.removeLabels + d := prepare(t, 1, 1, 0, true, &limits) + + userID, err := user.ExtractOrgID(ctx) + assert.NoError(t, err) + + key, err := d.tokenForLabels(userID, tc.series.Labels) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + series, err := d.validateSeries(key, tc.series, userID, tc.removeReplica) + if !reflect.DeepEqual(series, tc.outputSeries) { + t.Fatalf("output of validate series did not match expected output:\n\texpected: %+v\n\t got: %+v", tc.outputSeries, series) + } + } +} + func TestSlowQueries(t *testing.T) { nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo") nIngesters := 3 @@ -751,7 +851,7 @@ func TestRemoveReplicaLabel(t *testing.T) { } for _, c := range cases { - removeReplicaLabel(replicaLabel, &c.labelsIn) + removeLabel(replicaLabel, &c.labelsIn) assert.Equal(t, c.labelsOut, c.labelsIn) } } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a6b42571a2..139295099d 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -6,24 +6,27 @@ import ( "time" "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRate float64 `yaml:"ingestion_rate"` - IngestionBurstSize int `yaml:"ingestion_burst_size"` - AcceptHASamples bool `yaml:"accept_ha_samples"` - HAClusterLabel string `yaml:"ha_cluster_label"` - HAReplicaLabel string `yaml:"ha_replica_label"` - MaxLabelNameLength int `yaml:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` - RejectOldSamples bool `yaml:"reject_old_samples"` - RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` - CreationGracePeriod time.Duration `yaml:"creation_grace_period"` - EnforceMetricName bool `yaml:"enforce_metric_name"` + IngestionRate float64 `yaml:"ingestion_rate"` + IngestionBurstSize int `yaml:"ingestion_burst_size"` + AcceptHASamples bool `yaml:"accept_ha_samples"` + HAClusterLabel string `yaml:"ha_cluster_label"` + HAReplicaLabel string `yaml:"ha_replica_label"` + DropLabels flagext.StringSlice `yaml:"drop_labels"` + MaxLabelNameLength int `yaml:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` + RejectOldSamples bool `yaml:"reject_old_samples"` + RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` + CreationGracePeriod time.Duration `yaml:"creation_grace_period"` + EnforceMetricName bool `yaml:"enforce_metric_name"` // Ingester enforced limits. MaxSeriesPerQuery int `yaml:"max_series_per_query"` @@ -48,8 +51,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples). Warning, very high limits will be reset every -distributor.limiter-reload-period.") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") - f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") + f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") + f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -142,14 +146,19 @@ func (o *Overrides) AcceptHASamples(userID string) bool { return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples } +// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica. +func (o *Overrides) HAClusterLabel(userID string) string { + return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel +} + // HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica. func (o *Overrides) HAReplicaLabel(userID string) string { return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel } -// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica. -func (o *Overrides) HAClusterLabel(userID string) string { - return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel +// DropLabels returns the list of labels to be dropped when ingesting HA samples for the user. +func (o *Overrides) DropLabels(userID string) flagext.StringSlice { + return o.overridesManager.GetLimits(userID).(*Limits).DropLabels } // MaxLabelNameLength returns maximum length a label name can be.