Skip to content

Change otlp attribute conversion to be consistent with prometheus #6272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master / unreleased

* [CHANGE] OTLP: Change OTLP handler to be consistent with the Prometheus OTLP handler. #6272
- `target_info` metric is enabled by default and can be disabled via `-distributor.otlp.disable-target-info=true` flag
- Convert all attributes to labels is disabled by default and can be enabled via `-distributor.otlp.convert-all-attributes=true` flag
- You can specify the attributes converted to labels via `-distributor.promote-resource-attributes` flag. Supported only if `-distributor.otlp.convert-all-attributes=false`
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
Expand Down
15 changes: 15 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2640,6 +2640,16 @@ instance_limits:
# unlimited.
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
[max_inflight_push_requests: <int> | default = 0]

otlp:
# If true, all resource attributes are converted to labels.
# CLI flag: -distributor.otlp.convert-all-attributes
[convert_all_attributes: <boolean> | default = false]

# If true, a target_info metric is not ingested. (refer to:
# https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)
# CLI flag: -distributor.otlp.disable-target-info
[disable_target_info: <boolean> | default = false]
```

### `etcd_config`
Expand Down Expand Up @@ -3285,6 +3295,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -validation.max-native-histogram-buckets
[max_native_histogram_buckets: <int> | default = 0]

# Comma separated list of resource attributes that should be converted to
# labels.
# CLI flag: -distributor.promote-resource-attributes
[promote_resource_attributes: <list of string> | default = ]

# The maximum number of active series per user, per ingester. 0 to disable.
# CLI flag: -ingester.max-series-per-user
[max_series_per_user: <int> | default = 5000000]
Expand Down
10 changes: 6 additions & 4 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics
return metrics
}

func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest {
d := pmetric.NewMetrics()

// Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram
Expand All @@ -244,6 +244,9 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service")
resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance")
resourceMetric.Resource().Attributes().PutStr("host.name", "test-host")
for _, label := range labels {
resourceMetric.Resource().Attributes().PutStr(label.Name, label.Value)
}

scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()

Expand All @@ -258,7 +261,6 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
counterDataPoint.SetDoubleValue(10.0)
counterDataPoint.Attributes().PutStr("foo.bar", "baz")

counterExemplar := counterDataPoint.Exemplars().AppendEmpty()
counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
Expand All @@ -269,8 +271,8 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest {
return pmetricotlp.NewExportRequestFromMetrics(d)
}

func (c *Client) OTLPPushExemplar(name string) (*http.Response, error) {
data, err := otlpWriteRequest(name).MarshalProto()
func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) {
data, err := otlpWriteRequest(name, labels...).MarshalProto()
if err != nil {
return nil, err
}
Expand Down
110 changes: 110 additions & 0 deletions integration/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package integration

import (
"bytes"
"context"
"fmt"
"math/rand"
"path/filepath"
Expand All @@ -15,6 +17,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore/providers/s3"

"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
Expand Down Expand Up @@ -144,3 +147,110 @@ func TestOTLPIngestExemplar(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(exemplars))
}

func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) {
configFileName := "runtime-config.yaml"

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
minio := e2edb.NewMinio(9000, bucketName)
require.NoError(t, s.StartAndWaitReady(minio))

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
"-auth.enabled": "true",
"-runtime-config.backend": "s3",
"-runtime-config.s3.access-key-id": e2edb.MinioAccessKey,
"-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey,
"-runtime-config.s3.bucket-name": bucketName,
"-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
"-runtime-config.s3.insecure": "true",
"-runtime-config.file": configFileName,
"-runtime-config.reload-period": "1s",

// Distributor
"-distributor.otlp.convert-all-attributes": "false",
"-distributor.promote-resource-attributes": "attr1,attr2,attr3",

// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-alertmanager-storage.backend": "local",
"-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"),
})

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

client, err := s3.NewBucketWithConfig(nil, s3.Config{
Endpoint: minio.HTTPEndpoint(),
Insecure: true,
Bucket: bucketName,
AccessKey: e2edb.MinioAccessKey,
SecretKey: e2edb.MinioSecretKey,
}, "runtime-config-test", nil)

require.NoError(t, err)

// update runtime config
newRuntimeConfig := []byte(`overrides:
user-1:
promote_resource_attributes: ["attr1"]
user-2:
promote_resource_attributes: ["attr1", "attr2"]
`)
require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig)))
time.Sleep(2 * time.Second)

require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile))

// start cortex and assert runtime-config is loaded correctly
cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095)
require.NoError(t, s.StartAndWaitReady(cortex))

c1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

c2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-2")
require.NoError(t, err)

c3, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-3")
require.NoError(t, err)

// Push some series to Cortex.
now := time.Now()

labels := []prompb.Label{
{Name: "service.name", Value: "test-service"},
{Name: "attr1", Value: "value"},
{Name: "attr2", Value: "value"},
{Name: "attr3", Value: "value"},
}

res, err := c1.OTLPPushExemplar("series_1", labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c2.OTLPPushExemplar("series_1", labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

res, err = c3.OTLPPushExemplar("series_1", labels...)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

labelSet1, err := c1.LabelNames(now.Add(-time.Minute*5), now, "series_1")
require.NoError(t, err)
require.Equal(t, labelSet1, []string{"__name__", "attr1", "instance", "job"})

labelSet2, err := c2.LabelNames(now.Add(-time.Minute*5), now, "series_1")
require.NoError(t, err)
require.Equal(t, labelSet2, []string{"__name__", "attr1", "attr2", "instance", "job"})

labelSet3, err := c3.LabelNames(now.Add(-time.Minute*5), now, "series_1")
require.NoError(t, err)
require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"})
}
5 changes: 3 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/push"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// DistributorPushWrapper wraps around a push. It is similar to middleware.Interface.
Expand Down Expand Up @@ -273,11 +274,11 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
}

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
}

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor)
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides)

return nil, nil
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,21 @@ type Config struct {

// Limits for distributor
InstanceLimits InstanceLimits `yaml:"instance_limits"`

// OTLPConfig
OTLPConfig OTLPConfig `yaml:"otlp"`
}

type InstanceLimits struct {
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
}

type OTLPConfig struct {
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
DisableTargetInfo bool `yaml:"disable_target_info"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.PoolConfig.RegisterFlags(f)
Expand All @@ -188,6 +196,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")

