Skip to content

Commit 3c1280f

Browse files
committed
support out of order samples ingestion feature
Signed-off-by: Ben Ye <[email protected]>
1 parent da2ad99 commit 3c1280f

File tree

12 files changed

+160
-10
lines changed

12 files changed

+160
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
* [FEATURE] Cache: Support redis as backend for caching bucket and index cache. #5057
3333
* [FEATURE] Querier/Store-Gateway: Added `-blocks-storage.bucket-store.ignore-blocks-within` allowing to filter out the recently created blocks from being synced by queriers and store-gateways. #5166
3434
* [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers.
35+
* [FEATURE] Enable experimental out-of-order samples support. Added 2 new configs `ingester.out_of_order_time_window` and `blocks-storage.tsdb.out_of_order_cap_max`. #4964
3536
* [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008
3637
* [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044
3738
* [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,4 +1227,9 @@ blocks_storage:
12271227
# down.
12281228
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
12291229
[memory_snapshot_on_shutdown: <boolean> | default = false]
1230+
1231+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
1232+
# be out-of-order.
1233+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
1234+
[out_of_order_cap_max: <int> | default = 32]
12301235
```

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,4 +1304,9 @@ blocks_storage:
13041304
# down.
13051305
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
13061306
[memory_snapshot_on_shutdown: <boolean> | default = false]
1307+
1308+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
1309+
# be out-of-order.
1310+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
1311+
[out_of_order_cap_max: <int> | default = 32]
13071312
```

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2772,6 +2772,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27722772
# CLI flag: -ingester.max-global-metadata-per-metric
27732773
[max_global_metadata_per_metric: <int> | default = 0]
27742774
2775+
# [Experimental] Configures the allowed time window for ingestion of
2776+
# out-of-order samples. Disabled (0s) by default.
2777+
# CLI flag: -ingester.out-of-order-time-window
2778+
[out_of_order_time_window: <duration> | default = 0s]
2779+
27752780
# Maximum number of chunks that can be fetched in a single query from ingesters
27762781
# and long-term storage. This limit is enforced in the querier, ruler and
27772782
# store-gateway. 0 to disable.
@@ -2784,8 +2789,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27842789
# CLI flag: -querier.max-fetched-series-per-query
27852790
[max_fetched_series_per_query: <int> | default = 0]
27862791
2787-
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
2788-
# of all chunks in bytes that a query can fetch from each ingester and storage.
2792+
# Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of
2793+
# all chunks in bytes that a query can fetch from each ingester and storage.
27892794
# This limit is enforced in the querier, ruler and store-gateway. 0 to disable.
27902795
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
27912796
[max_fetched_chunk_bytes_per_query: <int> | default = 0]
@@ -4166,6 +4171,11 @@ tsdb:
41664171
# down.
41674172
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
41684173
[memory_snapshot_on_shutdown: <boolean> | default = false]
4174+
4175+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
4176+
# be out-of-order.
4177+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
4178+
[out_of_order_cap_max: <int> | default = 32]
41694179
```
41704180
41714181
### `compactor_config`

docs/configuration/v1-guarantees.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,8 @@ Currently experimental features are:
100100
- `-frontend.query-vertical-shard-size` (int) CLI flag
101101
- `query_vertical_shard_size` (int) field in runtime config file
102102
- Snapshotting of in-memory TSDB on disk during shutdown
103-
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
103+
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
104+
- Out of order samples support
105+
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
106+
- `-ingester.out-of-order-time-window` (duration) CLI flag
107+
- `out_of_order_time_window` (duration) field in runtime config file

pkg/distributor/distributor_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3032,7 +3032,6 @@ func TestDistributorValidation(t *testing.T) {
30323032
}},
30333033
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
30343034
},
3035-
30363035
// Test validation fails for samples from the future.
30373036
{
30383037
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},

pkg/ingester/ingester.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -842,11 +842,13 @@ func (i *Ingester) updateUserTSDBConfigs() {
842842
ExemplarsConfig: &config.ExemplarsConfig{
843843
MaxExemplars: i.getMaxExemplars(userID),
844844
},
845+
TSDBConfig: &config.TSDBConfig{
846+
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
847+
},
845848
},
846849
}
847850

848-
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method
849-
// with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars.
851+
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
850852
err := userDB.db.ApplyConfig(cfg)
851853
if err != nil {
852854
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
@@ -995,6 +997,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
995997
startAppend = time.Now()
996998
sampleOutOfBoundsCount = 0
997999
sampleOutOfOrderCount = 0
1000+
sampleTooOldCount = 0
9981001
newValueForTimestampCount = 0
9991002
perUserSeriesLimitCount = 0
10001003
perMetricSeriesLimitCount = 0
@@ -1063,6 +1066,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
10631066
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
10641067
continue
10651068

1069+
case storage.ErrTooOldSample:
1070+
sampleTooOldCount++
1071+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1072+
continue
1073+
10661074
case errMaxSeriesPerUserLimitExceeded:
10671075
perUserSeriesLimitCount++
10681076
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
@@ -1153,6 +1161,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
11531161
if sampleOutOfOrderCount > 0 {
11541162
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
11551163
}
1164+
if sampleTooOldCount > 0 {
1165+
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
1166+
}
11561167
if newValueForTimestampCount > 0 {
11571168
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
11581169
}
@@ -1947,6 +1958,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
19471958
MaxExemplars: maxExemplarsForUser,
19481959
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
19491960
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
1961+
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
1962+
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
19501963
}, nil)
19511964
if err != nil {
19521965
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

pkg/ingester/ingester_test.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func TestIngester_Push(t *testing.T) {
341341
additionalMetrics []string
342342
disableActiveSeries bool
343343
maxExemplars int
344+
oooTimeWindow time.Duration
344345
}{
345346
"should succeed on valid series and metadata": {
346347
reqs: []*cortexpb.WriteRequest{
@@ -553,7 +554,7 @@ func TestIngester_Push(t *testing.T) {
553554
cortex_ingester_memory_series_removed_total{user="test"} 0
554555
`,
555556
},
556-
"should soft fail on sample out of order": {
557+
"ooo disabled, should soft fail on sample out of order": {
557558
reqs: []*cortexpb.WriteRequest{
558559
cortexpb.ToWriteRequest(
559560
[]labels.Labels{metricLabels},
@@ -597,7 +598,7 @@ func TestIngester_Push(t *testing.T) {
597598
cortex_ingester_active_series{user="test"} 1
598599
`,
599600
},
600-
"should soft fail on sample out of bound": {
601+
"ooo disabled, should soft fail on sample out of bound": {
601602
reqs: []*cortexpb.WriteRequest{
602603
cortexpb.ToWriteRequest(
603604
[]labels.Labels{metricLabels},
@@ -641,6 +642,92 @@ func TestIngester_Push(t *testing.T) {
641642
cortex_ingester_active_series{user="test"} 1
642643
`,
643644
},
645+
"ooo enabled, should soft fail on sample too old": {
646+
reqs: []*cortexpb.WriteRequest{
647+
cortexpb.ToWriteRequest(
648+
[]labels.Labels{metricLabels},
649+
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
650+
nil,
651+
cortexpb.API),
652+
cortexpb.ToWriteRequest(
653+
[]labels.Labels{metricLabels},
654+
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (600 * 1000)}},
655+
nil,
656+
cortexpb.API),
657+
},
658+
oooTimeWindow: 5 * time.Minute,
659+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(storage.ErrTooOldSample, model.Time(1575043969-(600*1000)), cortexpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
660+
expectedIngested: []cortexpb.TimeSeries{
661+
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}},
662+
},
663+
expectedMetrics: `
664+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
665+
# TYPE cortex_ingester_ingested_samples_total counter
666+
cortex_ingester_ingested_samples_total 1
667+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
668+
# TYPE cortex_ingester_ingested_samples_failures_total counter
669+
cortex_ingester_ingested_samples_failures_total 1
670+
# HELP cortex_ingester_memory_users The current number of users in memory.
671+
# TYPE cortex_ingester_memory_users gauge
672+
cortex_ingester_memory_users 1
673+
# HELP cortex_ingester_memory_series The current number of series in memory.
674+
# TYPE cortex_ingester_memory_series gauge
675+
cortex_ingester_memory_series 1
676+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
677+
# TYPE cortex_ingester_memory_series_created_total counter
678+
cortex_ingester_memory_series_created_total{user="test"} 1
679+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
680+
# TYPE cortex_ingester_memory_series_removed_total counter
681+
cortex_ingester_memory_series_removed_total{user="test"} 0
682+
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
683+
# TYPE cortex_discarded_samples_total counter
684+
cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1
685+
# HELP cortex_ingester_active_series Number of currently active series per user.
686+
# TYPE cortex_ingester_active_series gauge
687+
cortex_ingester_active_series{user="test"} 1
688+
`,
689+
},
690+
"ooo enabled, should succeed": {
691+
reqs: []*cortexpb.WriteRequest{
692+
cortexpb.ToWriteRequest(
693+
[]labels.Labels{metricLabels},
694+
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
695+
nil,
696+
cortexpb.API),
697+
cortexpb.ToWriteRequest(
698+
[]labels.Labels{metricLabels},
699+
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}},
700+
nil,
701+
cortexpb.API),
702+
},
703+
oooTimeWindow: 5 * time.Minute,
704+
expectedIngested: []cortexpb.TimeSeries{
705+
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}},
706+
},
707+
expectedMetrics: `
708+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
709+
# TYPE cortex_ingester_ingested_samples_total counter
710+
cortex_ingester_ingested_samples_total 2
711+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
712+
# TYPE cortex_ingester_ingested_samples_failures_total counter
713+
cortex_ingester_ingested_samples_failures_total 0
714+
# HELP cortex_ingester_memory_users The current number of users in memory.
715+
# TYPE cortex_ingester_memory_users gauge
716+
cortex_ingester_memory_users 1
717+
# HELP cortex_ingester_memory_series The current number of series in memory.
718+
# TYPE cortex_ingester_memory_series gauge
719+
cortex_ingester_memory_series 1
720+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
721+
# TYPE cortex_ingester_memory_series_created_total counter
722+
cortex_ingester_memory_series_created_total{user="test"} 1
723+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
724+
# TYPE cortex_ingester_memory_series_removed_total counter
725+
cortex_ingester_memory_series_removed_total{user="test"} 0
726+
# HELP cortex_ingester_active_series Number of currently active series per user.
727+
# TYPE cortex_ingester_active_series gauge
728+
cortex_ingester_active_series{user="test"} 1
729+
`,
730+
},
644731
"should soft fail on two different sample values at the same timestamp": {
645732
reqs: []*cortexpb.WriteRequest{
646733
cortexpb.ToWriteRequest(
@@ -777,6 +864,7 @@ func TestIngester_Push(t *testing.T) {
777864

778865
limits := defaultLimitsTestConfig()
779866
limits.MaxExemplars = testData.maxExemplars
867+
limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow)
780868
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry)
781869
require.NoError(t, err)
782870
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))

