Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617
* [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628
* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676
* [ENHANCEMENT] Ingester: Push request should fail when label set is out of order #6746
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
15 changes: 15 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,17 @@ type extendedAppender interface {
storage.GetRef
}

func (i *Ingester) isLabelSetOutOfOrder(labels labels.Labels) bool {
last := ""
for _, l := range labels {
if strings.Compare(last, l.Name) > 0 {
return true
}
last = l.Name
}
return false
}

// Push adds metrics to a block
func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
if err := i.checkRunning(); err != nil {
Expand Down Expand Up @@ -1297,6 +1308,10 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Look up a reference for this series.
tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels)
if i.isLabelSetOutOfOrder(tsLabels) {
i.metrics.oooLabelsTotal.WithLabelValues(userID).Inc()
return nil, wrapWithUser(errors.Errorf("out-of-order label set found when push: %s", tsLabels), userID)
}
tsLabelsHash := tsLabels.Hash()
ref, copiedLabels := app.GetRef(tsLabels, tsLabelsHash)

Expand Down
62 changes: 49 additions & 13 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2365,6 +2365,42 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) {
assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...))
}

func TestIngester_Push_OutOfOrderLabels(t *testing.T) {
// Create ingester
cfg := defaultIngesterTestConfig(t)
r := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, r)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test-user")

outOfOrderLabels := labels.Labels{
{Name: labels.MetricName, Value: "test_metric"},
{Name: "c", Value: "3"},
{Name: "a", Value: "1"}, // Out of order (a comes before c)
}

req, _ := mockWriteRequest(t, outOfOrderLabels, 1, 2)
_, err = i.Push(ctx, req)
require.Error(t, err)
require.Contains(t, err.Error(), "out-of-order label set found")

metric := `
# HELP cortex_ingester_out_of_order_labels_total The total number of out of order label found per user.
# TYPE cortex_ingester_out_of_order_labels_total counter
cortex_ingester_out_of_order_labels_total{user="test-user"} 1
`
err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_out_of_order_labels_total")
require.NoError(t, err)
}

func BenchmarkIngesterPush(b *testing.B) {
limits := defaultLimitsTestConfig()
benchmarkIngesterPush(b, limits, false)
Expand Down Expand Up @@ -2730,8 +2766,8 @@ func Test_Ingester_LabelNames(t *testing.T) {
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
}

Expand Down Expand Up @@ -2786,8 +2822,8 @@ func Test_Ingester_LabelValues(t *testing.T) {
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
}

Expand Down Expand Up @@ -2864,7 +2900,7 @@ func Test_Ingester_LabelValue_MaxInflightQueryRequest(t *testing.T) {
// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

Expand Down Expand Up @@ -3023,7 +3059,7 @@ func Test_Ingester_Query_MaxInflightQueryRequest(t *testing.T) {
// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

Expand Down Expand Up @@ -4904,8 +4940,8 @@ func Test_Ingester_UserStats(t *testing.T) {
value float64
timestamp int64
}{
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
{labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
{labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000},
}

Expand Down Expand Up @@ -4950,8 +4986,8 @@ func Test_Ingester_AllUserStats(t *testing.T) {
value float64
timestamp int64
}{
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000},
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000},
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000},
Expand Down Expand Up @@ -5018,8 +5054,8 @@ func Test_Ingester_AllUserStatsHandler(t *testing.T) {
value float64
timestamp int64
}{
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000},
{"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000},
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000},
{"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000},
Expand Down Expand Up @@ -6476,7 +6512,7 @@ func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) {
// Mock request
ctx := user.InjectOrgID(context.Background(), "test")

wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000)
wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000)
_, err = i.Push(ctx, wreq)
require.NoError(t, err)

Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ingesterMetrics struct {
ingestedHistogramsFail prometheus.Counter
ingestedExemplarsFail prometheus.Counter
ingestedMetadataFail prometheus.Counter
oooLabelsTotal *prometheus.CounterVec
queries prometheus.Counter
queriedSamples prometheus.Histogram
queriedExemplars prometheus.Histogram
Expand Down Expand Up @@ -112,6 +113,10 @@ func newIngesterMetrics(r prometheus.Registerer,
Name: "cortex_ingester_ingested_metadata_failures_total",
Help: "The total number of metadata that errored on ingestion.",
}),
oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_out_of_order_labels_total",
Help: "The total number of out of order label found per user.",
}, []string{"user"}),
queries: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_queries_total",
Help: "The total number of queries the ingester has handled.",
Expand Down Expand Up @@ -283,6 +288,7 @@ func newIngesterMetrics(r prometheus.Registerer,
}

func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
m.oooLabelsTotal.DeleteLabelValues(userID)
m.memMetadataCreatedTotal.DeleteLabelValues(userID)
m.memMetadataRemovedTotal.DeleteLabelValues(userID)
m.activeSeriesPerUser.DeleteLabelValues(userID)
Expand Down
Loading