f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.")
f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)")
}

// Validate config and returns error on failure
Expand Down
72 changes: 52 additions & 20 deletions pkg/util/push/otlp.go
Original file line number Diff line number Diff line change
@@ -1,55 +1,50 @@
package push

import (
"context"
"net/http"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
"github.com/prometheus/prometheus/util/annotations"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// OTLPHandler is a http.Handler which accepts OTLP metrics.
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
logger := util_log.WithContext(ctx, util_log.Logger)
if sourceIPs != nil {
source := sourceIPs.Get(r)
if source != "" {
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
logger = log.WithSourceIPs(source, logger)
logger = util_log.WithSourceIPs(source, logger)
}
}
req, err := remote.DecodeOTLPWriteRequest(r)

userID, err := tenant.TenantID(ctx)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

promConverter := prometheusremotewrite.NewPrometheusConverter()
setting := prometheusremotewrite.Settings{
AddMetricSuffixes: true,
DisableTargetInfo: true,
}
annots, err := promConverter.FromMetrics(ctx, convertToMetricsAttributes(req.Metrics()), setting)
ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}

req, err := remote.DecodeOTLPWriteRequest(r)
if err != nil {
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -60,8 +55,16 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
SkipLabelNameValidation: false,
}

// otlp to prompb TimeSeries
promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

// convert prompb to cortexpb TimeSeries
tsList := []cortexpb.PreallocTimeseries(nil)
for _, v := range promConverter.TimeSeries() {
for _, v := range promTsList {
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
Labels: makeLabels(v.Labels),
Samples: makeSamples(v.Samples),
Expand All @@ -87,6 +90,35 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
})
}

func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) {
promConverter := prometheusremotewrite.NewPrometheusConverter()
settings := prometheusremotewrite.Settings{
AddMetricSuffixes: true,
DisableTargetInfo: cfg.DisableTargetInfo,
}

var annots annotations.Annotations
var err error

if cfg.ConvertAllAttributes {
annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings)
} else {
settings.PromoteResourceAttributes = overrides.PromoteResourceAttributes(userID)
annots, err = promConverter.FromMetrics(ctx, pmetrics, settings)
}

ws, _ := annots.AsStrings("", 0, 0)
if len(ws) > 0 {
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
}

if err != nil {
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
return nil, err
}
return promConverter.TimeSeries(), nil
}

func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
out := make(labels.Labels, 0, len(in))
for _, l := range in {
Expand Down
Loading
Loading