From 83318bae80552da1b31aedd9ed087638d152ad9b Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 17 Oct 2019 19:33:03 -0700 Subject: [PATCH 1/5] Rename removeReplicaLabel to more generic removeLabel. Signed-off-by: Callum Styan --- pkg/distributor/distributor.go | 8 ++++---- pkg/distributor/distributor_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4dc292186e..e8b374bbcb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -249,11 +249,11 @@ func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, erro return h, nil } -// Remove the replica label from a slice of LabelPairs if it exists. -func removeReplicaLabel(replica string, labels *[]client.LabelAdapter) { +// Remove the label labelname from a slice of LabelPairs if it exists. +func removeLabel(labelName string, labels *[]client.LabelAdapter) { for i := 0; i < len(*labels); i++ { pair := (*labels)[i] - if pair.Name == replica { + if pair.Name == labelName { *labels = append((*labels)[:i], (*labels)[i+1:]...) return } @@ -323,7 +323,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie // storing series in Cortex. If we kept the replica label we would end up with another series for the same // series we're trying to dedupe when HA tracking moves over to a different replica. if removeReplica { - removeReplicaLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) + removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) } key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index d2ef007d53..8d70197e8e 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -751,7 +751,7 @@ func TestRemoveReplicaLabel(t *testing.T) { } for _, c := range cases { - removeReplicaLabel(replicaLabel, &c.labelsIn) + removeLabel(replicaLabel, &c.labelsIn) assert.Equal(t, c.labelsOut, c.labelsIn) } } From 0c18eb20abd62338e51f60a2a2d0e180f2ca6549 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 17 Oct 2019 20:25:16 -0700 Subject: [PATCH 2/5] Refactor Push; move some logic into another function validateSeries for testability. Signed-off-by: Callum Styan --- pkg/distributor/distributor.go | 61 +++++++++++++++++------ pkg/distributor/distributor_test.go | 75 +++++++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 16 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e8b374bbcb..6d27734b6a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -96,6 +96,7 @@ var ( Name: "distributor_replication_factor", Help: "The configured replication factor.", }) + emptyPreallocSeries = ingester_client.PreallocTimeseries{} ) // Distributor is a storage.SampleAppender and a client.Querier which @@ -280,6 +281,42 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica return true, nil } + +// Validates a single series from a write request. Will remove HA labels if necessary. +// Takes a pointer for a partial error so that we can get partial errors, errors during validation +// of a single sample, back from the function without adding an additional return param. +// Returns a token for the series if it is valid, the validated series with it's labels/samples, and any fatal error. +func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTimeseries, userID string, removeReplica bool) (client.PreallocTimeseries, error) { + // If we found both the cluster and replica labels, we only want to include the cluster label when + // storing series in Cortex. If we kept the replica label we would end up with another series for the same + // series we're trying to dedupe when HA tracking moves over to a different replica. + if removeReplica { + removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) + } + + labelsHistogram.Observe(float64(len(ts.Labels))) + if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil { + return emptyPreallocSeries, err + } + + metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels) + samples := make([]client.Sample, 0, len(ts.Samples)) + for _, s := range ts.Samples { + if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil { + return emptyPreallocSeries, err + } + samples = append(samples, s) + } + + return client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: ts.Labels, + Samples: samples, + }, + }, + nil +} + // Push implements client.IngesterServer func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) { userID, err := user.ExtractOrgID(ctx) @@ -307,7 +344,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } return nil, err } - // If there wasn't an error but removeReplica is false that means we didn't find both HA labels + // If there wasn't an error but removeReplica is false that means we didn't find both HA labels. if !removeReplica { nonHASamples.WithLabelValues(userID).Add(float64(numSamples)) } @@ -319,20 +356,18 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie keys := make([]uint32, 0, len(req.Timeseries)) validatedSamples := 0 for _, ts := range req.Timeseries { - // If we found both the cluster and replica labels, we only want to include the cluster label when - // storing series in Cortex. If we kept the replica label we would end up with another series for the same - // series we're trying to dedupe when HA tracking moves over to a different replica. - if removeReplica { - removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) - } key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { return nil, err } - labelsHistogram.Observe(float64(len(ts.Labels))) - if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil { + validatedSeries, err := d.validateSeries(key, ts, userID, removeReplica) + if err != nil { lastPartialErr = err + } + + // validateSeries would have returned an emptyPreallocSeries if there were no valid samples. + if validatedSeries == emptyPreallocSeries { continue } @@ -347,13 +382,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } keys = append(keys, key) - validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{ - TimeSeries: &client.TimeSeries{ - Labels: ts.Labels, - Samples: samples, - }, - }) - + validatedTimeseries = append(validatedTimeseries, validatedSeries) validatedSamples += len(ts.Samples) } receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 8d70197e8e..8756dc2c1c 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net/http" + "reflect" "sort" "strconv" "sync" @@ -299,6 +300,80 @@ func TestDistributorPushQuery(t *testing.T) { } } +func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { + ctx = user.InjectOrgID(context.Background(), "user") + + type testcase struct { + series client.PreallocTimeseries + outputSeries client.PreallocTimeseries + removeReplica bool + } + + cases := []testcase{ + { // Remove replica label. + series: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}}, + }, + }, + outputSeries: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + }, + Samples: []client.Sample{}, + }, + }, + removeReplica: true, + }, + { // Don't remove either HA. + series: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}}, + }, + }, + outputSeries: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + Samples: []client.Sample{}, + }, + }, + removeReplica: false, + }, + } + + for _, tc := range cases { + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + d := prepare(t, 1, 1, 0, true, &limits) + + userID, err := user.ExtractOrgID(ctx) + assert.NoError(t, err) + + key, err := d.tokenForLabels(userID, tc.series.Labels) + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + + series, err := d.validateSeries(key, tc.series, userID, tc.removeReplica) + if !reflect.DeepEqual(series, tc.outputSeries) { + t.Fatalf("output of validate series did not match expected output:\n\texpected: %+v\n\t got: %+v", tc.outputSeries, series) + } + } +} + func TestSlowQueries(t *testing.T) { nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo") nIngesters := 3 From e8e42d9cde6f4335f3ec87596c40cc92b40b4a28 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 17 Oct 2019 20:39:48 -0700 Subject: [PATCH 3/5] Add a per user option to remove cluster label when ingesting HA samples in Distributor. Signed-off-by: Callum Styan --- docs/arguments.md | 4 +++- pkg/distributor/distributor.go | 4 +++- pkg/distributor/distributor_test.go | 24 ++++++++++++++++++++++++ pkg/util/validation/limits.go | 15 +++++++++++---- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/docs/arguments.md b/docs/arguments.md index 9c876f6fd9..c093378eed 100644 --- a/docs/arguments.md +++ b/docs/arguments.md @@ -144,11 +144,13 @@ prefix these flags with `distributor.ha-tracker.` ### HA Tracker -HA tracking has two of it's own flags: +HA tracking has three of its own flags: - `distributor.ha-tracker.cluster` Prometheus label to look for in samples to identify a Prometheus HA cluster. (default "cluster") - `distributor.ha-tracker.replica` Prometheus label to look for in samples to identify a Prometheus HA replica. (default "`__replica__`") +- `distributor.ha-tracker.drop-cluster-label` + Drops the cluster label before ingesting samples. The replica label is always dropped. This flag should only be used for users who already have some combination of more than two labels to uniquely identify a Prometheus replica, but for whom it is not feasible to switch over to a new set of just two labels. It's reasonable to assume people probably already have a `cluster` label, or something similar. If not, they should add one along with `__replica__` via external labels in their Prometheus config. If you stick to these default values your Prometheus config could look like this (`POD_NAME` is an environment variable which must be set by you): diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6d27734b6a..847fc218c6 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -281,7 +281,6 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica return true, nil } - // Validates a single series from a write request. Will remove HA labels if necessary. // Takes a pointer for a partial error so that we can get partial errors, errors during validation // of a single sample, back from the function without adding an additional return param. @@ -292,6 +291,9 @@ func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTime // series we're trying to dedupe when HA tracking moves over to a different replica. if removeReplica { removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) + if d.limits.HADropClusterLabel(userID) { + removeLabel(d.limits.HAClusterLabel(userID), &ts.Labels) + } } labelsHistogram.Observe(float64(len(ts.Labels))) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 8756dc2c1c..6fbd0e8c5f 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -307,9 +307,30 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { series client.PreallocTimeseries outputSeries client.PreallocTimeseries removeReplica bool + removeCluster bool } cases := []testcase{ + { // Remove both cluster and replica label. + series: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}}, + }, + }, + outputSeries: client.PreallocTimeseries{ + TimeSeries: &client.TimeSeries{ + Labels: []client.LabelAdapter{ + {Name: "__name__", Value: "some_metric"}, + }, + Samples: []client.Sample{}, + }, + }, + removeReplica: true, + removeCluster: true, + }, { // Remove replica label. series: client.PreallocTimeseries{ TimeSeries: &client.TimeSeries{ @@ -329,6 +350,7 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { }, }, removeReplica: true, + removeCluster: false, }, { // Don't remove either HA. series: client.PreallocTimeseries{ @@ -350,6 +372,7 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { }, }, removeReplica: false, + removeCluster: false, }, } @@ -357,6 +380,7 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { var err error var limits validation.Limits flagext.DefaultValues(&limits) + limits.HADropClusterLabel = tc.removeCluster d := prepare(t, 1, 1, 0, true, &limits) userID, err := user.ExtractOrgID(ctx) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index a6b42571a2..610836548a 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -17,6 +17,7 @@ type Limits struct { AcceptHASamples bool `yaml:"accept_ha_samples"` HAClusterLabel string `yaml:"ha_cluster_label"` HAReplicaLabel string `yaml:"ha_replica_label"` + HADropClusterLabel bool `yaml:"drop_cluster_label"` MaxLabelNameLength int `yaml:"max_label_name_length"` MaxLabelValueLength int `yaml:"max_label_value_length"` MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` @@ -48,8 +49,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.") f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples). Warning, very high limits will be reset every -distributor.limiter-reload-period.") f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") - f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") + f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") + f.BoolVar(&l.HADropClusterLabel, "distributor.ha-tracker.drop-cluster-label", false, "Enable this flag to drop the cluster label when ingesting HA samples for a user, in addition to dropping the replica label. Note that this flag should only be used when users already have their own combination of more than two labels to uniquely identify Prometheus replicas.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -142,14 +144,19 @@ func (o *Overrides) AcceptHASamples(userID string) bool { return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples } +// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica. +func (o *Overrides) HAClusterLabel(userID string) string { + return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel +} + // HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica. func (o *Overrides) HAReplicaLabel(userID string) string { return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel } -// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica. -func (o *Overrides) HAClusterLabel(userID string) string { - return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel +// HADropClusterLabel returns whether the cluster label should be dropped when ingesting HA samples for the user. +func (o *Overrides) HADropClusterLabel(userID string) bool { + return o.overridesManager.GetLimits(userID).(*Limits).HADropClusterLabel } // MaxLabelNameLength returns maximum length a label name can be. From 82969da152ff845b339d62e6849a177e6903e2ca Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 21 Nov 2019 15:14:07 -0800 Subject: [PATCH 4/5] Make label dropping generic instead of cluster label/HA tracker specific. Signed-off-by: Callum Styan --- docs/arguments.md | 7 +++--- pkg/distributor/distributor.go | 10 +++++--- pkg/distributor/distributor_test.go | 17 +++++++------- pkg/util/validation/limits.go | 36 +++++++++++++++-------------- 4 files changed, 39 insertions(+), 31 deletions(-) diff --git a/docs/arguments.md b/docs/arguments.md index c093378eed..ba1153aa68 100644 --- a/docs/arguments.md +++ b/docs/arguments.md @@ -108,6 +108,9 @@ The ingester query API was improved over time, but defaults to the old behaviour - `distributor.ha-tracker.enable` Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels). Global (for distributors), this ensures that the necessary internal data structures for the HA handling are created. The option `enable-for-all-users` is still needed to enable ingestion of HA samples for all users. +- `distributor.drop-label` + This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels. + ### Ring/HA Tracker Store The KVStore client is used by both the Ring and HA Tracker. @@ -144,13 +147,11 @@ prefix these flags with `distributor.ha-tracker.` ### HA Tracker -HA tracking has three of its own flags: +HA tracking has two of its own flags: - `distributor.ha-tracker.cluster` Prometheus label to look for in samples to identify a Prometheus HA cluster. (default "cluster") - `distributor.ha-tracker.replica` Prometheus label to look for in samples to identify a Prometheus HA replica. (default "`__replica__`") -- `distributor.ha-tracker.drop-cluster-label` - Drops the cluster label before ingesting samples. The replica label is always dropped. This flag should only be used for users who already have some combination of more than two labels to uniquely identify a Prometheus replica, but for whom it is not feasible to switch over to a new set of just two labels. It's reasonable to assume people probably already have a `cluster` label, or something similar. If not, they should add one along with `__replica__` via external labels in their Prometheus config. If you stick to these default values your Prometheus config could look like this (`POD_NAME` is an environment variable which must be set by you): diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 847fc218c6..a25ec58730 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -291,9 +291,13 @@ func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTime // series we're trying to dedupe when HA tracking moves over to a different replica. if removeReplica { removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) - if d.limits.HADropClusterLabel(userID) { - removeLabel(d.limits.HAClusterLabel(userID), &ts.Labels) - } + } + for _, s := range d.limits.DropLabels(userID) { + removeLabel(s, &ts.Labels) + } + key, err := d.tokenForLabels(userID, ts.Labels) + if err != nil { + return emptyPreallocSeries, err } labelsHistogram.Observe(float64(len(ts.Labels))) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6fbd0e8c5f..dd5e8bf767 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -307,7 +307,7 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { series client.PreallocTimeseries outputSeries client.PreallocTimeseries removeReplica bool - removeCluster bool + removeLabels []string } cases := []testcase{ @@ -329,15 +329,17 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { }, }, removeReplica: true, - removeCluster: true, + removeLabels: []string{"cluster"}, }, - { // Remove replica label. + { // Remove multiple labels and replica. series: client.PreallocTimeseries{ TimeSeries: &client.TimeSeries{ Labels: []client.LabelAdapter{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, - {Name: "__replica__", Value: "two"}}, + {Name: "__replica__", Value: "two"}, + {Name: "foo", Value: "bar"}, + {Name: "some", Value: "thing"}}, }, }, outputSeries: client.PreallocTimeseries{ @@ -350,9 +352,9 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { }, }, removeReplica: true, - removeCluster: false, + removeLabels: []string{"foo", "some"}, }, - { // Don't remove either HA. + { // Don't remove any labels. series: client.PreallocTimeseries{ TimeSeries: &client.TimeSeries{ Labels: []client.LabelAdapter{ @@ -372,7 +374,6 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { }, }, removeReplica: false, - removeCluster: false, }, } @@ -380,7 +381,7 @@ func TestDistributorValidateSeriesLabelRemoval(t *testing.T) { var err error var limits validation.Limits flagext.DefaultValues(&limits) - limits.HADropClusterLabel = tc.removeCluster + limits.DropLabels = tc.removeLabels d := prepare(t, 1, 1, 0, true, &limits) userID, err := user.ExtractOrgID(ctx) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 610836548a..f224a6e6c5 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -6,25 +6,27 @@ import ( "time" "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/util/flagext" ) // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. type Limits struct { // Distributor enforced limits. - IngestionRate float64 `yaml:"ingestion_rate"` - IngestionBurstSize int `yaml:"ingestion_burst_size"` - AcceptHASamples bool `yaml:"accept_ha_samples"` - HAClusterLabel string `yaml:"ha_cluster_label"` - HAReplicaLabel string `yaml:"ha_replica_label"` - HADropClusterLabel bool `yaml:"drop_cluster_label"` - MaxLabelNameLength int `yaml:"max_label_name_length"` - MaxLabelValueLength int `yaml:"max_label_value_length"` - MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` - RejectOldSamples bool `yaml:"reject_old_samples"` - RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` - CreationGracePeriod time.Duration `yaml:"creation_grace_period"` - EnforceMetricName bool `yaml:"enforce_metric_name"` + IngestionRate float64 `yaml:"ingestion_rate"` + IngestionBurstSize int `yaml:"ingestion_burst_size"` + AcceptHASamples bool `yaml:"accept_ha_samples"` + HAClusterLabel string `yaml:"ha_cluster_label"` + HAReplicaLabel string `yaml:"ha_replica_label"` + DropLabels flagext.StringSlice `yaml:"drop_labels"` + MaxLabelNameLength int `yaml:"max_label_name_length"` + MaxLabelValueLength int `yaml:"max_label_value_length"` + MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"` + RejectOldSamples bool `yaml:"reject_old_samples"` + RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"` + CreationGracePeriod time.Duration `yaml:"creation_grace_period"` + EnforceMetricName bool `yaml:"enforce_metric_name"` // Ingester enforced limits. MaxSeriesPerQuery int `yaml:"max_series_per_query"` @@ -51,7 +53,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.") f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") - f.BoolVar(&l.HADropClusterLabel, "distributor.ha-tracker.drop-cluster-label", false, "Enable this flag to drop the cluster label when ingesting HA samples for a user, in addition to dropping the replica label. Note that this flag should only be used when users already have their own combination of more than two labels to uniquely identify Prometheus replicas.") + f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.") @@ -154,9 +156,9 @@ func (o *Overrides) HAReplicaLabel(userID string) string { return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel } -// HADropClusterLabel returns whether the cluster label should be dropped when ingesting HA samples for the user. -func (o *Overrides) HADropClusterLabel(userID string) bool { - return o.overridesManager.GetLimits(userID).(*Limits).HADropClusterLabel +// DropLabels returns whether the cluster label should be dropped when ingesting HA samples for the user. +func (o *Overrides) DropLabels(userID string) flagext.StringSlice { + return o.overridesManager.GetLimits(userID).(*Limits).DropLabels } // MaxLabelNameLength returns maximum length a label name can be. From 9954be49372932d1fc54e9e0354d901309f512f4 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Tue, 3 Dec 2019 15:09:35 -0800 Subject: [PATCH 5/5] review comments; fix some comments, more descriptive var name, check for empty labels after dropping labels Signed-off-by: Callum Styan --- pkg/distributor/distributor.go | 18 ++++++++++++------ pkg/util/validation/limits.go | 2 +- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index a25ec58730..51ab89d356 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -281,10 +281,9 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica return true, nil } -// Validates a single series from a write request. Will remove HA labels if necessary. -// Takes a pointer for a partial error so that we can get partial errors, errors during validation -// of a single sample, back from the function without adding an additional return param. -// Returns a token for the series if it is valid, the validated series with it's labels/samples, and any fatal error. +// Validates a single series from a write request. Will remove labels if +// any are configured to be dropped for the user ID. +// Returns the validated series with it's labels/samples, and any error. func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTimeseries, userID string, removeReplica bool) (client.PreallocTimeseries, error) { // If we found both the cluster and replica labels, we only want to include the cluster label when // storing series in Cortex. If we kept the replica label we would end up with another series for the same @@ -292,9 +291,14 @@ func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTime if removeReplica { removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels) } - for _, s := range d.limits.DropLabels(userID) { - removeLabel(s, &ts.Labels) + + for _, labelName := range d.limits.DropLabels(userID) { + removeLabel(labelName, &ts.Labels) + } + if len(ts.Labels) == 0 { + return emptyPreallocSeries, nil } + key, err := d.tokenForLabels(userID, ts.Labels) if err != nil { return emptyPreallocSeries, err @@ -368,6 +372,8 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie } validatedSeries, err := d.validateSeries(key, ts, userID, removeReplica) + // Errors in validation are considered non-fatal, as one series in a request may contain + // invalid data but all the remaining series could be perfectly valid. if err != nil { lastPartialErr = err } diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index f224a6e6c5..139295099d 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -156,7 +156,7 @@ func (o *Overrides) HAReplicaLabel(userID string) string { return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel } -// DropLabels returns whether the cluster label should be dropped when ingesting HA samples for the user. +// DropLabels returns the list of labels to be dropped when ingesting HA samples for the user. func (o *Overrides) DropLabels(userID string) flagext.StringSlice { return o.overridesManager.GetLimits(userID).(*Limits).DropLabels }