Skip to content

Commit 9a1c45a

Browse files
committed
api: Create a new Go module for API; with remote write client and handler.
Lot's of assumptions here: * We want separate module (we don't have processes on how to version it and release). * We want to expose generated protos, without gogo. Signed-off-by: bwplotka <[email protected]>
1 parent e1675ce commit 9a1c45a

23 files changed

+8979
-6
lines changed

.bingo/Variables.mk

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.8. DO NOT EDIT.
1+
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.9. DO NOT EDIT.
22
# All tools are designed to be build inside $GOBIN.
33
BINGO_DIR := $(dir $(lastword $(MAKEFILE_LIST)))
44
GOPATH ?= $(shell go env GOPATH)
@@ -7,16 +7,22 @@ GO ?= $(shell which go)
77

88
# Below generated variables ensure that every time a tool under each variable is invoked, the correct version
99
# will be used; reinstalling only if needed.
10-
# For example for goimports variable:
10+
# For example for buf variable:
1111
#
1212
# In your main Makefile (for non array binaries):
1313
#
1414
#include .bingo/Variables.mk # Assuming -dir was set to .bingo .
1515
#
16-
#command: $(GOIMPORTS)
17-
# @echo "Running goimports"
18-
# @$(GOIMPORTS) <flags/args..>
16+
#command: $(BUF)
17+
# @echo "Running buf"
18+
# @$(BUF) <flags/args..>
1919
#
20+
BUF := $(GOBIN)/buf-v1.39.0
21+
$(BUF): $(BINGO_DIR)/buf.mod
22+
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
23+
@echo "(re)installing $(GOBIN)/buf-v1.39.0"
24+
@cd $(BINGO_DIR) && GOWORK=off $(GO) build -mod=mod -modfile=buf.mod -o=$(GOBIN)/buf-v1.39.0 "github.com/bufbuild/buf/cmd/buf"
25+
2026
GOIMPORTS := $(GOBIN)/goimports-v0.9.3
2127
$(GOIMPORTS): $(BINGO_DIR)/goimports.mod
2228
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.

.bingo/buf.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT
2+
3+
go 1.22.6
4+
5+
require github.com/bufbuild/buf v1.39.0 // cmd/buf

.bingo/buf.sum

Lines changed: 336 additions & 0 deletions
Large diffs are not rendered by default.

.bingo/variables.env

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.8. DO NOT EDIT.
1+
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.9. DO NOT EDIT.
22
# All tools are designed to be build inside $GOBIN.
33
# Those variables will work only until 'bingo get' was invoked, or if tools were installed via Makefile's Variables.mk.
44
GOBIN=${GOBIN:=$(go env GOBIN)}
@@ -8,5 +8,7 @@ if [ -z "$GOBIN" ]; then
88
fi
99

1010

11+
BUF="${GOBIN}/buf-v1.39.0"
12+
1113
GOIMPORTS="${GOBIN}/goimports-v0.9.3"
1214

Makefile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,10 @@ generate-go-collector-test-files:
5050
.PHONY: fmt
5151
fmt: common-format
5252
$(GOIMPORTS) -local github.com/prometheus/client_golang -w .
53+
54+
RWMODULE = api/remotewrite
55+
56+
.PHONY: proto
57+
proto: ## Regenerate Go from proto.
58+
proto: $(BUF)
59+
@$(MAKE) -C api/remotewrite proto BUF=$(BUF)

api/remotewrite/Makefile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
.PHONY: proto
3+
proto: ## Regenerate Go from proto.
4+
proto: $(BUF)
5+
@echo ">> regenerating Prometheus proto"
6+
@$(BUF) generate
7+
# TODO(bwplotka): Is there a way to configure buf for this?
8+
@find genproto/ -type f -exec sed -i '' 's/package prompb/package writev1/g' {} \;
9+
# For some reasons buf generates this unused import, kill it manually for now and reformat.
10+
@find genproto/ -type f -exec sed -i '' 's/_ "github.com\/gogo\/protobuf\/gogoproto"//g' {} \;
11+
@go fmt ./genproto/...

api/remotewrite/buf.gen.yaml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# buf.gen.yaml
2+
version: v2
3+
4+
plugins:
5+
- remote: buf.build/protocolbuffers/go:v1.31.0
6+
out: .
7+
opt:
8+
- Mio/prometheus/write/v2/types.proto=genproto/v2
9+
- Mtypes.proto=genproto/v1
10+
- Mremote.proto=genproto/v1
11+
12+
# vtproto for efficiency utilities like pooling etc.
13+
# https://buf.build/community/planetscale-vtprotobuf?version=v0.6.0
14+
- remote: buf.build/community/planetscale-vtprotobuf:v0.6.0
15+
out: .
16+
opt:
17+
- Mio/prometheus/write/v2/types.proto=genproto/v2
18+
- Mtypes.proto=genproto/v1
19+
- Mremote.proto=genproto/v1
20+
- features=marshal+unmarshal+size+clone
21+
22+
inputs:
23+
- module: buf.build/prometheus/prometheus:5b212ab78fb7460e831cf7ff2d83e385
24+
types:
25+
- "io.prometheus.write.v2.Request"
26+
- "prometheus.WriteRequest"

