Skip to content

Commit 1f81b3e

Browse files
authored
Added Transactional Gatherer allowed cached solutions (#989)
* Added cached collector. Signed-off-by: Bartlomiej Plotka <[email protected]> update. Signed-off-by: Bartlomiej Plotka <[email protected]> Attempt 2 Signed-off-by: Bartlomiej Plotka <[email protected]> Added blocking registry, with raw collector and transactional handler. Signed-off-by: Bartlomiej Plotka <[email protected]> Added fast path to normal (empty) registry to save 8 allocs and 3K5B per Gather. Signed-off-by: Bartlomiej Plotka <[email protected]> Simplified API, added tests. Signed-off-by: Bartlomiej Plotka <[email protected]> Fix. Signed-off-by: Bartlomiej Plotka <[email protected]> Simplified implementation. Signed-off-by: Bartlomiej Plotka <[email protected]> Added benchmark. Signed-off-by: Bartlomiej Plotka <[email protected]> Optimized. Signed-off-by: Bartlomiej Plotka <[email protected]> * Optimization attempt. Signed-off-by: Bartlomiej Plotka <[email protected]> * Revert "Optimization attempt." This reverts commit 2fcaf51. Optimization was not worth it: benchstat v1.txt v2.txt name old time/op new time/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 2.64µs ± 0% 4.05µs ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 701ms ± 0% 358ms ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 535µs ± 0% 703934µs ± 0% ~ (p=1.000 n=1+1) name old alloc/op new alloc/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 208B ± 0% 208B ± 0% ~ (all equal) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 40.2MB ± 0% 41.1MB ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 48.6kB ± 0% 84.3kB ± 0% ~ (p=1.000 n=1+1) name old allocs/op new allocs/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 3.00 ± 0% 3.00 ± 0% ~ (all equal) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 6.00 ± 0% 4003.00 ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 1.00k ± 0% 2.01k ± 0% ~ (p=1.000 n=1+1) * nit. Signed-off-by: Bartlomiej Plotka <[email protected]> * Another optimization attempt. Signed-off-by: Bartlomiej Plotka <[email protected]> * rename and further optimization. Signed-off-by: Bartlomiej Plotka <[email protected]> * Hopefully final optimization. benchstat -delta-test=none v6.txt v9.txt name old time/op new time/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 13.1ms ± 0% 0.0ms ± 0% -99.81% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 309ms ± 0% 282ms ± 0% -8.77% CachedTGatherer_Update/Gather-12 422ms ± 0% 0ms ± 0% -99.95% name old alloc/op new alloc/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 208B ± 0% 208B ± 0% 0.00% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 2.47kB ± 0% 1.67kB ± 0% -32.56% CachedTGatherer_Update/Gather-12 52.8kB ± 0% 24.6kB ± 0% -53.34% name old allocs/op new allocs/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 3.00 ± 0% 3.00 ± 0% 0.00% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 0.00 0.00 0.00% CachedTGatherer_Update/Gather-12 1.00k ± 0% 0.00k ± 0% -99.60% Signed-off-by: Bartlomiej Plotka <[email protected]> * Removed obsolete comment Signed-off-by: Bartlomiej Plotka <[email protected]> * Fixed tests. Signed-off-by: Bartlomiej Plotka <[email protected]> * Removed cache. Signed-off-by: Bartlomiej Plotka <[email protected]> * Fixed tests. Signed-off-by: Bartlomiej Plotka <[email protected]> * Re-add cache. Signed-off-by: Bartlomiej Plotka <[email protected]> * Removed cache. Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent f3021b0 commit 1f81b3e

File tree

10 files changed

+340
-64
lines changed

10 files changed

+340
-64
lines changed

prometheus/desc.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"strings"
2121

2222
"github.com/cespare/xxhash/v2"
23+
"github.com/prometheus/client_golang/prometheus/internal"
24+
2325
//nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
2426
"github.com/golang/protobuf/proto"
2527
"github.com/prometheus/common/model"
@@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) *
154156
Value: proto.String(v),
155157
})
156158
}
157-
sort.Sort(labelPairSorter(d.constLabelPairs))
159+
sort.Sort(internal.LabelPairSorter(d.constLabelPairs))
158160
return d
159161
}
160162

