Skip to content

Commit 6e95a3d

Browse files
committed
Simplified API, added tests.
Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent abaeafe commit 6e95a3d

File tree

2 files changed

+239
-57
lines changed

2 files changed

+239
-57
lines changed

prometheus/cache.go

Lines changed: 97 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,67 +26,84 @@ import (
2626
dto "github.com/prometheus/client_model/go"
2727
)
2828

29-
var _ rawCollector = &CachedCollector{}
29+
var _ TransactionalGatherer = &CachedTGatherer{}
3030

31-
// CachedCollector allows creating allocation friendly metrics which change less frequently than scrape time, yet
32-
// label values can are changing over time. This collector
31+
// CachedTGatherer is a transactional gatherer that allows maintaining set of metrics which
32+
// change less frequently than scrape time, yet label values and values change over time.
3333
//
3434
// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider
35-
// using CachedCollector instead.
36-
type CachedCollector struct {
35+
// using CachedTGatherer instead.
36+
//
37+
// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers.
38+
// TODO(bwplotka): Add non-session update API if useful for watcher-like mechanic.
39+
type CachedTGatherer struct {
3740
metrics map[uint64]*dto.Metric
3841
metricFamilyByName map[string]*dto.MetricFamily
42+
mMu sync.RWMutex
3943

4044
pendingSession bool
45+
psMu sync.Mutex
4146
}
4247

43-
func NewCachedCollector() *CachedCollector {
44-
return &CachedCollector{
48+
func NewCachedTGatherer() *CachedTGatherer {
49+
return &CachedTGatherer{
4550
metrics: make(map[uint64]*dto.Metric),
4651
metricFamilyByName: map[string]*dto.MetricFamily{},
4752
}
4853
}
4954

50-
func (c *CachedCollector) Collect() []*dto.MetricFamily {
51-
// TODO(bwplotka): Optimize potential penalty here.
52-
return internal.NormalizeMetricFamilies(c.metricFamilyByName)
55+
// Gather implements TransactionalGatherer interface.
56+
func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
57+
c.mMu.RLock()
58+
// TODO(bwplotka): Consider caching slice and normalizing on write.
59+
return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil
5360
}
5461

55-
// NewSession allows to collect all metrics in one go and update cache as much in-place
56-
// as possible to save allocations.
57-
// NOTE: Not concurrency safe and only one allowed at the time (until commit).
58-
func (c *CachedCollector) NewSession() *CollectSession {
62+
// NewSession allows to recreate state of all metrics in CachedTGatherer in
63+
// one go and update cache in-place to save allocations.
64+
// Only one session is allowed at the time.
65+
//
66+
// Session is not concurrency safe.
67+
func (c *CachedTGatherer) NewSession() (*CollectSession, error) {
68+
c.psMu.Lock()
69+
if c.pendingSession {
70+
c.psMu.Unlock()
71+
return nil, errors.New("only one session allowed, one already pending")
72+
}
5973
c.pendingSession = true
74+
c.psMu.Unlock()
75+
6076
return &CollectSession{
6177
c: c,
6278
currentMetrics: make(map[uint64]*dto.Metric, len(c.metrics)),
6379
currentByName: make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)),
64-
}
80+
}, nil
6581
}
6682

6783
type CollectSession struct {
6884
closed bool
6985

70-
c *CachedCollector
86+
c *CachedTGatherer
7187
currentMetrics map[uint64]*dto.Metric
7288
currentByName map[string]*dto.MetricFamily
7389
}
7490

91+
// MustAddMetric is an AddMetric that panics on error.
7592
func (s *CollectSession) MustAddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) {
7693
if err := s.AddMetric(fqName, help, labelNames, labelValues, valueType, value, ts); err != nil {
7794
panic(err)
7895
}
7996
}
8097

