Skip to content

Commit 77a09cc

Browse files
Adds v11 schema (#1538)
* Adds v11 schema to store label names within index. Stores only label names and not the entire metric. Storing entire metric will bloat the index by 30% and it doesn't really make sense to do it right now. Adding just label names adds a tolerable 7% to the index. Also, in Prometheus, we don't treat __name__ as a special label. We always return it when calling /labels API and we should do the same here. Signed-off-by: Cyril Tovena <[email protected]> Signed-off-by: Goutham Veeramachaneni <[email protected]> * Finishing touches fix lint issue Signed-off-by: Cyril Tovena <[email protected]> removes useless loop Signed-off-by: Cyril Tovena <[email protected]> This should be on v11 not v10. Signed-off-by: Cyril Tovena <[email protected]> s/metricConstRangeKeyV1/labelNamesRangeKeyV1/ The code was first written to store the entire series, but now changed to do just labelNames. Signed-off-by: Goutham Veeramachaneni <[email protected]> Add note about v11 being experimental. Signed-off-by: Goutham Veeramachaneni <[email protected]> Co-authored-by: cyriltovena <[email protected]> Co-authored-by: Goutham Veeramachaneni <[email protected]>
1 parent f59c54d commit 77a09cc

11 files changed

+252
-18
lines changed

docs/architecture.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,4 @@ The interface works somewhat differently across the supported databases:
145145

146146
A set of schemas are used to map the matchers and label sets used on reads and writes to the chunk store into appropriate operations on the index. Schemas have been added as Cortex has evolved, mainly in an attempt to better load balance writes and improve query performance.
147147

148-
> The current schema recommendation is the **v10 schema**.
148+
> The current schema recommendation is the **v10 schema**. v11 schema is an experimental schema.

docs/single-process-config.yaml

+1-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ ingester:
5151
# for the chunks.
5252
schema:
5353
configs:
54-
- from: 2019-03-25
54+
- from: 2019-07-29
5555
store: boltdb
5656
object_store: filesystem
5757
schema: v10
@@ -65,4 +65,3 @@ storage:
6565

6666
filesystem:
6767
directory: /tmp/cortex/chunks
68-

pkg/chunk/chunk_store_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ var schemas = []struct {
3737
{"v6", true},
3838
{"v9", true},
3939
{"v10", true},
40+
{"v11", true},
4041
}
4142

4243
var stores = []struct {
@@ -416,11 +417,11 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
416417
}{
417418
{
418419
`foo`,
419-
[]string{"bar", "flip", "toms"},
420+
[]string{labels.MetricName, "bar", "flip", "toms"},
420421
},
421422
{
422423
`bar`,
423-
[]string{"bar", "toms"},
424+
[]string{labels.MetricName, "bar", "toms"},
424425
},
425426
} {
426427
for _, schema := range schemas {

pkg/chunk/chunk_store_utils.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,9 @@ func labelNamesFromChunks(chunks []Chunk) []string {
4242
var result []string
4343
for _, c := range chunks {
4444
for _, l := range c.Metric {
45-
if l.Name != model.MetricNameLabel {
46-
if _, ok := keys[string(l.Name)]; !ok {
47-
keys[string(l.Name)] = struct{}{}
48-
result = append(result, string(l.Name))
49-
}
45+
if _, ok := keys[string(l.Name)]; !ok {
46+
keys[string(l.Name)] = struct{}{}
47+
result = append(result, string(l.Name))
5048
}
5149
}
5250
}

pkg/chunk/composite_store.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index
5555
var store Store
5656
var err error
5757
switch cfg.Schema {
58-
case "v9", "v10":
58+
case "v9", "v10", "v11":
5959
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
6060
default:
6161
store, err = newStore(storeCfg, schema, index, chunks, limits)

pkg/chunk/inmemory_storage_client.go

+1
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error {
165165
itemComponents := decodeRangeKey(items[i].rangeValue)
166166
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) &&
167167
!bytes.Equal(itemComponents[3], seriesRangeKeyV1) &&
168+
!bytes.Equal(itemComponents[3], labelNamesRangeKeyV1) &&
168169
!bytes.Equal(itemComponents[3], labelSeriesRangeKeyV1) {
169170
return fmt.Errorf("Dupe write")
170171
}

pkg/chunk/schema.go

+109
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"strings"
99

10+
jsoniter "github.com/json-iterator/go"
1011
"github.com/prometheus/common/model"
1112
"github.com/prometheus/prometheus/pkg/labels"
1213
)
@@ -22,6 +23,8 @@ var (
2223
// For v9 schema
2324
seriesRangeKeyV1 = []byte{'7'}
2425
labelSeriesRangeKeyV1 = []byte{'8'}
26+
// For v11 schema
27+
labelNamesRangeKeyV1 = []byte{'9'}
2528

2629
// ErrNotSupported when a schema doesn't support that particular lookup.
2730
ErrNotSupported = errors.New("not supported")
@@ -45,6 +48,8 @@ type Schema interface {
4548

4649
// If the query resulted in series IDs, use this method to find chunks.
4750
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
51+
// Returns queries to retrieve all label names of multiple series by id.
52+
GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
4853
}
4954

5055
// IndexQuery describes a query for entries
@@ -196,6 +201,20 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri
196201
return result, nil
197202
}
198203

204+
func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
205+
var result []IndexQuery
206+
207+
buckets := s.buckets(from, through, userID)
208+
for _, bucket := range buckets {
209+
entries, err := s.entries.GetLabelNamesForSeries(bucket, seriesID)
210+
if err != nil {
211+
return nil, err
212+
}
213+
result = append(result, entries...)
214+
}
215+
return result, nil
216+
}
217+
199218
type entries interface {
200219
GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
201220
GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
@@ -205,6 +224,7 @@ type entries interface {
205224
GetReadMetricLabelQueries(bucket Bucket, metricName string, labelName string) ([]IndexQuery, error)
206225
GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
207226
GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
227+
GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
208228
}
209229

210230
// original entries:
@@ -276,6 +296,10 @@ func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, err
276296
return nil, ErrNotSupported
277297
}
278298

299+
func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
300+
return nil, ErrNotSupported
301+
}
302+
279303
// v3Schema went to base64 encoded label values & a version ID
280304
// - range key: <label name>\0<base64(label value)>\0<chunk name>\0<version 1>
281305

@@ -391,6 +415,10 @@ func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]Index
391415
return nil, ErrNotSupported
392416
}
393417

418+
func (labelNameInHashKeyEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
419+
return nil, ErrNotSupported
420+
}
421+
394422
// v5 schema is an extension of v4, with the chunk end time in the
395423
// range key to improve query latency. However, it did it wrong
396424
// so the chunk end times are ignored.
@@ -461,6 +489,10 @@ func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
461489
return nil, ErrNotSupported
462490
}
463491