prometheus/internal/metric.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,34 @@ import (
1919
dto "github.com/prometheus/client_model/go"
2020
)
2121

22-
// metricSorter is a sortable slice of *dto.Metric.
23-
type metricSorter []*dto.Metric
22+
// LabelPairSorter implements sort.Interface. It is used to sort a slice of
23+
// dto.LabelPair pointers.
24+
type LabelPairSorter []*dto.LabelPair
2425

25-
func (s metricSorter) Len() int {
26+
func (s LabelPairSorter) Len() int {
2627
return len(s)
2728
}
2829

29-
func (s metricSorter) Swap(i, j int) {
30+
func (s LabelPairSorter) Swap(i, j int) {
3031
s[i], s[j] = s[j], s[i]
3132
}
3233

33-
func (s metricSorter) Less(i, j int) bool {
34+
func (s LabelPairSorter) Less(i, j int) bool {
35+
return s[i].GetName() < s[j].GetName()
36+
}
37+
38+
// MetricSorter is a sortable slice of *dto.Metric.
39+
type MetricSorter []*dto.Metric
40+
41+
func (s MetricSorter) Len() int {
42+
return len(s)
43+
}
44+
45+
func (s MetricSorter) Swap(i, j int) {
46+
s[i], s[j] = s[j], s[i]
47+
}
48+
49+
func (s MetricSorter) Less(i, j int) bool {
3450
if len(s[i].Label) != len(s[j].Label) {
3551
// This should not happen. The metrics are
3652
// inconsistent. However, we have to deal with the fact, as
@@ -68,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool {
6884
// the slice, with the contained Metrics sorted within each MetricFamily.
6985
func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
7086
for _, mf := range metricFamiliesByName {
71-
sort.Sort(metricSorter(mf.Metric))
87+
sort.Sort(MetricSorter(mf.Metric))
7288
}
7389
names := make([]string, 0, len(metricFamiliesByName))
7490
for name, mf := range metricFamiliesByName {

prometheus/metric.go

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string {
115115
return name
116116
}
117117

118-
// labelPairSorter implements sort.Interface. It is used to sort a slice of
119-
// dto.LabelPair pointers.
120-
type labelPairSorter []*dto.LabelPair
121-
122-
func (s labelPairSorter) Len() int {
123-
return len(s)
124-
}
125-
126-
func (s labelPairSorter) Swap(i, j int) {
127-
s[i], s[j] = s[j], s[i]
128-
}
129-
130-
func (s labelPairSorter) Less(i, j int) bool {
131-
return s[i].GetName() < s[j].GetName()
132-
}
133-
134118
type invalidMetric struct {
135119
desc *Desc
136120
err error

prometheus/promhttp/http.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ func Handler() http.Handler {
8484
// instrumentation. Use the InstrumentMetricHandler function to apply the same
8585
// kind of instrumentation as it is used by the Handler function.
8686
func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
87+
return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts)
88+
}
89+
90+
// HandlerForTransactional is like HandlerFor, but it uses transactional gather, which
91+
// can safely change in-place returned *dto.MetricFamily before call to `Gather` and after
92+
// call to `done` of that `Gather`.
93+
func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler {
8794
var (
8895
inFlightSem chan struct{}
8996
errCnt = prometheus.NewCounterVec(
@@ -123,7 +130,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler {
123130
return
124131
}
125132
}
126-
mfs, err := reg.Gather()
133+
mfs, done, err := reg.Gather()
134+
defer done()
127135
if err != nil {
128136
if opts.ErrorLog != nil {
129137
opts.ErrorLog.Println("error gathering metrics:", err)

prometheus/promhttp/http_test.go

Lines changed: 83 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package promhttp
1616
import (
1717
"bytes"
1818
"errors"
19+
"fmt"
1920
"log"
2021
"net/http"
2122
"net/http/httptest"
@@ -24,6 +25,7 @@ import (
2425
"time"
2526

2627
"github.com/prometheus/client_golang/prometheus"
28+
dto "github.com/prometheus/client_model/go"
2729
)
2830

2931
type errorCollector struct{}
@@ -56,8 +58,19 @@ func (b blockingCollector) Collect(ch chan<- prometheus.Metric) {
5658
<-b.Block
5759
}
5860

59-
func TestHandlerErrorHandling(t *testing.T) {
61+
type mockTransactionGatherer struct {
62+
g prometheus.Gatherer
63+
gatherInvoked int
64+
doneInvoked int
65+
}
6066

67+
func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
68+
g.gatherInvoked++
69+
mfs, err := g.g.Gather()
70+
return mfs, func() { g.doneInvoked++ }, err
71+
}
72+
73+
func TestHandlerErrorHandling(t *testing.T) {
6174
// Create a registry that collects a MetricFamily with two elements,
6275
// another with one, and reports an error. Further down, we'll use the
6376
// same registry in the HandlerOpts.
@@ -90,21 +103,30 @@ func TestHandlerErrorHandling(t *testing.T) {
90103
request, _ := http.NewRequest("GET", "/", nil)
91104
request.Header.Add("Accept", "test/plain")
92105

93-
errorHandler := HandlerFor(reg, HandlerOpts{
106+
mReg := &mockTransactionGatherer{g: reg}
107+
errorHandler := HandlerForTransactional(mReg, HandlerOpts{
94108
ErrorLog: logger,
95109
ErrorHandling: HTTPErrorOnError,
96110
Registry: reg,
97111
})
98-
continueHandler := HandlerFor(reg, HandlerOpts{
112+
continueHandler := HandlerForTransactional(mReg, HandlerOpts{
99113
ErrorLog: logger,
100114
ErrorHandling: ContinueOnError,
101115
Registry: reg,
102116
})
103-
panicHandler := HandlerFor(reg, HandlerOpts{
117+
panicHandler := HandlerForTransactional(mReg, HandlerOpts{
104118
ErrorLog: logger,
105119
ErrorHandling: PanicOnError,
106120
Registry: reg,
107121
})
122+
// Expect gatherer not touched.
123+
if got := mReg.gatherInvoked; got != 0 {
124+
t.Fatalf("unexpected number of gather invokes, want 0, got %d", got)
125+
}
126+
if got := mReg.doneInvoked; got != 0 {
127+
t.Fatalf("unexpected number of done invokes, want 0, got %d", got)
128+
}
129+
108130
wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error
109131
`
110132
wantErrorBody := `An error has occurred while serving metrics:
@@ -140,25 +162,39 @@ the_count 0
140162
`
141163

142164
errorHandler.ServeHTTP(writer, request)
165+
if got := mReg.gatherInvoked; got != 1 {
166+
t.Fatalf("unexpected number of gather invokes, want 1, got %d", got)
167+
}
168+
if got := mReg.doneInvoked; got != 1 {
169+
t.Fatalf("unexpected number of done invokes, want 1, got %d", got)
170+
}
143171
if got, want := writer.Code, http.StatusInternalServerError; got != want {
144172
t.Errorf("got HTTP status code %d, want %d", got, want)
145173
}
146-
if got := logBuf.String(); got != wantMsg {
147-
t.Errorf("got log message:\n%s\nwant log message:\n%s\n", got, wantMsg)
174+
if got, want := logBuf.String(), wantMsg; got != want {
175+
t.Errorf("got log buf %q, want %q", got, want)
148176
}
149-
if got := writer.Body.String(); got != wantErrorBody {
150-
t.Errorf("got body:\n%s\nwant body:\n%s\n", got, wantErrorBody)
177+
if got, want := writer.Body.String(), wantErrorBody; got != want {
178+
t.Errorf("got body %q, want %q", got, want)
151179
}
180+
152181
logBuf.Reset()
153182
writer.Body.Reset()
154183
writer.Code = http.StatusOK
155184

156185
continueHandler.ServeHTTP(writer, request)
186+
187+
if got := mReg.gatherInvoked; got != 2 {
188+
t.Fatalf("unexpected number of gather invokes, want 2, got %d", got)
189+
}
190+
if got := mReg.doneInvoked; got != 2 {
191+
t.Fatalf("unexpected number of done invokes, want 2, got %d", got)
192+
}
157193
if got, want := writer.Code, http.StatusOK; got != want {
158194
t.Errorf("got HTTP status code %d, want %d", got, want)
159195
}
160-
if got := logBuf.String(); got != wantMsg {
161-
t.Errorf("got log message %q, want %q", got, wantMsg)
196+
if got, want := logBuf.String(), wantMsg; got != want {
197+
t.Errorf("got log buf %q, want %q", got, want)
162198
}
163199
if got := writer.Body.String(); got != wantOKBody1 && got != wantOKBody2 {
164200
t.Errorf("got body %q, want either %q or %q", got, wantOKBody1, wantOKBody2)
@@ -168,20 +204,34 @@ the_count 0
168204
if err := recover(); err == nil {
169205
t.Error("expected panic from panicHandler")
170206
}
207+
if got := mReg.gatherInvoked; got != 3 {
208+
t.Fatalf("unexpected number of gather invokes, want 3, got %d", got)
209+
}
210+
if got := mReg.doneInvoked; got != 3 {
211+
t.Fatalf("unexpected number of done invokes, want 3, got %d", got)
212+
}
171213
}()
172214
panicHandler.ServeHTTP(writer, request)
173215
}
174216

175217
func TestInstrumentMetricHandler(t *testing.T) {
176218
reg := prometheus.NewRegistry()
177-
handler := InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{}))
219+
mReg := &mockTransactionGatherer{g: reg}
220+
handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
178221
// Do it again to test idempotency.
179-
InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{}))
222+
InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{}))
180223
writer := httptest.NewRecorder()
181224
request, _ := http.NewRequest("GET", "/", nil)
182225
request.Header.Add("Accept", "test/plain")
183226

184227
handler.ServeHTTP(writer, request)
228+
if got := mReg.gatherInvoked; got != 1 {
229+
t.Fatalf("unexpected number of gather invokes, want 1, got %d", got)
230+
}
231+
if got := mReg.doneInvoked; got != 1 {
232+
t.Fatalf("unexpected number of done invokes, want 1, got %d", got)
233+
}
234+
185235
if got, want := writer.Code, http.StatusOK; got != want {
186236
t.Errorf("got HTTP status code %d, want %d", got, want)
187237
}
@@ -195,19 +245,28 @@ func TestInstrumentMetricHandler(t *testing.T) {
195245
t.Errorf("got body %q, does not contain %q", got, want)
196246
}
197247

198-
writer.Body.Reset()
199-
handler.ServeHTTP(writer, request)
200-
if got, want := writer.Code, http.StatusOK; got != want {
201-
t.Errorf("got HTTP status code %d, want %d", got, want)
202-
}
248+
for i := 0; i < 100; i++ {
249+
writer.Body.Reset()
250+
handler.ServeHTTP(writer, request)
203251

204-
want = "promhttp_metric_handler_requests_in_flight 1\n"
205-
if got := writer.Body.String(); !strings.Contains(got, want) {
206-
t.Errorf("got body %q, does not contain %q", got, want)
207-
}
208-
want = "promhttp_metric_handler_requests_total{code=\"200\"} 1\n"
209-
if got := writer.Body.String(); !strings.Contains(got, want) {
210-
t.Errorf("got body %q, does not contain %q", got, want)
252+
if got, want := mReg.gatherInvoked, i+2; got != want {
253+
t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got)
254+
}
255+
if got, want := mReg.doneInvoked, i+2; got != want {
256+
t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got)
257+
}
258+
if got, want := writer.Code, http.StatusOK; got != want {
259+
t.Errorf("got HTTP status code %d, want %d", got, want)
260+
}
261+
262+
want := "promhttp_metric_handler_requests_in_flight 1\n"
263+
if got := writer.Body.String(); !strings.Contains(got, want) {
264+
t.Errorf("got body %q, does not contain %q", got, want)
265+
}
266+
want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1)
267+
if got := writer.Body.String(); !strings.Contains(got, want) {
268+
t.Errorf("got body %q, does not contain %q", got, want)
269+
}
211270
}
212271
}
213272

0 commit comments

Comments
 (0)