Skip to content

Adds v11 schema #1538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,4 @@ The interface works somewhat differently across the supported databases:

A set of schemas are used to map the matchers and label sets used on reads and writes to the chunk store into appropriate operations on the index. Schemas have been added as Cortex has evolved, mainly in an attempt to better load balance writes and improve query performance.

> The current schema recommendation is the **v10 schema**.
> The current schema recommendation is the **v10 schema**. v11 schema is an experimental schema.
3 changes: 1 addition & 2 deletions docs/single-process-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ ingester:
# for the chunks.
schema:
configs:
- from: 2019-03-25
- from: 2019-07-29
store: boltdb
object_store: filesystem
schema: v10
Expand All @@ -65,4 +65,3 @@ storage:

filesystem:
directory: /tmp/cortex/chunks

5 changes: 3 additions & 2 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var schemas = []struct {
{"v6", true},
{"v9", true},
{"v10", true},
{"v11", true},
}

var stores = []struct {
Expand Down Expand Up @@ -416,11 +417,11 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
}{
{
`foo`,
[]string{"bar", "flip", "toms"},
[]string{labels.MetricName, "bar", "flip", "toms"},
},
{
`bar`,
[]string{"bar", "toms"},
[]string{labels.MetricName, "bar", "toms"},
},
} {
for _, schema := range schemas {
Expand Down
8 changes: 3 additions & 5 deletions pkg/chunk/chunk_store_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ func labelNamesFromChunks(chunks []Chunk) []string {
var result []string
for _, c := range chunks {
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
if _, ok := keys[string(l.Name)]; !ok {
keys[string(l.Name)] = struct{}{}
result = append(result, string(l.Name))
}
if _, ok := keys[string(l.Name)]; !ok {
keys[string(l.Name)] = struct{}{}
result = append(result, string(l.Name))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index
var store Store
var err error
switch cfg.Schema {
case "v9", "v10":
case "v9", "v10", "v11":
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
default:
store, err = newStore(storeCfg, schema, index, chunks, limits)
Expand Down
1 change: 1 addition & 0 deletions pkg/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error {
itemComponents := decodeRangeKey(items[i].rangeValue)
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) &&
!bytes.Equal(itemComponents[3], seriesRangeKeyV1) &&
!bytes.Equal(itemComponents[3], labelNamesRangeKeyV1) &&
!bytes.Equal(itemComponents[3], labelSeriesRangeKeyV1) {
return fmt.Errorf("Dupe write")
}
Expand Down
109 changes: 109 additions & 0 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"strings"

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
Expand All @@ -22,6 +23,8 @@ var (
// For v9 schema
seriesRangeKeyV1 = []byte{'7'}
labelSeriesRangeKeyV1 = []byte{'8'}
// For v11 schema
labelNamesRangeKeyV1 = []byte{'9'}

// ErrNotSupported when a schema doesn't support that particular lookup.
ErrNotSupported = errors.New("not supported")
Expand All @@ -45,6 +48,8 @@ type Schema interface {

// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
// Returns queries to retrieve all label names of multiple series by id.
GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
}

// IndexQuery describes a query for entries
Expand Down Expand Up @@ -196,6 +201,20 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri
return result, nil
}

func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
var result []IndexQuery

buckets := s.buckets(from, through, userID)
for _, bucket := range buckets {
entries, err := s.entries.GetLabelNamesForSeries(bucket, seriesID)
if err != nil {
return nil, err
}
result = append(result, entries...)
}
return result, nil
}

type entries interface {
GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
Expand All @@ -205,6 +224,7 @@ type entries interface {
GetReadMetricLabelQueries(bucket Bucket, metricName string, labelName string) ([]IndexQuery, error)
GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
}

// original entries:
Expand Down Expand Up @@ -276,6 +296,10 @@ func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, err
return nil, ErrNotSupported
}

func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

// v3Schema went to base64 encoded label values & a version ID
// - range key: <label name>\0<base64(label value)>\0<chunk name>\0<version 1>

Expand Down Expand Up @@ -391,6 +415,10 @@ func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]Index
return nil, ErrNotSupported
}

func (labelNameInHashKeyEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

// v5 schema is an extension of v4, with the chunk end time in the
// range key to improve query latency. However, it did it wrong
// so the chunk end times are ignored.
Expand Down Expand Up @@ -461,6 +489,10 @@ func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

func (v5Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
// moves label value out of range key (see #199).
type v6Entries struct{}
Expand Down Expand Up @@ -537,6 +569,10 @@ func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

func (v6Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct {
}
Expand Down Expand Up @@ -632,6 +668,10 @@ func (v9Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuer
}, nil
}

func (v9Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

// v10Entries builds on v9 by sharding index rows to reduce their size.
type v10Entries struct {
rowShards uint32
Expand Down Expand Up @@ -736,3 +776,72 @@ func (v10Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQue
},
}, nil
}

func (v10Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}

// v11Entries builds on v10 but adds index entries for each series to store respective labels.
type v11Entries struct {
v10Entries
}

func (s v11Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)

// read first 32 bits of the hash and use this to calculate the shard
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards

labelNames := make([]string, 0, len(labels))
for _, l := range labels {
if l.Name == model.MetricNameLabel {
continue
}
labelNames = append(labelNames, l.Name)
}
data, err := jsoniter.ConfigFastest.Marshal(labelNames)
if err != nil {
return nil, err
}
entries := []IndexEntry{
// Entry for metricName -> seriesID
{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s", shard, bucket.hashKey, metricName),
RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1),
},
// Entry for seriesID -> label names
{
TableName: bucket.tableName,
HashValue: string(seriesID),
RangeValue: encodeRangeKey(nil, nil, nil, labelNamesRangeKeyV1),
Value: data,
},
}

// Entries for metricName:labelName -> hash(value):seriesID
// We use a hash of the value to limit its length.
for _, v := range labels {
if v.Name == model.MetricNameLabel {
continue
}
valueHash := sha256bytes(v.Value)
entries = append(entries, IndexEntry{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey, metricName, v.Name),
RangeValue: encodeRangeKey(valueHash, seriesID, nil, labelSeriesRangeKeyV1),
Value: []byte(v.Value),
})
}

return entries, nil
}

func (v11Entries) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) {
return []IndexQuery{
{
TableName: bucket.tableName,
HashValue: string(seriesID),
},
}, nil
}
8 changes: 8 additions & 0 deletions pkg/chunk/schema_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID stri
return s.setImmutability(from, through, queries), nil
}

