Skip to content

Commit 08ddf88

Browse files
cstyangouthamve
authored andcommitted
Add a limits config option to allow for dropping of the cluster label (#1726)
* Rename removeReplicaLabel to more generic removeLabel. Signed-off-by: Callum Styan <[email protected]> * Refactor Push; move some logic into another function validateSeries for testability. Signed-off-by: Callum Styan <[email protected]> * Add a per user option to remove cluster label when ingesting HA samples in Distributor. Signed-off-by: Callum Styan <[email protected]> * Make label dropping generic instead of cluster label/HA tracker specific. Signed-off-by: Callum Styan <[email protected]> * review comments; fix some comments, more descriptive var name, check for empty labels after dropping labels Signed-off-by: Callum Styan <[email protected]>
1 parent 19db5f6 commit 08ddf88

File tree

4 files changed

+190
-37
lines changed

4 files changed

+190
-37
lines changed

docs/arguments.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ The ingester query API was improved over time, but defaults to the old behaviour
108108
- `distributor.ha-tracker.enable`
109109
Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels). Global (for distributors), this ensures that the necessary internal data structures for the HA handling are created. The option `enable-for-all-users` is still needed to enable ingestion of HA samples for all users.
110110

111+
- `distributor.drop-label`
112+
This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.
113+
111114
### Ring/HA Tracker Store
112115

113116
The KVStore client is used by both the Ring and HA Tracker.
@@ -177,7 +180,7 @@ Flags for configuring KV store based on memberlist library. This feature is expe
177180

178181
### HA Tracker
179182

180-
HA tracking has two of it's own flags:
183+
HA tracking has two of its own flags:
181184
- `distributor.ha-tracker.cluster`
182185
Prometheus label to look for in samples to identify a Prometheus HA cluster. (default "cluster")
183186
- `distributor.ha-tracker.replica`

pkg/distributor/distributor.go

+60-19
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ var (
9090
Name: "distributor_replication_factor",
9191
Help: "The configured replication factor.",
9292
})
93+
emptyPreallocSeries = ingester_client.PreallocTimeseries{}
9394
)
9495

9596
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -250,11 +251,11 @@ func shardByAllLabels(userID string, labels []client.LabelAdapter) (uint32, erro
250251
return h, nil
251252
}
252253

253-
// Remove the replica label from a slice of LabelPairs if it exists.
254-
func removeReplicaLabel(replica string, labels *[]client.LabelAdapter) {
254+
// Remove the label labelname from a slice of LabelPairs if it exists.
255+
func removeLabel(labelName string, labels *[]client.LabelAdapter) {
255256
for i := 0; i < len(*labels); i++ {
256257
pair := (*labels)[i]
257-
if pair.Name == replica {
258+
if pair.Name == labelName {
258259
*labels = append((*labels)[:i], (*labels)[i+1:]...)
259260
return
260261
}
@@ -281,6 +282,52 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
281282
return true, nil
282283
}
283284

285+
// Validates a single series from a write request. Will remove labels if
286+
// any are configured to be dropped for the user ID.
287+
// Returns the validated series with it's labels/samples, and any error.
288+
func (d *Distributor) validateSeries(key uint32, ts ingester_client.PreallocTimeseries, userID string, removeReplica bool) (client.PreallocTimeseries, error) {
289+
// If we found both the cluster and replica labels, we only want to include the cluster label when
290+
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
291+
// series we're trying to dedupe when HA tracking moves over to a different replica.
292+
if removeReplica {
293+
removeLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
294+
}
295+
296+
for _, labelName := range d.limits.DropLabels(userID) {
297+
removeLabel(labelName, &ts.Labels)
298+
}
299+
if len(ts.Labels) == 0 {
300+
return emptyPreallocSeries, nil
301+
}
302+
303+
key, err := d.tokenForLabels(userID, ts.Labels)
304+
if err != nil {
305+
return emptyPreallocSeries, err
306+
}
307+
308+
labelsHistogram.Observe(float64(len(ts.Labels)))
309+
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
310+
return emptyPreallocSeries, err
311+
}
312+
313+
metricName, _ := extract.MetricNameFromLabelAdapters(ts.Labels)
314+
samples := make([]client.Sample, 0, len(ts.Samples))
315+
for _, s := range ts.Samples {
316+
if err := validation.ValidateSample(d.limits, userID, metricName, s); err != nil {
317+
return emptyPreallocSeries, err
318+
}
319+
samples = append(samples, s)
320+
}
321+
322+
return client.PreallocTimeseries{
323+
TimeSeries: &client.TimeSeries{
324+
Labels: ts.Labels,
325+
Samples: samples,
326+
},
327+
},
328+
nil
329+
}
330+
284331
// Push implements client.IngesterServer
285332
func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
286333
userID, err := user.ExtractOrgID(ctx)
@@ -308,7 +355,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
308355
}
309356
return nil, err
310357
}
311-
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels
358+
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
312359
if !removeReplica {
313360
nonHASamples.WithLabelValues(userID).Add(float64(numSamples))
314361
}
@@ -320,20 +367,20 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
320367
keys := make([]uint32, 0, len(req.Timeseries))
321368
validatedSamples := 0
322369
for _, ts := range req.Timeseries {
323-
// If we found both the cluster and replica labels, we only want to include the cluster label when
324-
// storing series in Cortex. If we kept the replica label we would end up with another series for the same
325-
// series we're trying to dedupe when HA tracking moves over to a different replica.
326-
if removeReplica {
327-
removeReplicaLabel(d.limits.HAReplicaLabel(userID), &ts.Labels)
328-
}
329370
key, err := d.tokenForLabels(userID, ts.Labels)
330371
if err != nil {
331372
return nil, err
332373
}
333374

334-
labelsHistogram.Observe(float64(len(ts.Labels)))
335-
if err := validation.ValidateLabels(d.limits, userID, ts.Labels); err != nil {
375+
validatedSeries, err := d.validateSeries(key, ts, userID, removeReplica)
376+
// Errors in validation are considered non-fatal, as one series in a request may contain
377+
// invalid data but all the remaining series could be perfectly valid.
378+
if err != nil {
336379
lastPartialErr = err
380+
}
381+
382+
// validateSeries would have returned an emptyPreallocSeries if there were no valid samples.
383+
if validatedSeries == emptyPreallocSeries {
337384
continue
338385
}
339386

@@ -348,13 +395,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
348395
}
349396

350397
keys = append(keys, key)
351-
validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{
352-
TimeSeries: &client.TimeSeries{
353-
Labels: ts.Labels,
354-
Samples: samples,
355-
},
356-
})
357-
398+
validatedTimeseries = append(validatedTimeseries, validatedSeries)
358399
validatedSamples += len(ts.Samples)
359400
}
360401
receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples))

