Skip to content

Commit 0b0281b

Browse files
committed
Disable chunk trimming in ingester
Signed-off-by: SungJin1212 <[email protected]>
1 parent c6347f0 commit 0b0281b

File tree

3 files changed

+171
-2
lines changed

3 files changed

+171
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [ENHANCEMENT] Ingester: Disable chunk trimming. #6270
1213
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1314
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1415
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

integration/query_fuzz_test.go

Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package integration
55

66
import (
77
"context"
8+
"fmt"
89
"math/rand"
910
"os"
1011
"path"
@@ -35,6 +36,169 @@ import (
3536
"github.com/cortexproject/cortex/pkg/util/log"
3637
)
3738

39+
func TestDisableChunkTrimmingFuzz(t *testing.T) {
40+
noneChunkTrimmingImage := "quay.io/cortexproject/cortex:v1.18.0"
41+
s, err := e2e.NewScenario(networkName)
42+
require.NoError(t, err)
43+
defer s.Close()
44+
45+
// Start dependencies.
46+
consul1 := e2edb.NewConsulWithName("consul1")
47+
consul2 := e2edb.NewConsulWithName("consul2")
48+
require.NoError(t, s.StartAndWaitReady(consul1, consul2))
49+
50+
flags1 := mergeFlags(
51+
AlertmanagerLocalFlags(),
52+
map[string]string{
53+
"-store.engine": blocksStorageEngine,
54+
"-blocks-storage.backend": "filesystem",
55+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
56+
"-blocks-storage.tsdb.block-ranges-period": "2h",
57+
"-blocks-storage.tsdb.ship-interval": "1h",
58+
"-blocks-storage.bucket-store.sync-interval": "15m",
59+
"-blocks-storage.tsdb.retention-period": "2h",
60+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
61+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
62+
"-querier.query-store-for-labels-enabled": "true",
63+
// Ingester.
64+
"-ring.store": "consul",
65+
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
66+
// Distributor.
67+
"-distributor.replication-factor": "1",
68+
// Store-gateway.
69+
"-store-gateway.sharding-enabled": "false",
70+
// alert manager
71+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
72+
},
73+
)
74+
flags2 := mergeFlags(
75+
AlertmanagerLocalFlags(),
76+
map[string]string{
77+
"-store.engine": blocksStorageEngine,
78+
"-blocks-storage.backend": "filesystem",
79+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
80+
"-blocks-storage.tsdb.block-ranges-period": "2h",
81+
"-blocks-storage.tsdb.ship-interval": "1h",
82+
"-blocks-storage.bucket-store.sync-interval": "15m",
83+
"-blocks-storage.tsdb.retention-period": "2h",
84+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
85+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
86+
"-querier.query-store-for-labels-enabled": "true",
87+
// Ingester.
88+
"-ring.store": "consul",
89+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
90+
// Distributor.
91+
"-distributor.replication-factor": "1",
92+
// Store-gateway.
93+
"-store-gateway.sharding-enabled": "false",
94+
// alert manager
95+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
96+
},
97+
)
98+
// make alert manager config dir
99+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
100+
101+
path1 := path.Join(s.SharedDir(), "cortex-1")
102+
path2 := path.Join(s.SharedDir(), "cortex-2")
103+
104+
flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1})
105+
flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2})
106+
// Start Cortex replicas.
107+
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, "")
108+
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, noneChunkTrimmingImage)
109+
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))
110+
111+
// Wait until Cortex replicas have updated the ring state.
112+
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
113+
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
114+
115+
c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
116+
require.NoError(t, err)
117+
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
118+
require.NoError(t, err)
119+
120+
now := time.Now()
121+
// Push some series to Cortex.
122+
start := now.Add(-time.Minute * 120)
123+
end := now
124+
scrapeInterval := 30 * time.Second
125+
126+
numSeries := 10
127+
numSamples := 240
128+
serieses := make([]prompb.TimeSeries, numSeries)
129+
lbls := make([]labels.Labels, numSeries)
130+
for i := 0; i < numSeries; i++ {
131+
series := e2e.GenerateSeriesWithSamples(fmt.Sprintf("test_series_%d", i), start, scrapeInterval, i*numSamples, numSamples, prompb.Label{Name: "foo", Value: "bar"})
132+
serieses[i] = series
133+
134+
builder := labels.NewBuilder(labels.EmptyLabels())
135+
for _, lbl := range series.Labels {
136+
builder.Set(lbl.Name, lbl.Value)
137+
}
138+
lbls[i] = builder.Labels()
139+
}
140+
141+
res, err := c1.Push(serieses)
142+
require.NoError(t, err)
143+
require.Equal(t, 200, res.StatusCode)
144+
145+
res, err = c2.Push(serieses)
146+
require.NoError(t, err)
147+
require.Equal(t, 200, res.StatusCode)
148+
149+
rnd := rand.New(rand.NewSource(now.Unix()))
150+
opts := []promqlsmith.Option{
151+
promqlsmith.WithEnableOffset(true),
152+
promqlsmith.WithEnableAtModifier(true),
153+
}
154+
ps := promqlsmith.New(rnd, lbls, opts...)
155+
156+
type testCase struct {
157+
query string
158+
res1, res2 model.Value
159+
err1, err2 error
160+
instantQuery bool
161+
}
162+
163+
now = time.Now()
164+
cases := make([]*testCase, 0, 200)
165+
166+
for i := 0; i < 200; i++ {
167+
expr := ps.WalkRangeQuery()
168+
query := expr.Pretty(0)
169+
res1, err1 := c1.QueryRange(query, start, end, scrapeInterval)
170+
res2, err2 := c2.QueryRange(query, start, end, scrapeInterval)
171+
cases = append(cases, &testCase{
172+
query: query,
173+
res1: res1,
174+
res2: res2,
175+
err1: err1,
176+
err2: err2,
177+
instantQuery: false,
178+
})
179+
}
180+
181+
failures := 0
182+
for i, tc := range cases {
183+
qt := "instant query"
184+
if !tc.instantQuery {
185+
qt = "range query"
186+
}
187+
if tc.err1 != nil || tc.err2 != nil {
188+
if !cmp.Equal(tc.err1, tc.err2) {
189+
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
190+
failures++
191+
}
192+
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
193+
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
194+
failures++
195+
}
196+
}
197+
if failures > 0 {
198+
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
199+
}
200+
}
201+
38202
func TestVerticalShardingFuzz(t *testing.T) {
39203
s, err := e2e.NewScenario(networkName)
40204
require.NoError(t, err)
@@ -159,7 +323,6 @@ func TestVerticalShardingFuzz(t *testing.T) {
159323
instantQuery bool
160324
}
161325

162-
now = time.Now()
163326
cases := make([]*testCase, 0, 200)
164327
for i := 0; i < 100; i++ {
165328
expr := ps.WalkInstantQuery()

pkg/ingester/ingester.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1982,8 +1982,13 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
19821982
if err != nil {
19831983
return 0, 0, 0, err
19841984
}
1985+
hints := &storage.SelectHints{
1986+
Start: from,
1987+
End: through,
1988+
DisableTrimming: true,
1989+
}
19851990
// It's not required to return sorted series because series are sorted by the Cortex querier.
1986-
ss := q.Select(ctx, false, nil, matchers...)
1991+
ss := q.Select(ctx, false, hints, matchers...)
19871992
c()
19881993
if ss.Err() != nil {
19891994
return 0, 0, 0, ss.Err()

0 commit comments

Comments
 (0)