Skip to content

Commit 463dc13

Browse files
committed
support out of order samples ingestion feature
Signed-off-by: Ben Ye <[email protected]>
1 parent f95976a commit 463dc13

File tree

12 files changed

+160
-10
lines changed

12 files changed

+160
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
* [FEATURE] Querier/Ruler: Support the new thanos promql engine. This is an experimental feature and might change in the future. #5093
2828
* [FEATURE] Added zstd as an option for grpc compression #5092
2929
* [FEATURE] Ring: Add new kv store option `dynamodb`. #5026
30+
* [FEATURE] Enable experimental out-of-order samples support. Added 2 new configs `ingester.out_of_order_time_window` and `blocks-storage.tsdb.out_of_order_cap_max`. #4964
3031
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
3132
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
3233
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,4 +896,9 @@ blocks_storage:
896896
# down.
897897
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
898898
[memory_snapshot_on_shutdown: <boolean> | default = false]
899+
900+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
901+
# be out-of-order.
902+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
903+
[out_of_order_cap_max: <int> | default = 32]
899904
```

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,4 +966,9 @@ blocks_storage:
966966
# down.
967967
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
968968
[memory_snapshot_on_shutdown: <boolean> | default = false]
969+
970+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
971+
# be out-of-order.
972+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
973+
[out_of_order_cap_max: <int> | default = 32]
969974
```

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2772,6 +2772,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27722772
# CLI flag: -ingester.max-global-metadata-per-metric
27732773
[max_global_metadata_per_metric: <int> | default = 0]
27742774
2775+
# [Experimental] Configures the allowed time window for ingestion of
2776+
# out-of-order samples. Disabled (0s) by default.
2777+
# CLI flag: -ingester.out-of-order-time-window
2778+
[out_of_order_time_window: <duration> | default = 0s]
2779+
27752780
# Maximum number of chunks that can be fetched in a single query from ingesters
27762781
# and long-term storage. This limit is enforced in the querier, ruler and
27772782
# store-gateway. 0 to disable.
@@ -2784,8 +2789,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27842789
# CLI flag: -querier.max-fetched-series-per-query
27852790
[max_fetched_series_per_query: <int> | default = 0]
27862791
2787-
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
2788-
# of all chunks in bytes that a query can fetch from each ingester and storage.
2792+
# Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of
2793+
# all chunks in bytes that a query can fetch from each ingester and storage.
27892794
# This limit is enforced in the querier, ruler and store-gateway. 0 to disable.
27902795
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
27912796
[max_fetched_chunk_bytes_per_query: <int> | default = 0]
@@ -3835,6 +3840,11 @@ tsdb:
38353840
# down.
38363841
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
38373842
[memory_snapshot_on_shutdown: <boolean> | default = false]
3843+
3844+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
3845+
# be out-of-order.
3846+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
3847+
[out_of_order_cap_max: <int> | default = 32]
38383848
```
38393849

38403850
### `compactor_config`

docs/configuration/v1-guarantees.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,8 @@ Currently experimental features are:
100100
- `-frontend.query-vertical-shard-size` (int) CLI flag
101101
- `query_vertical_shard_size` (int) field in runtime config file
102102
- Snapshotting of in-memory TSDB on disk during shutdown
103-
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
103+
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
104+
- Out of order samples support
105+
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
106+
- `-ingester.out-of-order-time-window` (duration) CLI flag
107+
- `out_of_order_time_window` (duration) field in runtime config file

pkg/distributor/distributor_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3032,7 +3032,6 @@ func TestDistributorValidation(t *testing.T) {
30323032
}},
30333033
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
30343034
},
3035-
30363035
// Test validation fails for samples from the future.
30373036
{
30383037
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},

pkg/ingester/ingester.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -841,11 +841,13 @@ func (i *Ingester) updateUserTSDBConfigs() {
841841
ExemplarsConfig: &config.ExemplarsConfig{
842842
MaxExemplars: i.getMaxExemplars(userID),
843843
},
844+
TSDBConfig: &config.TSDBConfig{
845+
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
846+
},
844847
},
845848
}
846849

847-
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method
848-
// with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars.
850+
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
849851
err := userDB.db.ApplyConfig(cfg)
850852
if err != nil {
851853
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
@@ -994,6 +996,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
994996
startAppend = time.Now()
995997
sampleOutOfBoundsCount = 0
996998
sampleOutOfOrderCount = 0
999+
sampleTooOldCount = 0
9971000
newValueForTimestampCount = 0
9981001
perUserSeriesLimitCount = 0
9991002
perMetricSeriesLimitCount = 0
@@ -1062,6 +1065,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
10621065
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
10631066
continue
10641067

1068+
case storage.ErrTooOldSample:
1069+
sampleTooOldCount++
1070+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1071+
continue
1072+
10651073
case errMaxSeriesPerUserLimitExceeded:
10661074
perUserSeriesLimitCount++
10671075
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
@@ -1152,6 +1160,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
11521160
if sampleOutOfOrderCount > 0 {
11531161
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
11541162
}
1163+
if sampleTooOldCount > 0 {
1164+
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
1165+
}
11551166
if newValueForTimestampCount > 0 {
11561167
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
11571168
}
@@ -1944,6 +1955,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
19441955
MaxExemplars: maxExemplarsForUser,
19451956
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
19461957
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
1958+
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
1959+
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
19471960
}, nil)
19481961
if err != nil {
19491962
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

pkg/ingester/ingester_test.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func TestIngester_Push(t *testing.T) {
341341
additionalMetrics []string
342342
disableActiveSeries bool
343343
maxExemplars int
344+
oooTimeWindow time.Duration
344345
}{
345346
"should succeed on valid series and metadata": {
346347
reqs: []*cortexpb.WriteRequest{
@@ -553,7 +554,7 @@ func TestIngester_Push(t *testing.T) {
553554
cortex_ingester_memory_series_removed_total{user="test"} 0
554555
`,
555556
},
556-
"should soft fail on sample out of order": {
557+
"ooo disabled, should soft fail on sample out of order": {
557558
reqs: []*cortexpb.WriteRequest{
558559
cortexpb.ToWriteRequest(
559560
[]labels.Labels{metricLabels},
@@ -597,7 +598,7 @@ func TestIngester_Push(t *testing.T) {
597598
cortex_ingester_active_series{user="test"} 1
598599
`,
599600
},
600-
"should soft fail on sample out of bound": {
601+
"ooo disabled, should soft fail on sample out of bound": {
601602
reqs: []*cortexpb.WriteRequest{
602603
cortexpb.ToWriteRequest(
603604
[]labels.Labels{metricLabels},
@@ -641,6 +642,92 @@ func TestIngester_Push(t *testing.T) {
641642
cortex_ingester_active_series{user="test"} 1
642643
`,
643644
},
645+
"ooo enabled, should soft fail on sample too old": {
646+
reqs: []*cortexpb.WriteRequest{
647+
cortexpb.ToWriteRequest(
648+
[]labels.Labels{metricLabels},
649+
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
650+
nil,
651+
cortexpb.API),
652+
cortexpb.ToWriteRequest(
653+
[]labels.Labels{metricLabels},
654+
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (600 * 1000)}},
655+
nil,
656+
cortexpb.API),
657+
},
658+
oooTimeWindow: 5 * time.Minute,
659+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(storage.ErrTooOldSample, model.Time(1575043969-(600*1000)), cortexpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
660+
expectedIngested: []cortexpb.TimeSeries{
661+
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}},
662+
},
663+
expectedMetrics: `
664+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
665+
# TYPE cortex_ingester_ingested_samples_total counter
666+
cortex_ingester_ingested_samples_total 1
667+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
668+
# TYPE cortex_ingester_ingested_samples_failures_total counter
669+
cortex_ingester_ingested_samples_failures_total 1
670+
# HELP cortex_ingester_memory_users The current number of users in memory.
671+
# TYPE cortex_ingester_memory_users gauge
672+
cortex_ingester_memory_users 1
673+
# HELP cortex_ingester_memory_series The current number of series in memory.
674+
# TYPE cortex_ingester_memory_series gauge
675+
cortex_ingester_memory_series 1
676+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
677+
# TYPE cortex_ingester_memory_series_created_total counter
678+
cortex_ingester_memory_series_created_total{user="test"} 1
679+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
680+
# TYPE cortex_ingester_memory_series_removed_total counter
681+
cortex_ingester_memory_series_removed_total{user="test"} 0
682+
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
683+
# TYPE cortex_discarded_samples_total counter
684+
cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1
685+
# HELP cortex_ingester_active_series Number of currently active series per user.
686+
# TYPE cortex_ingester_active_series gauge
687+
cortex_ingester_active_series{user="test"} 1
688+
`,
689+
},
690+
"ooo enabled, should succeed": {
691+
reqs: []*cortexpb.WriteRequest{
692+
cortexpb.ToWriteRequest(
693+
[]labels.Labels{metricLabels},
694+
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
695+
nil,
696+
cortexpb.API),
697+
cortexpb.ToWriteRequest(
698+
[]labels.Labels{metricLabels},
699+
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}},
700+
nil,
701+
cortexpb.API),
702+
},
703+
oooTimeWindow: 5 * time.Minute,
704+
expectedIngested: []cortexpb.TimeSeries{
705+
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}},
706+
},
707+
expectedMetrics: `
708+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
709+
# TYPE cortex_ingester_ingested_samples_total counter
710+
cortex_ingester_ingested_samples_total 2
711+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
712+
# TYPE cortex_ingester_ingested_samples_failures_total counter
713+
cortex_ingester_ingested_samples_failures_total 0
714+
# HELP cortex_ingester_memory_users The current number of users in memory.
715+
# TYPE cortex_ingester_memory_users gauge
716+
cortex_ingester_memory_users 1
717+
# HELP cortex_ingester_memory_series The current number of series in memory.
718+
# TYPE cortex_ingester_memory_series gauge
719+
cortex_ingester_memory_series 1
720+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
721+
# TYPE cortex_ingester_memory_series_created_total counter
722+
cortex_ingester_memory_series_created_total{user="test"} 1
723+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
724+
# TYPE cortex_ingester_memory_series_removed_total counter
725+
cortex_ingester_memory_series_removed_total{user="test"} 0
726+
# HELP cortex_ingester_active_series Number of currently active series per user.
727+
# TYPE cortex_ingester_active_series gauge
728+
cortex_ingester_active_series{user="test"} 1
729+
`,
730+
},
644731
"should soft fail on two different sample values at the same timestamp": {
645732
reqs: []*cortexpb.WriteRequest{
646733
cortexpb.ToWriteRequest(
@@ -777,6 +864,7 @@ func TestIngester_Push(t *testing.T) {
777864

778865
limits := defaultLimitsTestConfig()
779866
limits.MaxExemplars = testData.maxExemplars
867+
limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow)
780868
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry)
781869
require.NoError(t, err)
782870
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))