pkg/distributor/distributor_test.go

+101-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"net/http"
8+
"reflect"
89
"sort"
910
"strconv"
1011
"sync"
@@ -299,6 +300,105 @@ func TestDistributorPushQuery(t *testing.T) {
299300
}
300301
}
301302

303+
func TestDistributorValidateSeriesLabelRemoval(t *testing.T) {
304+
ctx = user.InjectOrgID(context.Background(), "user")
305+
306+
type testcase struct {
307+
series client.PreallocTimeseries
308+
outputSeries client.PreallocTimeseries
309+
removeReplica bool
310+
removeLabels []string
311+
}
312+
313+
cases := []testcase{
314+
{ // Remove both cluster and replica label.
315+
series: client.PreallocTimeseries{
316+
TimeSeries: &client.TimeSeries{
317+
Labels: []client.LabelAdapter{
318+
{Name: "__name__", Value: "some_metric"},
319+
{Name: "cluster", Value: "one"},
320+
{Name: "__replica__", Value: "two"}},
321+
},
322+
},
323+
outputSeries: client.PreallocTimeseries{
324+
TimeSeries: &client.TimeSeries{
325+
Labels: []client.LabelAdapter{
326+
{Name: "__name__", Value: "some_metric"},
327+
},
328+
Samples: []client.Sample{},
329+
},
330+
},
331+
removeReplica: true,
332+
removeLabels: []string{"cluster"},
333+
},
334+
{ // Remove multiple labels and replica.
335+
series: client.PreallocTimeseries{
336+
TimeSeries: &client.TimeSeries{
337+
Labels: []client.LabelAdapter{
338+
{Name: "__name__", Value: "some_metric"},
339+
{Name: "cluster", Value: "one"},
340+
{Name: "__replica__", Value: "two"},
341+
{Name: "foo", Value: "bar"},
342+
{Name: "some", Value: "thing"}},
343+
},
344+
},
345+
outputSeries: client.PreallocTimeseries{
346+
TimeSeries: &client.TimeSeries{
347+
Labels: []client.LabelAdapter{
348+
{Name: "__name__", Value: "some_metric"},
349+
{Name: "cluster", Value: "one"},
350+
},
351+
Samples: []client.Sample{},
352+
},
353+
},
354+
removeReplica: true,
355+
removeLabels: []string{"foo", "some"},
356+
},
357+
{ // Don't remove any labels.
358+
series: client.PreallocTimeseries{
359+
TimeSeries: &client.TimeSeries{
360+
Labels: []client.LabelAdapter{
361+
{Name: "__name__", Value: "some_metric"},
362+
{Name: "cluster", Value: "one"},
363+
{Name: "__replica__", Value: "two"}},
364+
},
365+
},
366+
outputSeries: client.PreallocTimeseries{
367+
TimeSeries: &client.TimeSeries{
368+
Labels: []client.LabelAdapter{
369+
{Name: "__name__", Value: "some_metric"},
370+
{Name: "cluster", Value: "one"},
371+
{Name: "__replica__", Value: "two"},
372+
},
373+
Samples: []client.Sample{},
374+
},
375+
},
376+
removeReplica: false,
377+
},
378+
}
379+
380+
for _, tc := range cases {
381+
var err error
382+
var limits validation.Limits
383+
flagext.DefaultValues(&limits)
384+
limits.DropLabels = tc.removeLabels
385+
d := prepare(t, 1, 1, 0, true, &limits)
386+
387+
userID, err := user.ExtractOrgID(ctx)
388+
assert.NoError(t, err)
389+
390+
key, err := d.tokenForLabels(userID, tc.series.Labels)
391+
if err != nil {
392+
t.Fatalf("unexpected error: %s", err)
393+
}
394+
395+
series, err := d.validateSeries(key, tc.series, userID, tc.removeReplica)
396+
if !reflect.DeepEqual(series, tc.outputSeries) {
397+
t.Fatalf("output of validate series did not match expected output:\n\texpected: %+v\n\t got: %+v", tc.outputSeries, series)
398+
}
399+
}
400+
}
401+
302402
func TestSlowQueries(t *testing.T) {
303403
nameMatcher := mustEqualMatcher(model.MetricNameLabel, "foo")
304404
nIngesters := 3
@@ -759,7 +859,7 @@ func TestRemoveReplicaLabel(t *testing.T) {
759859
}
760860

761861
for _, c := range cases {
762-
removeReplicaLabel(replicaLabel, &c.labelsIn)
862+
removeLabel(replicaLabel, &c.labelsIn)
763863
assert.Equal(t, c.labelsOut, c.labelsIn)
764864
}
765865
}

