Skip to content

Reuse iterators #1574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 0 additions & 52 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func equalByKey(a, b Chunk) bool {

// Samples returns all SamplePairs for the chunk.
func (c *Chunk) Samples(from, through model.Time) ([]model.SamplePair, error) {
it := c.Data.NewIterator()
it := c.Data.NewIterator(nil)
interval := metric.Interval{OldestInclusive: from, NewestInclusive: through}
return prom_chunk.RangeValues(it, interval)
}
24 changes: 18 additions & 6 deletions pkg/chunk/encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
}

b.chunks = make([]smallChunk, 0, numChunks+1) // allow one extra space in case we want to add new data
var reuseIter chunkenc.Iterator
for i := uint16(0); i < numChunks; i++ {
chunkLen, err := r.ReadUint16()
if err != nil {
Expand All @@ -130,7 +131,8 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
return err
}

start, end, err := firstAndLastTimes(chunk)
var start, end int64
start, end, reuseIter, err = firstAndLastTimes(chunk, reuseIter)
if err != nil {
return err
}
Expand Down Expand Up @@ -169,10 +171,20 @@ func (b *bigchunk) Size() int {
return sum
}

func (b *bigchunk) NewIterator() Iterator {
func (b *bigchunk) NewIterator(reuseIter Iterator) Iterator {
if bci, ok := reuseIter.(*bigchunkIterator); ok {
bci.bigchunk = b
bci.i = 0
if len(b.chunks) > 0 {
bci.curr = b.chunks[0].Iterator(bci.curr)
} else {
bci.curr = chunkenc.NewNopIterator()
}
return bci
}
var it chunkenc.Iterator
if len(b.chunks) > 0 {
it = b.chunks[0].Iterator(nil)
it = b.chunks[0].Iterator(it)
} else {
it = chunkenc.NewNopIterator()
}
Expand Down Expand Up @@ -321,13 +333,13 @@ func (it *bigchunkIterator) Err() error {
return nil
}

func firstAndLastTimes(c chunkenc.Chunk) (int64, int64, error) {
func firstAndLastTimes(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, int64, chunkenc.Iterator, error) {
var (
first int64
last int64
firstSet bool
iter = c.Iterator(nil)
)
iter = c.Iterator(iter)
for iter.Next() {
t, _ := iter.At()
if !firstSet {
Expand All @@ -336,5 +348,5 @@ func firstAndLastTimes(c chunkenc.Chunk) (int64, int64, error) {
}
last = t
}
return first, last, iter.Err()
return first, last, iter, iter.Err()
}
4 changes: 2 additions & 2 deletions pkg/chunk/encoding/bigchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestSliceBiggerChunk(t *testing.T) {

for i := 0; i < (12*3600/15)-480; i += 120 {
s := c.Slice(model.Time(i*step), model.Time((i+479)*step))
iter := s.NewIterator()
iter := s.NewIterator(nil)
for j := i; j < i+480; j++ {
require.True(t, iter.Scan())
sample := iter.Value()
Expand All @@ -38,7 +38,7 @@ func TestSliceBiggerChunk(t *testing.T) {
// Test for when the slice does not align perfectly with the sub-chunk boundaries.
for i := 0; i < (12*3600/15)-500; i += 100 {
s := c.Slice(model.Time(i*step), model.Time((i+500)*step))
iter := s.NewIterator()
iter := s.NewIterator(nil)

// Consume some samples until we get to where we want to be.
for {
Expand Down
7 changes: 5 additions & 2 deletions pkg/chunk/encoding/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ type Chunk interface {
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk.
Add(sample model.SamplePair) ([]Chunk, error)
NewIterator() Iterator
// NewIterator returns an iterator for the chunks.
// The iterator passed as argument is for re-use. Depending on implementation,
// the iterator can be re-used or a new iterator can be allocated.
NewIterator(Iterator) Iterator
Marshal(io.Writer) error
UnmarshalFromBuf([]byte) error
Encoding() Encoding
Expand Down Expand Up @@ -141,7 +144,7 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error)
err error
)

it := src.NewIterator()
it := src.NewIterator(nil)
for it.Scan() {
if NewChunks, err = head.Add(it.Value()); err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
require.NoError(t, err)

// Check all the samples are in there.
iter := chunk.NewIterator()
iter := chunk.NewIterator(nil)
for i := 0; i < samples; i++ {
require.True(t, iter.Scan())
sample := iter.Value()
Expand All @@ -145,7 +145,7 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
chunk := mkChunk(t, encoding, samples)

iter := chunk.NewIterator()
iter := chunk.NewIterator(nil)
for i := 0; i < samples; i += samples / 10 {
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
sample := iter.Value()
Expand All @@ -167,7 +167,7 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) {
func testChunkSeekForward(t *testing.T, encoding Encoding, samples int) {
chunk := mkChunk(t, encoding, samples)

iter := chunk.NewIterator()
iter := chunk.NewIterator(nil)
for i := 0; i < samples; i += samples / 10 {
require.True(t, iter.FindAtOrAfter(model.Time(i*step)))
sample := iter.Value()
Expand All @@ -190,7 +190,7 @@ func testChunkBatch(t *testing.T, encoding Encoding, samples int) {
chunk := mkChunk(t, encoding, samples)

// Check all the samples are in there.
iter := chunk.NewIterator()
iter := chunk.NewIterator(nil)
for i := 0; i < samples; {
require.True(t, iter.Scan())
batch := iter.Batch(BatchSize)
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/encoding/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (c *deltaEncodedChunk) Slice(_, _ model.Time) Chunk {
}

// NewIterator implements chunk.
func (c *deltaEncodedChunk) NewIterator() Iterator {
func (c *deltaEncodedChunk) NewIterator(_ Iterator) Iterator {
return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/encoding/doubledelta.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
return c.baseTime()
}

// NewIterator( implements chunk.
func (c *doubleDeltaEncodedChunk) NewIterator() Iterator {
// NewIterator implements chunk.
func (c *doubleDeltaEncodedChunk) NewIterator(_ Iterator) Iterator {
return newIndexAccessingChunkIterator(c.Len(), &doubleDeltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/encoding/varbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) {
}

// NewIterator implements chunk.
func (c varbitChunk) NewIterator() Iterator {
func (c varbitChunk) NewIterator(_ Iterator) Iterator {
return newVarbitChunkIterator(c)
}

Expand Down Expand Up @@ -329,7 +329,7 @@ func (c varbitChunk) marshalLen() int {

// Len implements chunk. Runs in O(n).
func (c varbitChunk) Len() int {
it := c.NewIterator()
it := c.NewIterator(nil)
i := 0
for ; it.Scan(); i++ {
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) {
first model.Time
last model.Time
firstSet bool
iter = c.NewIterator()
iter = c.NewIterator(nil)
)
for iter.Scan() {
sample := iter.Value()
Expand Down Expand Up @@ -190,9 +190,11 @@ func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.Sample
OldestInclusive: from,
NewestInclusive: through,
}
var reuseIter encoding.Iterator
for idx := fromIdx; idx <= throughIdx; idx++ {
cd := s.chunkDescs[idx]
chValues, err := encoding.RangeValues(cd.C.NewIterator(), in)
reuseIter = cd.C.NewIterator(reuseIter)
chValues, err := encoding.RangeValues(reuseIter, in)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/batch/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type chunkIterator struct {

func (i *chunkIterator) reset(chunk chunk.Chunk) {
i.chunk = chunk
i.it = chunk.Data.NewIterator()
i.it = chunk.Data.NewIterator(i.it)
i.batch.Length = 0
i.batch.Index = 0
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/iterators/chunk_merge_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func buildIterators(cs []chunk.Chunk) []*nonOverlappingIterator {
for i := range cs {
chunks[i] = &chunkIterator{
Chunk: cs[i],
it: cs[i].Data.NewIterator(),
it: cs[i].Data.NewIterator(nil),
}
}
sort.Sort(byFrom(chunks))
Expand Down