Skip to content

Add active series limit for native histogram samples #6796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Changelog

## master / unreleased
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] StoreGateway/Alertmanager: Add default 5s connection timeout on client. #6603
* [CHANGE] Ingester: Remove EnableNativeHistograms config flag and instead gate keep through new per-tenant limit at ingestion. #6718
* [CHANGE] Validate a tenantID when to use a single tenant resolver. #6727
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
Expand Down Expand Up @@ -38,10 +38,11 @@
* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676
* [ENHANCEMENT] Ingester: Push request should fail when label set is out of order #6746
* [ENHANCEMENT] Querier: Add `querier.ingester-query-max-attempts` to retry on partial data. #6714
* [ENHANCEMENT] Distributor: Add min/max schema validation for NativeHistograms. #6766
* [ENHANCEMENT] Distributor: Add min/max schema validation for Native Histogram. #6766
* [ENHANCEMENT] Ingester: Handle runtime errors in query path #6769
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histograms. #6794
* [ENHANCEMENT] Distributor: Add ingestion rate limit for Native Histogram. #6794
* [ENHANCEMENT] Ingester: Add active series limit specifically for Native Histogram. #6796
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 #6821
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3606,6 +3606,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-series-per-metric
[max_series_per_metric: <int> | default = 50000]

# The maximum number of active native histogram series per user, per ingester. 0
# to disable. Supported only if ingester.active-series-metrics-enabled is true.
# CLI flag: -ingester.max-native-histogram-series-per-user
[max_native_histogram_series_per_user: <int> | default = 0]

# The maximum number of active series per user, across the cluster before
# replication. 0 to disable. Supported only if -distributor.shard-by-all-labels
# is true.
Expand All @@ -3617,6 +3622,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -ingester.max-global-series-per-metric
[max_global_series_per_metric: <int> | default = 0]

# The maximum number of active native histogram series per user, across the
# cluster before replication. 0 to disable. Supported only if
# -distributor.shard-by-all-labels and ingester.active-series-metrics-enabled is
# true.
# CLI flag: -ingester.max-global-native-histogram-series-per-user
[max_global_native_histogram_series_per_user: <int> | default = 0]

# [Experimental] Enable limits per LabelSet. Supported limits per labelSet:
# [max_series]
[limits_per_label_set: <list of LimitsPerLabelSet> | default = []]
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.BlocksStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid TSDB config")
}
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels, c.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return errors.Wrap(err, "invalid limits config")
}
if err := c.ResourceMonitor.Validate(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/runtime_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (l runtimeConfigLoader) load(r io.Reader) (interface{}, error) {
}

for _, ul := range overrides.TenantLimits {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels); err != nil {
if err := ul.Validate(l.cfg.Distributor.ShardByAllLabels, l.cfg.Ingester.ActiveSeriesMetricsEnabled); err != nil {
return nil, err
}
}
Expand Down
45 changes: 30 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,11 @@ func (u *userTSDB) PreCreation(metric labels.Labels) error {
return err
}

// Total native histogram series limit.
if err := u.limiter.AssertMaxNativeHistogramSeriesPerUser(u.userID, u.activeSeries.ActiveNativeHistogram()); err != nil {
return err
}

// Series per metric name limit.
metricName, err := extract.MetricNameFromLabels(metric)
if err != nil {
Expand Down Expand Up @@ -1220,21 +1225,22 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
// Keep track of some stats which are tracked only if the samples will be
// successfully committed
var (
succeededSamplesCount = 0
failedSamplesCount = 0
succeededHistogramsCount = 0
failedHistogramsCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
discardedNativeHistogramCount = 0
succeededSamplesCount = 0
failedSamplesCount = 0
succeededHistogramsCount = 0
failedHistogramsCount = 0
succeededExemplarsCount = 0
failedExemplarsCount = 0
startAppend = time.Now()
sampleOutOfBoundsCount = 0
sampleOutOfOrderCount = 0
sampleTooOldCount = 0
newValueForTimestampCount = 0
perUserSeriesLimitCount = 0
perUserNativeHistogramSeriesLimitCount = 0
perLabelSetSeriesLimitCount = 0
perMetricSeriesLimitCount = 0
discardedNativeHistogramCount = 0

updateFirstPartial = func(errFn func() error) {
if firstPartialErr == nil {
Expand Down Expand Up @@ -1270,6 +1276,12 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
})

case errors.Is(cause, errMaxNativeHistogramSeriesPerUserLimitExceeded):
perUserNativeHistogramSeriesLimitCount++
updateFirstPartial(func() error {
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
})

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
updateFirstPartial(func() error {
Expand Down Expand Up @@ -1513,6 +1525,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
if perUserSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserSeriesLimit, userID).Add(float64(perUserSeriesLimitCount))
}
if perUserNativeHistogramSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perUserNativeHistogramSeriesLimit, userID).Add(float64(perUserNativeHistogramSeriesLimitCount))
}
if perMetricSeriesLimitCount > 0 {
i.validateMetrics.DiscardedSamples.WithLabelValues(perMetricSeriesLimit, userID).Add(float64(perMetricSeriesLimitCount))
}
Expand Down
89 changes: 89 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ func TestIngesterUserLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.MaxLocalSeriesPerUser = 1
limits.MaxLocalNativeHistogramSeriesPerUser = 1
limits.MaxLocalMetricsWithMetadataPerUser = 1

userID := "1"
Expand Down Expand Up @@ -868,6 +869,93 @@ func TestIngesterUserLimitExceeded(t *testing.T) {

}

func TestIngesterUserLimitExceededForNativeHistogram(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.MaxLocalNativeHistogramSeriesPerUser = 1
limits.MaxLocalSeriesPerUser = 2
limits.MaxLocalMetricsWithMetadataPerUser = 1

userID := "1"
// Series
labels1 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "bar"}}
labels3 := labels.Labels{{Name: labels.MetricName, Value: "testmetric"}, {Name: "foo", Value: "biz"}}
sampleNativeHistogram1 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(1))
sampleNativeHistogram2 := cortexpb.HistogramToHistogramProto(1, tsdbutil.GenerateTestHistogram(2))
sampleNativeHistogram3 := cortexpb.HistogramToHistogramProto(0, tsdbutil.GenerateTestHistogram(3))