pkg/util/validation/limits.go

+25-16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"time"
88

99
"gopkg.in/yaml.v2"
10+
11+
"github.com/cortexproject/cortex/pkg/util/flagext"
1012
)
1113

1214
var (
@@ -17,18 +19,19 @@ var (
1719
// limits via flags, or per-user limits via yaml config.
1820
type Limits struct {
1921
// Distributor enforced limits.
20-
IngestionRate float64 `yaml:"ingestion_rate"`
21-
IngestionBurstSize int `yaml:"ingestion_burst_size"`
22-
AcceptHASamples bool `yaml:"accept_ha_samples"`
23-
HAClusterLabel string `yaml:"ha_cluster_label"`
24-
HAReplicaLabel string `yaml:"ha_replica_label"`
25-
MaxLabelNameLength int `yaml:"max_label_name_length"`
26-
MaxLabelValueLength int `yaml:"max_label_value_length"`
27-
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"`
28-
RejectOldSamples bool `yaml:"reject_old_samples"`
29-
RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"`
30-
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
31-
EnforceMetricName bool `yaml:"enforce_metric_name"`
22+
IngestionRate float64 `yaml:"ingestion_rate"`
23+
IngestionBurstSize int `yaml:"ingestion_burst_size"`
24+
AcceptHASamples bool `yaml:"accept_ha_samples"`
25+
HAClusterLabel string `yaml:"ha_cluster_label"`
26+
HAReplicaLabel string `yaml:"ha_replica_label"`
27+
DropLabels flagext.StringSlice `yaml:"drop_labels"`
28+
MaxLabelNameLength int `yaml:"max_label_name_length"`
29+
MaxLabelValueLength int `yaml:"max_label_value_length"`
30+
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series"`
31+
RejectOldSamples bool `yaml:"reject_old_samples"`
32+
RejectOldSamplesMaxAge time.Duration `yaml:"reject_old_samples_max_age"`
33+
CreationGracePeriod time.Duration `yaml:"creation_grace_period"`
34+
EnforceMetricName bool `yaml:"enforce_metric_name"`
3235

3336
// Ingester enforced limits.
3437
MaxSeriesPerQuery int `yaml:"max_series_per_query"`
@@ -55,8 +58,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
5558
f.Float64Var(&l.IngestionRate, "distributor.ingestion-rate-limit", 25000, "Per-user ingestion rate limit in samples per second.")
5659
f.IntVar(&l.IngestionBurstSize, "distributor.ingestion-burst-size", 50000, "Per-user allowed ingestion burst size (in number of samples). Warning, very high limits will be reset every -distributor.limiter-reload-period.")
5760
f.BoolVar(&l.AcceptHASamples, "distributor.ha-tracker.enable-for-all-users", false, "Flag to enable, for all users, handling of samples with external labels identifying replicas in an HA Prometheus setup.")
58-
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
5961
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
62+
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
63+
f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.")
6064
f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names")
6165
f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name")
6266
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
@@ -163,14 +167,19 @@ func (o *Overrides) AcceptHASamples(userID string) bool {
163167
return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples
164168
}
165169

170+
// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
171+
func (o *Overrides) HAClusterLabel(userID string) string {
172+
return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel
173+
}
174+
166175
// HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica.
167176
func (o *Overrides) HAReplicaLabel(userID string) string {
168177
return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel
169178
}
170179

171-
// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
172-
func (o *Overrides) HAClusterLabel(userID string) string {
173-
return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel
180+
// DropLabels returns the list of labels to be dropped when ingesting HA samples for the user.
181+
func (o *Overrides) DropLabels(userID string) flagext.StringSlice {
182+
return o.overridesManager.GetLimits(userID).(*Limits).DropLabels
174183
}
175184

176185
// MaxLabelNameLength returns maximum length a label name can be.

0 commit comments

Comments
 (0)