Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* [ENHANCEMENT] Added FIFO cache metrics for current number of entries and memory usage. #2270
* [ENHANCEMENT] Output all config fields to /config API, including those with empty value. #2209
* [ENHANCEMENT] Add "missing_metric_name" and "metric_name_invalid" reasons to cortex_discarded_samples_total metric. #2346
* [ENHANCEMENT] Experimental TSDB: sample ingestion errors are now reported via existing `cortex_discarded_samples_total` metric. #2370
* [BUGFIX] Ensure user state metrics are updated if a transfer fails. #2338
* [BUGFIX] Fixed etcd client keepalive settings. #2278
* [BUGFIX] Fixed bug in updating last element of FIFO cache. #2270
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
firstPartialErr = errors.Wrapf(err, "series=%s, timestamp=%v", client.FromLabelAdaptersToLabels(ts.Labels).String(), model.Time(s.TimestampMs).Time().Format(time.RFC3339Nano))
}

switch cause {
case tsdb.ErrOutOfBounds:
validation.DiscardedSamples.WithLabelValues(sampleOutOfBounds, userID).Inc()
case tsdb.ErrOutOfOrderSample:
validation.DiscardedSamples.WithLabelValues(sampleOutOfOrder, userID).Inc()
case tsdb.ErrAmendSample:
validation.DiscardedSamples.WithLabelValues(newValueForTimestamp, userID).Inc()
}

continue
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestIngester_v2Push(t *testing.T) {
"cortex_ingester_memory_users",
"cortex_ingester_memory_series_created_total",
"cortex_ingester_memory_series_removed_total",
"cortex_discarded_samples_total",
}
userID := "test"

Expand Down Expand Up @@ -124,6 +125,9 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="sample-out-of-order",user="test"} 1
`,
},
"should soft fail on sample out of bound": {
Expand Down Expand Up @@ -160,6 +164,9 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="sample-out-of-bounds",user="test"} 1
`,
},
"should soft fail on two different sample values at the same timestamp": {
Expand Down Expand Up @@ -196,6 +203,9 @@ func TestIngester_v2Push(t *testing.T) {
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
# TYPE cortex_ingester_memory_series_removed_total counter
cortex_ingester_memory_series_removed_total{user="test"} 0
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="new-value-for-timestamp",user="test"} 1
`,
},
}
Expand All @@ -204,6 +214,9 @@ func TestIngester_v2Push(t *testing.T) {
t.Run(testName, func(t *testing.T) {
registry := prometheus.NewRegistry()

registry.MustRegister(validation.DiscardedSamples)
validation.DiscardedSamples.Reset()

// Create a mocked ingester
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0
Expand Down
16 changes: 12 additions & 4 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ import (
"github.com/cortexproject/cortex/pkg/prom1/storage/metric"
)

const (
sampleOutOfOrder = "sample-out-of-order"
newValueForTimestamp = "new-value-for-timestamp"
sampleOutOfBounds = "sample-out-of-bounds"
duplicateSample = "duplicate-sample"
duplicateTimestamp = "duplicate-timestamp"
)

type memorySeries struct {
metric labels.Labels

Expand Down Expand Up @@ -51,19 +59,19 @@ func (s *memorySeries) add(v model.SamplePair) error {
// If we don't know what the last sample value is, silently discard.
// This will mask some errors but better than complaining when we don't really know.
if !s.lastSampleValueSet {
return makeNoReportError("duplicate-timestamp")
return makeNoReportError(duplicateTimestamp)
}
// If both timestamp and sample value are the same as for the last append,
// ignore as they are a common occurrence when using client-side timestamps
// (e.g. Pushgateway or federation).
if v.Value.Equal(s.lastSampleValue) {
return makeNoReportError("duplicate-sample")
return makeNoReportError(duplicateSample)
}
return makeMetricValidationError("new-value-for-timestamp", s.metric,
return makeMetricValidationError(newValueForTimestamp, s.metric,
fmt.Errorf("sample with repeated timestamp but different value; last value: %v, incoming value: %v", s.lastSampleValue, v.Value))
}
if v.Timestamp < s.lastTime {
return makeMetricValidationError("sample-out-of-order", s.metric,
return makeMetricValidationError(sampleOutOfOrder, s.metric,
fmt.Errorf("sample timestamp out of order; last timestamp: %v, incoming timestamp: %v", s.lastTime, v.Timestamp))
}

Expand Down