Skip to content

Commit 79a3cf9

Browse files
committed
support out of order samples ingestion feature
Signed-off-by: Ben Ye <[email protected]>
1 parent f23d591 commit 79a3cf9

File tree

12 files changed

+160
-10
lines changed

12 files changed

+160
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* [FEATURE] AlertManager/Ruler: Added support for `keep_firing_for` on alerting rulers.
2525
* [FEATURE] Alertmanager: Add support for time_intervals. #5102
2626
* [FEATURE] Added `snappy-block` as an option for grpc compression #5215
27+
* [FEATURE] Enable experimental out-of-order samples support. Added 2 new configs `ingester.out_of_order_time_window` and `blocks-storage.tsdb.out_of_order_cap_max`. #4964
2728
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
2829
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
2930
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1227,4 +1227,9 @@ blocks_storage:
12271227
# down.
12281228
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
12291229
[memory_snapshot_on_shutdown: <boolean> | default = false]
1230+
1231+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
1232+
# be out-of-order.
1233+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
1234+
[out_of_order_cap_max: <int> | default = 32]
12301235
```

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,4 +1304,9 @@ blocks_storage:
13041304
# down.
13051305
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
13061306
[memory_snapshot_on_shutdown: <boolean> | default = false]
1307+
1308+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
1309+
# be out-of-order.
1310+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
1311+
[out_of_order_cap_max: <int> | default = 32]
13071312
```

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1735,6 +1735,11 @@ tsdb:
17351735
# down.
17361736
# CLI flag: -blocks-storage.tsdb.memory-snapshot-on-shutdown
17371737
[memory_snapshot_on_shutdown: <boolean> | default = false]
1738+
1739+
# [EXPERIMENTAL] Configures the maximum number of samples per chunk that can
1740+
# be out-of-order.
1741+
# CLI flag: -blocks-storage.tsdb.out-of-order-cap-max
1742+
[out_of_order_cap_max: <int> | default = 32]
17381743
```
17391744
17401745
### `compactor_config`
@@ -2852,6 +2857,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
28522857
# CLI flag: -ingester.max-global-metadata-per-metric
28532858
[max_global_metadata_per_metric: <int> | default = 0]
28542859
2860+
# [Experimental] Configures the allowed time window for ingestion of
2861+
# out-of-order samples. Disabled (0s) by default.
2862+
# CLI flag: -ingester.out-of-order-time-window
2863+
[out_of_order_time_window: <duration> | default = 0s]
2864+
28552865
# Maximum number of chunks that can be fetched in a single query from ingesters
28562866
# and long-term storage. This limit is enforced in the querier, ruler and
28572867
# store-gateway. 0 to disable.
@@ -2864,8 +2874,8 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
28642874
# CLI flag: -querier.max-fetched-series-per-query
28652875
[max_fetched_series_per_query: <int> | default = 0]
28662876
2867-
# Deprecated (user max-fetched-data-bytes-per-query instead): The maximum size
2868-
# of all chunks in bytes that a query can fetch from each ingester and storage.
2877+
# Deprecated (use max-fetched-data-bytes-per-query instead): The maximum size of
2878+
# all chunks in bytes that a query can fetch from each ingester and storage.
28692879
# This limit is enforced in the querier, ruler and store-gateway. 0 to disable.
28702880
# CLI flag: -querier.max-fetched-chunk-bytes-per-query
28712881
[max_fetched_chunk_bytes_per_query: <int> | default = 0]

docs/configuration/v1-guarantees.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,8 @@ Currently experimental features are:
100100
- `-frontend.query-vertical-shard-size` (int) CLI flag
101101
- `query_vertical_shard_size` (int) field in runtime config file
102102
- Snapshotting of in-memory TSDB on disk during shutdown
103-
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
103+
- `-blocks-storage.tsdb.memory-snapshot-on-shutdown` (boolean) CLI flag
104+
- Out of order samples support
105+
- `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag
106+
- `-ingester.out-of-order-time-window` (duration) CLI flag
107+
- `out_of_order_time_window` (duration) field in runtime config file

