diff --git a/integration/e2e/images/images.go b/integration/e2e/images/images.go index b6c113298e..2c90eb5bd5 100644 --- a/integration/e2e/images/images.go +++ b/integration/e2e/images/images.go @@ -11,5 +11,5 @@ var ( Minio = "minio/minio:RELEASE.2024-05-28T17-19-04Z" Consul = "consul:1.8.4" ETCD = "gcr.io/etcd-development/etcd:v3.4.7" - Prometheus = "quay.io/prometheus/prometheus:v2.51.0" + Prometheus = "quay.io/prometheus/prometheus:v3.2.1" ) diff --git a/integration/e2e/util.go b/integration/e2e/util.go index 3ea03d5bdd..1f0cb7a570 100644 --- a/integration/e2e/util.go +++ b/integration/e2e/util.go @@ -24,6 +24,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/runutil" + "go.uber.org/atomic" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" ) @@ -259,6 +260,92 @@ func RandRange(rnd *rand.Rand, min, max int64) int64 { return rnd.Int63n(max-min) + min } +func CreateNHBlock( + ctx context.Context, + rnd *rand.Rand, + dir string, + series []labels.Labels, + numNHSamples int, + mint, maxt int64, + scrapeInterval int64, + seriesSize int64, +) (id ulid.ULID, err error) { + headOpts := tsdb.DefaultHeadOptions() + headOpts.EnableNativeHistograms = *atomic.NewBool(true) + headOpts.ChunkDirRoot = filepath.Join(dir, "chunks") + headOpts.ChunkRange = 10000000000 + random := rand.New(rand.NewSource(time.Now().UnixNano())) + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + if err != nil { + return id, errors.Wrap(err, "create head block") + } + defer func() { + runutil.CloseWithErrCapture(&err, h, "TSDB Head") + if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil { + err = errors.Wrap(e, "delete chunks dir") + } + }() + + app := h.Appender(ctx) + for i := 0; i < len(series); i++ { + num := random.Intn(i + 1) + var ref storage.SeriesRef + start := RandRange(rnd, mint, maxt) + for j := 0; j < numNHSamples; j++ { + if num%2 == 0 { + // append float histogram + ref, err = app.AppendHistogram(ref, series[i], start, nil, tsdbutil.GenerateTestFloatHistogram(int64(i+j))) + } else { + // append histogram + ref, err = app.AppendHistogram(ref, series[i], start, tsdbutil.GenerateTestHistogram(int64(i+j)), nil) + } + if err != nil { + if rerr := app.Rollback(); rerr != nil { + err = errors.Wrapf(err, "rollback failed: %v", rerr) + } + return id, errors.Wrap(err, "add NH sample") + } + start += scrapeInterval + if start > maxt { + break + } + } + } + if err := app.Commit(); err != nil { + return id, errors.Wrap(err, "commit") + } + + c, err := tsdb.NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{maxt - mint}, nil, nil) + if err != nil { + return id, errors.Wrap(err, "create compactor") + } + + ids, err := c.Write(dir, h, mint, maxt, nil) + if err != nil { + return id, errors.Wrap(err, "write block") + } + if len(ids) == 0 { + return id, errors.Errorf("nothing to write, asked for %d samples", numNHSamples) + } + id = ids[0] + + blockDir := filepath.Join(dir, id.String()) + logger := log.NewNopLogger() + + if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{ + Labels: map[string]string{ + cortex_tsdb.IngesterIDExternalLabel: "ingester-0", + }, + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize}, + }, nil); err != nil { + return id, errors.Wrap(err, "finalize block") + } + + return id, nil +} + func CreateBlock( ctx context.Context, rnd *rand.Rand, diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index b1d5fbba26..04d2aeff3c 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -52,8 +52,116 @@ func init() { } } +func TestNativeHistogramFuzz(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": "24h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-querier.query-store-for-labels-enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + }, + ) + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + now := time.Now() + start := now.Add(-time.Hour * 2) + end := now.Add(-time.Hour) + numSeries := 10 + numSamples := 60 + lbls := make([]labels.Labels, 0, numSeries*2) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.Labels{ + {Name: labels.MetricName, Value: "test_series_a"}, + {Name: "job", Value: "test"}, + {Name: "series", Value: strconv.Itoa(i % 3)}, + {Name: "status_code", Value: statusCodes[i%5]}, + }) + + lbls = append(lbls, labels.Labels{ + {Name: labels.MetricName, Value: "test_series_b"}, + {Name: "job", Value: "test"}, + {Name: "series", Value: strconv.Itoa((i + 1) % 3)}, + {Name: "status_code", Value: statusCodes[(i+1)%5]}, + }) + } + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + dir := filepath.Join(s.SharedDir(), "data") + err = os.MkdirAll(dir, os.ModePerm) + require.NoError(t, err) + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + id, err := e2e.CreateNHBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait for querier and store to sync blocks. + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway")))) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier")))) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics)) + + c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + err = writeFileToSharedDir(s, "prometheus.yml", []byte("")) + require.NoError(t, err) + prom := e2edb.NewPrometheus("", nil) + require.NoError(t, s.StartAndWaitReady(prom)) + + c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint()) + require.NoError(t, err) + + waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end) + + opts := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(true), + promqlsmith.WithEnableAtModifier(true), + promqlsmith.WithEnabledAggrs([]parser.ItemType{ + parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE, + }), + } + ps := promqlsmith.New(rnd, lbls, opts...) + + runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false) +} + func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) { - prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1" s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -148,7 +256,7 @@ func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) { err = writeFileToSharedDir(s, "prometheus.yml", []byte("")) require.NoError(t, err) - prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{ + prom := e2edb.NewPrometheus("", map[string]string{ "--enable-feature": "promql-experimental-functions", }) require.NoError(t, s.StartAndWaitReady(prom)) @@ -841,6 +949,10 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool { const fraction = 1.e-10 // 0.00000001% return cmp.Equal(l, r, cmpopts.EquateNaNs(), cmpopts.EquateApprox(fraction, epsilon)) } + compareHistograms := func(l, r *model.SampleHistogram) bool { + return l.Equal(r) + } + // count_values returns a metrics with one label {"value": "1.012321"} compareValueMetrics := func(l, r model.Metric) (valueMetric bool, equals bool) { lLabels := model.LabelSet(l).Clone() @@ -906,6 +1018,9 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool { if !compareFloats(float64(vx[i].Value), float64(vy[i].Value)) { return false } + if !compareHistograms(vx[i].Histogram, vy[i].Histogram) { + return false + } } return true } @@ -942,6 +1057,21 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool { return false } } + + xhs := mxs.Histograms + yhs := mys.Histograms + + if len(xhs) != len(yhs) { + return false + } + for j := 0; j < len(xhs); j++ { + if xhs[j].Timestamp != yhs[j].Timestamp { + return false + } + if !compareHistograms(xhs[j].Histogram, yhs[j].Histogram) { + return false + } + } } return true } @@ -1423,7 +1553,6 @@ func TestBackwardCompatibilityQueryFuzz(t *testing.T) { // TestPrometheusCompatibilityQueryFuzz compares Cortex with latest Prometheus release. func TestPrometheusCompatibilityQueryFuzz(t *testing.T) { - prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1" s, err := e2e.NewScenario(networkName) require.NoError(t, err) defer s.Close() @@ -1516,7 +1645,7 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) { err = writeFileToSharedDir(s, "prometheus.yml", []byte("")) require.NoError(t, err) - prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{}) + prom := e2edb.NewPrometheus("", map[string]string{}) require.NoError(t, s.StartAndWaitReady(prom)) c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())