Skip to content

Shrink bigchunk data structure #1649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 28 additions & 38 deletions pkg/chunk/encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ const samplesPerChunk = 120
var errOutOfBounds = errors.New("out of bounds")

type smallChunk struct {
*chunkenc.XORChunk
chunkenc.XORChunk
start int64
end int64
}

// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no
Expand Down Expand Up @@ -45,7 +44,6 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {

b.appender.Append(int64(sample.Timestamp), float64(sample.Value))
b.remainingSamples--
b.chunks[len(b.chunks)-1].end = int64(sample.Timestamp)
return []Chunk{b}, nil
}

Expand All @@ -63,22 +61,19 @@ func (b *bigchunk) addNextChunk(start model.Time) error {
if err != nil {
return err
}
b.chunks[l-1].XORChunk = compacted.(*chunkenc.XORChunk)
b.chunks[l-1].XORChunk = *compacted.(*chunkenc.XORChunk)
}
}

chunk := chunkenc.NewXORChunk()
appender, err := chunk.Appender()
if err != nil {
return err
}

b.chunks = append(b.chunks, smallChunk{
XORChunk: chunk,
XORChunk: *chunkenc.NewXORChunk(),
start: int64(start),
end: int64(start),
})

appender, err := b.chunks[len(b.chunks)-1].Appender()
if err != nil {
return err
}
b.appender = appender
b.remainingSamples = samplesPerChunk
return nil
Expand Down Expand Up @@ -131,16 +126,15 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
return err
}

var start, end int64
start, end, reuseIter, err = firstAndLastTimes(chunk, reuseIter)
var start int64
start, reuseIter, err = firstTime(chunk, reuseIter)
if err != nil {
return err
}

b.chunks = append(b.chunks, smallChunk{
XORChunk: chunk.(*chunkenc.XORChunk),
XORChunk: *chunk.(*chunkenc.XORChunk),
start: int64(start),
end: int64(end),
})
}
return nil
Expand Down Expand Up @@ -197,8 +191,8 @@ func (b *bigchunk) NewIterator(reuseIter Iterator) Iterator {
func (b *bigchunk) Slice(start, end model.Time) Chunk {
i, j := 0, len(b.chunks)
for k := 0; k < len(b.chunks); k++ {
if b.chunks[k].end < int64(start) {
i = k + 1
if b.chunks[k].start <= int64(start) {
i = k
}
if b.chunks[k].start > int64(end) {
j = k
Expand Down Expand Up @@ -258,16 +252,13 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool {

// If the seek is outside the current chunk, use the index to find the right
// chunk.
if int64(target) < it.chunks[it.i].start || int64(target) > it.chunks[it.i].end {
if int64(target) < it.chunks[it.i].start ||
(it.i+1 < len(it.chunks) && int64(target) >= it.chunks[it.i+1].start) {
it.curr = nil
for it.i = 0; it.i < len(it.chunks) && int64(target) > it.chunks[it.i].end; it.i++ {
for it.i = 0; it.i+1 < len(it.chunks) && int64(target) >= it.chunks[it.i+1].start; it.i++ {
}
}

if it.i >= len(it.chunks) {
return false
}

if it.curr == nil {
it.curr = it.chunks[it.i].Iterator(it.curr)
} else if t, _ := it.curr.At(); int64(target) <= t {
Expand All @@ -280,6 +271,14 @@ func (it *bigchunkIterator) FindAtOrAfter(target model.Time) bool {
return true
}
}
// Timestamp is after the end of that chunk - if there is another chunk
// then the position we need is at the beginning of it.
if it.i+1 < len(it.chunks) {
it.i++
it.curr = it.chunks[it.i].Iterator(it.curr)
it.curr.Next()
return true
}
return false
}

Expand Down Expand Up @@ -333,20 +332,11 @@ func (it *bigchunkIterator) Err() error {
return nil
}

func firstAndLastTimes(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, int64, chunkenc.Iterator, error) {
var (
first int64
last int64
firstSet bool
)
func firstTime(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, chunkenc.Iterator, error) {
var first int64
iter = c.Iterator(iter)
for iter.Next() {
t, _ := iter.At()
if !firstSet {
first = t
firstSet = true
}
last = t
if iter.Next() {
first, _ = iter.At()
}
return first, last, iter, iter.Err()
return first, iter, iter.Err()
}
7 changes: 7 additions & 0 deletions pkg/chunk/encoding/bigchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ func TestSliceBiggerChunk(t *testing.T) {
require.Equal(t, sample.Value, model.SampleValue(j))
require.True(t, iter.Scan())
}

// Now try via seek
iter = s.NewIterator(iter)
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
sample := iter.Value()
require.Equal(t, sample.Timestamp, model.Time(i*step))
require.Equal(t, sample.Value, model.SampleValue(i))
}
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
require.False(t, iter.Scan())
require.NoError(t, iter.Err())

// Check seek works after unmarshal
iter = chunk.NewIterator(iter)
for i := 0; i < samples; i += samples / 10 {
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
}

// Check the byte representation after another Marshall is the same.
buf = bytes.Buffer{}
err = chunk.Marshal(&buf)
Expand All @@ -147,6 +153,14 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) {

iter := chunk.NewIterator(nil)
for i := 0; i < samples; i += samples / 10 {
if i > 0 {
// Seek one millisecond before the actual time
require.True(t, iter.FindAtOrAfter(model.Time(i*step-1)), "1ms before step %d not found", i)
sample := iter.Value()
require.EqualValues(t, model.Time(i*step), sample.Timestamp)
require.EqualValues(t, model.SampleValue(i), sample.Value)
}
// Now seek to exactly the right time
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
sample := iter.Value()
require.EqualValues(t, model.Time(i*step), sample.Timestamp)
Expand All @@ -162,6 +176,8 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
require.False(t, iter.Scan())
require.NoError(t, iter.Err())
}
// Check seek past the end of the chunk returns failure
require.False(t, iter.FindAtOrAfter(model.Time(samples*step+1)))
}

func testChunkSeekForward(t *testing.T, encoding Encoding, samples int) {
Expand Down