Skip to content

Commit ea53377

Browse files
authored
Fix a bug where sample in a single sample chunk might get skipped by batch iterator (#4218)
* Fix a bug where sample in a single sample chunk might get skipped by batch iterator Signed-off-by: Alvin Lin <[email protected]> * Add reslease notes Signed-off-by: Alvin Lin <[email protected]> * Address PR comment Signed-off-by: Alvin Lin <[email protected]> * Validate value from iterator in test too Signed-off-by: Alvin Lin <[email protected]> * Revert accidental change in ingester.go Signed-off-by: Alvin Lin <[email protected]> * Address CR comment Signed-off-by: Alvin Lin <[email protected]> * Address PR comments Signed-off-by: Alvin Lin <[email protected]> * Refactor optimization a bit such that the batch iterator don't go 'backwards' Signed-off-by: Alvin Lin <[email protected]>
1 parent 12d1cb1 commit ea53377

File tree

5 files changed

+41
-7
lines changed

5 files changed

+41
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
5353
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
5454
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
55+
* [BUGFIX] Querier: Fix issue where samples in a chunk might get skipped by batch iterator. #4218
5556

5657
## Blocksconvert
5758

pkg/querier/batch/batch.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,21 @@ func newIteratorAdapter(underlying iterator) chunkenc.Iterator {
8383

8484
// Seek implements storage.SeriesIterator.
8585
func (a *iteratorAdapter) Seek(t int64) bool {
86-
// Optimisation: see if the seek is within the current batch.
87-
if a.curr.Length > 0 && t >= a.curr.Timestamps[0] && t <= a.curr.Timestamps[a.curr.Length-1] {
88-
a.curr.Index = 0
89-
for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] {
90-
a.curr.Index++
86+
87+
// Optimisation: fulfill the seek using current batch if possible.
88+
if a.curr.Length > 0 && a.curr.Index < a.curr.Length {
89+
if t <= a.curr.Timestamps[a.curr.Index] {
90+
//In this case, the interface's requirement is met, so state of this
91+
//iterator does not need any change.
92+
return true
93+
} else if t <= a.curr.Timestamps[a.curr.Length-1] {
94+
//In this case, some timestamp between current sample and end of batch can fulfill
95+
//the seek. Let's find it.
96+
for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] {
97+
a.curr.Index++
98+
}
99+
return true
91100
}
92-
return true
93101
}
94102

95103
a.curr.Length = -1

pkg/querier/batch/batch_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"time"
77

88
"github.com/prometheus/common/model"
9+
"github.com/stretchr/testify/require"
910

1011
"github.com/cortexproject/cortex/pkg/chunk"
1112
promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
@@ -55,6 +56,22 @@ func BenchmarkNewChunkMergeIterator_CreateAndIterate(b *testing.B) {
5556
}
5657
}
5758

59+
func TestSeekCorrectlyDealWithSinglePointChunks(t *testing.T) {
60+
chunkOne := mkChunk(t, model.Time(1*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
61+
chunkTwo := mkChunk(t, model.Time(10*step/time.Millisecond), 1, promchunk.PrometheusXorChunk)
62+
chunks := []chunk.Chunk{chunkOne, chunkTwo}
63+
64+
sut := NewChunkMergeIterator(chunks, 0, 0)
65+
66+
// Following calls mimics Prometheus's query engine behaviour for VectorSelector.
67+
require.True(t, sut.Next())
68+
require.True(t, sut.Seek(0))
69+
70+
actual, val := sut.At()
71+
require.Equal(t, float64(1*time.Second/time.Millisecond), val) // since mkChunk use ts as value.
72+
require.Equal(t, int64(1*time.Second/time.Millisecond), actual)
73+
}
74+
5875
func createChunks(b *testing.B, numChunks, numSamplesPerChunk, duplicationFactor int, enc promchunk.Encoding) []chunk.Chunk {
5976
result := make([]chunk.Chunk, 0, numChunks)
6077

pkg/querier/batch/chunk_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ func TestChunkIter(t *testing.T) {
2424
forEncodings(t, func(t *testing.T, enc promchunk.Encoding) {
2525
chunk := mkGenericChunk(t, 0, 100, enc)
2626
iter := &chunkIterator{}
27+
2728
iter.reset(chunk)
2829
testIter(t, 100, newIteratorAdapter(iter))
30+
31+
iter.reset(chunk)
2932
testSeek(t, 100, newIteratorAdapter(iter))
3033
})
3134
}
@@ -56,7 +59,7 @@ func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Enco
5659
require.Nil(t, npc)
5760
ts = ts.Add(step)
5861
}
59-
return chunk.NewChunk(userID, fp, metric, pc, model.Time(0), ts)
62+
return chunk.NewChunk(userID, fp, metric, pc, from, ts)
6063
}
6164

6265
func mkGenericChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) GenericChunk {

pkg/querier/batch/merge_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@ func TestMergeIter(t *testing.T) {
1616
chunk3 := mkGenericChunk(t, model.TimeFromUnix(50), 100, enc)
1717
chunk4 := mkGenericChunk(t, model.TimeFromUnix(75), 100, enc)
1818
chunk5 := mkGenericChunk(t, model.TimeFromUnix(100), 100, enc)
19+
1920
iter := newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
2021
testIter(t, 200, newIteratorAdapter(iter))
22+
23+
iter = newMergeIterator([]GenericChunk{chunk1, chunk2, chunk3, chunk4, chunk5})
2124
testSeek(t, 200, newIteratorAdapter(iter))
2225
})
2326
}
@@ -37,6 +40,8 @@ func TestMergeHarder(t *testing.T) {
3740
}
3841
iter := newMergeIterator(chunks)
3942
testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter))
43+
44+
iter = newMergeIterator(chunks)
4045
testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter))
4146
})
4247
}

0 commit comments

Comments
 (0)