Skip to content

Move iterators inside chunk store using MergeSeriesIterator #438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
32 changes: 31 additions & 1 deletion pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"

"github.com/weaveworks/common/errors"
Expand Down Expand Up @@ -278,6 +279,35 @@ func (c *Chunk) decode(input []byte) error {
})
}

func chunksToIterators(chunks []Chunk) ([]local.SeriesIterator, error) {
// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
for _, c := range chunks {
fp := c.Metric.Fingerprint()
ss, ok := sampleStreams[fp]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[fp] = ss
}

samples, err := c.samples()
if err != nil {
return nil, err
}

ss.Values = util.MergeSampleSets(ss.Values, samples)
}

iterators := make([]local.SeriesIterator, 0, len(sampleStreams))
for _, ss := range sampleStreams {
iterators = append(iterators, util.NewSampleStreamIterator(ss))
}

return iterators, nil
}

// ChunksToMatrix converts a slice of chunks into a model.Matrix.
func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
// Group chunks by series, sort and dedupe samples.
Expand All @@ -297,7 +327,7 @@ func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
return nil, err
}

ss.Values = util.MergeSamples(ss.Values, samples)
ss.Values = util.MergeSampleSets(ss.Values, samples)
}

matrix := make(model.Matrix, 0, len(sampleStreams))
Expand Down
11 changes: 10 additions & 1 deletion pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"golang.org/x/net/context"

Expand Down Expand Up @@ -151,7 +152,15 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch
}

// Get implements ChunkStore
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) {
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
chunks, err := c.getChunks(ctx, from, through, allMatchers...)
if err != nil {
return nil, err
}
return chunksToIterators(chunks)
}