api/remotewrite/client.go

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
// Copyright 2024 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package remotewrite
15+
16+
import (
17+
"bytes"
18+
"context"
19+
"errors"
20+
"fmt"
21+
"io"
22+
"log/slog"
23+
"net/http"
24+
"strconv"
25+
"time"
26+
27+
"github.com/efficientgo/core/backoff"
28+
"github.com/klauspost/compress/snappy"
29+
writev1 "github.com/prometheus/client_golang/api/remotewrite/genproto/v1"
30+
writev2 "github.com/prometheus/client_golang/api/remotewrite/genproto/v2"
31+
)
32+
33+
const (
34+
defaultBackoff = 0
35+
maxErrMsgLen = 1024
36+
)
37+
38+
type Client struct {
39+
logger *slog.Logger
40+
url string
41+
client *http.Client
42+
43+
userAgent string
44+
retryOnRateLimit bool
45+
46+
compr Compression
47+
comprBuf []byte
48+
49+
b *backoff.Backoff
50+
}
51+
52+
type EncodingClient struct {
53+
client *Client
54+
55+
buf []byte
56+
}
57+
58+
func NewEncodingClient(client *Client) *EncodingClient {
59+
return &EncodingClient{client: client}
60+
}
61+
62+
func (c *EncodingClient) WriteV1(ctx context.Context, req *writev1.WriteRequest, opts *ClientWriteOpts) (WriteResponseStats, error) {
63+
size := req.SizeVT()
64+
if len(c.buf) < size {
65+
c.buf = make([]byte, size)
66+
}
67+
if _, err := req.MarshalToSizedBufferVT(c.buf[:size]); err != nil {
68+
return WriteResponseStats{}, fmt.Errorf("encoding v1 request %w", err)
69+
}
70+
return c.client.Write(ctx, ProtoMsgV1, c.buf[:size], opts)
71+
}
72+
73+
func (c *EncodingClient) WriteV2(ctx context.Context, req *writev2.Request, opts *ClientWriteOpts) (WriteResponseStats, error) {
74+
size := req.SizeVT()
75+
if len(c.buf) < size {
76+
c.buf = make([]byte, size)
77+
}
78+
if _, err := req.MarshalToSizedBufferVT(c.buf[:size]); err != nil {
79+
return WriteResponseStats{}, fmt.Errorf("encoding v2 request %w", err)
80+
}
81+
stats, err := c.client.Write(ctx, ProtoMsgV2, c.buf[:size], opts)
82+
if err != nil {
83+
return stats, err
84+
}
85+
86+
// Check the case mentioned in PRW 2.0.
87+
// https://prometheus.io/docs/specs/remote_write_spec_2_0/#required-written-response-headers.
88+
if !stats.Confirmed && stats.NoDataWritten() {
89+
cStats := WriteResponseStats{}.AddV2(req)
90+
if !cStats.NoDataWritten() {
91+
return stats, fmt.Errorf("sent v2 request with %v samples %v histograms %v exemplars; "+
92+
"got 2xx, but PRW 2.0 response header statistics indicate %v samples, %v histograms "+
93+
"and %v exemplars were accepted; assumining failure e.g. the target only supports "+
94+
"PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly",
95+
cStats.Samples, cStats.Histograms, cStats.Exemplars,
96+
stats.Samples, stats.Histograms, stats.Exemplars,
97+
)
98+
}
99+
}
100+
return stats, nil
101+
}
102+
103+
// NewClient returns client.
104+
// TODO(bwplotka): Add variadic options.
105+
func NewClient(logger *slog.Logger, url string, hc *http.Client, compr Compression, ua string, retryOnRateLimit bool) *Client {
106+
if hc == nil {
107+
hc = &http.Client{Timeout: 1 * time.Minute}
108+
}
109+
return &Client{
110+
logger: logger,
111+
url: url,
112+
client: hc,
113+
compr: compr,
114+
userAgent: ua,
115+
retryOnRateLimit: retryOnRateLimit,
116+
}
117+
}
118+
119+
type RetryableError struct {
120+
error
121+
retryAfter time.Duration
122+
}
123+
124+
func (r RetryableError) RetryAfter() time.Duration {
125+
return r.retryAfter
126+
}
127+
128+
type ClientWriteOpts struct {
129+
Backoff backoff.Config
130+
}
131+
132+
var defaultOpts = &ClientWriteOpts{
133+
Backoff: backoff.Config{
134+
Min: 1 * time.Second,
135+
Max: 10 * time.Second,
136+
MaxRetries: 10,
137+
},
138+
}
139+
140+
// TODO(bwplotka): Support variadic options allowing too old sample handling, tracing, metrics
141+
func (c *Client) Write(ctx context.Context, proto ProtoMsg, serializedRequest []byte, opts *ClientWriteOpts) (WriteResponseStats, error) {
142+
o := *defaultOpts
143+
if opts != nil {
144+
o = *opts
145+
}
146+
payload, err := compressPayload(&c.comprBuf, c.compr, serializedRequest)
147+
if err != nil {
148+
return WriteResponseStats{}, fmt.Errorf("compressing %w", err)
149+
}
150+
151+
// Since we retry writes we need to track the total amount of accepted data
152+
// across the various attempts.
153+
accumulatedStats := WriteResponseStats{}
154+
155+
b := backoff.New(ctx, o.Backoff)
156+
for {
157+
rs, err := c.write(ctx, proto, payload, b.NumRetries())
158+
accumulatedStats = accumulatedStats.Add(rs)
159+
if err == nil {
160+
// Success!
161+
// TODO(bwplotka): Debug log with retry summary?
162+
return accumulatedStats, nil
163+
}
164+
165+
var retryableErr RetryableError
166+
if !errors.As(err, &retryableErr) {
167+
// TODO(bwplotka): More context in the error e.g. about retries.
168+
return accumulatedStats, err
169+
}
170+
171+
if !b.Ongoing() {
172+
// TODO(bwplotka): More context in the error e.g. about retries.
173+
return accumulatedStats, err
174+
}
175+
176+
backoffDelay := b.NextDelay() + retryableErr.RetryAfter()
177+
c.logger.Error("failed to send remote write request; retrying after backoff", "err", err, "backoff", backoffDelay)
178+
select {
179+
case <-ctx.Done():
180+
// TODO(bwplotka): More context in the error e.g. about retries.
181+
return WriteResponseStats{}, ctx.Err()
182+
case <-time.After(backoffDelay):
183+
// Retry.
184+
}
185+
}
186+
}
187+
188+
func compressPayload(tmpbuf *[]byte, enc Compression, inp []byte) (compressed []byte, _ error) {
189+
switch enc {
190+
case SnappyBlockCompression:
191+
compressed = snappy.Encode(*tmpbuf, inp)
192+
if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) {
193+
// grow the buffer for the next time.
194+
*tmpbuf = make([]byte, n)
195+
}
196+
return compressed, nil
197+
default:
198+
return compressed, fmt.Errorf("Unknown compression scheme [%v]", enc)
199+
}
200+
}
201+
202+
func (c *Client) write(ctx context.Context, proto ProtoMsg, payload []byte, attempt int) (WriteResponseStats, error) {
203+
httpReq, err := http.NewRequest(http.MethodPost, c.url, bytes.NewReader(payload))
204+
if err != nil {
205+
// Errors from NewRequest are from unparsable URLs, so are not
206+
// recoverable.
207+
return WriteResponseStats{}, err
208+
}
209+
210+
httpReq.Header.Add("Content-Encoding", string(c.compr))
211+
httpReq.Header.Set("Content-Type", ContentTypeHeader(proto))
212+
httpReq.Header.Set("User-Agent", c.userAgent)
213+
if proto == ProtoMsgV1 {
214+
// Compatibility mode for 1.0.
215+
httpReq.Header.Set(VersionHeader, Version1HeaderValue)
216+
} else {
217+
httpReq.Header.Set(VersionHeader, Version20HeaderValue)
218+
}
219+
220+
if attempt > 0 {
221+
httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt))
222+
}
223+
224+
httpResp, err := c.client.Do(httpReq.WithContext(ctx))
225+
if err != nil {
226+
// Errors from Client.Do are likely network errors, so recoverable.
227+
return WriteResponseStats{}, RetryableError{err, defaultBackoff}
228+
}
229+
defer func() {
230+
_, _ = io.Copy(io.Discard, httpResp.Body)
231+
_ = httpResp.Body.Close()
232+
}()
233+
234+
rs, err := parseWriteResponseStats(httpResp)
235+
if err != nil {
236+
c.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
237+
}
238+
239+
if httpResp.StatusCode/100 == 2 {
240+
return rs, nil
241+
}
242+
243+
body, err := io.ReadAll(io.LimitReader(httpResp.Body, maxErrMsgLen))
244+
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, body)
245+
246+
if httpResp.StatusCode/100 == 5 ||
247+
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
248+
return rs, RetryableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
249+
}
250+
return rs, err
251+
}
252+
253+
// retryAfterDuration returns the duration for the Retry-After header. In case of any errors, it
254+
// returns 0 as if the header was never supplied.
255+
func retryAfterDuration(t string) time.Duration {
256+
parsedDuration, err := time.Parse(http.TimeFormat, t)
257+
if err == nil {
258+
return time.Until(parsedDuration)
259+
}
260+
// The duration can be in seconds.
261+
d, err := strconv.Atoi(t)
262+
if err != nil {
263+
return 0
264+
}
265+
return time.Duration(d) * time.Second
266+
}

0 commit comments

Comments
 (0)