492+
func (v5Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
493+
return nil, ErrNotSupported
494+
}
495+
464496
// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
465497
// moves label value out of range key (see #199).
466498
type v6Entries struct{}
@@ -537,6 +569,10 @@ func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
537569
return nil, ErrNotSupported
538570
}
539571

572+
func (v6Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
573+
return nil, ErrNotSupported
574+
}
575+
540576
// v9Entries adds a layer of indirection between labels -> series -> chunks.
541577
type v9Entries struct {
542578
}
@@ -632,6 +668,10 @@ func (v9Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuer
632668
}, nil
633669
}
634670

671+
func (v9Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
672+
return nil, ErrNotSupported
673+
}
674+
635675
// v10Entries builds on v9 by sharding index rows to reduce their size.
636676
type v10Entries struct {
637677
rowShards uint32
@@ -736,3 +776,72 @@ func (v10Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQue
736776
},
737777
}, nil
738778
}
779+
780+
func (v10Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
781+
return nil, ErrNotSupported
782+
}
783+
784+
// v11Entries builds on v10 but adds index entries for each series to store respective labels.
785+
type v11Entries struct {
786+
v10Entries
787+
}
788+
789+
func (s v11Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
790+
seriesID := labelsSeriesID(labels)
791+
792+
// read first 32 bits of the hash and use this to calculate the shard
793+
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards
794+
795+
labelNames := make([]string, 0, len(labels))
796+
for _, l := range labels {
797+
if l.Name == model.MetricNameLabel {
798+
continue
799+
}
800+
labelNames = append(labelNames, l.Name)
801+
}
802+
data, err := jsoniter.ConfigFastest.Marshal(labelNames)
803+
if err != nil {
804+
return nil, err
805+
}
806+
entries := []IndexEntry{
807+
// Entry for metricName -> seriesID
808+
{
809+
TableName: bucket.tableName,
810+
HashValue: fmt.Sprintf("%02d:%s:%s", shard, bucket.hashKey, metricName),
811+
RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1),
812+
},
813+
// Entry for seriesID -> label names
814+
{
815+
TableName: bucket.tableName,
816+
HashValue: string(seriesID),
817+
RangeValue: encodeRangeKey(nil, nil, nil, labelNamesRangeKeyV1),
818+
Value: data,
819+
},
820+
}
821+
822+
// Entries for metricName:labelName -> hash(value):seriesID
823+
// We use a hash of the value to limit its length.
824+
for _, v := range labels {
825+
if v.Name == model.MetricNameLabel {
826+
continue
827+
}
828+
valueHash := sha256bytes(v.Value)
829+
entries = append(entries, IndexEntry{
830+
TableName: bucket.tableName,
831+
HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey, metricName, v.Name),
832+
RangeValue: encodeRangeKey(valueHash, seriesID, nil, labelSeriesRangeKeyV1),
833+
Value: []byte(v.Value),
834+
})
835+
}
836+
837+
return entries, nil
838+
}
839+
840+
func (v11Entries) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) {
841+
return []IndexQuery{
842+
{
843+
TableName: bucket.tableName,
844+
HashValue: string(seriesID),
845+
},
846+
}, nil
847+
}