pkg/ingester/series.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ const (
44
sampleOutOfOrder = "sample-out-of-order"
55
newValueForTimestamp = "new-value-for-timestamp"
66
sampleOutOfBounds = "sample-out-of-bounds"
7+
sampleTooOld = "sample-too-old"
78
)

pkg/storage/tsdb/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/alecthomas/units"
1010
"github.com/pkg/errors"
11+
"github.com/prometheus/prometheus/tsdb"
1112
"github.com/prometheus/prometheus/tsdb/chunks"
1213
"github.com/prometheus/prometheus/tsdb/wlog"
1314
"github.com/thanos-io/thanos/pkg/store"
@@ -45,6 +46,7 @@ var (
4546
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
4647
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
4748
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
49+
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
4850
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
4951
)
5052

@@ -145,11 +147,14 @@ type TSDBConfig struct {
145147
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
146148
CloseIdleTSDBInterval time.Duration `yaml:"-"`
147149

148-
// Positive value enables experiemental support for exemplars. 0 or less to disable.
150+
// Positive value enables experimental support for exemplars. 0 or less to disable.
149151
MaxExemplars int `yaml:"max_exemplars"`
150152

151153
// Enable snapshotting of in-memory TSDB data on disk when shutting down.
152154
MemorySnapshotOnShutdown bool `yaml:"memory_snapshot_on_shutdown"`
155+
156+
// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
157+
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
153158
}
154159

155160
// RegisterFlags registers the TSDBConfig flags.
@@ -176,6 +181,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
176181
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
177182
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
178183
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
184+
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
179185
}
180186

181187
// Validate the config.
@@ -212,6 +218,10 @@ func (cfg *TSDBConfig) Validate() error {
212218
return errInvalidWALSegmentSizeBytes
213219
}
214220

221+
if cfg.OutOfOrderCapMax <= 0 {
222+
return errInvalidOutOfOrderCapMax
223+
}
224+
215225
return nil
216226
}
217227

0 commit comments

Comments
 (0)