func (c *Store) getChunks(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) {
if through < from {
return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestChunkStore(t *testing.T) {
{"v5 schema", v5Schema},
{"v6 schema", v6Schema},
{"v7 schema", v7Schema},
{"v8 schema", v8Schema},
}

nameMatcher := mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo")
Expand Down Expand Up @@ -122,7 +123,7 @@ func TestChunkStore(t *testing.T) {
t.Fatal(err)
}

chunks, err := store.Get(ctx, now.Add(-time.Hour), now, tc.matchers...)
chunks, err := store.getChunks(ctx, now.Add(-time.Hour), now, tc.matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, chunks) {
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestChunkStoreMetricNames(t *testing.T) {
t.Fatal(err)
}

chunks, err := store.Get(ctx, now.Add(-time.Hour), now, tc.matchers...)
chunks, err := store.getChunks(ctx, now.Add(-time.Hour), now, tc.matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, chunks) {
Expand Down Expand Up @@ -282,6 +283,7 @@ func TestChunkStoreRandom(t *testing.T) {
{name: "v5 schema", fn: v5Schema},
{name: "v6 schema", fn: v6Schema},
{name: "v7 schema", fn: v7Schema},
{name: "v8 schema", fn: v8Schema},
}

for i := range schemas {
Expand Down Expand Up @@ -325,7 +327,7 @@ func TestChunkStoreRandom(t *testing.T) {
endTime := model.TimeFromUnix(end)

for _, s := range schemas {
chunks, err := s.store.Get(ctx, startTime, endTime,
chunks, err := s.store.getChunks(ctx, startTime, endTime,
mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo"),
mustNewLabelMatcher(metric.Equal, "bar", "baz"),
)
Expand Down Expand Up @@ -388,7 +390,7 @@ func TestChunkStoreLeastRead(t *testing.T) {
startTime := model.TimeFromUnix(start)
endTime := model.TimeFromUnix(end)

chunks, err := store.Get(ctx, startTime, endTime,
chunks, err := store.getChunks(ctx, startTime, endTime,
mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo"),
mustNewLabelMatcher(metric.Equal, "bar", "baz"),
)
Expand Down
61 changes: 61 additions & 0 deletions pkg/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/local/chunk"
"github.com/stretchr/testify/require"
"github.com/weaveworks/cortex/pkg/util"
)

const userID = "userID"
Expand Down Expand Up @@ -126,3 +128,62 @@ func TestParseExternalKey(t *testing.T) {
require.Equal(t, c.chunk, chunk)
}
}

func TestChunksToIterators(t *testing.T) {
// Create 2 chunks which have the same metric
metric := model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
"toms": "code",
}
chunk1 := dummyChunkFor(metric)
chunk1Samples, err := chunk1.samples()
require.NoError(t, err)
chunk2 := dummyChunkFor(metric)
chunk2Samples, err := chunk2.samples()
require.NoError(t, err)

iterator1 := util.NewSampleStreamIterator(&model.SampleStream{
Metric: chunk1.Metric,
Values: util.MergeSampleSets(chunk1Samples, chunk2Samples),
})

// Create another chunk with a different metric
otherMetric := model.Metric{
model.MetricNameLabel: "foo2",
"bar": "baz",
"toms": "code",
}
chunk3 := dummyChunkFor(otherMetric)
chunk3Samples, err := chunk3.samples()
require.NoError(t, err)

iterator2 := util.NewSampleStreamIterator(&model.SampleStream{
Metric: chunk3.Metric,
Values: chunk3Samples,
})

for _, c := range []struct {
chunks []Chunk
expectedIterators []local.SeriesIterator
}{
{
chunks: []Chunk{},
expectedIterators: []local.SeriesIterator{},
}, {
chunks: []Chunk{
chunk1,
chunk2,
chunk3,
},
expectedIterators: []local.SeriesIterator{
iterator1,
iterator2,
},
},
} {
iterators, err := chunksToIterators(c.chunks)
require.NoError(t, err)
require.Equal(t, c.expectedIterators, iterators)
}
}
5 changes: 3 additions & 2 deletions pkg/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (m *MockStorage) BatchWrite(_ context.Context, batch WriteBatch) error {
items = append(items, mockItem{})
copy(items[i+1:], items[i:])
} else {
// Return error if duplicate write and not metric name entry
// Return error if duplicate write and not metric name entry or series entry
itemComponents := decodeRangeKey(items[i].rangeValue)
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) {
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) &&
!bytes.Equal(itemComponents[3], seriesRangeKeyV1) {
return fmt.Errorf("Dupe write")
}
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"strings"
Expand All @@ -17,6 +18,7 @@ var (
chunkTimeRangeKeyV4 = []byte{'4'}
chunkTimeRangeKeyV5 = []byte{'5'}
metricNameRangeKeyV1 = []byte{'6'}
seriesRangeKeyV1 = []byte{'7'}
)

// Errors
Expand Down Expand Up @@ -132,6 +134,14 @@ func v7Schema(cfg SchemaConfig) Schema {
}
}

// v8 schema is an extension of v6, with support for a labelset/series index
func v8Schema(cfg SchemaConfig) Schema {
return schema{
cfg.dailyBuckets,
v8Entries{},
}
}

// schema implements Schema given a bucketing function and and set of range key callbacks
type schema struct {
buckets func(from, through model.Time, userID string) []Bucket
Expand Down Expand Up @@ -536,3 +546,40 @@ func (v7Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
},
}, nil
}

// v8Entries is the same as v7Entries however with a series index instead of a metric name index
type v8Entries struct {
v6Entries
}

func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID)
if err != nil {
return nil, err
}

seriesID := metricSeriesID(labels)
seriesBytes, err := json.Marshal(labels)
if err != nil {
return nil, err
}

// Add IndexEntry for series with userID:bigBucket HashValue
indexEntries = append(indexEntries, IndexEntry{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
RangeValue: encodeRangeKey([]byte(seriesID), nil, nil, seriesRangeKeyV1),
Value: seriesBytes,
})

return indexEntries, nil
}

func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
return []IndexQuery{
{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
},
}, nil
}
8 changes: 8 additions & 0 deletions pkg/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type SchemaConfig struct {

// After this time, we will read and write v7 schemas.
V7SchemaFrom util.DayValue

// After this time, we will read and write v8 schemas.
V8SchemaFrom util.DayValue
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -53,6 +56,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.V5SchemaFrom, "dynamodb.v5-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v5 schema.")
f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.")
f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema.")
f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema.")
}

func (cfg *SchemaConfig) tableForBucket(bucketStart int64) string {
Expand Down Expand Up @@ -177,6 +181,10 @@ func newCompositeSchema(cfg SchemaConfig) (Schema, error) {
schemas = append(schemas, compositeSchemaEntry{cfg.V7SchemaFrom.Time, v7Schema(cfg)})
}

if cfg.V8SchemaFrom.IsSet() {
schemas = append(schemas, compositeSchemaEntry{cfg.V8SchemaFrom.Time, v8Schema(cfg)})
}

if !sort.IsSorted(byStart(schemas)) {
return nil, fmt.Errorf("schemas not in time-sorted order")
}
Expand Down
Loading