81-
// AddMetric ...
98+
// AddMetric adds metrics to current session. No changes will be updated in CachedTGatherer until Commit.
8299
// TODO(bwplotka): Add validation.
83100
func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) error {
84101
if s.closed {
85102
return errors.New("new metric: collect session is closed, but was attempted to be used")
86103
}
87104

88-
// Label names can be unsorted, will be sorting them later. The only implication is cachability if
89-
// consumer provide non-deterministic order of those (unlikely since label values has to be matched).
105+
// Label names can be unsorted, we will be sorting them later. The only implication is cachability if
106+
// consumer provide non-deterministic order of those.
90107

91108
if len(labelNames) != len(labelValues) {
92109
return errors.New("new metric: label name has different len than values")
@@ -116,7 +133,8 @@ func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues
116133
h := xxhash.New()
117134
h.WriteString(fqName)
118135
h.Write(separatorByteSlice)
119-
for i := range labelNames { // Ofc not in the same order...
136+
137+
for i := range labelNames {
120138
h.WriteString(labelNames[i])
121139
h.Write(separatorByteSlice)
122140
h.WriteString(labelValues[i])
@@ -178,75 +196,96 @@ func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues
178196
}
179197
s.currentMetrics[hSum] = m
180198

181-
// Will be sorted later.
199+
// Will be sorted later anyway, skip for now.
182200
d.Metric = append(d.Metric, m)
183201
return nil
184202
}
185203

186204
func (s *CollectSession) Commit() {
187-
// TODO(bwplotka): Sort metrics within family.
205+
s.c.mMu.Lock()
206+
// TODO(bwplotka): Sort metrics within family?
188207
s.c.metricFamilyByName = s.currentByName
189208
s.c.metrics = s.currentMetrics
209+
s.c.mMu.Unlock()
190210

211+
s.c.psMu.Lock()
191212
s.closed = true
192213
s.c.pendingSession = false
214+
s.c.psMu.Unlock()
193215
}
194216

195-
type BlockingRegistry struct {
196-
*Registry
197-
198-
// rawCollector represents special collectors which requires blocking collect for the whole duration
199-
// of returned dto.MetricFamily usage.
200-
rawCollectors []rawCollector
201-
mu sync.Mutex
202-
}
203-
204-
func NewBlockingRegistry() *BlockingRegistry {
205-
return &BlockingRegistry{
206-
Registry: NewRegistry(),
207-
}
208-
}
217+
var _ TransactionalGatherer = &MultiTRegistry{}
209218

210-
type rawCollector interface {
211-
Collect() []*dto.MetricFamily
212-
}
213-
214-
func (b *BlockingRegistry) RegisterRaw(r rawCollector) error {
215-
// TODO(bwplotka): Register, I guess for dups/check purposes?
216-
b.rawCollectors = append(b.rawCollectors, r)
217-
return nil
219+
// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple
220+
// transactional gatherers.
221+
//
222+
// It is caller responsibility to ensure two registries have mutually exclusive metric families,
223+
// no deduplication will happen.
224+
type MultiTRegistry struct {
225+
tGatherers []TransactionalGatherer
218226
}
219227

220-
func (b *BlockingRegistry) MustRegisterRaw(r rawCollector) {
221-
if err := b.RegisterRaw(r); err != nil {
222-
panic(err)
228+
// NewMultiTRegistry creates MultiTRegistry.
229+
func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry {
230+
return &MultiTRegistry{
231+
tGatherers: tGatherers,
223232
}
224233
}
225234

226-
func (b *BlockingRegistry) Gather() (_ []*dto.MetricFamily, done func(), err error) {
227-
b.mu.Lock()
228-
mfs, err := b.Registry.Gather()
235+
// Gather implements TransactionalGatherer interface.
236+
func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) {
237+
errs := MultiError{}
229238

230-
// TODO(bwplotka): Returned mfs are sorted, so sort raw ones and inject?
239+
dFns := make([]func(), 0, len(r.tGatherers))
231240
// TODO(bwplotka): Implement concurrency for those?
232-
for _, r := range b.rawCollectors {
233-
// TODO(bwplotka): Check for duplicates.
234-
mfs = append(mfs, r.Collect()...)
241+
for _, g := range r.tGatherers {
242+
// TODO(bwplotka): Check for duplicates?
243+
m, d, err := g.Gather()
244+
errs.Append(err)
245+
246+
mfs = append(mfs, m...)
247+
dFns = append(dFns, d)
235248
}
236249

237250
// TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already.
238251
sort.Slice(mfs, func(i, j int) bool {
239252
return *mfs[i].Name < *mfs[j].Name
240253
})
241-
return mfs, func() { b.mu.Unlock() }, err
254+
return mfs, func() {
255+
for _, d := range dFns {
256+
d()
257+
}
258+
}, errs.MaybeUnwrap()
242259
}
243260

244-
// TransactionalGatherer ...
261+
// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
262+
// used by metric family is no longer used by a caller. This allows implementations with cache.
245263
type TransactionalGatherer interface {
246-
// Gather ...
264+
// Gather returns metrics in a lexicographically sorted slice
265+
// of uniquely named MetricFamily protobufs. Gather ensures that the
266+
// returned slice is valid and self-consistent so that it can be used
267+
// for valid exposition. As an exception to the strict consistency
268+
// requirements described for metric.Desc, Gather will tolerate
269+
// different sets of label names for metrics of the same metric family.
270+
//
271+
// Even if an error occurs, Gather attempts to gather as many metrics as
272+
// possible. Hence, if a non-nil error is returned, the returned
273+
// MetricFamily slice could be nil (in case of a fatal error that
274+
// prevented any meaningful metric collection) or contain a number of
275+
// MetricFamily protobufs, some of which might be incomplete, and some
276+
// might be missing altogether. The returned error (which might be a
277+
// MultiError) explains the details. Note that this is mostly useful for
278+
// debugging purposes. If the gathered protobufs are to be used for
279+
// exposition in actual monitoring, it is almost always better to not
280+
// expose an incomplete result and instead disregard the returned
281+
// MetricFamily protobufs in case the returned error is non-nil.
282+
//
283+
// Important: done is expected to be triggered (even if the error occurs!)
284+
// once caller does not need returned slice of dto.MetricFamily.
247285
Gather() (_ []*dto.MetricFamily, done func(), err error)
248286
}
249287

288+
// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
250289
func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
251290
return &noTransactionGatherer{g: g}
252291
}
@@ -255,6 +294,7 @@ type noTransactionGatherer struct {
255294
g Gatherer
256295
}
257296

297+
// Gather implements TransactionalGatherer interface.
258298
func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
259299
mfs, err := g.g.Gather()
260300
return mfs, func() {}, err

prometheus/cache_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package prometheus
2+
3+
import (
4+
"errors"
5+
"strings"
6+
"testing"
7+
8+
dto "github.com/prometheus/client_model/go"
9+
)
10+
11+
func TestCachedTGatherer(t *testing.T) {
12+
c := NewCachedTGatherer()
13+
mfs, done, err := c.Gather()
14+
if err != nil {
15+
t.Error("gather failed:", err)
16+
}
17+
done()
18+
if got := mfsToString(mfs); got != "" {
19+
t.Error("unexpected metric family", got)
20+
}
21+
22+
s, err := c.NewSession()
23+
if err != nil {
24+
t.Error("session failed:", err)
25+
}
26+
27+
_, err = c.NewSession()
28+
if err == nil {
29+
t.Error("second session expected to fail, got nil")
30+
}
31+
32+
if err := s.AddMetric("a", "help a", []string{"b", "c"}, []string{"valb", "valc"}, GaugeValue, 1, nil); err != nil {
33+
t.Error("add metric:", err)
34+
}
35+
36+
// Without commit we should see still empty list.
37+
mfs, done, err = c.Gather()
38+
if err != nil {
39+
t.Error("gather failed:", err)
40+
}
41+
done()
42+
if got := mfsToString(mfs); got != "" {
43+
t.Error("unexpected metric family", got)
44+
}
45+
46+
s.Commit()
47+
mfs, done, err = c.Gather()
48+
if err != nil {
49+
t.Error("gather failed:", err)
50+
}
51+
done()
52+
if got := mfsToString(mfs); got != "name:\"a\" help:\"help a\" type:GAUGE metric:<label:<name:\"b\" value:\"valb\" > label:<name:\"c\" value:\"valc\" > gauge:<value:1 > > " {
53+
t.Error("unexpected metric family, got", got)
54+
}
55+
}
56+
57+
func mfsToString(mfs []*dto.MetricFamily) string {
58+
ret := make([]string, 0, len(mfs))
59+
for _, m := range mfs {
60+
ret = append(ret, m.String())
61+
}
62+
return strings.Join(ret, ",")
63+
}
64+
65+
type tGatherer struct {
66+
done bool
67+
err error
68+
}
69+
70+
func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
71+
name := "g1"
72+
val := 1.0
73+
return []*dto.MetricFamily{
74+
{Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}},
75+
}, func() { g.done = true }, g.err
76+
}
77+
78+
func TestNewMultiTRegistry(t *testing.T) {
79+
treg := &tGatherer{}
80+
81+
t.Run("one registry", func(t *testing.T) {
82+
m := NewMultiTRegistry(treg)
83+
ret, done, err := m.Gather()
84+
if err != nil {
85+
t.Error("gather failed:", err)
86+
}
87+
done()
88+
if len(ret) != 1 {
89+
t.Error("unexpected number of metric families, expected 1, got", ret)
90+
}
91+
if !treg.done {
92+
t.Error("inner transactional registry not marked as done")
93+
}
94+
})
95+
96+
reg := NewRegistry()
97+
if err := reg.Register(NewCounter(CounterOpts{Name: "c1", Help: "help c1"})); err != nil {
98+
t.Error("registration failed:", err)
99+
}
100+
101+
// Note on purpose two registries will have exactly same metric family name (but with different string).
102+
// This behaviour is undefined at the moment.
103+
if err := reg.Register(NewGauge(GaugeOpts{Name: "g1", Help: "help g1"})); err != nil {
104+
t.Error("registration failed:", err)
105+
}
106+
treg.done = false
107+
108+
t.Run("two registries", func(t *testing.T) {
109+
m := NewMultiTRegistry(ToTransactionalGatherer(reg), treg)
110+
ret, done, err := m.Gather()
111+
if err != nil {
112+
t.Error("gather failed:", err)
113+
}
114+
done()
115+
if len(ret) != 3 {
116+
t.Error("unexpected number of metric families, expected 3, got", ret)
117+
}
118+
if !treg.done {
119+
t.Error("inner transactional registry not marked as done")
120+
}
121+
})
122+
123+
treg.done = false
124+
// Inject error.
125+
treg.err = errors.New("test err")
126+
127+
t.Run("two registries, one with error", func(t *testing.T) {
128+
m := NewMultiTRegistry(ToTransactionalGatherer(reg), treg)
129+
ret, done, err := m.Gather()
130+
if err != treg.err {
131+
t.Error("unexpected error:", err)
132+
}
133+
done()
134+
if len(ret) != 3 {
135+
t.Error("unexpected number of metric families, expected 3, got", ret)
136+
}
137+
// Still on error, we expect done to be triggered.
138+
if !treg.done {
139+
t.Error("inner transactional registry not marked as done")
140+
}
141+
})
142+
}

0 commit comments

Comments
 (0)