pkg/chunk/schema_caching.go

+8
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID stri
4646
return s.setImmutability(from, through, queries), nil
4747
}
4848

49+
func (s *schemaCaching) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
50+
queries, err := s.Schema.GetLabelNamesForSeries(from, through, userID, seriesID)
51+
if err != nil {
52+
return nil, err
53+
}
54+
return s.setImmutability(from, through, queries), nil
55+
}
56+
4957
func (s *schemaCaching) setImmutability(from, through model.Time, queries []IndexQuery) []IndexQuery {
5058
cacheBefore := model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
5159

pkg/chunk/schema_config.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,12 @@ func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)
219219

220220
// CreateSchema returns the schema defined by the PeriodConfig
221221
func (cfg PeriodConfig) CreateSchema() Schema {
222-
var e entries
222+
rowShards := uint32(16)
223+
if cfg.RowShards > 0 {
224+
rowShards = cfg.RowShards
225+
}
223226

227+
var e entries
224228
switch cfg.Schema {
225229
case "v1":
226230
e = originalEntries{}
@@ -237,14 +241,15 @@ func (cfg PeriodConfig) CreateSchema() Schema {
237241
case "v9":
238242
e = v9Entries{}
239243
case "v10":
240-
rowShards := uint32(16)
241-
if cfg.RowShards > 0 {
242-
rowShards = cfg.RowShards
243-
}
244-
245244
e = v10Entries{
246245
rowShards: rowShards,
247246
}
247+
case "v11":
248+
e = v11Entries{
249+
v10Entries: v10Entries{
250+
rowShards: rowShards,
251+
},
252+
}
248253
default:
249254
return nil
250255
}

pkg/chunk/schema_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88
"sort"
99
"testing"
1010

11+
jsoniter "github.com/json-iterator/go"
1112
"github.com/prometheus/common/model"
1213
"github.com/prometheus/prometheus/pkg/labels"
14+
"github.com/prometheus/prometheus/promql"
1315
"github.com/stretchr/testify/require"
1416
"github.com/weaveworks/common/test"
1517
)
@@ -333,3 +335,56 @@ func TestSchemaRangeKey(t *testing.T) {
333335
})
334336
}
335337
}
338+
339+
func BenchmarkEncodeLabelsJson(b *testing.B) {
340+
decoded := &labels.Labels{}
341+
lbs := labels.FromMap(map[string]string{
342+
"foo": "bar",
343+
"fuzz": "buzz",
344+
"cluster": "test",
345+
"test": "test1",
346+
"instance": "cortex-01",
347+
"bar": "foo",
348+
"version": "0.1",
349+
})
350+
json := jsoniter.ConfigFastest
351+
var data []byte
352+
var err error
353+
for n := 0; n < b.N; n++ {
354+
data, err = json.Marshal(lbs)
355+
if err != nil {
356+
panic(err)
357+
}
358+
err = json.Unmarshal(data, decoded)
359+
if err != nil {
360+
panic(err)
361+
}
362+
}
363+
b.Log("data size", len(data))
364+
b.Log("decode", decoded)
365+
}
366+
367+
func BenchmarkEncodeLabelsString(b *testing.B) {
368+
var decoded labels.Labels
369+
lbs := labels.FromMap(map[string]string{
370+
"foo": "bar",
371+
"fuzz": "buzz",
372+
"cluster": "test",
373+
"test": "test1",
374+
"instance": "cortex-01",
375+
"bar": "foo",
376+
"version": "0.1",
377+
})
378+
var data []byte
379+
var err error
380+
for n := 0; n < b.N; n++ {
381+
data = []byte(lbs.String())
382+
decoded, err = promql.ParseMetric(string(data))
383+
if err != nil {
384+
panic(err)
385+
}
386+
}
387+
b.Log("data size", len(data))
388+
b.Log("decode", decoded)
389+
390+
}

0 commit comments

Comments
 (0)