Skip to content

Add a limits config option to allow for dropping of the cluster label #1726

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ The ingester query API was improved over time, but defaults to the old behaviour
- `distributor.ha-tracker.enable`
Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels). Global (for distributors), this ensures that the necessary internal data structures for the HA handling are created. The option `enable-for-all-users` is still needed to enable ingestion of HA samples for all users.

- `distributor.drop-label`
This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.

### Ring/HA Tracker Store

The KVStore client is used by both the Ring and HA Tracker.
Expand Down Expand Up @@ -144,7 +147,7 @@ prefix these flags with `distributor.ha-tracker.`

### HA Tracker

HA tracking has two of it's own flags:
HA tracking has two of its own flags:
- `distributor.ha-tracker.cluster`
Prometheus label to look for in samples to identify a Prometheus HA cluster. (default "cluster")
- `distributor.ha-tracker.replica`
Expand Down
79 changes: 60 additions & 19 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
Name: "distributor_replication_factor",
Help: "The configured replication factor.",
})
emptyPreallocSeries = ingester_client.PreallocTimeseries{}
)

// Distributor is a storage.SampleAppender and a client.Querier which
Expand Down Expand Up @@ -249,11 +250,11 @@ func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, erro
return h, nil
}

// Remove the replica label from a slice of LabelPairs if it exists.
func removeReplicaLabel(replica string, labels *[]client.LabelAdapter) {
// Remove the label labelname from a slice of LabelPairs if it exists.
func removeLabel(labelName string, labels *[]client.LabelAdapter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is very similar to labelPairs.removeBlanks(), so they could be merged.
This one preserves order, which I don't think is necessary.
It might be a good idea to comment why this code does it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a look at the labelsPairs function in the morning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

labelPairs is nested within the ingester package. If we're going to merge the two functions should that exist in a new package under util?

And the reason for preserving order is only that I've generally seen that be the case within Prometheus itself. For example within remote write, when processing a sample through external labelse and relabel rules, we preserve the sorted order of the labels.

for i := 0; i < len(*labels); i++ {
pair := (*labels)[i]
if pair.Name == replica {
if pair.Name == labelName {
*labels = append((*labels)[:i], (*labels)[i+1:]...)
return
}
Expand All @@ -280,6 +281,52 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
return true, nil
}

// Validates a single series from a write request. Will remove labels if
// any are configured to be dropped for the user ID.
// Returns the validated series with it's labels/samples, and any error.
func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTimeseries, userID string, removeReplica bool) (client.PreallocTimeseries, error) {
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
if removeReplica {
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
}

for _, labelName := range d.limits.DropLabels(userID) {
removeLabel(labelName, &ts.Labels)
}
if len(ts.Labels) == 0 {
return emptyPreallocSeries, nil
}

key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return emptyPreallocSeries, err
}

labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
return emptyPreallocSeries, err
}

metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
samples := make([]client.Sample, 0, len(ts.Samples))
for _, s := range ts.Samples {
if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil {
return emptyPreallocSeries, err
}
samples = append(samples, s)
}

return client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: ts.Labels,
Samples: samples,
},
},
nil
}

// Push implements client.IngesterServer
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
userID, err := user.ExtractOrgID(ctx)
Expand Down Expand Up @@ -307,7 +354,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}
return nil, err
}
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
if !removeReplica {
nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
}
Expand All @@ -319,20 +366,20 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
keys := make([]uint32, 0, len(req.Timeseries))
validatedSamples := 0
for _, ts := range req.Timeseries {
// If we found both the cluster and replica labels, we only want to include the cluster label when
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
// series we're trying to dedupe when HA tracking moves over to a different replica.
if removeReplica {
removeReplicaLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
}
key, err := d.tokenForLabels(userID, ts.Labels)
if err != nil {
return nil, err
}

labelsHistogram.Observe(float64(len(ts.Labels)))
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
validatedSeries, err := d.validateSeries(key, ts, userID, removeReplica)
// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if err != nil {
lastPartialErr = err
}

// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
if validatedSeries == emptyPreallocSeries {
continue
}

Expand All @@ -347,13 +394,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
}

keys = append(keys, key)
validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: ts.Labels,
Samples: samples,
},
})

