diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index bd74b4eaba2..deea2b35ce6 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -15,9 +15,8 @@ const samplesPerChunk = 120 var errOutOfBounds = errors.New("out of bounds") type smallChunk struct { - *chunkenc.XORChunk + chunkenc.XORChunk start int64 - end int64 } // bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no @@ -45,7 +44,6 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) b.remainingSamples-- - b.chunks[len(b.chunks)-1].end = int64(sample.Timestamp) return []Chunk{b}, nil } @@ -63,22 +61,19 @@ func (b *bigchunk) addNextChunk(start model.Time) error { if err != nil { return err } - b.chunks[l-1].XORChunk = compacted.(*chunkenc.XORChunk) + b.chunks[l-1].XORChunk = *compacted.(*chunkenc.XORChunk) } } - chunk := chunkenc.NewXORChunk() - appender, err := chunk.Appender() - if err != nil { - return err - } - b.chunks = append(b.chunks, smallChunk{ - XORChunk: chunk, + XORChunk: *chunkenc.NewXORChunk(), start: int64(start), - end: int64(start), }) + appender, err := b.chunks[len(b.chunks)-1].Appender() + if err != nil { + return err + } b.appender = appender b.remainingSamples = samplesPerChunk return nil @@ -131,16 +126,15 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { return err } - var start, end int64 - start, end, reuseIter, err = firstAndLastTimes(chunk, reuseIter) + var start int64 + start, reuseIter, err = firstTime(chunk, reuseIter) if err != nil { return err } b.chunks = append(b.chunks, smallChunk{ - XORChunk: chunk.(*chunkenc.XORChunk), + XORChunk: *chunk.(*chunkenc.XORChunk), start: int64(start), - end: int64(end), }) } return nil @@ -197,8 +191,8 @@ func (b *bigchunk) NewIterator(reuseIter Iterator) Iterator { func (b *bigchunk) Slice(start, end model.Time) Chunk { i, j := 0, len(b.chunks) for k := 0; k < len(b.chunks); k++ { - if b.chunks[k].end < int64(start) { - i = k + 1 + if b.chunks[k].start <= int64(start) { + i = k } if b.chunks[k].start > int64(end) { j = k @@ -258,16 +252,13 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool { // If the seek is outside the current chunk, use the index to find the right // chunk. - if int64(target) < it.chunks[it.i].start || int64(target) > it.chunks[it.i].end { + if int64(target) < it.chunks[it.i].start || + (it.i+1 < len(it.chunks) && int64(target) >= it.chunks[it.i+1].start) { it.curr = nil - for it.i = 0; it.i < len(it.chunks) && int64(target) > it.chunks[it.i].end; it.i++ { + for it.i = 0; it.i+1 < len(it.chunks) && int64(target) >= it.chunks[it.i+1].start; it.i++ { } } - if it.i >= len(it.chunks) { - return false - } - if it.curr == nil { it.curr = it.chunks[it.i].Iterator(it.curr) } else if t, _ := it.curr.At(); int64(target) <= t { @@ -280,6 +271,14 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool { return true } } + // Timestamp is after the end of that chunk - if there is another chunk + // then the position we need is at the beginning of it. + if it.i+1 < len(it.chunks) { + it.i++ + it.curr = it.chunks[it.i].Iterator(it.curr) + it.curr.Next() + return true + } return false } @@ -333,20 +332,11 @@ func (it *bigchunkIterator) Err() error { return nil } -func firstAndLastTimes(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, int64, chunkenc.Iterator, error) { - var ( - first int64 - last int64 - firstSet bool - ) +func firstTime(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, chunkenc.Iterator, error) { + var first int64 iter = c.Iterator(iter) - for iter.Next() { - t, _ := iter.At() - if !firstSet { - first = t - firstSet = true - } - last = t + if iter.Next() { + first, _ = iter.At() } - return first, last, iter, iter.Err() + return first, iter, iter.Err() } diff --git a/pkg/chunk/encoding/bigchunk_test.go b/pkg/chunk/encoding/bigchunk_test.go index c8910c53f82..7e5e6e8761c 100644 --- a/pkg/chunk/encoding/bigchunk_test.go +++ b/pkg/chunk/encoding/bigchunk_test.go @@ -55,6 +55,13 @@ func TestSliceBiggerChunk(t *testing.T) { require.Equal(t, sample.Value, model.SampleValue(j)) require.True(t, iter.Scan()) } + + // Now try via seek + iter = s.NewIterator(iter) + require.True(t, iter.FindAtOrAfter(model.Time(i*step))) + sample := iter.Value() + require.Equal(t, sample.Timestamp, model.Time(i*step)) + require.Equal(t, sample.Value, model.SampleValue(i)) } } diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index e0dbd383c6f..c7bf8a3c0f1 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -131,6 +131,12 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) { require.False(t, iter.Scan()) require.NoError(t, iter.Err()) + // Check seek works after unmarshal + iter = chunk.NewIterator(iter) + for i := 0; i < samples; i += samples / 10 { + require.True(t, iter.FindAtOrAfter(model.Time(i*step))) + } + // Check the byte representation after another Marshall is the same. buf = bytes.Buffer{} err = chunk.Marshal(&buf) @@ -147,6 +153,14 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) { iter := chunk.NewIterator(nil) for i := 0; i < samples; i += samples / 10 { + if i > 0 { + // Seek one millisecond before the actual time + require.True(t, iter.FindAtOrAfter(model.Time(i*step-1)), "1ms before step %d not found", i) + sample := iter.Value() + require.EqualValues(t, model.Time(i*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(i), sample.Value) + } + // Now seek to exactly the right time require.True(t, iter.FindAtOrAfter(model.Time(i*step))) sample := iter.Value() require.EqualValues(t, model.Time(i*step), sample.Timestamp) @@ -162,6 +176,8 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) { require.False(t, iter.Scan()) require.NoError(t, iter.Err()) } + // Check seek past the end of the chunk returns failure + require.False(t, iter.FindAtOrAfter(model.Time(samples*step+1))) } func testChunkSeekForward(t *testing.T, encoding Encoding, samples int) {