Skip to content

Commit 53f2043

Browse files
authored
Merge pull request #1649 from cortexproject/bigchunk-smaller
Shrink bigchunk data structure
2 parents 78e0607 + f0ba932 commit 53f2043

File tree

3 files changed

+51
-38
lines changed

3 files changed

+51
-38
lines changed

pkg/chunk/encoding/bigchunk.go

Lines changed: 28 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ const samplesPerChunk = 120
1515
var errOutOfBounds = errors.New("out of bounds")
1616

1717
type smallChunk struct {
18-
*chunkenc.XORChunk
18+
chunkenc.XORChunk
1919
start int64
20-
end int64
2120
}
2221

2322
// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no
@@ -45,7 +44,6 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
4544

4645
b.appender.Append(int64(sample.Timestamp), float64(sample.Value))
4746
b.remainingSamples--
48-
b.chunks[len(b.chunks)-1].end = int64(sample.Timestamp)
4947
return []Chunk{b}, nil
5048
}
5149

@@ -63,22 +61,19 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
6361
if err != nil {
6462
return err
6563
}
66-
b.chunks[l-1].XORChunk = compacted.(*chunkenc.XORChunk)
64+
b.chunks[l-1].XORChunk = *compacted.(*chunkenc.XORChunk)
6765
}
6866
}
6967

70-
chunk := chunkenc.NewXORChunk()
71-
appender, err := chunk.Appender()
72-
if err != nil {
73-
return err
74-
}
75-
7668
b.chunks = append(b.chunks, smallChunk{
77-
XORChunk: chunk,
69+
XORChunk: *chunkenc.NewXORChunk(),
7870
start: int64(start),
79-
end: int64(start),
8071
})
8172

73+
appender, err := b.chunks[len(b.chunks)-1].Appender()
74+
if err != nil {
75+
return err
76+
}
8277
b.appender = appender
8378
b.remainingSamples = samplesPerChunk
8479
return nil
@@ -131,16 +126,15 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
131126
return err
132127
}
133128

134-
var start, end int64
135-
start, end, reuseIter, err = firstAndLastTimes(chunk, reuseIter)
129+
var start int64
130+
start, reuseIter, err = firstTime(chunk, reuseIter)
136131
if err != nil {
137132
return err
138133
}
139134

140135
b.chunks = append(b.chunks, smallChunk{
141-
XORChunk: chunk.(*chunkenc.XORChunk),
136+
XORChunk: *chunk.(*chunkenc.XORChunk),
142137
start: int64(start),
143-
end: int64(end),
144138
})
145139
}
146140
return nil
@@ -197,8 +191,8 @@ func (b *bigchunk) NewIterator(reuseIter Iterator) Iterator {
197191
func (b *bigchunk) Slice(start, end model.Time) Chunk {
198192
i, j := 0, len(b.chunks)
199193
for k := 0; k < len(b.chunks); k++ {
200-
if b.chunks[k].end < int64(start) {
201-
i = k + 1
194+
if b.chunks[k].start <= int64(start) {
195+
i = k
202196
}
203197
if b.chunks[k].start > int64(end) {
204198
j = k
@@ -258,16 +252,13 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool {
258252

259253
// If the seek is outside the current chunk, use the index to find the right
260254
// chunk.
261-
if int64(target) < it.chunks[it.i].start || int64(target) > it.chunks[it.i].end {
255+
if int64(target) < it.chunks[it.i].start ||
256+
(it.i+1 < len(it.chunks) && int64(target) >= it.chunks[it.i+1].start) {
262257
it.curr = nil
263-
for it.i = 0; it.i < len(it.chunks) && int64(target) > it.chunks[it.i].end; it.i++ {
258+
for it.i = 0; it.i+1 < len(it.chunks) && int64(target) >= it.chunks[it.i+1].start; it.i++ {
264259
}
265260
}
266261

267-
if it.i >= len(it.chunks) {
268-
return false
269-
}
270-
271262
if it.curr == nil {
272263
it.curr = it.chunks[it.i].Iterator(it.curr)
273264
} else if t, _ := it.curr.At(); int64(target) <= t {
@@ -280,6 +271,14 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool {
280271
return true
281272
}
282273
}
274+
// Timestamp is after the end of that chunk - if there is another chunk
275+
// then the position we need is at the beginning of it.
276+
if it.i+1 < len(it.chunks) {
277+
it.i++
278+
it.curr = it.chunks[it.i].Iterator(it.curr)
279+
it.curr.Next()
280+
return true
281+
}
283282
return false
284283
}
285284

@@ -333,20 +332,11 @@ func (it *bigchunkIterator) Err() error {
333332
return nil
334333
}
335334

336-
func firstAndLastTimes(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, int64, chunkenc.Iterator, error) {
337-
var (
338-
first int64
339-
last int64
340-
firstSet bool
341-
)
335+
func firstTime(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, chunkenc.Iterator, error) {
336+
var first int64
342337
iter = c.Iterator(iter)
343-
for iter.Next() {
344-
t, _ := iter.At()
345-
if !firstSet {
346-
first = t
347-
firstSet = true
348-
}
349-
last = t
338+
if iter.Next() {
339+
first, _ = iter.At()
350340
}
351-
return first, last, iter, iter.Err()
341+
return first, iter, iter.Err()
352342
}

pkg/chunk/encoding/bigchunk_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,13 @@ func TestSliceBiggerChunk(t *testing.T) {
5555
require.Equal(t, sample.Value, model.SampleValue(j))
5656
require.True(t, iter.Scan())
5757
}
58+
59+
// Now try via seek
60+
iter = s.NewIterator(iter)
61+
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
62+
sample := iter.Value()
63+
require.Equal(t, sample.Timestamp, model.Time(i*step))
64+
require.Equal(t, sample.Value, model.SampleValue(i))
5865
}
5966
}
6067

pkg/chunk/encoding/chunk_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,12 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
131131
require.False(t, iter.Scan())
132132
require.NoError(t, iter.Err())
133133

134+
// Check seek works after unmarshal
135+
iter = chunk.NewIterator(iter)
136+
for i := 0; i < samples; i += samples / 10 {
137+
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
138+
}
139+
134140
// Check the byte representation after another Marshall is the same.
135141
buf = bytes.Buffer{}
136142
err = chunk.Marshal(&buf)
@@ -147,6 +153,14 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
147153

148154
iter := chunk.NewIterator(nil)
149155
for i := 0; i < samples; i += samples / 10 {
156+
if i > 0 {
157+
// Seek one millisecond before the actual time
158+
require.True(t, iter.FindAtOrAfter(model.Time(i*step-1)), "1ms before step %d not found", i)
159+
sample := iter.Value()
160+
require.EqualValues(t, model.Time(i*step), sample.Timestamp)
161+
require.EqualValues(t, model.SampleValue(i), sample.Value)
162+
}
163+
// Now seek to exactly the right time
150164
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
151165
sample := iter.Value()
152166
require.EqualValues(t, model.Time(i*step), sample.Timestamp)
@@ -162,6 +176,8 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
162176
require.False(t, iter.Scan())
163177
require.NoError(t, iter.Err())
164178
}
179+
// Check seek past the end of the chunk returns failure
180+
require.False(t, iter.FindAtOrAfter(model.Time(samples*step+1)))
165181
}
166182

167183
func testChunkSeekForward(t *testing.T, encoding Encoding, samples int) {

0 commit comments

Comments
 (0)