func (s *schemaCaching) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
queries, err := s.Schema.GetLabelNamesForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}

func (s *schemaCaching) setImmutability(from, through model.Time, queries []IndexQuery) []IndexQuery {
cacheBefore := model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())

Expand Down
17 changes: 11 additions & 6 deletions pkg/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,12 @@ func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)

// CreateSchema returns the schema defined by the PeriodConfig
func (cfg PeriodConfig) CreateSchema() Schema {
var e entries
rowShards := uint32(16)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}

var e entries
switch cfg.Schema {
case "v1":
e = originalEntries{}
Expand All @@ -237,14 +241,15 @@ func (cfg PeriodConfig) CreateSchema() Schema {
case "v9":
e = v9Entries{}
case "v10":
rowShards := uint32(16)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}

e = v10Entries{
rowShards: rowShards,
}
case "v11":
e = v11Entries{
v10Entries: v10Entries{
rowShards: rowShards,
},
}
default:
return nil
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/chunk/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"sort"
"testing"

jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/test"
)
Expand Down Expand Up @@ -333,3 +335,56 @@ func TestSchemaRangeKey(t *testing.T) {
})
}
}

func BenchmarkEncodeLabelsJson(b *testing.B) {
decoded := &labels.Labels{}
lbs := labels.FromMap(map[string]string{
"foo": "bar",
"fuzz": "buzz",
"cluster": "test",
"test": "test1",
"instance": "cortex-01",
"bar": "foo",
"version": "0.1",
})
json := jsoniter.ConfigFastest
var data []byte
var err error
for n := 0; n < b.N; n++ {
data, err = json.Marshal(lbs)
if err != nil {
panic(err)
}
err = json.Unmarshal(data, decoded)
if err != nil {
panic(err)
}
}
b.Log("data size", len(data))
b.Log("decode", decoded)
}

func BenchmarkEncodeLabelsString(b *testing.B) {
var decoded labels.Labels
lbs := labels.FromMap(map[string]string{
"foo": "bar",
"fuzz": "buzz",
"cluster": "test",
"test": "test1",
"instance": "cortex-01",
"bar": "foo",
"version": "0.1",
})
var data []byte
var err error
for n := 0; n < b.N; n++ {
data = []byte(lbs.String())
decoded, err = promql.ParseMetric(string(data))
if err != nil {
panic(err)
}
}
b.Log("data size", len(data))
b.Log("decode", decoded)

}
Loading