pkg/distributor/distributor_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3036,7 +3036,6 @@ func TestDistributorValidation(t *testing.T) {
30363036
}},
30373037
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
30383038
},
3039-
30403039
// Test validation fails for samples from the future.
30413040
{
30423041
labels: []labels.Labels{{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}},

pkg/ingester/ingester.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -842,11 +842,13 @@ func (i *Ingester) updateUserTSDBConfigs() {
842842
ExemplarsConfig: &config.ExemplarsConfig{
843843
MaxExemplars: i.getMaxExemplars(userID),
844844
},
845+
TSDBConfig: &config.TSDBConfig{
846+
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
847+
},
845848
},
846849
}
847850

848-
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow. Invoking this method
849-
// with a 0 value of OutOfOrderTimeWindow simply updates Max Exemplars.
851+
// This method currently updates the MaxExemplars and OutOfOrderTimeWindow.
850852
err := userDB.db.ApplyConfig(cfg)
851853
if err != nil {
852854
level.Error(logutil.WithUserID(userID, i.logger)).Log("msg", "failed to update user tsdb configuration.")
@@ -995,6 +997,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
995997
startAppend = time.Now()
996998
sampleOutOfBoundsCount = 0
997999
sampleOutOfOrderCount = 0
1000+
sampleTooOldCount = 0
9981001
newValueForTimestampCount = 0
9991002
perUserSeriesLimitCount = 0
10001003
perMetricSeriesLimitCount = 0
@@ -1063,6 +1066,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
10631066
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
10641067
continue
10651068

1069+
case storage.ErrTooOldSample:
1070+
sampleTooOldCount++
1071+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1072+
continue
1073+
10661074
case errMaxSeriesPerUserLimitExceeded:
10671075
perUserSeriesLimitCount++
10681076
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
@@ -1153,6 +1161,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
11531161
if sampleOutOfOrderCount > 0 {
11541162
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Add(float64(sampleOutOfOrderCount))
11551163
}
1164+
if sampleTooOldCount > 0 {
1165+
validation.DiscardedSamples.WithLabelValues(sampleTooOld, userID).Add(float64(sampleTooOldCount))
1166+
}
11561167
if newValueForTimestampCount > 0 {
11571168
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Add(float64(newValueForTimestampCount))
11581169
}
@@ -1947,6 +1958,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
19471958
MaxExemplars: maxExemplarsForUser,
19481959
HeadChunksWriteQueueSize: i.cfg.BlocksStorageConfig.TSDB.HeadChunksWriteQueueSize,
19491960
EnableMemorySnapshotOnShutdown: i.cfg.BlocksStorageConfig.TSDB.MemorySnapshotOnShutdown,
1961+
OutOfOrderTimeWindow: time.Duration(i.limits.OutOfOrderTimeWindow(userID)).Milliseconds(),
1962+
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
19501963
}, nil)
19511964
if err != nil {
19521965
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

pkg/ingester/ingester_test.go

Lines changed: 90 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ func TestIngester_Push(t *testing.T) {
341341
additionalMetrics []string
342342
disableActiveSeries bool
343343
maxExemplars int
344+
oooTimeWindow time.Duration
344345
}{
345346
"should succeed on valid series and metadata": {
346347
reqs: []*cortexpb.WriteRequest{
@@ -553,7 +554,7 @@ func TestIngester_Push(t *testing.T) {
553554
cortex_ingester_memory_series_removed_total{user="test"} 0
554555
`,
555556
},
556-
"should soft fail on sample out of order": {
557+
"ooo disabled, should soft fail on sample out of order": {
557558
reqs: []*cortexpb.WriteRequest{
558559
cortexpb.ToWriteRequest(
559560
[]labels.Labels{metricLabels},
@@ -597,7 +598,7 @@ func TestIngester_Push(t *testing.T) {
597598
cortex_ingester_active_series{user="test"} 1
598599
`,
599600
},
600-
"should soft fail on sample out of bound": {
601+
"ooo disabled, should soft fail on sample out of bound": {
601602
reqs: []*cortexpb.WriteRequest{
602603
cortexpb.ToWriteRequest(
603604
[]labels.Labels{metricLabels},
@@ -641,6 +642,92 @@ func TestIngester_Push(t *testing.T) {
641642
cortex_ingester_active_series{user="test"} 1
642643
`,
643644
},
645+
"ooo enabled, should soft fail on sample too old": {
646+
reqs: []*cortexpb.WriteRequest{
647+
cortexpb.ToWriteRequest(
648+
[]labels.Labels{metricLabels},
649+
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
650+
nil,
651+
cortexpb.API),
652+
cortexpb.ToWriteRequest(
653+
[]labels.Labels{metricLabels},
654+
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (600 * 1000)}},
655+
nil,
656+
cortexpb.API),
657+
},
658+
oooTimeWindow: 5 * time.Minute,
659+
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(wrappedTSDBIngestErr(storage.ErrTooOldSample, model.Time(1575043969-(600*1000)), cortexpb.FromLabelsToLabelAdapters(metricLabels)), userID).Error()),
660+
expectedIngested: []cortexpb.TimeSeries{
661+
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}}},
662+
},
663+
expectedMetrics: `
664+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
665+
# TYPE cortex_ingester_ingested_samples_total counter
666+
cortex_ingester_ingested_samples_total 1
667+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
668+
# TYPE cortex_ingester_ingested_samples_failures_total counter
669+
cortex_ingester_ingested_samples_failures_total 1
670+
# HELP cortex_ingester_memory_users The current number of users in memory.
671+
# TYPE cortex_ingester_memory_users gauge
672+
cortex_ingester_memory_users 1
673+
# HELP cortex_ingester_memory_series The current number of series in memory.
674+
# TYPE cortex_ingester_memory_series gauge
675+
cortex_ingester_memory_series 1
676+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
677+
# TYPE cortex_ingester_memory_series_created_total counter
678+
cortex_ingester_memory_series_created_total{user="test"} 1
679+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
680+
# TYPE cortex_ingester_memory_series_removed_total counter
681+
cortex_ingester_memory_series_removed_total{user="test"} 0
682+
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
683+
# TYPE cortex_discarded_samples_total counter
684+
cortex_discarded_samples_total{reason="sample-too-old",user="test"} 1
685+
# HELP cortex_ingester_active_series Number of currently active series per user.
686+
# TYPE cortex_ingester_active_series gauge
687+
cortex_ingester_active_series{user="test"} 1
688+
`,
689+
},
690+
"ooo enabled, should succeed": {
691+
reqs: []*cortexpb.WriteRequest{
692+
cortexpb.ToWriteRequest(
693+
[]labels.Labels{metricLabels},
694+
[]cortexpb.Sample{{Value: 2, TimestampMs: 1575043969}},
695+
nil,
696+
cortexpb.API),
697+
cortexpb.ToWriteRequest(
698+
[]labels.Labels{metricLabels},
699+
[]cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}},
700+
nil,
701+
cortexpb.API),
702+
},
703+
oooTimeWindow: 5 * time.Minute,
704+
expectedIngested: []cortexpb.TimeSeries{
705+
{Labels: metricLabelAdapters, Samples: []cortexpb.Sample{{Value: 1, TimestampMs: 1575043969 - (60 * 1000)}, {Value: 2, TimestampMs: 1575043969}}},
706+
},
707+
expectedMetrics: `
708+
# HELP cortex_ingester_ingested_samples_total The total number of samples ingested.
709+
# TYPE cortex_ingester_ingested_samples_total counter
710+
cortex_ingester_ingested_samples_total 2
711+
# HELP cortex_ingester_ingested_samples_failures_total The total number of samples that errored on ingestion.
712+
# TYPE cortex_ingester_ingested_samples_failures_total counter
713+
cortex_ingester_ingested_samples_failures_total 0
714+
# HELP cortex_ingester_memory_users The current number of users in memory.
715+
# TYPE cortex_ingester_memory_users gauge
716+
cortex_ingester_memory_users 1
717+
# HELP cortex_ingester_memory_series The current number of series in memory.
718+
# TYPE cortex_ingester_memory_series gauge
719+
cortex_ingester_memory_series 1
720+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
721+
# TYPE cortex_ingester_memory_series_created_total counter
722+
cortex_ingester_memory_series_created_total{user="test"} 1
723+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
724+
# TYPE cortex_ingester_memory_series_removed_total counter
725+
cortex_ingester_memory_series_removed_total{user="test"} 0
726+
# HELP cortex_ingester_active_series Number of currently active series per user.
727+
# TYPE cortex_ingester_active_series gauge
728+
cortex_ingester_active_series{user="test"} 1
729+
`,
730+
},
644731
"should soft fail on two different sample values at the same timestamp": {
645732
reqs: []*cortexpb.WriteRequest{
646733
cortexpb.ToWriteRequest(
@@ -777,6 +864,7 @@ func TestIngester_Push(t *testing.T) {
777864

778865
limits := defaultLimitsTestConfig()
779866
limits.MaxExemplars = testData.maxExemplars
867+
limits.OutOfOrderTimeWindow = model.Duration(testData.oooTimeWindow)
780868
i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, "", registry)
781869
require.NoError(t, err)
782870
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))

pkg/ingester/series.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ const (
44
sampleOutOfOrder = "sample-out-of-order"
55
newValueForTimestamp = "new-value-for-timestamp"
66
sampleOutOfBounds = "sample-out-of-bounds"
7+
sampleTooOld = "sample-too-old"
78
)

pkg/storage/tsdb/config.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/alecthomas/units"
1010
"github.com/pkg/errors"
11+
"github.com/prometheus/prometheus/tsdb"
1112
"github.com/prometheus/prometheus/tsdb/chunks"
1213
"github.com/prometheus/prometheus/tsdb/wlog"
1314
"github.com/thanos-io/thanos/pkg/store"
@@ -45,6 +46,7 @@ var (
4546
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
4647
errInvalidWALSegmentSizeBytes = errors.New("invalid TSDB WAL segment size bytes")
4748
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
49+
errInvalidOutOfOrderCapMax = errors.New("invalid TSDB OOO chunks capacity (in samples)")
4850
errEmptyBlockranges = errors.New("empty block ranges for TSDB")
4951
)
5052

@@ -145,11 +147,14 @@ type TSDBConfig struct {
145147
// How often to check for idle TSDBs for closing. DefaultCloseIdleTSDBInterval is not suitable for testing, so tests can override.
146148
CloseIdleTSDBInterval time.Duration `yaml:"-"`
147149

148-
// Positive value enables experiemental support for exemplars. 0 or less to disable.
150+
// Positive value enables experimental support for exemplars. 0 or less to disable.
149151
MaxExemplars int `yaml:"max_exemplars"`
150152

151153
// Enable snapshotting of in-memory TSDB data on disk when shutting down.
152154
MemorySnapshotOnShutdown bool `yaml:"memory_snapshot_on_shutdown"`
155+
156+
// OutOfOrderCapMax is maximum capacity for OOO chunks (in samples).
157+
OutOfOrderCapMax int64 `yaml:"out_of_order_cap_max"`
153158
}
154159

155160
// RegisterFlags registers the TSDBConfig flags.
@@ -176,6 +181,7 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) {
176181
f.IntVar(&cfg.HeadChunksWriteQueueSize, "blocks-storage.tsdb.head-chunks-write-queue-size", chunks.DefaultWriteQueueSize, "The size of the in-memory queue used before flushing chunks to the disk.")
177182
f.IntVar(&cfg.MaxExemplars, "blocks-storage.tsdb.max-exemplars", 0, "Deprecated, use maxExemplars in limits instead. If the MaxExemplars value in limits is set to zero, cortex will fallback on this value. This setting enables support for exemplars in TSDB and sets the maximum number that will be stored. 0 or less means disabled.")
178183
f.BoolVar(&cfg.MemorySnapshotOnShutdown, "blocks-storage.tsdb.memory-snapshot-on-shutdown", false, "True to enable snapshotting of in-memory TSDB data on disk when shutting down.")
184+
f.Int64Var(&cfg.OutOfOrderCapMax, "blocks-storage.tsdb.out-of-order-cap-max", tsdb.DefaultOutOfOrderCapMax, "[EXPERIMENTAL] Configures the maximum number of samples per chunk that can be out-of-order.")
179185
}
180186

181187
// Validate the config.
@@ -212,6 +218,10 @@ func (cfg *TSDBConfig) Validate() error {
212218
return errInvalidWALSegmentSizeBytes
213219
}
214220

221+
if cfg.OutOfOrderCapMax <= 0 {
222+
return errInvalidOutOfOrderCapMax
223+
}
224+
215225
return nil
216226
}
217227

pkg/storage/tsdb/config_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func TestConfig_Validate(t *testing.T) {
115115
},
116116
expectedErr: errInvalidWALSegmentSizeBytes,
117117
},
118+
"should fail on out of order cap max": {
119+
setup: func(cfg *BlocksStorageConfig) {
120+
cfg.TSDB.OutOfOrderCapMax = 0
121+
},
122+
expectedErr: errInvalidOutOfOrderCapMax,
123+
},
118124
}
119125

120126
for testName, testData := range tests {

0 commit comments

Comments
 (0)