diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 97d63930abf..0602441b5e5 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -47,9 +47,7 @@ var stores = []struct { { name: "store", configFn: func() StoreConfig { - var ( - storeCfg StoreConfig - ) + var storeCfg StoreConfig flagext.DefaultValues(&storeCfg) return storeCfg }, @@ -57,15 +55,11 @@ var stores = []struct { { name: "cached_store", configFn: func() StoreConfig { - var ( - storeCfg StoreConfig - ) + var storeCfg StoreConfig flagext.DefaultValues(&storeCfg) - storeCfg.WriteDedupeCacheConfig.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{ Size: 500, }) - return storeCfg }, }, @@ -73,9 +67,7 @@ var stores = []struct { // newTestStore creates a new Store for testing. func newTestChunkStore(t *testing.T, schemaName string) Store { - var ( - storeCfg StoreConfig - ) + var storeCfg StoreConfig flagext.DefaultValues(&storeCfg) return newTestChunkStoreConfig(t, schemaName, storeCfg) } diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 88c546751ea..e7dde42a33a 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -7,7 +7,6 @@ import ( "net/http" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -22,11 +21,19 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -var ( - // ErrCardinalityExceeded is returned when the user reads a row that - // is too large. - ErrCardinalityExceeded = errors.New("cardinality limit exceeded") +// CardinalityExceededError is returned when the user reads a row that +// is too large. +type CardinalityExceededError struct { + MetricName, LabelName string + Size, Limit int32 +} + +func (e CardinalityExceededError) Error() string { + return fmt.Sprintf("cardinality limit exceeded for %s{%s}; %d entries, more than limit of %d", + e.MetricName, e.LabelName, e.Size, e.Limit) +} +var ( indexLookupsPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_index_lookups_per_query", @@ -196,6 +203,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from var preIntersectionCount int var lastErr error var cardinalityExceededErrors int + var cardinalityExceededError CardinalityExceededError for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingIDs: @@ -210,8 +218,9 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // series and the other returns only 10 (a few), we don't lookup the first one at all. // We just manually filter through the 10 series again using "filterChunksByMatchers", // saving us from looking up and intersecting a lot of series. - if err == ErrCardinalityExceeded { + if e, ok := err.(CardinalityExceededError); ok { cardinalityExceededErrors++ + cardinalityExceededError = e } else { lastErr = err } @@ -220,7 +229,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from // But if every single matcher returns a lot of series, then it makes sense to abort the query. if cardinalityExceededErrors == len(matchers) { - return nil, ErrCardinalityExceeded + return nil, cardinalityExceededError } else if lastErr != nil { return nil, lastErr } @@ -241,11 +250,14 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, } var queries []IndexQuery + var labelName string if matcher == nil { queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName)) } else if matcher.Type != labels.MatchEqual { + labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name)) } else { + labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value)) } if err != nil { @@ -254,7 +266,11 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) - if err != nil { + if e, ok := err.(CardinalityExceededError); ok { + e.MetricName = metricName + e.LabelName = labelName + return nil, e + } else if err != nil { return nil, err } level.Debug(log).Log("entries", len(entries)) diff --git a/pkg/chunk/storage/caching_fixtures.go b/pkg/chunk/storage/caching_fixtures.go index 1469e92d75e..47e90fe44e5 100644 --- a/pkg/chunk/storage/caching_fixtures.go +++ b/pkg/chunk/storage/caching_fixtures.go @@ -40,5 +40,6 @@ var Fixtures = []testutils.Fixture{ func defaultLimits() (*validation.Overrides, error) { var defaults validation.Limits flagext.DefaultValues(&defaults) + defaults.CardinalityLimit = 5 return validation.NewOverrides(defaults) } diff --git a/pkg/chunk/storage/caching_index_client.go b/pkg/chunk/storage/caching_index_client.go index 50db6868ea3..38381b297ee 100644 --- a/pkg/chunk/storage/caching_index_client.go +++ b/pkg/chunk/storage/caching_index_client.go @@ -88,7 +88,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind batches, misses := s.cacheFetch(ctx, keys) for _, batch := range batches { if cardinalityLimit > 0 && batch.Cardinality > cardinalityLimit { - return chunk.ErrCardinalityExceeded + return chunk.CardinalityExceededError{ + Size: batch.Cardinality, + Limit: cardinalityLimit, + } } queries := queriesByKey[batch.Key] @@ -156,7 +159,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind if cardinalityLimit > 0 && cardinality > cardinalityLimit { batch.Cardinality = cardinality batch.Entries = nil - cardinalityErr = chunk.ErrCardinalityExceeded + cardinalityErr = chunk.CardinalityExceededError{ + Size: cardinality, + Limit: cardinalityLimit, + } } keys = append(keys, key) diff --git a/pkg/chunk/storage/index_client_test.go b/pkg/chunk/storage/index_client_test.go index dc4731f528e..8f88721ecc4 100644 --- a/pkg/chunk/storage/index_client_test.go +++ b/pkg/chunk/storage/index_client_test.go @@ -2,10 +2,14 @@ package storage import ( "fmt" + "strconv" "testing" + "time" - "github.com/cortexproject/cortex/pkg/chunk" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/cache" ) func TestIndexBasic(t *testing.T) { @@ -194,3 +198,32 @@ func TestQueryPages(t *testing.T) { } }) } + +func TestCardinalityLimit(t *testing.T) { + forAllFixtures(t, func(t *testing.T, client chunk.IndexClient, _ chunk.ObjectClient) { + limits, err := defaultLimits() + require.NoError(t, err) + + client = newCachingIndexClient(client, cache.NewMockCache(), time.Minute, limits) + batch := client.NewWriteBatch() + for i := 0; i < 10; i++ { + batch.Add(tableName, "bar", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i))) + } + err = client.BatchWrite(ctx, batch) + require.NoError(t, err) + + var have int + err = client.QueryPages(ctx, []chunk.IndexQuery{{ + TableName: tableName, + HashValue: "bar", + }}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool { + iter := read.Iterator() + for iter.Next() { + have++ + } + return true + }) + require.Error(t, err, "cardinality limit exceeded for {}; 10 entries, more than limit of 5") + require.Equal(t, 0, have) + }) +}