Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* [FEATURE] Ruler: Add support for group labels. #6665
* [FEATURE] Support Parquet format: Implement parquet converter service to convert a TSDB block into Parquet. #6716
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. #6681
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744
* [ENHANCEMENT] Ingester: Add a `cortex_ingester_active_native_histogram_series` metric to track # of active NH series. #6695
* [ENHANCEMENT] Query Frontend: Add new limit `-frontend.max-query-response-size` for total query response size after decompression in query frontend. #6607
* [ENHANCEMENT] Alertmanager: Add nflog and silences maintenance metrics. #6659
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3267,6 +3267,10 @@ instance_limits:
# Maximum number of entries in the regex matchers cache. 0 to disable.
# CLI flag: -ingester.matchers-cache-max-items
[matchers_cache_max_items: <int> | default = 0]

# If enabled, the metadata API returns all metadata regardless of the limits.
# CLI flag: -ingester.skip-metadata-limits
[skip_metadata_limits: <boolean> | default = true]
```

### `ingester_client_config`
Expand Down
77 changes: 77 additions & 0 deletions integration/backward_compatibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,83 @@ func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
}
}

// Test for #6744. When the querier is running on an older version, while the ingester is running on a newer
// version, the ingester should return all metadata.
func TestMetadataAPIWhenDeployment(t *testing.T) {
oldImage := "quay.io/cortexproject/cortex:v1.19.0"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())

minio := e2edb.NewMinio(9000, baseFlags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

oldFlags := mergeFlags(baseFlags, map[string]string{
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// consul
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
})

newFlags := mergeFlags(oldFlags, map[string]string{
// ingester
"-ingester.skip-metadata-limits": "true",
})

// Start Cortex components
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), newFlags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), newFlags, "")
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), oldFlags, oldImage)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

metadataMetricNum := 5
metadataPerMetrics := 2
metadata := make([]prompb.MetricMetadata, 0, metadataMetricNum)
for i := 0; i < metadataMetricNum; i++ {
for j := 0; j < metadataPerMetrics; j++ {
metadata = append(metadata, prompb.MetricMetadata{
MetricFamilyName: fmt.Sprintf("metadata_name_%d", i),
Help: fmt.Sprintf("metadata_help_%d_%d", i, j),
Unit: fmt.Sprintf("metadata_unit_%d_%d", i, j),
})
}
}
res, err := client.Push(nil, metadata...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// should return all metadata regardless of the limit
maxLimit := metadataMetricNum * metadataPerMetrics
for i := -1; i <= maxLimit; i++ {
result, err := client.Metadata("", strconv.Itoa(i))
require.NoError(t, err)
require.Equal(t, metadataMetricNum, len(result))
for _, metadata := range result {
require.Equal(t, metadataPerMetrics, len(metadata))
}
}
}

// Test cortex which uses Prometheus v3.x can support holt_winters function
func TestCanSupportHoltWintersFunc(t *testing.T) {
s, err := e2e.NewScenario(networkName)
Expand Down
6 changes: 5 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ type Config struct {

// Maximum number of entries in the matchers cache. 0 to disable.
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`

