Skip to content

Commit e402433

Browse files
committed
address comments
Signed-off-by: Ben Ye <[email protected]>
1 parent a5b5b9b commit e402433

File tree

9 files changed

+71
-14
lines changed

9 files changed

+71
-14
lines changed

docs/blocks-storage/querier.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -879,8 +879,8 @@ blocks_storage:
879879
# CLI flag: -blocks-storage.tsdb.max-exemplars
880880
[max_exemplars: <int> | default = 0]
881881
882-
# [EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in
883-
# samples). If set to <=0, default value 32 is assumed.
882+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
883+
# be out-of-order.
884884
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
885885
[out_of_order_cap_max: <int> | default = 32]
886886
```

docs/blocks-storage/store-gateway.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -943,8 +943,8 @@ blocks_storage:
943943
# CLI flag: -blocks-storage.tsdb.max-exemplars
944944
[max_exemplars: <int> | default = 0]
945945
946-
# [EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in
947-
# samples). If set to <=0, default value 32 is assumed.
946+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
947+
# be out-of-order.
948948
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
949949
[out_of_order_cap_max: <int> | default = 32]
950950
```

docs/configuration/config-file-reference.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2587,6 +2587,16 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
25872587
# CLI flag: -validation.max-metadata-length
25882588
[max_metadata_length: <int> | default = 1024]
25892589
2590+
# Deprecated (use ingester.out-of-order-time-window instead): Reject old
2591+
# samples.
2592+
# CLI flag: -validation.reject-old-samples
2593+
[reject_old_samples: <boolean> | default = false]
2594+
2595+
# Deprecated (use ingester.out-of-order-time-window instead): Maximum accepted
2596+
# sample age before rejecting.
2597+
# CLI flag: -validation.reject-old-samples.max-age
2598+
[reject_old_samples_max_age: <duration> | default = 2w]
2599+
25902600
# Duration which table will be created/deleted before/after it's needed; we
25912601
# won't accept sample from before this time.
25922602
# CLI flag: -validation.create-grace-period
@@ -2676,8 +2686,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
26762686
# CLI flag: -querier.max-fetched-series-per-query
26772687
[max_fetched_series_per_query: <int> | default = 0]
26782688
2679-
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
2680-
# of all chunks in bytes that a query can fetch from each ingester and storage.
2689+
# Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of
2690+
# all chunks in bytes that a query can fetch from each ingester and storage.
26812691
# This limit is enforced in the querier, ruler and store-gateway. 0 to disable.
26822692
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
26832693
[max_fetched_chunk_bytes_per_query: <int> | default = 0]
@@ -3712,8 +3722,8 @@ tsdb:
37123722
# CLI flag: -blocks-storage.tsdb.max-exemplars
37133723
[max_exemplars: <int> | default = 0]
37143724
3715-
# [EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in
3716-
# samples). If set to <=0, default value 32 is assumed.
3725+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
3726+
# be out-of-order.
37173727
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
37183728
[out_of_order_cap_max: <int> | default = 32]
37193729
```

docs/configuration/v1-guarantees.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,8 @@ Currently experimental features are:
9898
- Enabled via `-compactor.sharding-enabled=true`, `-compactor.sharding-strategy=shuffle-sharding`, and `-compactor.tenant-shard-size` set to a value larger than 0.
9999
- Vertical sharding at query frontend for range/instant queries
100100
- `-frontend.query-vertical-shard-size` (int) CLI flag
101-
- `query_vertical_shard_size` (int) field in runtime config file
101+
- `query_vertical_shard_size` (int) field in runtime config file
102+
- Out of order samples support
103+
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
104+
- `-ingester.out-of-order-time-window` (duration) CLI flag
105+
- `out_of_order_time_window` (duration) field in runtime config file

pkg/distributor/distributor_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2949,7 +2949,15 @@ func TestDistributorValidation(t *testing.T) {
29492949
Value: 1,
29502950
}},
29512951
},
2952-
2952+
// Test validation fails for very old samples.
2953+
{
2954+
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
2955+
samples: []cortexpb.Sample{{
2956+
TimestampMs: int64(past),
2957+
Value: 2,
2958+
}},
2959+
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
2960+
},
29532961
// Test validation fails for samples from the future.
29542962
{
29552963
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},
@@ -2997,6 +3005,8 @@ func TestDistributorValidation(t *testing.T) {
29973005
flagext.DefaultValues(&limits)
29983006

29993007
limits.CreationGracePeriod = model.Duration(2 * time.Hour)
3008+
limits.RejectOldSamples = true
3009+
limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour)
30003010
limits.MaxLabelNamesPerSeries = 2
30013011

30023012
ds, _, _, _ := prepare(t, prepConfig{

pkg/storage/tsdb/config.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package tsdb
22

33
import (
44
"flag"
5-
"fmt"
65
"path/filepath"
76
"strings"
87
"time"
@@ -51,6 +50,7 @@ var (
5150
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
5251
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
5352
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
53+
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
5454
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
5555
)
5656

@@ -153,7 +153,6 @@ type TSDBConfig struct {
153153
MaxExemplars int `yaml:"max_exemplars"`
154154

155155
// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
156-
// If it is <=0, the default value is assumed.
157156
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
158157
}
159158

@@ -179,7 +178,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
179178
f.BoolVar(&cfg.FlushBlocksOnShutdown, "blocks-storage.tsdb.flush-blocks-on-shutdown", false, "True to flush blocks to storage on shutdown. If false, incomplete blocks will be reused after restart.")
180179
f.DurationVar(&cfg.CloseIdleTSDBTimeout, "blocks-storage.tsdb.close-idle-tsdb-timeout", 0, "If TSDB has not received any data for this duration, and all blocks from TSDB have been shipped, TSDB is closed and deleted from local disk. If set to positive value, this value should be equal or higher than -querier.query-ingesters-within flag to make sure that TSDB is not closed prematurely, which could cause partial query results. 0 or negative value disables closing of idle TSDB.")
181180
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
182-
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, fmt.Sprintf("[EXPERIMENTAL] Configures the maximum capacity for out-of-order chunks (in samples). If set to <=0, default value %d is assumed.", tsdb.DefaultOutOfOrderCapMax))
181+
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
183182
}
184183

185184
// Validate the config.
@@ -216,6 +215,10 @@ func (cfg *TSDBConfig) Validate() error {
216215
return errInvalidWALSegmentSizeBytes
217216
}
218217

218+
if cfg.OutOfOrderCapMax <= 0 {
219+
return errInvalidOutOfOrderCapMax
220+
}
221+
219222
return nil
220223
}
221224

pkg/util/validation/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,14 @@ func (e *sampleValidationError) Error() string {
168168
return fmt.Sprintf(e.message, e.timestamp, e.metricName)
169169
}
170170

171+
func newSampleTimestampTooOldError(metricName string, timestamp int64) ValidationError {
172+
return &sampleValidationError{
173+
message: "timestamp too old: %d metric: %.200q",
174+
metricName: metricName,
175+
timestamp: timestamp,
176+
}
177+
}
178+
171179
func newSampleTimestampTooNewError(metricName string, timestamp int64) ValidationError {
172180
return &sampleValidationError{
173181
message: "timestamp too new: %d metric: %.200q",

pkg/util/validation/limits.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ type Limits struct {
4848
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
4949
MaxLabelsSizeBytes int `yaml:"max_labels_size_bytes" json:"max_labels_size_bytes"`
5050
MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"`
51+
RejectOldSamples bool `yaml:"reject_old_samples" json:"reject_old_samples"`
52+
RejectOldSamplesMaxAge model.Duration `yaml:"reject_old_samples_max_age" json:"reject_old_samples_max_age"`
5153
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period"`
5254
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name"`
5355
EnforceMetricName bool `yaml:"enforce_metric_name" json:"enforce_metric_name"`
@@ -130,6 +132,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
130132
f.IntVar(&l.MaxLabelNamesPerSeries, "validation.max-label-names-per-series", 30, "Maximum number of label names per series.")
131133
f.IntVar(&l.MaxLabelsSizeBytes, "validation.max-labels-size-bytes", 0, "Maximum combined size in bytes of all labels and label values accepted for a series. 0 to disable the limit.")
132134
f.IntVar(&l.MaxMetadataLength, "validation.max-metadata-length", 1024, "Maximum length accepted for metric metadata. Metadata refers to Metric Name, HELP and UNIT.")
135+
f.BoolVar(&l.RejectOldSamples, "validation.reject-old-samples", false, "Deprecated (use ingester.out-of-order-time-window instead): Reject old samples.")
136+
_ = l.RejectOldSamplesMaxAge.Set("14d")
137+
f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Deprecated (use ingester.out-of-order-time-window instead): Maximum accepted sample age before rejecting.")
133138
_ = l.CreationGracePeriod.Set("10m")
134139
f.Var(&l.CreationGracePeriod, "validation.create-grace-period", "Duration which table will be created/deleted before/after it's needed; we won't accept sample from before this time.")
135140
f.BoolVar(&l.EnforceMetricName, "validation.enforce-metric-name", true, "Enforce every sample has a metric name.")
@@ -148,7 +153,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
148153
f.IntVar(&l.MaxGlobalMetadataPerMetric, "ingester.max-global-metadata-per-metric", 0, "The maximum number of metadata per metric, across the cluster. 0 to disable.")
149154
f.IntVar(&l.MaxChunksPerQuery, "querier.max-fetched-chunks-per-query", 2000000, "Maximum number of chunks that can be fetched in a single query from ingesters and long-term storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
150155
f.IntVar(&l.MaxFetchedSeriesPerQuery, "querier.max-fetched-series-per-query", 0, "The maximum number of unique series for which a query can fetch samples from each ingesters and blocks storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable")
151-
f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
156+
f.IntVar(&l.MaxFetchedChunkBytesPerQuery, "querier.max-fetched-chunk-bytes-per-query", 0, "Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of all chunks in bytes that a query can fetch from each ingester and storage. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.")
152157
f.IntVar(&l.MaxFetchedDataBytesPerQuery, "querier.max-fetched-data-bytes-per-query", 0, "The maximum combined size of all data that a query can fetch from each ingester and storage. This limit is enforced in the querier and ruler for `query`, `query_range` and `series` APIs. 0 to disable.")
153158
f.Var(&l.MaxQueryLength, "store.max-query-length", "Limit the query time range (end - start time). This limit is enforced in the query-frontend (on the received query) and in the querier (on the query possibly split by the query-frontend). 0 to disable.")
154159
f.Var(&l.MaxQueryLookback, "querier.max-query-lookback", "Limit how long back data (series and metadata) can be queried, up until <lookback> duration ago. This limit is enforced in the query-frontend, querier and ruler. If the requested time range is outside the allowed range, the request will not fail but will be manipulated to only query data within the allowed time range. 0 to disable.")
@@ -341,6 +346,17 @@ func (o *Overrides) MaxMetadataLength(userID string) int {
341346
return o.GetOverridesForUser(userID).MaxMetadataLength
342347
}
343348

349+
// RejectOldSamples returns true when we should reject samples older than certain
350+
// age.
351+
func (o *Overrides) RejectOldSamples(userID string) bool {
352+
return o.GetOverridesForUser(userID).RejectOldSamples
353+
}
354+
355+
// RejectOldSamplesMaxAge returns the age at which samples should be rejected.
356+
func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration {
357+
return time.Duration(o.GetOverridesForUser(userID).RejectOldSamplesMaxAge)
358+
}
359+
344360
// CreationGracePeriod is misnamed, and actually returns how far into the future
345361
// we should accept samples.
346362
func (o *Overrides) CreationGracePeriod(userID string) time.Duration {

pkg/util/validation/validate.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636

3737
missingMetricName = "missing_metric_name"
3838
invalidMetricName = "metric_name_invalid"
39+
greaterThanMaxSampleAge = "greater_than_max_sample_age"
3940
maxLabelNamesPerSeries = "max_label_names_per_series"
4041
tooFarInFuture = "too_far_in_future"
4142
invalidLabel = "label_invalid"
@@ -105,6 +106,11 @@ func init() {
105106
func ValidateSample(limits *Limits, userID string, ls []cortexpb.LabelAdapter, s cortexpb.Sample) ValidationError {
106107
unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls)
107108

109+
if limits.RejectOldSamples && model.Time(s.TimestampMs) < model.Now().Add(-time.Duration(limits.RejectOldSamplesMaxAge)) {
110+
DiscardedSamples.WithLabelValues(greaterThanMaxSampleAge, userID).Inc()
111+
return newSampleTimestampTooOldError(unsafeMetricName, s.TimestampMs)
112+
}
113+
108114
if model.Time(s.TimestampMs) > model.Now().Add(time.Duration(limits.CreationGracePeriod)) {
109115
DiscardedSamples.WithLabelValues(tooFarInFuture, userID).Inc()
110116
return newSampleTimestampTooNewError(unsafeMetricName, s.TimestampMs)

0 commit comments

Comments
 (0)