Skip to content

Commit d4a9c2c

Browse files
committed
Add native histogram query fuzz test
Signed-off-by: SungJin1212 <[email protected]>
1 parent c6dc6b4 commit d4a9c2c

File tree

2 files changed

+192
-2
lines changed

2 files changed

+192
-2
lines changed

integration/e2e/util.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2525
"github.com/thanos-io/thanos/pkg/block/metadata"
2626
"github.com/thanos-io/thanos/pkg/runutil"
27+
"go.uber.org/atomic"
2728

2829
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2930
)
@@ -259,6 +260,84 @@ func RandRange(rnd *rand.Rand, min, max int64) int64 {
259260
return rnd.Int63n(max-min) + min
260261
}
261262

263+
func CreateNHBlock(
264+
ctx context.Context,
265+
rnd *rand.Rand,
266+
dir string,
267+
series []labels.Labels,
268+
numNHSamples int,
269+
mint, maxt int64,
270+
scrapeInterval int64,
271+
seriesSize int64,
272+
) (id ulid.ULID, err error) {
273+
headOpts := tsdb.DefaultHeadOptions()
274+
headOpts.EnableNativeHistograms = *atomic.NewBool(true)
275+
headOpts.ChunkDirRoot = filepath.Join(dir, "chunks")
276+
headOpts.ChunkRange = 10000000000
277+
h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil)
278+
if err != nil {
279+
return id, errors.Wrap(err, "create head block")
280+
}
281+
defer func() {
282+
runutil.CloseWithErrCapture(&err, h, "TSDB Head")
283+
if e := os.RemoveAll(headOpts.ChunkDirRoot); e != nil {
284+
err = errors.Wrap(e, "delete chunks dir")
285+
}
286+
}()
287+
288+
app := h.Appender(ctx)
289+
for i := 0; i < len(series); i++ {
290+
var ref storage.SeriesRef
291+
start := RandRange(rnd, mint, maxt)
292+
for j := 0; j < numNHSamples; j++ {
293+
ref, err = app.AppendHistogram(ref, series[i], start, nil, tsdbutil.GenerateTestFloatHistogram(int64(i+j)))
294+
if err != nil {
295+
if rerr := app.Rollback(); rerr != nil {
296+
err = errors.Wrapf(err, "rollback failed: %v", rerr)
297+
}
298+
return id, errors.Wrap(err, "add NH sample")
299+
}
300+
start += scrapeInterval
301+
if start > maxt {
302+
break
303+
}
304+
}
305+
}
306+
if err := app.Commit(); err != nil {
307+
return id, errors.Wrap(err, "commit")
308+
}
309+
310+
c, err := tsdb.NewLeveledCompactor(ctx, nil, promslog.NewNopLogger(), []int64{maxt - mint}, nil, nil)
311+
if err != nil {
312+
return id, errors.Wrap(err, "create compactor")
313+
}
314+
315+
ids, err := c.Write(dir, h, mint, maxt, nil)
316+
if err != nil {
317+
return id, errors.Wrap(err, "write block")
318+
}
319+
if len(ids) == 0 {
320+
return id, errors.Errorf("nothing to write, asked for %d samples", numNHSamples)
321+
}
322+
id = ids[0]
323+
324+
blockDir := filepath.Join(dir, id.String())
325+
logger := log.NewNopLogger()
326+
327+
if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
328+
Labels: map[string]string{
329+
cortex_tsdb.IngesterIDExternalLabel: "ingester-0",
330+
},
331+
Downsample: metadata.ThanosDownsample{Resolution: 0},
332+
Source: metadata.TestSource,
333+
IndexStats: metadata.IndexStats{SeriesMaxSize: seriesSize},
334+
}, nil); err != nil {
335+
return id, errors.Wrap(err, "finalize block")
336+
}
337+
338+
return id, nil
339+
}
340+
262341
func CreateBlock(
263342
ctx context.Context,
264343
rnd *rand.Rand,

integration/query_fuzz_test.go

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ import (
3939

4040
var enabledFunctions []*parser.Function
4141

42+
const (
43+
prometheusLatestImage = "quay.io/prometheus/prometheus:v3.2.1"
44+
)
45+
4246
func init() {
4347
for _, f := range parser.Functions {
4448
// Ignore native histogram functions for now as our test cases are only float samples.
@@ -52,8 +56,116 @@ func init() {
5256
}
5357
}
5458

59+
func TestNativeHistogramFuzz(t *testing.T) {
60+
s, err := e2e.NewScenario(networkName)
61+
require.NoError(t, err)
62+
defer s.Close()
63+
64+
// Start dependencies.
65+
consul := e2edb.NewConsulWithName("consul")
66+
require.NoError(t, s.StartAndWaitReady(consul))
67+
68+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
69+
flags := mergeFlags(
70+
baseFlags,
71+
map[string]string{
72+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
73+
"-blocks-storage.tsdb.block-ranges-period": "2h",
74+
"-blocks-storage.tsdb.ship-interval": "1h",
75+
"-blocks-storage.bucket-store.sync-interval": "1s",
76+
"-blocks-storage.tsdb.retention-period": "24h",
77+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
78+
"-querier.query-store-for-labels-enabled": "true",
79+
// Ingester.
80+
"-ring.store": "consul",
81+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
82+
// Distributor.
83+
"-distributor.replication-factor": "1",
84+
// alert manager
85+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
86+
},
87+
)
88+
// make alert manager config dir
89+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
90+
91+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
92+
require.NoError(t, s.StartAndWaitReady(minio))
93+
94+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
95+
require.NoError(t, s.StartAndWaitReady(cortex))
96+
97+
// Wait until Cortex replicas have updated the ring state.
98+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
99+
100+
now := time.Now()
101+
start := now.Add(-time.Hour * 2)
102+
end := now.Add(-time.Hour)
103+
numSeries := 10
104+
numSamples := 60
105+
lbls := make([]labels.Labels, 0, numSeries*2)
106+
scrapeInterval := time.Minute
107+
statusCodes := []string{"200", "400", "404", "500", "502"}
108+
for i := 0; i < numSeries; i++ {
109+
lbls = append(lbls, labels.Labels{
110+
{Name: labels.MetricName, Value: "test_series_a"},
111+
{Name: "job", Value: "test"},
112+
{Name: "series", Value: strconv.Itoa(i % 3)},
113+
{Name: "status_code", Value: statusCodes[i%5]},
114+
})
115+
116+
lbls = append(lbls, labels.Labels{
117+
{Name: labels.MetricName, Value: "test_series_b"},
118+
{Name: "job", Value: "test"},
119+
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
120+
{Name: "status_code", Value: statusCodes[(i+1)%5]},
121+
})
122+
}
123+
124+
ctx := context.Background()
125+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
126+
127+
dir := filepath.Join(s.SharedDir(), "data")
128+
err = os.MkdirAll(dir, os.ModePerm)
129+
require.NoError(t, err)
130+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
131+
require.NoError(t, err)
132+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
133+
id, err := e2e.CreateNHBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
134+
require.NoError(t, err)
135+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
136+
require.NoError(t, err)
137+
138+
// Wait for querier and store to sync blocks.
139+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
140+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
141+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))
142+
143+
c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
144+
require.NoError(t, err)
145+
146+
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
147+
require.NoError(t, err)
148+
prom := e2edb.NewPrometheus(prometheusLatestImage, nil)
149+
require.NoError(t, s.StartAndWaitReady(prom))
150+
151+
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
152+
require.NoError(t, err)
153+
154+
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
155+
156+
opts := []promqlsmith.Option{
157+
promqlsmith.WithEnableOffset(true),
158+
promqlsmith.WithEnableAtModifier(true),
159+
promqlsmith.WithEnabledAggrs([]parser.ItemType{
160+
parser.SUM, parser.MIN, parser.MAX, parser.AVG, parser.GROUP, parser.COUNT, parser.COUNT_VALUES, parser.QUANTILE,
161+
}),
162+
}
163+
ps := promqlsmith.New(rnd, lbls, opts...)
164+
165+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
166+
}
167+
55168
func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
56-
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
57169
s, err := e2e.NewScenario(networkName)
58170
require.NoError(t, err)
59171
defer s.Close()
@@ -1423,7 +1535,6 @@ func TestBackwardCompatibilityQueryFuzz(t *testing.T) {
14231535

14241536
// TestPrometheusCompatibilityQueryFuzz compares Cortex with latest Prometheus release.
14251537
func TestPrometheusCompatibilityQueryFuzz(t *testing.T) {
1426-
prometheusLatestImage := "quay.io/prometheus/prometheus:v3.2.1"
14271538
s, err := e2e.NewScenario(networkName)
14281539
require.NoError(t, err)
14291540
defer s.Close()

0 commit comments

Comments
 (0)