// If enabled, the metadata API returns all metadata regardless of the limits.
SkipMetadataLimits bool `yaml:"skip_metadata_limits"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -176,6 +179,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.")

cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.")
}
Expand Down Expand Up @@ -3057,7 +3061,7 @@ func (i *Ingester) getOrCreateUserMetadata(userID string) *userMetricsMetadata {
// Ensure it was not created between switching locks.
userMetadata, ok := i.usersMetadata[userID]
if !ok {
userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID)
userMetadata = newMetadataMap(i.limiter, i.metrics, i.validateMetrics, userID, i.cfg.SkipMetadataLimits)
i.usersMetadata[userID] = userMetadata
}
return userMetadata
Expand Down
44 changes: 30 additions & 14 deletions pkg/ingester/user_metrics_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,33 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
defaultLimit = -1
defaultLimitPerMetric = -1
)

// userMetricsMetadata allows metric metadata of a tenant to be held by the ingester.
// Metadata is kept as a set as it can come from multiple targets that Prometheus scrapes
// with the same metric name.
type userMetricsMetadata struct {
limiter *Limiter
metrics *ingesterMetrics
validateMetrics *validation.ValidateMetrics
userID string
limiter *Limiter
metrics *ingesterMetrics
validateMetrics *validation.ValidateMetrics
userID string
skipMetadataLimits bool

mtx sync.RWMutex
metricToMetadata map[string]metricMetadataSet
}

func newMetadataMap(l *Limiter, m *ingesterMetrics, v *validation.ValidateMetrics, userID string) *userMetricsMetadata {
func newMetadataMap(l *Limiter, m *ingesterMetrics, v *validation.ValidateMetrics, userID string, skipMetadataLimits bool) *userMetricsMetadata {
return &userMetricsMetadata{
metricToMetadata: map[string]metricMetadataSet{},
limiter: l,
metrics: m,
validateMetrics: v,
userID: userID,
metricToMetadata: map[string]metricMetadataSet{},
limiter: l,
metrics: m,
validateMetrics: v,
userID: userID,
skipMetadataLimits: skipMetadataLimits,
}
}

Expand Down Expand Up @@ -88,8 +95,17 @@ func (mm *userMetricsMetadata) purge(deadline time.Time) {
func (mm *userMetricsMetadata) toClientMetadata(req *client.MetricsMetadataRequest) []*cortexpb.MetricMetadata {
mm.mtx.RLock()
defer mm.mtx.RUnlock()

r := make([]*cortexpb.MetricMetadata, 0, len(mm.metricToMetadata))
if req.Limit == 0 {
limit := req.Limit
limitPerMetric := req.LimitPerMetric

if mm.skipMetadataLimits {
// set limit and limitPerMetric to default
limit = defaultLimit
limitPerMetric = defaultLimitPerMetric
}
if limit == 0 {
return r
}

Expand All @@ -99,16 +115,16 @@ func (mm *userMetricsMetadata) toClientMetadata(req *client.MetricsMetadataReque
return r
}

metadataSet.add(req.LimitPerMetric, &r)
metadataSet.add(limitPerMetric, &r)
return r
}

var metrics int64
for _, set := range mm.metricToMetadata {
if req.Limit > 0 && metrics >= req.Limit {
if limit > 0 && metrics >= limit {
break
}
set.add(req.LimitPerMetric, &r)
set.add(limitPerMetric, &r)
metrics++
}
return r
Expand Down
65 changes: 34 additions & 31 deletions pkg/ingester/user_metrics_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
defaultLimit = -1
defaultLimitPerMetric = -1
)

func Test_UserMetricsMetadata(t *testing.T) {
userId := "user-1"

Expand All @@ -43,33 +38,13 @@ func Test_UserMetricsMetadata(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(overrides, nil, util.ShardingStrategyDefault, true, 1, false, "")

userMetricsMetadata := newMetadataMap(limiter, m, validation.NewValidateMetrics(reg), userId)

addMetricMetadata := func(name string, i int) {
metadata := &cortexpb.MetricMetadata{
MetricFamilyName: fmt.Sprintf("%s_%d", name, i),
Type: cortexpb.GAUGE,
Help: fmt.Sprintf("a help for %s", name),
Unit: fmt.Sprintf("a unit for %s", name),
}

err := userMetricsMetadata.add(name, metadata)
require.NoError(t, err)
}

metadataNumPerMetric := 3
for _, m := range []string{"metric1", "metric2"} {
for i := range metadataNumPerMetric {
addMetricMetadata(m, i)
}
}

tests := []struct {
description string
limit int64
limitPerMetric int64
metric string
expectedLength int
description string
limit int64
limitPerMetric int64
metric string
expectedLength int
skipMetadataLimits bool
}{
{
description: "limit: 1",
Expand Down Expand Up @@ -122,10 +97,38 @@ func Test_UserMetricsMetadata(t *testing.T) {
metric: "dummy",
expectedLength: 0,
},
{
description: "enable skipMetadataLimits",
limit: 1,
limitPerMetric: 2,
expectedLength: 2 * 3, // # of metric * metadataNumPerMetric
skipMetadataLimits: true,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
userMetricsMetadata := newMetadataMap(limiter, m, validation.NewValidateMetrics(reg), userId, test.skipMetadataLimits)

addMetricMetadata := func(name string, i int) {
metadata := &cortexpb.MetricMetadata{
MetricFamilyName: fmt.Sprintf("%s_%d", name, i),
Type: cortexpb.GAUGE,
Help: fmt.Sprintf("a help for %s", name),
Unit: fmt.Sprintf("a unit for %s", name),
}

err := userMetricsMetadata.add(name, metadata)
require.NoError(t, err)
}

metadataNumPerMetric := 3
for _, m := range []string{"metric1", "metric2"} {
for i := range metadataNumPerMetric {
addMetricMetadata(m, i)
}
}

req := &client.MetricsMetadataRequest{Limit: test.limit, LimitPerMetric: test.limitPerMetric, Metric: test.metric}

r := userMetricsMetadata.toClientMetadata(req)
Expand Down
Loading