Skip to content

Commit 07bbbee

Browse files
committed
Support remote write v2 by converting request
Signed-off-by: SungJin1212 <[email protected]>
1 parent 1d09628 commit 07bbbee

File tree

15 files changed

+1218
-108
lines changed

15 files changed

+1218
-108
lines changed

.github/workflows/test-build-deploy.yml

+1
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ jobs:
144144
- integration_querier
145145
- integration_ruler
146146
- integration_query_fuzz
147+
- integration_remote_write_v2
147148
steps:
148149
- name: Upgrade golang
149150
uses: actions/setup-go@41dfa10bad2bb2ae585af6ee5bb4d7d973ad74ed # v5.1.0

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,4 @@ run:
4949
- integration_querier
5050
- integration_ruler
5151
- integration_query_fuzz
52+
- integration_remote_write_v2

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [CHANGE] Change all max async concurrency default values `50` to `3` #6268
1010
* [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265
1111
* [CHANGE] Enable Compactor and Alertmanager in target all. #6204
12+
* [FEATURE] Support Prometheus remote write 2.0 by converting v2 request to v1. #6330
1213
* [FEATURE] Ruler: Pagination support for List Rules API. #6299
1314
* [FEATURE] Query Frontend/Querier: Add protobuf codec `-api.querier-default-codec` and the option to choose response compression type `-querier.response-compression`. #5527
1415
* [FEATURE] Ruler: Experimental: Add `ruler.frontend-address` to allow query to query frontends instead of ingesters. #6151

integration/e2e/util.go

+76
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/prometheus/prometheus/model/histogram"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/prompb"
21+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2122
"github.com/prometheus/prometheus/storage"
2223
"github.com/prometheus/prometheus/tsdb"
2324
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -334,3 +335,78 @@ func CreateBlock(
334335

335336
return id, nil
336337
}
338+
339+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
340+
tsMillis := TimeToMilliseconds(ts)
341+
342+
st := writev2.NewSymbolTable()
343+
344+
lbs := labels.Labels{labels.Label{Name: "__name__", Value: name}}
345+
for _, lbl := range additionalLabels {
346+
lbs = append(lbs, labels.Label{Name: lbl.Name, Value: lbl.Value})
347+
}
348+
349+
var (
350+
h *histogram.Histogram
351+
fh *histogram.FloatHistogram
352+
ph writev2.Histogram
353+
)
354+
if floatHistogram {
355+
fh = tsdbutil.GenerateTestFloatHistogram(int(i))
356+
ph = writev2.FromFloatHistogram(tsMillis, fh)
357+
} else {
358+
h = tsdbutil.GenerateTestHistogram(int(i))
359+
ph = writev2.FromIntHistogram(tsMillis, h)
360+
}
361+
362+
// Generate the series
363+
series = append(series, writev2.TimeSeries{
364+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
365+
Histograms: []writev2.Histogram{ph},
366+
})
367+
368+
symbols = st.Symbols()
369+
370+
return
371+
}
372+
373+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
374+
tsMillis := TimeToMilliseconds(ts)
375+
value := rand.Float64()
376+
377+
st := writev2.NewSymbolTable()
378+
lbs := labels.Labels{{Name: labels.MetricName, Value: name}}
379+
380+
for _, label := range additionalLabels {
381+
lbs = append(lbs, labels.Label{
382+
Name: label.Name,
383+
Value: label.Value,
384+
})
385+
}
386+
series = append(series, writev2.TimeSeries{
387+
// Generate the series
388+
LabelsRefs: st.SymbolizeLabels(lbs, nil),
389+
Samples: []writev2.Sample{
390+
{Value: value, Timestamp: tsMillis},
391+
},
392+
Metadata: writev2.Metadata{
393+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
394+
},
395+
})
396+
symbols = st.Symbols()
397+
398+
// Generate the expected vector when querying it
399+
metric := model.Metric{}
400+
metric[labels.MetricName] = model.LabelValue(name)
401+
for _, lbl := range additionalLabels {
402+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
403+
}
404+
405+
vector = append(vector, &model.Sample{
406+
Metric: metric,
407+
Value: model.SampleValue(value),
408+
Timestamp: model.Time(tsMillis),
409+
})
410+
411+
return
412+
}

integration/e2ecortex/client.go

+40
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/prometheus/model/labels"
2525
"github.com/prometheus/prometheus/model/rulefmt"
2626
"github.com/prometheus/prometheus/prompb"
27+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2728
"github.com/prometheus/prometheus/storage"
2829
"github.com/prometheus/prometheus/storage/remote"
2930
yaml "gopkg.in/yaml.v3"
@@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
147148
return res, nil
148149
}
149150