pkg/ingester/series.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ const (
44
sampleOutOfOrder = "sample-out-of-order"
55
newValueForTimestamp = "new-value-for-timestamp"
66
sampleOutOfBounds = "sample-out-of-bounds"
7+
sampleTooOld = "sample-too-old"
78
)

pkg/storage/tsdb/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/alecthomas/units"
1010
"github.com/pkg/errors"
11+
"github.com/prometheus/prometheus/tsdb"
1112
"github.com/prometheus/prometheus/tsdb/chunks"
1213
"github.com/prometheus/prometheus/tsdb/wlog"
1314
"github.com/thanos-io/thanos/pkg/store"
@@ -45,6 +46,7 @@ var (
4546
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
4647
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
4748
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
49+
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
4850
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
4951
)
5052

@@ -145,11 +147,14 @@ type TSDBConfig struct {
145147
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
146148
CloseIdleTSDBInterval time.Duration `yaml:"-"`
147149

148-
// Positive value enables experiemental support for exemplars. 0 or less to disable.
150+
// Positive value enables experimental support for exemplars. 0 or less to disable.
149151
MaxExemplars int `yaml:"max_exemplars"`
150152

151153
// Enable snapshotting of in-memory TSDB data on disk when shutting down.
152154
MemorySnapshotOnShutdown bool `yaml:"memory_snapshot_on_shutdown"`
155+
156+
// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
157+
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
153158
}
154159

155160
// RegisterFlags registers the TSDBConfig flags.
@@ -176,6 +181,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
176181
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
177182
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
178183
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
184+
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
179185
}
180186

181187
// Validate the config.
@@ -212,6 +218,10 @@ func (cfg *TSDBConfig) Validate() error {
212218
return errInvalidWALSegmentSizeBytes
213219
}
214220

221+
if cfg.OutOfOrderCapMax <= 0 {
222+
return errInvalidOutOfOrderCapMax
223+
}
224+
215225
return nil
216226
}
217227

pkg/storage/tsdb/config_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func TestConfig_Validate(t *testing.T) {
115115
},
116116
expectedErr: errInvalidWALSegmentSizeBytes,
117117
},
118+
"should fail on out of order cap max": {
119+
setup: func(cfg *BlocksStorageConfig) {
120+
cfg.TSDB.OutOfOrderCapMax = 0
121+
},
122+
expectedErr: errInvalidOutOfOrderCapMax,
123+
},
118124
}
119125

120126
for testName, testData := range tests {

0 commit comments

Comments
 (0)