Skip to content

Commit a42d813

Browse files
committed
Change otlp attribute conversion to consist with prometheus
Signed-off-by: SungJin1212 <[email protected]>
1 parent c6347f0 commit a42d813

File tree

6 files changed

+232
-28
lines changed

6 files changed

+232
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Ruler: Minimize chances of missed rule group evaluations that can occur due to OOM kills, bad underlying nodes, or due to an unhealthy ruler that appears in the ring as healthy. This feature is enabled via `-ruler.enable-ha-evaluation` flag. #6129
1010
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1111
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
12+
* [ENHANCEMENT] OTLP: Change otlp handler to consist with the Prometheus otlp handler. #6272
1213
* [ENHANCEMENT] Ingester: Add `blocks-storage.tsdb.wal-compression-type` to support zstd wal compression type. #6232
1314
* [ENHANCEMENT] Query Frontend: Add info field to query response. #6207
1415
* [ENHANCEMENT] Query Frontend: Add peakSample in query stats response. #6188

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2608,6 +2608,15 @@ instance_limits:
26082608
# unlimited.
26092609
# CLI flag: -distributor.instance-limits.max-inflight-push-requests
26102610
[max_inflight_push_requests: <int> | default = 0]
2611+
2612+
otlp:
2613+
# If enabled, all resource attributes are converted to labels.
2614+
# CLI flag: -distributor.otlp-config.convert-all-attributes
2615+
[convert_all_attributes: <boolean> | default = false]
2616+
2617+
# A list of resource attributes that should be converted to labels.
2618+
# CLI flag: -distributor.otlp-config.promote-resource-attributes
2619+
[promote_resource_attributes: <list of string> | default = []]
26112620
```
26122621

26132622
### `etcd_config`

pkg/api/api.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,8 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
275275
// RegisterDistributor registers the endpoints associated with the distributor.
276276
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) {
277277
distributorpb.RegisterDistributorServer(a.server.GRPC, d)
278-
279278
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
280-
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
279+
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
281280

282281
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
283282
a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics")

pkg/distributor/distributor.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"github.com/cortexproject/cortex/pkg/tenant"
3737
"github.com/cortexproject/cortex/pkg/util"
3838
"github.com/cortexproject/cortex/pkg/util/extract"
39+
"github.com/cortexproject/cortex/pkg/util/flagext"
3940
"github.com/cortexproject/cortex/pkg/util/limiter"
4041
util_log "github.com/cortexproject/cortex/pkg/util/log"
4142
util_math "github.com/cortexproject/cortex/pkg/util/math"
@@ -164,13 +165,21 @@ type Config struct {
164165

165166
// Limits for distributor
166167
InstanceLimits InstanceLimits `yaml:"instance_limits"`
168+
169+
// OTLPConfig
170+
OTLPConfig OTLPConfig `yaml:"otlp"`
167171
}
168172

169173
type InstanceLimits struct {
170174
MaxIngestionRate float64 `yaml:"max_ingestion_rate"`
171175
MaxInflightPushRequests int `yaml:"max_inflight_push_requests"`
172176
}
173177

178+
type OTLPConfig struct {
179+
ConvertAllAttributes bool `yaml:"convert_all_attributes"`
180+
PromoteResourceAttributes []string `yaml:"promote_resource_attributes"`
181+
}
182+
174183
// RegisterFlags adds the flags required to config this to the given FlagSet
175184
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
176185
cfg.PoolConfig.RegisterFlags(f)
@@ -188,6 +197,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
188197

189198
f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
190199
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
200+
201+
f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp-config.convert-all-attributes", false, "If enabled, all resource attributes are converted to labels.")
202+
f.Var((*flagext.StringSlice)(&cfg.OTLPConfig.PromoteResourceAttributes), "distributor.otlp-config.promote-resource-attributes", "A list of resource attributes that should be converted to labels.")
191203
}
192204

193205
// Validate config and returns error on failure

pkg/util/push/otlp.go

Lines changed: 44 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,36 @@
11
package push
22

33
import (
4+
"context"
45
"net/http"
56

7+
"github.com/cortexproject/cortex/pkg/cortexpb"
8+
"github.com/cortexproject/cortex/pkg/distributor"
9+
"github.com/cortexproject/cortex/pkg/util"
10+
util_log "github.com/cortexproject/cortex/pkg/util/log"
11+
"github.com/go-kit/log"
612
"github.com/go-kit/log/level"
713
"github.com/prometheus/prometheus/model/labels"
814
"github.com/prometheus/prometheus/prompb"
915
"github.com/prometheus/prometheus/storage/remote"
1016
"github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
17+
"github.com/prometheus/prometheus/util/annotations"
1118
"github.com/weaveworks/common/httpgrpc"
1219
"github.com/weaveworks/common/middleware"
1320
"go.opentelemetry.io/collector/pdata/pcommon"
1421
"go.opentelemetry.io/collector/pdata/pmetric"
15-
16-
"github.com/cortexproject/cortex/pkg/cortexpb"
17-
"github.com/cortexproject/cortex/pkg/util"
18-
"github.com/cortexproject/cortex/pkg/util/log"
1922
)
2023

2124
// OTLPHandler is a http.Handler which accepts OTLP metrics.
22-
func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
25+
func OTLPHandler(cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
2326
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2427
ctx := r.Context()
25-
logger := log.WithContext(ctx, log.Logger)
28+
logger := util_log.WithContext(ctx, util_log.Logger)
2629
if sourceIPs != nil {
2730
source := sourceIPs.Get(r)
2831
if source != "" {
2932
ctx = util.AddSourceIPsToOutgoingContext(ctx, source)
30-
logger = log.WithSourceIPs(source, logger)
33+
logger = util_log.WithSourceIPs(source, logger)
3134
}
3235
}
3336
req, err := remote.DecodeOTLPWriteRequest(r)
@@ -37,31 +40,22 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
3740
return
3841
}
3942

40-
promConverter := prometheusremotewrite.NewPrometheusConverter()
41-
setting := prometheusremotewrite.Settings{
42-
AddMetricSuffixes: true,
43-
DisableTargetInfo: true,
44-
}
45-
annots, err := promConverter.FromMetrics(ctx, convertToMetricsAttributes(req.Metrics()), setting)
46-
ws, _ := annots.AsStrings("", 0, 0)
47-
if len(ws) > 0 {
48-
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
43+
prwReq := cortexpb.WriteRequest{
44+
Source: cortexpb.API,
45+
Metadata: nil,
46+
SkipLabelNameValidation: false,
4947
}
5048

49+
// otlp to prompb TimeSeries
50+
promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, logger)
5151
if err != nil {
52-
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
5352
http.Error(w, err.Error(), http.StatusBadRequest)
5453
return
5554
}
5655

57-
prwReq := cortexpb.WriteRequest{
58-
Source: cortexpb.API,
59-
Metadata: nil,
60-
SkipLabelNameValidation: false,
61-
}
62-
56+
// convert prompb to cortexpb TimeSeries
6357
tsList := []cortexpb.PreallocTimeseries(nil)
64-
for _, v := range promConverter.TimeSeries() {
58+
for _, v := range promTsList {
6559
tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{
6660
Labels: makeLabels(v.Labels),
6761
Samples: makeSamples(v.Samples),
@@ -87,6 +81,32 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle
8781
})
8882
}
8983

84+
func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, logger log.Logger) ([]prompb.TimeSeries, error) {
85+
promConverter := prometheusremotewrite.NewPrometheusConverter()
86+
settings := prometheusremotewrite.Settings{
87+
AddMetricSuffixes: true,
88+
PromoteResourceAttributes: cfg.PromoteResourceAttributes,
89+
}
90+
var annots annotations.Annotations
91+
var err error
92+
if cfg.ConvertAllAttributes {
93+
annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings)
94+
} else {
95+
annots, err = promConverter.FromMetrics(ctx, pmetrics, settings)
96+
}
97+
98+
ws, _ := annots.AsStrings("", 0, 0)
99+
if len(ws) > 0 {
100+
level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws)
101+
}
102+
103+
if err != nil {
104+
level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
105+
return nil, err
106+
}
107+
return promConverter.TimeSeries(), nil
108+
}
109+
90110
func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter {
91111
out := make(labels.Labels, 0, len(in))
92112
for _, l := range in {

pkg/util/push/otlp_test.go

Lines changed: 165 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,179 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/go-kit/log"
12+
"github.com/prometheus/prometheus/prompb"
1113
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315
"go.opentelemetry.io/collector/pdata/pcommon"
1416
"go.opentelemetry.io/collector/pdata/pmetric"
1517
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
1618

1719
"github.com/cortexproject/cortex/pkg/cortexpb"
20+
"github.com/cortexproject/cortex/pkg/distributor"
1821
)
1922

23+
func TestOTLPConvertToPromTS(t *testing.T) {
24+
logger := log.NewNopLogger()
25+
ctx := context.Background()
26+
d := pmetric.NewMetrics()
27+
resourceMetric := d.ResourceMetrics().AppendEmpty()
28+
resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") // converted to job, service_name
29+
resourceMetric.Resource().Attributes().PutStr("attr1", "value")
30+
resourceMetric.Resource().Attributes().PutStr("attr2", "value")
31+
resourceMetric.Resource().Attributes().PutStr("attr3", "value")
32+
33+
scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty()
34+
35+
//Generate One Counter
36+
timestamp := time.Now()
37+
counterMetric := scopeMetric.Metrics().AppendEmpty()
38+
counterMetric.SetName("test-counter")
39+
counterMetric.SetDescription("test-counter-description")
40+
counterMetric.SetEmptySum()
41+
counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
42+
counterMetric.Sum().SetIsMonotonic(true)
43+
44+
counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty()
45+
counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
46+
counterDataPoint.SetDoubleValue(10.0)
47+
48+
tests := []struct {
49+
description string
50+
cfg distributor.OTLPConfig
51+
expectedLabels []prompb.Label
52+
}{
53+
{
54+
description: "only attributes that exist in promote resource attributes should be converted",
55+
cfg: distributor.OTLPConfig{
56+
ConvertAllAttributes: false,
57+
PromoteResourceAttributes: []string{"attr1"},
58+
},
59+
expectedLabels: []prompb.Label{
60+
{
61+
Name: "__name__",
62+
Value: "test_counter_total",
63+
},
64+
{
65+
Name: "attr1",
66+
Value: "value",
67+
},
68+
{
69+
Name: "job",
70+
Value: "test-service",
71+
},
72+
},
73+
},
74+
{
75+
description: "not exist attribute is ignored",
76+
cfg: distributor.OTLPConfig{
77+
ConvertAllAttributes: false,
78+
PromoteResourceAttributes: []string{"dummy"},
79+
},
80+
expectedLabels: []prompb.Label{
81+
{
82+
Name: "__name__",
83+
Value: "test_counter_total",
84+
},
85+
{
86+
Name: "job",
87+
Value: "test-service",
88+
},
89+
},
90+
},
91+
{
92+
description: "should convert all attribute",
93+
cfg: distributor.OTLPConfig{
94+
ConvertAllAttributes: true,
95+
PromoteResourceAttributes: nil,
96+
},
97+
expectedLabels: []prompb.Label{
98+
{
99+
Name: "__name__",
100+
Value: "test_counter_total",
101+
},
102+
{
103+
Name: "attr1",
104+
Value: "value",
105+
},
106+
{
107+
Name: "attr2",
108+
Value: "value",
109+
},
110+
{
111+
Name: "attr3",
112+
Value: "value",
113+
},
114+
{
115+
Name: "job",
116+
Value: "test-service",
117+
},
118+
{
119+
Name: "service_name",
120+
Value: "test-service",
121+
},
122+
},
123+
},
124+
{
125+
description: "should convert all attribute regardless of promote resource attributes",
126+
cfg: distributor.OTLPConfig{
127+
ConvertAllAttributes: true,
128+
PromoteResourceAttributes: []string{"attr1", "attr2"},
129+
},
130+
expectedLabels: []prompb.Label{
131+
{
132+
Name: "__name__",
133+
Value: "test_counter_total",
134+
},
135+
{
136+
Name: "attr1",
137+
Value: "value",
138+
},
139+
{
140+
Name: "attr2",
141+
Value: "value",
142+
},
143+
{
144+
Name: "attr3",
145+
Value: "value",
146+
},
147+
{
148+
Name: "job",
149+
Value: "test-service",
150+
},
151+
{
152+
Name: "service_name",
153+
Value: "test-service",
154+
},
155+
},
156+
},
157+
}
158+
159+
for _, test := range tests {
160+
t.Run(test.description, func(t *testing.T) {
161+
tsList, err := convertToPromTS(ctx, d, test.cfg, logger)
162+
require.NoError(t, err)
163+
require.Equal(t, 2, len(tsList)) // target_info + test_counter_total
164+
165+
var counterTs prompb.TimeSeries
166+
for _, ts := range tsList {
167+
for _, label := range ts.Labels {
168+
if label.Name == "__name__" && label.Value == "test_counter_total" {
169+
// get counter ts
170+
counterTs = ts
171+
}
172+
}
173+
}
174+
require.ElementsMatch(t, test.expectedLabels, counterTs.Labels)
175+
})
176+
}
177+
}
178+
20179
func TestOTLPWriteHandler(t *testing.T) {
180+
cfg := distributor.OTLPConfig{
181+
PromoteResourceAttributes: []string{},
182+
}
183+
21184
exportRequest := generateOTLPWriteRequest(t)
22185

23186
buf, err := exportRequest.MarshalProto()
@@ -28,7 +191,7 @@ func TestOTLPWriteHandler(t *testing.T) {
28191
req.Header.Set("Content-Type", "application/x-protobuf")
29192

30193
push := verifyOTLPWriteRequestHandler(t, cortexpb.API)
31-
handler := OTLPHandler(nil, push)
194+
handler := OTLPHandler(cfg, nil, push)
32195

33196
recorder := httptest.NewRecorder()
34197
handler.ServeHTTP(recorder, req)
@@ -120,7 +283,7 @@ func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest {
120283
func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
121284
t.Helper()
122285
return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
123-
assert.Len(t, request.Timeseries, 12) // 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram)
286+
assert.Len(t, request.Timeseries, 13) // 1 (target_info) + 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram)
124287
// TODO: test more things
125288
assert.Equal(t, expectSource, request.Source)
126289
assert.False(t, request.SkipLabelNameValidation)

0 commit comments

Comments
 (0)