Skip to content

Commit c73d573

Browse files
committed
Creating test parquet fuzz integration test
Signed-off-by: alanprot <[email protected]>
1 parent 4b91167 commit c73d573

File tree

2 files changed

+175
-1
lines changed

2 files changed

+175
-1
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package integration
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"github.com/cortexproject/cortex/integration/e2e"
10+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
11+
"github.com/cortexproject/cortex/integration/e2ecortex"
12+
"github.com/cortexproject/cortex/pkg/storage/bucket"
13+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
14+
"github.com/cortexproject/cortex/pkg/util/log"
15+
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
16+
"github.com/cortexproject/promqlsmith"
17+
"github.com/prometheus/prometheus/model/labels"
18+
"github.com/stretchr/testify/require"
19+
"github.com/thanos-io/objstore"
20+
"github.com/thanos-io/thanos/pkg/block"
21+
"github.com/thanos-io/thanos/pkg/block/metadata"
22+
"math/rand"
23+
"path/filepath"
24+
"strconv"
25+
"testing"
26+
"time"
27+
)
28+
29+
func TestParquetFuzz(t *testing.T) {
30+
31+
s, err := e2e.NewScenario(networkName)
32+
require.NoError(t, err)
33+
defer s.Close()
34+
35+
consul := e2edb.NewConsulWithName("consul")
36+
require.NoError(t, s.StartAndWaitReady(consul))
37+
38+
baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
39+
flags := mergeFlags(
40+
baseFlags,
41+
map[string]string{
42+
"-target": "all,parquet-converter",
43+
"-blocks-storage.tsdb.ship-interval": "1s",
44+
"-blocks-storage.bucket-store.sync-interval": "1s",
45+
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
46+
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
47+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
48+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
49+
"-querier.query-store-for-labels-enabled": "true",
50+
// compactor
51+
"-compactor.cleanup-interval": "1s",
52+
// Ingester.
53+
"-ring.store": "consul",
54+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
55+
// Distributor.
56+
"-distributor.replication-factor": "1",
57+
// Store-gateway.
58+
"-store-gateway.sharding-enabled": "false",
59+
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
60+
// alert manager
61+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
62+
"-frontend.query-vertical-shard-size": "1",
63+
"-frontend.max-cache-freshness": "1m",
64+
// enable experimental promQL funcs
65+
"-querier.enable-promql-experimental-functions": "true",
66+
// parquet-converter
67+
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
68+
"-parquet-converter.conversion-interval": "1s",
69+
"-parquet-converter.enabled": "true",
70+
// Querier
71+
"-querier.query-parquet-files": "true",
72+
},
73+
)
74+
75+
// make alert manager config dir
76+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
77+
78+
ctx := context.Background()
79+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
80+
dir := filepath.Join(s.SharedDir(), "data")
81+
numSeries := 10
82+
numSamples := 60
83+
lbls := make([]labels.Labels, 0, numSeries*2)
84+
scrapeInterval := time.Minute
85+
statusCodes := []string{"200", "400", "404", "500", "502"}
86+
now := time.Now()
87+
start := now.Add(-time.Hour * 24)
88+
end := now.Add(-time.Hour)
89+
90+
for i := 0; i < numSeries; i++ {
91+
lbls = append(lbls, labels.Labels{
92+
{Name: labels.MetricName, Value: "test_series_a"},
93+
{Name: "job", Value: "test"},
94+
{Name: "series", Value: strconv.Itoa(i % 3)},
95+
{Name: "status_code", Value: statusCodes[i%5]},
96+
})
97+
98+
lbls = append(lbls, labels.Labels{
99+
{Name: labels.MetricName, Value: "test_series_b"},
100+
{Name: "job", Value: "test"},
101+
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
102+
{Name: "status_code", Value: statusCodes[(i+1)%5]},
103+
})
104+
}
105+
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
106+
require.NoError(t, err)
107+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
108+
require.NoError(t, s.StartAndWaitReady(minio))
109+
110+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
111+
require.NoError(t, s.StartAndWaitReady(cortex))
112+
113+
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
114+
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
115+
116+
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
117+
require.NoError(t, err)
118+
119+
// Wait until we convert the blocks
120+
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
121+
found := false
122+
123+
bkt.Iter(context.Background(), "", func(name string) error {
124+
fmt.Println(name)
125+
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
126+
found = true
127+
}
128+
return nil
129+
}, objstore.WithRecursiveIter())
130+
return found
131+
})
132+
133+
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
134+
require.NoError(t, err)
135+
numberOfIndexesUpdate := 0
136+
lastUpdate := att.LastModified
137+
138+
cortex_testutil.Poll(t, 30*time.Second, 5, func() interface{} {
139+
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
140+
require.NoError(t, err)
141+
if lastUpdate != att.LastModified {
142+
lastUpdate = att.LastModified
143+
numberOfIndexesUpdate++
144+
}
145+
return numberOfIndexesUpdate
146+
})
147+
148+
c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
149+
require.NoError(t, err)
150+
151+
err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
152+
require.NoError(t, err)
153+
prom := e2edb.NewPrometheus("", map[string]string{
154+
"--enable-feature": "promql-experimental-functions",
155+
})
156+
require.NoError(t, s.StartAndWaitReady(prom))
157+
158+
c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
159+
require.NoError(t, err)
160+
waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)
161+
162+
opts := []promqlsmith.Option{
163+
promqlsmith.WithEnableOffset(true),
164+
promqlsmith.WithEnableAtModifier(true),
165+
promqlsmith.WithEnabledFunctions(enabledFunctions),
166+
}
167+
ps := promqlsmith.New(rnd, lbls, opts...)
168+
169+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false)
170+
fmt.Println(cortex.Metrics())
171+
172+
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
173+
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))
174+
}

pkg/querier/parquet_queryable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i
349349

350350
switch {
351351
case len(remaining) > 0 && len(parquetBlocks) > 0:
352-
q.metrics.selectCount.WithLabelValues("tsdb+parquet").Inc()
352+
q.metrics.selectCount.WithLabelValues("mixed").Inc()
353353
case len(remaining) > 0 && len(parquetBlocks) == 0:
354354
q.metrics.selectCount.WithLabelValues("tsdb").Inc()
355355
case len(remaining) == 0 && len(parquetBlocks) > 0:

0 commit comments

Comments
 (0)