// Metadata
metadata1 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric", Help: "a help for testmetric", Type: cortexpb.COUNTER}
metadata2 := &cortexpb.MetricMetadata{MetricFamilyName: "testmetric2", Help: "a help for testmetric2", Type: cortexpb.COUNTER}

dir := t.TempDir()

chunksDir := filepath.Join(dir, "chunks")
blocksDir := filepath.Join(dir, "blocks")
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))

blocksIngesterGenerator := func(reg prometheus.Registerer) *Ingester {
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, nil, blocksDir, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
// Wait until it's ACTIVE
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
return ing.lifecycler.GetState()
})

return ing
}

tests := []string{"blocks"}
for i, ingGenerator := range []func(reg prometheus.Registerer) *Ingester{blocksIngesterGenerator} {
t.Run(tests[i], func(t *testing.T) {
reg := prometheus.NewRegistry()
ing := ingGenerator(reg)

// Append only one series and one metadata first, expect no error.
ctx := user.InjectOrgID(context.Background(), userID)
_, err := ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1}, nil, []*cortexpb.MetricMetadata{metadata1}, []cortexpb.Histogram{sampleNativeHistogram1}, cortexpb.API))
require.NoError(t, err)

testLimits := func(reg prometheus.Gatherer) {
// Append to two series, expect series-exceeded error.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest([]labels.Labels{labels1, labels3}, nil, nil, []cortexpb.Histogram{sampleNativeHistogram2, sampleNativeHistogram3}, cortexpb.API))
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
require.True(t, ok, "returned error is not an httpgrpc response")
assert.Equal(t, http.StatusBadRequest, int(httpResp.Code))
assert.Equal(t, wrapWithUser(makeLimitError(perUserNativeHistogramSeriesLimit, ing.limiter.FormatError(userID, errMaxNativeHistogramSeriesPerUserLimitExceeded, labels1)), userID).Error(), string(httpResp.Body))

// Append two metadata, expect no error since metadata is a best effort approach.
_, err = ing.Push(ctx, cortexpb.ToWriteRequest(nil, nil, []*cortexpb.MetricMetadata{metadata1, metadata2}, nil, cortexpb.API))
require.NoError(t, err)

// Read samples back via ingester queries.
res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, model.MetricNameLabel, "testmetric")
require.NoError(t, err)
require.NotNil(t, res)

// Verify metadata
m, err := ing.MetricsMetadata(ctx, &client.MetricsMetadataRequest{Limit: -1, LimitPerMetric: -1, Metric: ""})
require.NoError(t, err)
assert.Equal(t, []*cortexpb.MetricMetadata{metadata1}, m.Metadata)
}