validatedTimeseries = append(validatedTimeseries, validatedSeries)
validatedSamples += len(ts.Samples)
}
receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))
Expand Down
102 changes: 101 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"net/http"
"reflect"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -299,6 +300,105 @@ func TestDistributorPushQuery(t *testing.T) {
}
}

func TestDistributorValidateSeriesLabelRemoval(t *testing.T) {
ctx = user.InjectOrgID(context.Background(), "user")

type testcase struct {
series client.PreallocTimeseries
outputSeries client.PreallocTimeseries
removeReplica bool
removeLabels []string
}

cases := []testcase{
{ // Remove both cluster and replica label.
series: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"}},
},
},
outputSeries: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
},
Samples: []client.Sample{},
},
},
removeReplica: true,
removeLabels: []string{"cluster"},
},
{ // Remove multiple labels and replica.
series: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
{Name: "foo", Value: "bar"},
{Name: "some", Value: "thing"}},
},
},
outputSeries: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
},
Samples: []client.Sample{},
},
},
removeReplica: true,
removeLabels: []string{"foo", "some"},
},
{ // Don't remove any labels.
series: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"}},
},
},
outputSeries: client.PreallocTimeseries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
Samples: []client.Sample{},
},
},
removeReplica: false,
},
}

for _, tc := range cases {
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = tc.removeLabels
d := prepare(t, 1, 1, 0, true, &limits)

userID, err := user.ExtractOrgID(ctx)
assert.NoError(t, err)

key, err := d.tokenForLabels(userID, tc.series.Labels)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

series, err := d.validateSeries(key, tc.series, userID, tc.removeReplica)
if !reflect.DeepEqual(series, tc.outputSeries) {
t.Fatalf("output of validate series did not match expected output:\n\texpected: %+v\n\t got: %+v", tc.outputSeries, series)
}
}
}

func TestSlowQueries(t *testing.T) {
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
nIngesters := 3
Expand Down Expand Up @@ -751,7 +851,7 @@ func TestRemoveReplicaLabel(t *testing.T) {
}

for _, c := range cases {
removeReplicaLabel(replicaLabel, &c.labelsIn)
removeLabel(replicaLabel, &c.labelsIn)
assert.Equal(t, c.labelsOut, c.labelsIn)
}
}
41 changes: 25 additions & 16 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,27 @@ import (
"time"

"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/util/flagext"
)

// Limits describe all the limits for users; can be used to describe global default
// limits via flags, or per-user limits via yaml config.
type Limits struct {
// Distributor enforced limits.
IngestionRate float64 `yaml:"ingestion_rate"`
IngestionBurstSize int `yaml:"ingestion_burst_size"`
AcceptHASamples bool `yaml:"accept_ha_samples"`
HAClusterLabel string `yaml:"ha_cluster_label"`
HAReplicaLabel string `yaml:"ha_replica_label"`
MaxLabelNameLength int `yaml:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples"`
RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"`
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name"`
IngestionRate float64 `yaml:"ingestion_rate"`
IngestionBurstSize int `yaml:"ingestion_burst_size"`
AcceptHASamples bool `yaml:"accept_ha_samples"`
HAClusterLabel string `yaml:"ha_cluster_label"`
HAReplicaLabel string `yaml:"ha_replica_label"`
DropLabels flagext.StringSlice `yaml:"drop_labels"`
MaxLabelNameLength int `yaml:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"`
RejectOldSamples bool `yaml:"reject_old_samples"`
RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"`
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
EnforceMetricName bool `yaml:"enforce_metric_name"`

// Ingester enforced limits.
MaxSeriesPerQuery int `yaml:"max_series_per_query"`
Expand All @@ -48,8 +51,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples). Warning, very high limits will be reset every -distributor.limiter-reload-period.")
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.")
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names")
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
Expand Down Expand Up @@ -142,14 +146,19 @@ func (o *Overrides) AcceptHASamples(userID string) bool {
return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples
}

// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAClusterLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel
}

// HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAReplicaLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel
}

// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAClusterLabel(userID string) string {
return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel
// DropLabels returns the list of labels to be dropped when ingesting HA samples for the user.
func (o *Overrides) DropLabels(userID string) flagext.StringSlice {
return o.overridesManager.GetLimits(userID).(*Limits).DropLabels
}

// MaxLabelNameLength returns maximum length a label name can be.
Expand Down