From ce957682a25c6ef48496e689320eed527c948898 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 15 May 2025 13:45:20 -0700 Subject: [PATCH 1/4] Push request should fail when label set is out of order Signed-off-by: Alex Le --- pkg/ingester/ingester.go | 14 ++++++++++++++ pkg/ingester/ingester_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index af618488efd..632f50e3c9c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1132,6 +1132,17 @@ type extendedAppender interface { storage.GetRef } +func (i *Ingester) isLabelSetOutOfOrder(labels labels.Labels) bool { + last := "" + for _, l := range labels { + if strings.Compare(last, l.Name) > 0 { + return true + } + last = l.Name + } + return false +} + // Push adds metrics to a block func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { if err := i.checkRunning(); err != nil { @@ -1297,6 +1308,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Look up a reference for this series. tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels) + if i.isLabelSetOutOfOrder(tsLabels) { + return nil, wrapWithUser(errors.Errorf("out-of-order label set found when push: %s", tsLabels), userID) + } tsLabelsHash := tsLabels.Hash() ref, copiedLabels := app.GetRef(tsLabels, tsLabelsHash) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 0da6a49161c..68a3e933578 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2365,6 +2365,33 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) { assert.NoError(t, testutil.GatherAndCompare(registry, strings.NewReader(expectedMetrics), metricNames...)) } +func TestIngester_Push_OutOfOrderLabels(t *testing.T) { + // Create ingester + cfg := defaultIngesterTestConfig(t) + i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Wait until it's ACTIVE + test.Poll(t, time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + ctx := user.InjectOrgID(context.Background(), "test-user") + + outOfOrderLabels := labels.Labels{ + {Name: labels.MetricName, Value: "test_metric"}, + {Name: "c", Value: "3"}, + {Name: "a", Value: "1"}, // Out of order (a comes before c) + } + + req, _ := mockWriteRequest(t, outOfOrderLabels, 1, 2) + _, err = i.Push(ctx, req) + require.Error(t, err) + require.Contains(t, err.Error(), "out-of-order label set found") +} + func BenchmarkIngesterPush(b *testing.B) { limits := defaultLimitsTestConfig() benchmarkIngesterPush(b, limits, false) From 8f81604b9a22220e8464bee9ffcf1ef4c08b1529 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 15 May 2025 14:07:51 -0700 Subject: [PATCH 2/4] updated changelog Signed-off-by: Alex Le --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bdc480d88d..1b7af662c22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ * [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617 * [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628 * [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676 +* [ENHANCEMENT] Ingester: Push request should fail when label set is out of order #6746 * [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 * [BUGFIX] Ingester: Fix labelset data race condition. #6573 * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 From af7c9ee174d139d42297ef1d63a16e0c6a020545 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 15 May 2025 14:28:45 -0700 Subject: [PATCH 3/4] fix test Signed-off-by: Alex Le --- pkg/ingester/ingester_test.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 68a3e933578..3dc9a8914a2 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2757,8 +2757,8 @@ func Test_Ingester_LabelNames(t *testing.T) { value float64 timestamp int64 }{ - {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000}, - {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000}, + {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000}, + {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000}, {labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000}, } @@ -2813,8 +2813,8 @@ func Test_Ingester_LabelValues(t *testing.T) { value float64 timestamp int64 }{ - {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000}, - {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000}, + {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000}, + {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000}, {labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000}, } @@ -2891,7 +2891,7 @@ func Test_Ingester_LabelValue_MaxInflightQueryRequest(t *testing.T) { // Mock request ctx := user.InjectOrgID(context.Background(), "test") - wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000) + wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000) _, err = i.Push(ctx, wreq) require.NoError(t, err) @@ -3050,7 +3050,7 @@ func Test_Ingester_Query_MaxInflightQueryRequest(t *testing.T) { // Mock request ctx := user.InjectOrgID(context.Background(), "test") - wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000) + wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000) _, err = i.Push(ctx, wreq) require.NoError(t, err) @@ -4931,8 +4931,8 @@ func Test_Ingester_UserStats(t *testing.T) { value float64 timestamp int64 }{ - {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000}, - {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000}, + {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000}, + {labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000}, {labels.Labels{{Name: labels.MetricName, Value: "test_2"}}, 2, 200000}, } @@ -4977,8 +4977,8 @@ func Test_Ingester_AllUserStats(t *testing.T) { value float64 timestamp int64 }{ - {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000}, - {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000}, + {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000}, + {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000}, {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000}, {"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000}, {"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000}, @@ -5045,8 +5045,8 @@ func Test_Ingester_AllUserStatsHandler(t *testing.T) { value float64 timestamp int64 }{ - {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000}, - {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "status", Value: "500"}, {Name: "route", Value: "get_user"}}, 1, 110000}, + {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000}, + {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "500"}}, 1, 110000}, {"user-1", labels.Labels{{Name: labels.MetricName, Value: "test_1_2"}}, 2, 200000}, {"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_1"}}, 2, 200000}, {"user-2", labels.Labels{{Name: labels.MetricName, Value: "test_2_2"}}, 2, 200000}, @@ -6503,7 +6503,7 @@ func Test_Ingester_QueryExemplar_MaxInflightQueryRequest(t *testing.T) { // Mock request ctx := user.InjectOrgID(context.Background(), "test") - wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "status", Value: "200"}, {Name: "route", Value: "get_user"}}, 1, 100000) + wreq, _ := mockWriteRequest(t, labels.Labels{{Name: labels.MetricName, Value: "test_1"}, {Name: "route", Value: "get_user"}, {Name: "status", Value: "200"}}, 1, 100000) _, err = i.Push(ctx, wreq) require.NoError(t, err) From 875b31222b4aeb93dd419435b94fa24b9a50d620 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Thu, 15 May 2025 16:33:12 -0700 Subject: [PATCH 4/4] add metric Signed-off-by: Alex Le --- pkg/ingester/ingester.go | 1 + pkg/ingester/ingester_test.go | 11 ++++++++++- pkg/ingester/metrics.go | 6 ++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 632f50e3c9c..f2f15cf5c0d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1309,6 +1309,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // Look up a reference for this series. tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels) if i.isLabelSetOutOfOrder(tsLabels) { + i.metrics.oooLabelsTotal.WithLabelValues(userID).Inc() return nil, wrapWithUser(errors.Errorf("out-of-order label set found when push: %s", tsLabels), userID) } tsLabelsHash := tsLabels.Hash() diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 3dc9a8914a2..adfe83bbee6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -2368,7 +2368,8 @@ func TestIngester_Push_DecreaseInactiveSeries(t *testing.T) { func TestIngester_Push_OutOfOrderLabels(t *testing.T) { // Create ingester cfg := defaultIngesterTestConfig(t) - i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry()) + r := prometheus.NewRegistry() + i, err := prepareIngesterWithBlocksStorage(t, cfg, r) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -2390,6 +2391,14 @@ func TestIngester_Push_OutOfOrderLabels(t *testing.T) { _, err = i.Push(ctx, req) require.Error(t, err) require.Contains(t, err.Error(), "out-of-order label set found") + + metric := ` + # HELP cortex_ingester_out_of_order_labels_total The total number of out of order label found per user. + # TYPE cortex_ingester_out_of_order_labels_total counter + cortex_ingester_out_of_order_labels_total{user="test-user"} 1 +` + err = testutil.GatherAndCompare(r, bytes.NewBufferString(metric), "cortex_ingester_out_of_order_labels_total") + require.NoError(t, err) } func BenchmarkIngesterPush(b *testing.B) { diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index bcb8148149d..260d4cf0281 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -31,6 +31,7 @@ type ingesterMetrics struct { ingestedHistogramsFail prometheus.Counter ingestedExemplarsFail prometheus.Counter ingestedMetadataFail prometheus.Counter + oooLabelsTotal *prometheus.CounterVec queries prometheus.Counter queriedSamples prometheus.Histogram queriedExemplars prometheus.Histogram @@ -112,6 +113,10 @@ func newIngesterMetrics(r prometheus.Registerer, Name: "cortex_ingester_ingested_metadata_failures_total", Help: "The total number of metadata that errored on ingestion.", }), + oooLabelsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_out_of_order_labels_total", + Help: "The total number of out of order label found per user.", + }, []string{"user"}), queries: promauto.With(r).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_queries_total", Help: "The total number of queries the ingester has handled.", @@ -283,6 +288,7 @@ func newIngesterMetrics(r prometheus.Registerer, } func (m *ingesterMetrics) deletePerUserMetrics(userID string) { + m.oooLabelsTotal.DeleteLabelValues(userID) m.memMetadataCreatedTotal.DeleteLabelValues(userID) m.memMetadataRemovedTotal.DeleteLabelValues(userID) m.activeSeriesPerUser.DeleteLabelValues(userID)