151+
// PushV2 the input timeseries to the remote endpoint
152+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
153+
// Create write request
154+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// Create HTTP request
160+
compressed := snappy.Encode(nil, data)
161+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
162+
if err != nil {
163+
return nil, err
164+
}
165+
166+
req.Header.Add("Content-Encoding", "snappy")
167+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
168+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
169+
req.Header.Set("X-Scope-OrgID", c.orgID)
170+
171+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
172+
defer cancel()
173+
174+
// Execute HTTP request
175+
res, err := c.httpClient.Do(req.WithContext(ctx))
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
defer res.Body.Close()
181+
return res, nil
182+
}
183+
150184
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
151185
var metricName string
152186
attributes := make(map[string]any)
@@ -356,6 +390,12 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
356390
return value, err
357391
}
358392

393+
// Metadata runs a metadata query
394+
func (c *Client) Metadata(name, limit string) (map[string][]promv1.Metadata, error) {
395+
metadata, err := c.querierClient.Metadata(context.Background(), name, limit)
396+
return metadata, err
397+
}
398+
359399
// QueryExemplars runs an exemplars query
360400
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
361401
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

integration/remote_write_v2_test.go

+211
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
//go:build integration_remote_write_v2
2+
// +build integration_remote_write_v2
3+
4+
package integration
5+
6+
import (
7+
"math/rand"
8+
"net/http"
9+
"path"
10+
"testing"
11+
"time"
12+
13+
"github.com/prometheus/common/model"
14+
"github.com/prometheus/prometheus/prompb"
15+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
16+
"github.com/prometheus/prometheus/tsdb/tsdbutil"
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
20+
"github.com/cortexproject/cortex/integration/e2e"
21+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
22+
"github.com/cortexproject/cortex/integration/e2ecortex"
23+
"github.com/cortexproject/cortex/pkg/storage/tsdb"
24+
)
25+
26+
func TestIngest(t *testing.T) {
27+
const blockRangePeriod = 5 * time.Second
28+
29+
s, err := e2e.NewScenario(networkName)
30+
require.NoError(t, err)
31+
defer s.Close()
32+
33+
// Start dependencies.
34+
consul := e2edb.NewConsulWithName("consul")
35+
require.NoError(t, s.StartAndWaitReady(consul))
36+
37+
flags := mergeFlags(
38+
AlertmanagerLocalFlags(),
39+
map[string]string{
40+
"-store.engine": blocksStorageEngine,
41+
"-blocks-storage.backend": "filesystem",
42+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
43+
"-blocks-storage.bucket-store.sync-interval": "15m",
44+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
45+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
46+
"-querier.query-store-for-labels-enabled": "true",
47+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
48+
"-blocks-storage.tsdb.ship-interval": "1s",
49+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
50+
"-blocks-storage.tsdb.enable-native-histograms": "true",
51+
// Ingester.
52+
"-ring.store": "consul",
53+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
54+
// Distributor.
55+
"-distributor.replication-factor": "1",
56+
// Store-gateway.
57+
"-store-gateway.sharding-enabled": "false",
58+
// alert manager
59+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
60+
},
61+
)
62+
63+
// make alert manager config dir
64+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
65+
66+
path := path.Join(s.SharedDir(), "cortex-1")
67+
68+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
69+
// Start Cortex replicas.
70+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
71+
require.NoError(t, s.StartAndWaitReady(cortex))
72+
73+
// Wait until Cortex replicas have updated the ring state.
74+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
75+
76+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
77+
require.NoError(t, err)
78+
79+
now := time.Now()
80+
81+
// series push
82+
symbols1, series, expectedVector := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
83+
res, err := c.PushV2(symbols1, series)
84+
require.NoError(t, err)
85+
require.Equal(t, 200, res.StatusCode)
86+
testPushHeader(t, res.Header, "1", "1", "0")
87+
88+
// sample
89+
result, err := c.Query("test_series", now)
90+
require.NoError(t, err)
91+
assert.Equal(t, expectedVector, result.(model.Vector))
92+
93+
// metadata
94+
metadata, err := c.Metadata("test_series", "")
95+
require.NoError(t, err)
96+
require.Equal(t, 1, len(metadata["test_series"]))
97+
98+
// histogram
99+
histogramIdx := rand.Uint32()
100+
symbols2, histogramSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
101+
res, err = c.PushV2(symbols2, histogramSeries)
102+
require.NoError(t, err)
103+
require.Equal(t, 200, res.StatusCode)
104+
testPushHeader(t, res.Header, "1", "1", "0")
105+
106+
symbols3, histogramFloatSeries := e2e.GenerateHistogramSeriesV2("test_histogram", now, histogramIdx, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
107+
res, err = c.PushV2(symbols3, histogramFloatSeries)
108+
require.NoError(t, err)
109+
require.Equal(t, 200, res.StatusCode)
110+
testPushHeader(t, res.Header, "1", "1", "0")
111+
112+
testHistogramTimestamp := now.Add(blockRangePeriod * 2)
113+
expectedHistogram := tsdbutil.GenerateTestHistogram(int(histogramIdx))
114+
result, err = c.Query(`test_histogram`, testHistogramTimestamp)
115+
require.NoError(t, err)
116+
require.Equal(t, model.ValVector, result.Type())
117+
v := result.(model.Vector)
118+
require.Equal(t, 2, v.Len())
119+
for _, s := range v {
120+
require.NotNil(t, s.Histogram)
121+
require.Equal(t, float64(expectedHistogram.Count), float64(s.Histogram.Count))
122+
require.Equal(t, float64(expectedHistogram.Sum), float64(s.Histogram.Sum))
123+
}
124+
}
125+
126+
func TestExemplar(t *testing.T) {
127+
s, err := e2e.NewScenario(networkName)
128+
require.NoError(t, err)
129+
defer s.Close()
130+
131+
// Start dependencies.
132+
consul := e2edb.NewConsulWithName("consul")
133+
require.NoError(t, s.StartAndWaitReady(consul))
134+
135+
flags := mergeFlags(
136+
AlertmanagerLocalFlags(),
137+
map[string]string{
138+
"-store.engine": blocksStorageEngine,
139+
"-blocks-storage.backend": "filesystem",
140+
"-blocks-storage.tsdb.head-compaction-interval": "4m",
141+
"-blocks-storage.bucket-store.sync-interval": "15m",
142+
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
143+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
144+
"-querier.query-store-for-labels-enabled": "true",
145+
"-blocks-storage.tsdb.ship-interval": "1s",
146+
"-blocks-storage.tsdb.enable-native-histograms": "true",
147+
// Ingester.
148+
"-ring.store": "consul",
149+
"-consul.hostname": consul.NetworkHTTPEndpoint(),
150+
"-ingester.max-exemplars": "100",
151+
// Distributor.
152+
"-distributor.replication-factor": "1",
153+
// Store-gateway.
154+
"-store-gateway.sharding-enabled": "false",
155+
// alert manager
156+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
157+
},
158+
)
159+
160+
// make alert manager config dir
161+
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))
162+
163+
path := path.Join(s.SharedDir(), "cortex-1")
164+
165+
flags = mergeFlags(flags, map[string]string{"-blocks-storage.filesystem.dir": path})
166+
// Start Cortex replicas.
167+
cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
168+
require.NoError(t, s.StartAndWaitReady(cortex))
169+
170+
// Wait until Cortex replicas have updated the ring state.
171+
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
172+
173+
c, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1")
174+
require.NoError(t, err)
175+
176+
now := time.Now()
177+
tsMillis := e2e.TimeToMilliseconds(now)
178+
179+
symbols := []string{"", "__name__", "test_metric", "b", "c", "baz", "qux", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}
180+
timeseries := []writev2.TimeSeries{
181+
{
182+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
183+
Metadata: writev2.Metadata{
184+
Type: writev2.Metadata_METRIC_TYPE_COUNTER, // writeV2RequestSeries1Metadata.Type.
185+
186+
HelpRef: 15, // Symbolized writeV2RequestSeries1Metadata.Help.
187+
UnitRef: 16, // Symbolized writeV2RequestSeries1Metadata.Unit.
188+
},
189+
Samples: []writev2.Sample{{Value: 1, Timestamp: tsMillis}},
190+
Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{11, 12}, Value: 1, Timestamp: tsMillis}},
191+
},
192+
}
193+
194+
res, err := c.PushV2(symbols, timeseries)
195+
require.NoError(t, err)
196+
require.Equal(t, 200, res.StatusCode)
197+
testPushHeader(t, res.Header, "1", "1", "1")
198+
199+
start := time.Now().Add(-time.Minute)
200+
end := now.Add(time.Minute)
201+
202+
exemplars, err := c.QueryExemplars("test_metric", start, end)
203+
require.NoError(t, err)
204+
require.Equal(t, 1, len(exemplars))
205+
}
206+
207+
func testPushHeader(t *testing.T, header http.Header, expectedSamples, expectedHistogram, expectedExemplars string) {
208+
require.Equal(t, expectedSamples, header.Get("X-Prometheus-Remote-Write-Samples-Written"))
209+
require.Equal(t, expectedHistogram, header.Get("X-Prometheus-Remote-Write-Histograms-Written"))
210+
require.Equal(t, expectedExemplars, header.Get("X-Prometheus-Remote-Write-Exemplars-Written"))
211+
}

0 commit comments

Comments
 (0)