Skip to content

Include metric name, label name, number of entries and limit in cardinality errors. #1328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,27 @@ var stores = []struct {
{
name: "store",
configFn: func() StoreConfig {
var (
storeCfg StoreConfig
)
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)
return storeCfg
},
},
{
name: "cached_store",
configFn: func() StoreConfig {
var (
storeCfg StoreConfig
)
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)

storeCfg.WriteDedupeCacheConfig.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{
Size: 500,
})

return storeCfg
},
},
}

// newTestStore creates a new Store for testing.
func newTestChunkStore(t *testing.T, schemaName string) Store {
var (
storeCfg StoreConfig
)
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)
return newTestChunkStoreConfig(t, schemaName, storeCfg)
}
Expand Down
32 changes: 24 additions & 8 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand All @@ -22,11 +21,19 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
// ErrCardinalityExceeded is returned when the user reads a row that
// is too large.
ErrCardinalityExceeded = errors.New("cardinality limit exceeded")
// CardinalityExceededError is returned when the user reads a row that
// is too large.
type CardinalityExceededError struct {
MetricName, LabelName string
Size, Limit int32
}

func (e CardinalityExceededError) Error() string {
return fmt.Sprintf("cardinality limit exceeded for %s{%s}; %d entries, more than limit of %d",
e.MetricName, e.LabelName, e.Size, e.Limit)
}

var (
indexLookupsPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_index_lookups_per_query",
Expand Down Expand Up @@ -196,6 +203,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
var preIntersectionCount int
var lastErr error
var cardinalityExceededErrors int
var cardinalityExceededError CardinalityExceededError
for i := 0; i < len(matchers); i++ {
select {
case incoming := <-incomingIDs:
Expand All @@ -210,8 +218,9 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
// series and the other returns only 10 (a few), we don't lookup the first one at all.
// We just manually filter through the 10 series again using "filterChunksByMatchers",
// saving us from looking up and intersecting a lot of series.
if err == ErrCardinalityExceeded {
if e, ok := err.(CardinalityExceededError); ok {
cardinalityExceededErrors++
cardinalityExceededError = e
} else {
lastErr = err
}
Expand All @@ -220,7 +229,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from

// But if every single matcher returns a lot of series, then it makes sense to abort the query.
if cardinalityExceededErrors == len(matchers) {
return nil, ErrCardinalityExceeded
return nil, cardinalityExceededError
} else if lastErr != nil {
return nil, lastErr
}
Expand All @@ -241,11 +250,14 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
}

var queries []IndexQuery
var labelName string
if matcher == nil {
queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName))
} else if matcher.Type != labels.MatchEqual {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name))
} else {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value))
}
if err != nil {
Expand All @@ -254,7 +266,11 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
level.Debug(log).Log("queries", len(queries))

entries, err := c.lookupEntriesByQueries(ctx, queries)
if err != nil {
if e, ok := err.(CardinalityExceededError); ok {
e.MetricName = metricName
e.LabelName = labelName
return nil, e
} else if err != nil {
return nil, err
}
level.Debug(log).Log("entries", len(entries))
Expand Down
1 change: 1 addition & 0 deletions pkg/chunk/storage/caching_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ var Fixtures = []testutils.Fixture{
func defaultLimits() (*validation.Overrides, error) {
var defaults validation.Limits
flagext.DefaultValues(&defaults)
defaults.CardinalityLimit = 5
return validation.NewOverrides(defaults)
}
10 changes: 8 additions & 2 deletions pkg/chunk/storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
batches, misses := s.cacheFetch(ctx, keys)
for _, batch := range batches {
if cardinalityLimit > 0 && batch.Cardinality > cardinalityLimit {
return chunk.ErrCardinalityExceeded
return chunk.CardinalityExceededError{
Size: batch.Cardinality,
Limit: cardinalityLimit,
}
}

queries := queriesByKey[batch.Key]
Expand Down Expand Up @@ -156,7 +159,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
if cardinalityLimit > 0 && cardinality > cardinalityLimit {
batch.Cardinality = cardinality
batch.Entries = nil
cardinalityErr = chunk.ErrCardinalityExceeded
cardinalityErr = chunk.CardinalityExceededError{
Size: cardinality,
Limit: cardinalityLimit,
}
}

keys = append(keys, key)
Expand Down
35 changes: 34 additions & 1 deletion pkg/chunk/storage/index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package storage

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
)

func TestIndexBasic(t *testing.T) {
Expand Down Expand Up @@ -194,3 +198,32 @@ func TestQueryPages(t *testing.T) {
}
})
}

func TestCardinalityLimit(t *testing.T) {
forAllFixtures(t, func(t *testing.T, client chunk.IndexClient, _ chunk.ObjectClient) {
limits, err := defaultLimits()
require.NoError(t, err)

client = newCachingIndexClient(client, cache.NewMockCache(), time.Minute, limits)
batch := client.NewWriteBatch()
for i := 0; i < 10; i++ {
batch.Add(tableName, "bar", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}
err = client.BatchWrite(ctx, batch)
require.NoError(t, err)

var have int
err = client.QueryPages(ctx, []chunk.IndexQuery{{
TableName: tableName,
HashValue: "bar",
}}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool {
iter := read.Iterator()
for iter.Next() {
have++
}
return true
})
require.Error(t, err, "cardinality limit exceeded for {}; 10 entries, more than limit of 5")
require.Equal(t, 0, have)
})
}