Skip to content

Commit fe07735

Browse files
committed
Add native histogram query fuzz test
Signed-off-by: SungJin1212 <[email protected]>
1 parent 3c98215 commit fe07735

File tree

3 files changed

+221
-5
lines changed

3 files changed

+221
-5
lines changed

integration/e2e/images/images.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ var (
1111
Minio = "minio/minio:RELEASE.2024-05-28T17-19-04Z"
1212
Consul = "consul:1.8.4"
1313
ETCD = "gcr.io/etcd-development/etcd:v3.4.7"
14-
Prometheus = "quay.io/prometheus/prometheus:v2.51.0"
14+
Prometheus = "quay.io/prometheus/prometheus:v3.2.1"
1515
)

integration/e2e/util.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2525
"github.com/thanos-io/thanos/pkg/block/metadata"
2626
"github.com/thanos-io/thanos/pkg/runutil"
27+
"go.uber.org/atomic"
2728

2829
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2930
)
@@ -259,6 +260,92 @@ func RandRange(rnd *rand.Rand, min, max int64) int64 {
259260
return rnd.Int63n(max-min) + min
260261
}
261262

263+
func CreateNHBlock(
264+
ctx context.Context,
265+
rnd *rand.Rand,
266+
dir string,
267+
series []labels.Labels,
268+
numNHSamples int,
269+
mint, maxt int64,
270+
scrapeInterval int64,
271+
seriesSize int64,
272+
) (id ulid.ULID, err error) {
273+
headOpts := tsdb.DefaultHeadOptions()
274+
headOpts.EnableNativeHistograms = *atomic.NewBool(true)
275+
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
276+
headOpts.ChunkRange = 10000000000
277+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
278+
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
279+
if err != nil {
280+
return id, errors.Wrap(err, "create head block")
281+
}
282+
defer func() {
283+
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
284+
if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil {
285+
err = errors.Wrap(e, "delete chunks dir")
286+
}
287+
}()
288+
289+
app := h.Appender(ctx)
290+
for i := 0; i < len(series); i++ {
291+
num := random.Intn(i + 1)
292+
var ref storage.SeriesRef
293+
start := RandRange(rnd, mint, maxt)
294+
for j := 0; j < numNHSamples; j++ {
295+
if num%2 == 0 {
296+
// append float histogram
297+
ref, err = app.AppendHistogram(ref, series[i], start, nil, tsdbutil.GenerateTestFloatHistogram(int64(i+j)))
298+
} else {
299+
// append histogram
300+
ref, err = app.AppendHistogram(ref, series[i], start, tsdbutil.GenerateTestHistogram(int64(i+j)), nil)
301+
}
302+
if err != nil {
303+
if rerr := app.Rollback(); rerr != nil {
304+
err = errors.Wrapf(err, "rollback failed: %v", rerr)
305+
}
306+
return id, errors.Wrap(err, "add NH sample")
307+
}
308+
start += scrapeInterval
309+
if start > maxt {
310+
break
311+
}
312+
}
313+
}
314+
if err := app.Commit(); err != nil {
315+
return id, errors.Wrap(err, "commit")
316+
}
317+
318+
c, err := tsdb.NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{maxt - mint}, nil, nil)
319+
if err != nil {
320+
return id, errors.Wrap(err, "create compactor")
321+
}
322+
323+
ids, err := c.Write(dir, h, mint, maxt, nil)
324+
if err != nil {
325+
return id, errors.Wrap(err, "write block")
326+
}
327+
if len(ids) == 0 {
328+
return id, errors.Errorf("nothing to write, asked for %d samples", numNHSamples)
329+
}
330+
id = ids[0]
331+
332+
blockDir := filepath.Join(dir, id.String())
333+
logger := log.NewNopLogger()
334+
335+
if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
336+
Labels: map[string]string{
337+
cortex_tsdb.IngesterIDExternalLabel: "ingester-0",
338+
},
339+
Downsample: metadata.ThanosDownsample{Resolution: 0},
340+
Source: metadata.TestSource,
341+
IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize},
342+
}, nil); err != nil {
343+
return id, errors.Wrap(err, "finalize block")
344+
}
345+
346+
return id, nil
347+
}
348+
262349
func CreateBlock(
263350
ctx context.Context,
264351
rnd *rand.Rand,

integration/query_fuzz_test.go

Lines changed: 133 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,116 @@ func init() {
5252
}
5353
}
5454

