Skip to content

Commit afcdf33

Browse files
committed
Add native histogram query fuzz test
Signed-off-by: SungJin1212 <[email protected]>
1 parent 3c98215 commit afcdf33

File tree

3 files changed

+225
-5
lines changed

3 files changed

+225
-5
lines changed

integration/e2e/images/images.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ var (
1111
Minio = "minio/minio:RELEASE.2024-05-28T17-19-04Z"
1212
Consul = "consul:1.8.4"
1313
ETCD = "gcr.io/etcd-development/etcd:v3.4.7"
14-
Prometheus = "quay.io/prometheus/prometheus:v2.51.0"
14+
Prometheus = "quay.io/prometheus/prometheus:v3.2.1"
1515
)

integration/e2e/util.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2525
"github.com/thanos-io/thanos/pkg/block/metadata"
2626
"github.com/thanos-io/thanos/pkg/runutil"
27+
"go.uber.org/atomic"
2728

2829
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2930
)
@@ -259,6 +260,92 @@ func RandRange(rnd *rand.Rand, min, max int64) int64 {
259260
return rnd.Int63n(max-min) + min
260261
}
261262

263+
func CreateNHBlock(
264+
ctx context.Context,
265+
rnd *rand.Rand,
266+
dir string,
267+
series []labels.Labels,
268+
numNHSamples int,
269+
mint, maxt int64,
270+
scrapeInterval int64,
271+
seriesSize int64,
272+
) (id ulid.ULID, err error) {
273+
headOpts := tsdb.DefaultHeadOptions()
274+
headOpts.EnableNativeHistograms = *atomic.NewBool(true)
275+
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
276+
headOpts.ChunkRange = 10000000000
277+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
278+
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
279+
if err != nil {
280+
return id, errors.Wrap(err, "create head block")
281+
}
282+
defer func() {
283+
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
284+
if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil {
285+
err = errors.Wrap(e, "delete chunks dir")
286+
}
287+
}()
288+
289+
app := h.Appender(ctx)
290+
for i := 0; i < len(series); i++ {
291+
num := random.Intn(i + 1)
292+
var ref storage.SeriesRef
293+
start := RandRange(rnd, mint, maxt)
294+
for j := 0; j < numNHSamples; j++ {
295+
if num%2 == 0 {
296+
// append float histogram
297+
ref, err = app.AppendHistogram(ref, series[i], start, nil, tsdbutil.GenerateTestFloatHistogram(int64(i+j)))
298+
} else {
299+
// append histogram
300+
ref, err = app.AppendHistogram(ref, series[i], start, tsdbutil.GenerateTestHistogram(int64(i+j)), nil)
301+
}
302+
if err != nil {
303+
if rerr := app.Rollback(); rerr != nil {
304+
err = errors.Wrapf(err, "rollback failed: %v", rerr)
305+
}
306+
return id, errors.Wrap(err, "add NH sample")
307+
}
308+
start += scrapeInterval
309+
if start > maxt {
310+
break
311+
}
312+
}
313+
}
314+
if err := app.Commit(); err != nil {
315+
return id, errors.Wrap(err, "commit")
316+
}
317+
318+
c, err := tsdb.NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{maxt - mint}, nil, nil)
319+
if err != nil {
320+
return id, errors.Wrap(err, "create compactor")
321+
}
322+
323+
ids, err := c.Write(dir, h, mint, maxt, nil)
324+
if err != nil {
325+
return id, errors.Wrap(err, "write block")
326+
}
327+
if len(ids) == 0 {
328+
return id, errors.Errorf("nothing to write, asked for %d samples", numNHSamples)
329+
}
330+
id = ids[0]
331+
332+
blockDir := filepath.Join(dir, id.String())
333+
logger := log.NewNopLogger()
334+
335+
if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
336+
Labels: map[string]string{
337+
cortex_tsdb.IngesterIDExternalLabel: "ingester-0",
338+
},
339+
Downsample: metadata.ThanosDownsample{Resolution: 0},
340+
Source: metadata.TestSource,
341+
IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize},
342+
}, nil); err != nil {
343+
return id, errors.Wrap(err, "finalize block")
344+
}
345+
346+
return id, nil
347+
}
348+
262349
func CreateBlock(
263350
ctx context.Context,
264351
rnd *rand.Rand,

integration/query_fuzz_test.go

Lines changed: 137 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ import (
3939

4040
var enabledFunctions []*parser.Function
4141

42+
const (
43+
prometheusLatestImage = "quay.io/prometheus/prometheus:v3.2.1"
44+
)
45+
4246
func init() {
4347
for _, f := range parser.Functions {
4448
// Ignore native histogram functions for now as our test cases are only float samples.
@@ -52,8 +56,116 @@ func init() {
5256
}
5357
}
5458

59+
func TestNativeHistogramFuzz(t *testing.T) {
60+
s, err := e2e.NewScenario(networkName)
61+
require.NoError(t, err)
62+
defer s.Close()
63+
64+
// Start dependencies.
65+
consul := e2edb.NewConsulWithName("consul")
66+
require.NoError(t, s.StartAndWaitReady(consul))
67+
68+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
69+
flags := mergeFlags(
70+
baseFlags,
71+
map[string]string{
72+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
73+
"-blocks-storage.tsdb.block-ranges-period": "2h",
74+
"-blocks-storage.tsdb.ship-interval": "1h",
75+
"-blocks-storage.bucket-store.sync-interval": "1s",
76+
"-blocks-storage.tsdb.retention-period": "24h",
77+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
78+
"-querier.query-store-for-labels-enabled": "true",
79+
// Ingester.
80+
"-ring.store": "consul",
81+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
82+
// Distributor.
83+
"-distributor.replication-factor": "1",
84+
// alert manager
85+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
86+
},
87+
)
88+
// make alert manager config dir
89+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
90+
91+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
92+
require.NoError(t, s.StartAndWaitReady(minio))
93+
94+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
95+
require.NoError(t, s.StartAndWaitReady(cortex))
96+
97+
// Wait until Cortex replicas have updated the ring state.
98+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
99+
100+
now := time.Now()
101+
start := now.Add(-time.Hour * 2)
102+
end := now.Add(-time.Hour)
103+
numSeries := 10
104+
numSamples := 60
105+
lbls := make([]labels.Labels, 0, numSeries*2)
106+
scrapeInterval := time.Minute
107+
statusCodes := []string{"200", "400", "404", "500", "502"}
108+
for i := 0; i < numSeries; i++ {
109+
lbls = append(lbls, labels.Labels{
110+
{Name: labels.MetricName, Value: "test_series_a"},
111+
{Name: "job", Value: "test"},
112+
{Name: "series", Value: strconv.Itoa(i % 3)},
113+
{Name: "status_code", Value: statusCodes[i%5]},
114+
})
115+
116+
lbls = append(lbls, labels.Labels{
117+
{Name: labels.MetricName, Value: "test_series_b"},
118+
{Name: "job", Value: "test"},
119+
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
120+
{Name: "status_code", Value: statusCodes[(i+1)%5]},
121+
})
122+
}
123+
124+
ctx := context.Background()
125+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
126+
127+
dir := filepath.Join(s.SharedDir(), "data")
128+
err = os.MkdirAll(dir, os.ModePerm)
129+
require.NoError(t, err)
130+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
131+
require.NoError(t, err)
132+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
133+
id, err := e2e.CreateNHBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
134+
require.NoError(t, err)
135+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
136+
require.NoError(t, err)
137+
138+
// Wait for querier and store to sync blocks.
139+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
140+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
141+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))
142+
143+
c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
144+
require.NoError(t, err)
145+
146+
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
147+
require.NoError(t, err)
148+
prom := e2edb.NewPrometheus("", nil)
149+
require.NoError(t, s.StartAndWaitReady(prom))
150+
151+
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
152+
require.NoError(t, err)
153+
154+
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
155+
156+
opts := []promqlsmith.Option{
157+
promqlsmith.WithEnableOffset(true),
158+
promqlsmith.WithEnableAtModifier(true),
159+
promqlsmith.WithEnabledAggrs([]parser.ItemType{
160+
parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE,
161+
}),
162+
}
163+
ps := promqlsmith.New(rnd, lbls, opts...)
164+
165+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
166+
}
167+
55168
func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
56-
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
57169
s, err := e2e.NewScenario(networkName)
58170
require.NoError(t, err)
59171
defer s.Close()
@@ -148,7 +260,7 @@ func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
148260

149261
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
150262
require.NoError(t, err)
151-
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{
263+
prom := e2edb.NewPrometheus("", map[string]string{
152264
"--enable-feature": "promql-experimental-functions",
153265
})
154266
require.NoError(t, s.StartAndWaitReady(prom))
@@ -841,6 +953,10 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
841953
const fraction = 1.e-10 // 0.00000001%
842954
return cmp.Equal(l, r, cmpopts.EquateNaNs(), cmpopts.EquateApprox(fraction, epsilon))
843955
}
956+
compareHistograms := func(l, r *model.SampleHistogram) bool {
957+
return l.Equal(r)
958+
}
959+
844960
// count_values returns a metrics with one label {"value": "1.012321"}
845961
compareValueMetrics := func(l, r model.Metric) (valueMetric bool, equals bool) {
846962
lLabels := model.LabelSet(l).Clone()
@@ -906,6 +1022,9 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
9061022
if !compareFloats(float64(vx[i].Value), float64(vy[i].Value)) {
9071023
return false
9081024
}
1025+
if !compareHistograms(vx[i].Histogram, vy[i].Histogram) {
1026+
return false
1027+
}
9091028
}
9101029
return true
9111030
}
@@ -942,6 +1061,21 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool {
9421061
return false
9431062
}
9441063
}
1064+
1065+
xhs := mxs.Histograms
1066+
yhs := mys.Histograms
1067+
1068+
if len(xhs) != len(yhs) {
1069+
return false
1070+
}
1071+
for j := 0; j < len(xhs); j++ {
1072+
if xhs[j].Timestamp != yhs[j].Timestamp {
1073+
return false
1074+
}
1075+
if !compareHistograms(xhs[j].Histogram, yhs[j].Histogram) {
1076+
return false
1077+
}
1078+
}
9451079
}
9461080
return true
9471081
}
@@ -1423,7 +1557,6 @@ func TestBackwardCompatibilityQueryFuzz(t *testing.T) {
14231557

14241558
// TestPrometheusCompatibilityQueryFuzz compares Cortex with latest Prometheus release.
14251559
func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
1426-
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
14271560
s, err := e2e.NewScenario(networkName)
14281561
require.NoError(t, err)
14291562
defer s.Close()
@@ -1516,7 +1649,7 @@ func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
15161649

15171650
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
15181651
require.NoError(t, err)
1519-
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{})
1652+
prom := e2edb.NewPrometheus("", map[string]string{})
15201653
require.NoError(t, s.StartAndWaitReady(prom))
15211654

15221655
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())

0 commit comments

Comments
 (0)