From 599b04dd88cfc5699d72859820f85e8fdedc9db5 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Jun 2024 21:55:01 -0700 Subject: [PATCH 1/5] integration test for native histograms Signed-off-by: Ben Ye --- integration/e2e/util.go | 43 +++++++++++ integration/native_histogram_test.go | 108 +++++++++++++++++++++++++++ 2 files changed, 151 insertions(+) create mode 100644 integration/native_histogram_test.go diff --git a/integration/e2e/util.go b/integration/e2e/util.go index ef877020c0..1f5de98037 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -15,14 +15,17 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + histogram_util "github.com/cortexproject/cortex/pkg/util/histogram" ) func RunCommandAndGetOutput(name string, args ...string) ([]byte, error) { @@ -147,6 +150,46 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label) return } +func GenerateHistogramSeries(name string, ts time.Time, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) { + tsMillis := TimeToMilliseconds(ts) + i := rand.Uint32() + + lbls := append( + []prompb.Label{ + {Name: labels.MetricName, Value: name}, + }, + additionalLabels..., + ) + + // Generate the expected vector when querying it + metric := model.Metric{} + metric[labels.MetricName] = model.LabelValue(name) + for _, lbl := range additionalLabels { + metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) + } + + var ( + h *histogram.Histogram + fh *histogram.FloatHistogram + ph prompb.Histogram + ) + if floatHistogram { + fh = histogram_util.GenerateTestFloatHistogram(int(i)) + ph = remote.FloatHistogramToHistogramProto(tsMillis, fh) + } else { + h = histogram_util.GenerateTestHistogram(int(i)) + ph = remote.HistogramToHistogramProto(tsMillis, h) + } + + // Generate the series + series = append(series, prompb.TimeSeries{ + Labels: lbls, + Histograms: []prompb.Histogram{ph}, + }) + + return +} + func GenerateSeriesWithSamples( name string, startTime time.Time, diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go new file mode 100644 index 0000000000..f75bc9fa01 --- /dev/null +++ b/integration/native_histogram_test.go @@ -0,0 +1,108 @@ +//go:build requires_docker +// +build requires_docker + +package integration + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/integration/e2e" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/prometheus/prometheus/prompb" +) + +func TestNativeHistogramIngestionAndQuery(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.tsdb.enable-native-histograms": "true", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now() + series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) + series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err := c.Push(append(series1, series1Float...)) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"}) + series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"}) + res, err = c.Push(append(series2, series2Float...)) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 2 series from `series_1` and `series_2` will be in head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(4), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series")) + + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + }), "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "1s", + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) + + // Sleep 3 * bucket sync interval to make sure consistency checker + // doesn't consider block is uploaded recently. + time.Sleep(3 * time.Second) + + // Query back the series. + c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + result, err := c.Query(`series_1`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + require.Equal(t, 2, result.(model.Vector).Len()) + + result, err = c.Query(`series_2`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + require.Equal(t, 2, result.(model.Vector).Len()) +} From 2dc42c615102c9038605bd60e5afb898d071121b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Jun 2024 22:41:06 -0700 Subject: [PATCH 2/5] lint Signed-off-by: Ben Ye --- integration/native_histogram_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go index f75bc9fa01..450c3f2e92 100644 --- a/integration/native_histogram_test.go +++ b/integration/native_histogram_test.go @@ -8,12 +8,12 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" - "github.com/prometheus/prometheus/prompb" ) func TestNativeHistogramIngestionAndQuery(t *testing.T) { From 1a9904bf3a0229ef344bc7a1b18b1ba976017a3d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 27 Jun 2024 14:04:16 -0700 Subject: [PATCH 3/5] add nil check for histogram sample Signed-off-by: Ben Ye --- integration/native_histogram_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go index 450c3f2e92..c8ad1f6a9c 100644 --- a/integration/native_histogram_test.go +++ b/integration/native_histogram_test.go @@ -99,10 +99,18 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { result, err := c.Query(`series_1`, series2Timestamp) require.NoError(t, err) require.Equal(t, model.ValVector, result.Type()) - require.Equal(t, 2, result.(model.Vector).Len()) + v := result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + } result, err = c.Query(`series_2`, series2Timestamp) require.NoError(t, err) require.Equal(t, model.ValVector, result.Type()) - require.Equal(t, 2, result.(model.Vector).Len()) + v = result.(model.Vector) + require.Equal(t, 2, v.Len()) + for _, s := range v { + require.NotNil(t, s.Histogram) + } } From b48bd849f026db98a47e73f5224cb39f6488bbad Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 27 Jun 2024 16:36:30 -0700 Subject: [PATCH 4/5] add test coverage for range query API Signed-off-by: Ben Ye --- integration/native_histogram_test.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go index c8ad1f6a9c..eebc42356d 100644 --- a/integration/native_histogram_test.go +++ b/integration/native_histogram_test.go @@ -96,7 +96,27 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) - result, err := c.Query(`series_1`, series2Timestamp) + result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + m := result.(model.Matrix) + require.Equal(t, 2, m.Len()) + for _, ss := range m { + require.Empty(t, ss.Values) + require.NotEmpty(t, ss.Histograms) + } + + result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + m = result.(model.Matrix) + require.Equal(t, 2, m.Len()) + for _, ss := range m { + require.Empty(t, ss.Values) + require.NotEmpty(t, ss.Histograms) + } + + result, err = c.Query(`series_1`, series2Timestamp) require.NoError(t, err) require.Equal(t, model.ValVector, result.Type()) v := result.(model.Vector) From d5f44c0c158af8ca2eb74667a40fd39e8760a636 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Thu, 27 Jun 2024 16:39:28 -0700 Subject: [PATCH 5/5] check each histogram Signed-off-by: Ben Ye --- integration/native_histogram_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/integration/native_histogram_test.go b/integration/native_histogram_test.go index eebc42356d..1e27576c2e 100644 --- a/integration/native_histogram_test.go +++ b/integration/native_histogram_test.go @@ -104,6 +104,9 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { for _, ss := range m { require.Empty(t, ss.Values) require.NotEmpty(t, ss.Histograms) + for _, h := range ss.Histograms { + require.NotEmpty(t, h) + } } result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second) @@ -114,6 +117,9 @@ func TestNativeHistogramIngestionAndQuery(t *testing.T) { for _, ss := range m { require.Empty(t, ss.Values) require.NotEmpty(t, ss.Histograms) + for _, h := range ss.Histograms { + require.NotEmpty(t, h) + } } result, err = c.Query(`series_1`, series2Timestamp)