testLimits(reg)

// Limits should hold after restart.
services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
// Use new registry to prevent metrics registration panic.
reg = prometheus.NewRegistry()
ing = ingGenerator(reg)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck

testLimits(reg)
})
}

}

func benchmarkData(nSeries int) (allLabels []labels.Labels, allSamples []cortexpb.Sample) {
for j := 0; j < nSeries; j++ {
labels := chunk.BenchmarkLabels.Copy()
Expand All @@ -886,6 +974,7 @@ func TestIngesterMetricLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.MaxLocalSeriesPerMetric = 1
limits.MaxLocalNativeHistogramSeriesPerUser = 1
limits.MaxLocalMetadataPerMetric = 1

userID := "1"
Expand Down
38 changes: 34 additions & 4 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
)

var (
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
errMaxSeriesPerMetricLimitExceeded = errors.New("per-metric series limit exceeded")
errMaxMetadataPerMetricLimitExceeded = errors.New("per-metric metadata limit exceeded")
errMaxSeriesPerUserLimitExceeded = errors.New("per-user series limit exceeded")
errMaxNativeHistogramSeriesPerUserLimitExceeded = errors.New("per-user native histogram series limit exceeded")
errMaxMetadataPerUserLimitExceeded = errors.New("per-user metric metadata limit exceeded")
)

type errMaxSeriesPerLabelSetLimitExceeded struct {
Expand Down Expand Up @@ -95,6 +96,16 @@ func (l *Limiter) AssertMaxSeriesPerUser(userID string, series int) error {
return errMaxSeriesPerUserLimitExceeded
}

// AssertMaxNativeHistogramSeriesPerUser limit has not been reached compared to the current
// number of native histogram series in input and returns an error if so.
func (l *Limiter) AssertMaxNativeHistogramSeriesPerUser(userID string, series int) error {
if actualLimit := l.maxNativeHistogramSeriesPerUser(userID); series < actualLimit {
return nil
}

return errMaxNativeHistogramSeriesPerUserLimitExceeded
}

// AssertMaxMetricsWithMetadataPerUser limit has not been reached compared to the current
// number of metrics with metadata in input and returns an error if so.
func (l *Limiter) AssertMaxMetricsWithMetadataPerUser(userID string, metrics int) error {
Expand Down Expand Up @@ -134,6 +145,8 @@ func (l *Limiter) FormatError(userID string, err error, lbls labels.Labels) erro
switch {
case errors.Is(err, errMaxSeriesPerUserLimitExceeded):
return l.formatMaxSeriesPerUserError(userID)
case errors.Is(err, errMaxNativeHistogramSeriesPerUserLimitExceeded):
return l.formatMaxNativeHistogramsSeriesPerUserError(userID)
case errors.Is(err, errMaxSeriesPerMetricLimitExceeded):
return l.formatMaxSeriesPerMetricError(userID, lbls.Get(labels.MetricName))
case errors.Is(err, errMaxMetadataPerUserLimitExceeded):
Expand All @@ -158,6 +171,15 @@ func (l *Limiter) formatMaxSeriesPerUserError(userID string) error {
minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxNativeHistogramsSeriesPerUserError(userID string) error {
actualLimit := l.maxNativeHistogramSeriesPerUser(userID)
localLimit := l.limits.MaxLocalNativeHistogramSeriesPerUser(userID)
globalLimit := l.limits.MaxGlobalNativeHistogramSeriesPerUser(userID)

return fmt.Errorf("per-user native histogram series limit of %d exceeded, %s (local limit: %d global limit: %d actual local limit: %d)",
minNonZero(localLimit, globalLimit), l.AdminLimitMessage, localLimit, globalLimit, actualLimit)
}

func (l *Limiter) formatMaxSeriesPerMetricError(userID string, metric string) error {
actualLimit := l.maxSeriesPerMetric(userID)
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
Expand Down Expand Up @@ -248,6 +270,14 @@ func (l *Limiter) maxSeriesPerUser(userID string) int {
)
}

func (l *Limiter) maxNativeHistogramSeriesPerUser(userID string) int {
return l.maxByLocalAndGlobal(
userID,
l.limits.MaxLocalNativeHistogramSeriesPerUser,
l.limits.MaxGlobalNativeHistogramSeriesPerUser,
)
}

func (l *Limiter) maxMetadataPerUser(userID string) int {
return l.maxByLocalAndGlobal(
userID,
Expand Down
Loading
Loading