55+
func TestNativeHistogramFuzz(t *testing.T) {
56+
s, err := e2e.NewScenario(networkName)
57+
require.NoError(t, err)
58+
defer s.Close()
59+
60+
// Start dependencies.
61+
consul := e2edb.NewConsulWithName("consul")
62+
require.NoError(t, s.StartAndWaitReady(consul))
63+
64+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
65+
flags := mergeFlags(
66+
baseFlags,
67+
map[string]string{
68+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
69+
"-blocks-storage.tsdb.block-ranges-period": "2h",
70+
"-blocks-storage.tsdb.ship-interval": "1h",
71+
"-blocks-storage.bucket-store.sync-interval": "1s",
72+
"-blocks-storage.tsdb.retention-period": "24h",
73+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
74+
"-querier.query-store-for-labels-enabled": "true",
75+
// Ingester.
76+
"-ring.store": "consul",
77+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
78+
// Distributor.
79+
"-distributor.replication-factor": "1",
80+
// alert manager
81+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
82+
},
83+
)
84+
// make alert manager config dir
85+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
86+
87+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
88+
require.NoError(t, s.StartAndWaitReady(minio))
89+
90+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
91+
require.NoError(t, s.StartAndWaitReady(cortex))
92+
93+
// Wait until Cortex replicas have updated the ring state.
94+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
95+
96+
now := time.Now()
97+
start := now.Add(-time.Hour * 2)
98+
end := now.Add(-time.Hour)
99+
numSeries := 10
100+
numSamples := 60
101+
lbls := make([]labels.Labels, 0, numSeries*2)
102+
scrapeInterval := time.Minute
103+
statusCodes := []string{"200", "400", "404", "500", "502"}
104+
for i := 0; i < numSeries; i++ {
105+
lbls = append(lbls, labels.Labels{
106+
{Name: labels.MetricName, Value: "test_series_a"},
107+
{Name: "job", Value: "test"},
108+
{Name: "series", Value: strconv.Itoa(i % 3)},
109+
{Name: "status_code", Value: statusCodes[i%5]},
110+
})
111+
112+
lbls = append(lbls, labels.Labels{
113+
{Name: labels.MetricName, Value: "test_series_b"},
114+
{Name: "job", Value: "test"},
115+
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
116+
{Name: "status_code", Value: statusCodes[(i+1)%5]},
117+
})
118+
}
119+
120+
ctx := context.Background()
121+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
122+
123+
dir := filepath.Join(s.SharedDir(), "data")
124+
err = os.MkdirAll(dir, os.ModePerm)
125+
require.NoError(t, err)
126+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
127+
require.NoError(t, err)
128+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
129+
id, err := e2e.CreateNHBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
130+
require.NoError(t, err)
131+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
132+
require.NoError(t, err)
133+
134+
// Wait for querier and store to sync blocks.
135+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
136+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
137+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))
138+
139+
c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
140+
require.NoError(t, err)
141+
142+
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
143+
require.NoError(t, err)
144+
prom := e2edb.NewPrometheus("", nil)
145+
require.NoError(t, s.StartAndWaitReady(prom))
146+
147+
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
148+
require.NoError(t, err)
149+
150+
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
151+
152+
opts := []promqlsmith.Option{
153+
promqlsmith.WithEnableOffset(true),
154+
promqlsmith.WithEnableAtModifier(true),
155+
promqlsmith.WithEnabledAggrs([]parser.ItemType{
156+
parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE,
157+
}),
158+
}
159+
ps := promqlsmith.New(rnd, lbls, opts...)
160+
161+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
162+
}
163+
55164
func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
56-
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
57165
s, err := e2e.NewScenario(networkName)
58166
require.NoError(t, err)
59167
defer s.Close()
@@ -148,7 +256,7 @@ func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
148256

149257
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
150258
require.NoError(t, err)
151-
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{
259+
prom := e2edb.NewPrometheus("", map[string]string{
152260
"--enable-feature": "promql-experimental-functions",
153261
})
154262
require.NoError(t, s.StartAndWaitReady(prom))
@@ -841,6 +949,10 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
841949
const fraction = 1.e-10 // 0.00000001%
842950
return cmp.Equal(l, r, cmpopts.EquateNaNs(), cmpopts.EquateApprox(fraction, epsilon))
843951
}
952+
compareHistograms := func(l, r *model.SampleHistogram) bool {
953+
return l.Equal(r)
954+
}
955+
844956
// count_values returns a metrics with one label {"value": "1.012321"}
845957
compareValueMetrics := func(l, r model.Metric) (valueMetric bool, equals bool) {
846958
lLabels := model.LabelSet(l).Clone()
@@ -906,6 +1018,9 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
9061018
if !compareFloats(float64(vx[i].Value), float64(vy[i].Value)) {
9071019
return false
9081020
}
1021+
if !compareHistograms(vx[i].Histogram, vy[i].Histogram) {
1022+
return false
1023+
}
9091024
}
9101025
return true
9111026
}
@@ -942,6 +1057,21 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
9421057
return false
9431058
}
9441059
}
1060+
1061+
xhs := mxs.Histograms
1062+
yhs := mys.Histograms
1063+
1064+
if len(xhs) != len(yhs) {
1065+
return false
1066+
}
1067+
for j := 0; j < len(xhs); j++ {
1068+
if xhs[j].Timestamp != yhs[j].Timestamp {
1069+
return false
1070+
}
1071+
if !compareHistograms(xhs[j].Histogram, yhs[j].Histogram) {
1072+
return false
1073+
}
1074+
}
9451075
}
9461076
return true
9471077
}
@@ -1423,7 +1553,6 @@ func TestBackwardCompatibilityQueryFuzz(t *testing.T) {
14231553

14241554
// TestPrometheusCompatibilityQueryFuzz compares Cortex with latest Prometheus release.
14251555
func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
1426-
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
14271556
s, err := e2e.NewScenario(networkName)
14281557
require.NoError(t, err)
14291558
defer s.Close()
@@ -1516,7 +1645,7 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
15161645

15171646
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
15181647
require.NoError(t, err)
1519-
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{})
1648+
prom := e2edb.NewPrometheus("", map[string]string{})
15201649
require.NoError(t, s.StartAndWaitReady(prom))
15211650

15221651
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())

0 commit comments

Comments
 (0)