Skip to content

Add native histogram query fuzz test #6654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integration/e2e/images/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ var (
Minio = "minio/minio:RELEASE.2024-05-28T17-19-04Z"
Consul = "consul:1.8.4"
ETCD = "gcr.io/etcd-development/etcd:v3.4.7"
Prometheus = "quay.io/prometheus/prometheus:v2.51.0"
Prometheus = "quay.io/prometheus/prometheus:v3.2.1"
)
87 changes: 87 additions & 0 deletions integration/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
"go.uber.org/atomic"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
)
Expand Down Expand Up @@ -259,6 +260,92 @@ func RandRange(rnd *rand.Rand, min, max int64) int64 {
return rnd.Int63n(max-min) + min
}

func CreateNHBlock(
ctx context.Context,
rnd *rand.Rand,
dir string,
series []labels.Labels,
numNHSamples int,
mint, maxt int64,
scrapeInterval int64,
seriesSize int64,
) (id ulid.ULID, err error) {
headOpts := tsdb.DefaultHeadOptions()
headOpts.EnableNativeHistograms = *atomic.NewBool(true)
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
headOpts.ChunkRange = 10000000000
random := rand.New(rand.NewSource(time.Now().UnixNano()))
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
if err != nil {
return id, errors.Wrap(err, "create head block")
}
defer func() {
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil {
err = errors.Wrap(e, "delete chunks dir")
}
}()

app := h.Appender(ctx)
for i := 0; i < len(series); i++ {
num := random.Intn(i + 1)
var ref storage.SeriesRef
start := RandRange(rnd, mint, maxt)
for j := 0; j < numNHSamples; j++ {
if num%2 == 0 {
// append float histogram
ref, err = app.AppendHistogram(ref, series[i], start, nil, tsdbutil.GenerateTestFloatHistogram(int64(i+j)))
} else {
// append histogram
ref, err = app.AppendHistogram(ref, series[i], start, tsdbutil.GenerateTestHistogram(int64(i+j)), nil)
}
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return id, errors.Wrap(err, "add NH sample")
}
start += scrapeInterval
if start > maxt {
break
}
}
}
if err := app.Commit(); err != nil {
return id, errors.Wrap(err, "commit")
}

c, err := tsdb.NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{maxt - mint}, nil, nil)
if err != nil {
return id, errors.Wrap(err, "create compactor")
}

ids, err := c.Write(dir, h, mint, maxt, nil)
if err != nil {
return id, errors.Wrap(err, "write block")
}
if len(ids) == 0 {
return id, errors.Errorf("nothing to write, asked for %d samples", numNHSamples)
}
id = ids[0]

blockDir := filepath.Join(dir, id.String())
logger := log.NewNopLogger()

if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
Labels: map[string]string{
cortex_tsdb.IngesterIDExternalLabel: "ingester-0",
},
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize},
}, nil); err != nil {
return id, errors.Wrap(err, "finalize block")
}

return id, nil
}

func CreateBlock(
ctx context.Context,
rnd *rand.Rand,
Expand Down
137 changes: 133 additions & 4 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,116 @@ func init() {
}
}

func TestNativeHistogramFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "24h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

now := time.Now()
start := now.Add(-time.Hour * 2)
end := now.Add(-time.Hour)
numSeries := 10
numSamples := 60
lbls := make([]labels.Labels, 0, numSeries*2)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_a"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa(i % 3)},
{Name: "status_code", Value: statusCodes[i%5]},
})

lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_b"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
{Name: "status_code", Value: statusCodes[(i+1)%5]},
})
}

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))

dir := filepath.Join(s.SharedDir(), "data")
err = os.MkdirAll(dir, os.ModePerm)
require.NoError(t, err)
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
id, err := e2e.CreateNHBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait for querier and store to sync blocks.
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))

c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
require.NoError(t, err)
prom := e2edb.NewPrometheus("", nil)
require.NoError(t, s.StartAndWaitReady(prom))

c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
require.NoError(t, err)

waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)

opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnabledAggrs([]parser.ItemType{
parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE,
}),
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
}

func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand Down Expand Up @@ -148,7 +256,7 @@ func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {

err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
require.NoError(t, err)
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{
prom := e2edb.NewPrometheus("", map[string]string{
"--enable-feature": "promql-experimental-functions",
})
require.NoError(t, s.StartAndWaitReady(prom))
Expand Down Expand Up @@ -841,6 +949,10 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
const fraction = 1.e-10 // 0.00000001%
return cmp.Equal(l, r, cmpopts.EquateNaNs(), cmpopts.EquateApprox(fraction, epsilon))
}
compareHistograms := func(l, r *model.SampleHistogram) bool {
return l.Equal(r)
}

// count_values returns a metrics with one label {"value": "1.012321"}
compareValueMetrics := func(l, r model.Metric) (valueMetric bool, equals bool) {
lLabels := model.LabelSet(l).Clone()
Expand Down Expand Up @@ -906,6 +1018,9 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
if !compareFloats(float64(vx[i].Value), float64(vy[i].Value)) {
return false
}
if !compareHistograms(vx[i].Histogram, vy[i].Histogram) {
return false
}
}
return true
}
Expand Down Expand Up @@ -942,6 +1057,21 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
return false
}
}

xhs := mxs.Histograms
yhs := mys.Histograms

if len(xhs) != len(yhs) {
return false
}
for j := 0; j < len(xhs); j++ {
if xhs[j].Timestamp != yhs[j].Timestamp {
return false
}
if !compareHistograms(xhs[j].Histogram, yhs[j].Histogram) {
return false
}
}
}
return true
}
Expand Down Expand Up @@ -1423,7 +1553,6 @@ func TestBackwardCompatibilityQueryFuzz(t *testing.T) {

// TestPrometheusCompatibilityQueryFuzz compares Cortex with latest Prometheus release.
func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
Expand Down Expand Up @@ -1516,7 +1645,7 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {

err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
require.NoError(t, err)
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{})
prom := e2edb.NewPrometheus("", map[string]string{})
require.NoError(t, s.StartAndWaitReady(prom))

c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
Expand Down
Loading