Skip to content

Commit 9d5935e

Browse files
committed
Simplified API, added tests.
Signed-off-by: Bartlomiej Plotka <[email protected]>
1 parent abaeafe commit 9d5935e

File tree

2 files changed

+222
-57
lines changed

2 files changed

+222
-57
lines changed

prometheus/cache.go

Lines changed: 97 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -26,67 +26,84 @@ import (
2626
dto "github.com/prometheus/client_model/go"
2727
)
2828

29-
var _ rawCollector = &CachedCollector{}
29+
var _ TransactionalGatherer = &CachedTGatherer{}
3030

31-
// CachedCollector allows creating allocation friendly metrics which change less frequently than scrape time, yet
32-
// label values can are changing over time. This collector
31+
// CachedTGatherer is a transactional gatherer that allows maintaining set of metrics which
32+
// change less frequently than scrape time, yet label values and values change over time.
3333
//
3434
// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider
35-
// using CachedCollector instead.
36-
type CachedCollector struct {
35+
// using CachedTGatherer instead.
36+
//
37+
// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers.
38+
// TODO(bwplotka): Add non-session update API if useful for watcher-like mechanic.
39+
type CachedTGatherer struct {
3740
metrics map[uint64]*dto.Metric
3841
metricFamilyByName map[string]*dto.MetricFamily
42+
mMu sync.RWMutex
3943

4044
pendingSession bool
45+
psMu sync.Mutex
4146
}
4247

43-
func NewCachedCollector() *CachedCollector {
44-
return &CachedCollector{
48+
func NewCachedTGatherer() *CachedTGatherer {
49+
return &CachedTGatherer{
4550
metrics: make(map[uint64]*dto.Metric),
4651
metricFamilyByName: map[string]*dto.MetricFamily{},
4752
}
4853
}
4954

50-
func (c *CachedCollector) Collect() []*dto.MetricFamily {
51-
// TODO(bwplotka): Optimize potential penalty here.
52-
return internal.NormalizeMetricFamilies(c.metricFamilyByName)
55+
// Gather implements TransactionalGatherer interface.
56+
func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
57+
c.mMu.RLock()
58+
// TODO(bwplotka): Consider caching slice and normalizing on write.
59+
return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil
5360
}
5461

55-
// NewSession allows to collect all metrics in one go and update cache as much in-place
56-
// as possible to save allocations.
57-
// NOTE: Not concurrency safe and only one allowed at the time (until commit).
58-
func (c *CachedCollector) NewSession() *CollectSession {
62+
// NewSession allows to recreate state of all metrics in CachedTGatherer in
63+
// one go and update cache in-place to save allocations.
64+
// Only one session is allowed at the time.
65+
//
66+
// Session is not concurrency safe.
67+
func (c *CachedTGatherer) NewSession() (*CollectSession, error) {
68+
c.psMu.Lock()
69+
if c.pendingSession {
70+
c.psMu.Unlock()
71+
return nil, errors.New("only one session allowed, one already pending")
72+
}
5973
c.pendingSession = true
74+
c.psMu.Unlock()
75+
6076
return &CollectSession{
6177
c: c,
6278
currentMetrics: make(map[uint64]*dto.Metric, len(c.metrics)),
6379
currentByName: make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)),
64-
}
80+
}, nil
6581
}
6682

6783
type CollectSession struct {
6884
closed bool
6985

70-
c *CachedCollector
86+
c *CachedTGatherer
7187
currentMetrics map[uint64]*dto.Metric
7288
currentByName map[string]*dto.MetricFamily
7389
}
7490

91+
// MustAddMetric is an AddMetric that panics on error.
7592
func (s *CollectSession) MustAddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) {
7693
if err := s.AddMetric(fqName, help, labelNames, labelValues, valueType, value, ts); err != nil {
7794
panic(err)
7895
}
7996
}
8097

81-
// AddMetric ...
98+
// AddMetric adds metrics to current session. No changes will be updated in CachedTGatherer until Commit.
8299
// TODO(bwplotka): Add validation.
83100
func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues []string, valueType ValueType, value float64, ts *time.Time) error {
84101
if s.closed {
85102
return errors.New("new metric: collect session is closed, but was attempted to be used")
86103
}
87104

88-
// Label names can be unsorted, will be sorting them later. The only implication is cachability if
89-
// consumer provide non-deterministic order of those (unlikely since label values has to be matched).
105+
// Label names can be unsorted, we will be sorting them later. The only implication is cachability if
106+
// consumer provide non-deterministic order of those.
90107

91108
if len(labelNames) != len(labelValues) {
92109
return errors.New("new metric: label name has different len than values")
@@ -116,7 +133,8 @@ func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues
116133
h := xxhash.New()
117134
h.WriteString(fqName)
118135
h.Write(separatorByteSlice)
119-
for i := range labelNames { // Ofc not in the same order...
136+
137+
for i := range labelNames {
120138
h.WriteString(labelNames[i])
121139
h.Write(separatorByteSlice)
122140
h.WriteString(labelValues[i])
@@ -178,75 +196,96 @@ func (s *CollectSession) AddMetric(fqName, help string, labelNames, labelValues
178196
}
179197
s.currentMetrics[hSum] = m
180198

181-
// Will be sorted later.
199+
// Will be sorted later anyway, skip for now.
182200
d.Metric = append(d.Metric, m)
183201
return nil
184202
}
185203

186204
func (s *CollectSession) Commit() {
187-
// TODO(bwplotka): Sort metrics within family.
205+
s.c.mMu.Lock()
206+
// TODO(bwplotka): Sort metrics within family?
188207
s.c.metricFamilyByName = s.currentByName
189208
s.c.metrics = s.currentMetrics
209+
s.c.mMu.Unlock()
190210

211+
s.c.psMu.Lock()
191212
s.closed = true
192213
s.c.pendingSession = false
214+
s.c.psMu.Unlock()
193215
}
194216

195-
type BlockingRegistry struct {
196-
*Registry
197-
198-
// rawCollector represents special collectors which requires blocking collect for the whole duration
199-
// of returned dto.MetricFamily usage.
200-
rawCollectors []rawCollector
201-
mu sync.Mutex
202-
}
203-
204-
func NewBlockingRegistry() *BlockingRegistry {
205-
return &BlockingRegistry{
206-
Registry: NewRegistry(),
207-
}
208-
}
217+
var _ TransactionalGatherer = &MultiTRegistry{}
209218

210-
type rawCollector interface {
211-
Collect() []*dto.MetricFamily
212-
}
213-
214-
func (b *BlockingRegistry) RegisterRaw(r rawCollector) error {
215-
// TODO(bwplotka): Register, I guess for dups/check purposes?
216-
b.rawCollectors = append(b.rawCollectors, r)
217-
return nil
219+
// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple
220+
// transactional gatherers.
221+
//
222+
// It is caller responsibility to ensure two registries have mutually exclusive metric families,
223+
// no deduplication will happen.
224+
type MultiTRegistry struct {
225+
tGatherers []TransactionalGatherer
218226
}
219227

220-
func (b *BlockingRegistry) MustRegisterRaw(r rawCollector) {
221-
if err := b.RegisterRaw(r); err != nil {
222-
panic(err)
228+
// NewMultiTRegistry creates MultiTRegistry.
229+
func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry {
230+
return &MultiTRegistry{
231+
tGatherers: tGatherers,
223232
}
224233
}
225234

226-
func (b *BlockingRegistry) Gather() (_ []*dto.MetricFamily, done func(), err error) {
227-
b.mu.Lock()
228-
mfs, err := b.Registry.Gather()
235+
// Gather implements TransactionalGatherer interface.
236+
func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) {
237+
errs := MultiError{}
229238

230-
// TODO(bwplotka): Returned mfs are sorted, so sort raw ones and inject?
239+
dFns := make([]func(), 0, len(r.tGatherers))
231240
// TODO(bwplotka): Implement concurrency for those?
232-
for _, r := range b.rawCollectors {
233-
// TODO(bwplotka): Check for duplicates.
234-
mfs = append(mfs, r.Collect()...)
241+
for _, g := range r.tGatherers {
242+
// TODO(bwplotka): Check for duplicates?
243+
m, d, err := g.Gather()
244+
errs.Append(err)
245+
246+
mfs = append(mfs, m...)
247+
dFns = append(dFns, d)
235248
}
236249

237250
// TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already.
238251
sort.Slice(mfs, func(i, j int) bool {
239252
return *mfs[i].Name < *mfs[j].Name
240253
})
241-
return mfs, func() { b.mu.Unlock() }, err
254+
return mfs, func() {
255+
for _, d := range dFns {
256+
d()
257+
}
258+
}, errs.MaybeUnwrap()
242259
}
243260

244-
// TransactionalGatherer ...
261+
// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory
262+
// used by metric family is no longer used by a caller. This allows implementations with cache.
245263
type TransactionalGatherer interface {
246-
// Gather ...
264+
// Gather returns metrics in a lexicographically sorted slice
265+
// of uniquely named MetricFamily protobufs. Gather ensures that the
266+
// returned slice is valid and self-consistent so that it can be used
267+
// for valid exposition. As an exception to the strict consistency
268+
// requirements described for metric.Desc, Gather will tolerate
269+
// different sets of label names for metrics of the same metric family.
270+
//
271+
// Even if an error occurs, Gather attempts to gather as many metrics as
272+
// possible. Hence, if a non-nil error is returned, the returned
273+
// MetricFamily slice could be nil (in case of a fatal error that
274+
// prevented any meaningful metric collection) or contain a number of
275+
// MetricFamily protobufs, some of which might be incomplete, and some
276+
// might be missing altogether. The returned error (which might be a
277+
// MultiError) explains the details. Note that this is mostly useful for
278+
// debugging purposes. If the gathered protobufs are to be used for
279+
// exposition in actual monitoring, it is almost always better to not
280+
// expose an incomplete result and instead disregard the returned
281+
// MetricFamily protobufs in case the returned error is non-nil.
282+
//
283+
// Important: done is expected to be triggered (even if the error occurs!)
284+
// once caller does not need returned slice of dto.MetricFamily.
247285
Gather() (_ []*dto.MetricFamily, done func(), err error)
248286
}
249287

288+
// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function.
250289
func ToTransactionalGatherer(g Gatherer) TransactionalGatherer {
251290
return &noTransactionGatherer{g: g}
252291
}
@@ -255,6 +294,7 @@ type noTransactionGatherer struct {
255294
g Gatherer
256295
}
257296

297+
// Gather implements TransactionalGatherer interface.
258298
func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
259299
mfs, err := g.g.Gather()
260300
return mfs, func() {}, err

prometheus/cache_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package prometheus
2+
3+
import (
4+
"errors"
5+
"strings"
6+
"testing"
7+
"time"
8+
9+
dto "github.com/prometheus/client_model/go"
10+
)
11+
12+
func TestCachedTGatherer(t *testing.T) {
13+
c := NewCachedTGatherer()
14+
mfs, done, err := c.Gather()
15+
if err != nil {
16+
t.Error("gather failed:", err)
17+
}
18+
done()
19+
if got := mfsToString(mfs); got != "" {
20+
t.Error("unexpected metric family", got)
21+
}
22+
23+
s, err := c.NewSession()
24+
if err != nil {
25+
t.Error("session failed:", err)
26+
}
27+
28+
_, err = c.NewSession()
29+
if err == nil {
30+
t.Error("second session expected to fail, got nil")
31+
}
32+
33+
// WIP.
34+
time.Parse()
35+
36+
s.AddMetric("a", "help a", []string{"b", "c"}, []string{"valb", "valc"}, GaugeValue, 1)
37+
38+
}
39+
40+
func mfsToString(mfs []*dto.MetricFamily) string {
41+
ret := make([]string, 0, len(mfs))
42+
for _, m := range mfs {
43+
ret = append(ret, m.String())
44+
}
45+
return strings.Join(ret, ",")
46+
}
47+
48+
type tGatherer struct {
49+
done bool
50+
err error
51+
}
52+
53+
func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) {
54+
name := "g1"
55+
val := 1.0
56+
return []*dto.MetricFamily{
57+
{Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}},
58+
}, func() { g.done = true }, g.err
59+
}
60+
61+
func TestNewMultiTRegistry(t *testing.T) {
62+
treg := &tGatherer{}
63+
64+
t.Run("one registry", func(t *testing.T) {
65+
m := NewMultiTRegistry(treg)
66+
ret, done, err := m.Gather()
67+
if err != nil {
68+
t.Error("gather failed:", err)
69+
}
70+
done()
71+
if len(ret) != 1 {
72+
t.Error("unexpected number of metric families, expected 1, got", ret)
73+
}
74+
if !treg.done {
75+
t.Error("inner transactional registry not marked as done")
76+
}
77+
})
78+
79+
reg := NewRegistry()
80+
if err := reg.Register(NewCounter(CounterOpts{Name: "c1", Help: "help c1"})); err != nil {
81+
t.Error("registration failed:", err)
82+
}
83+
84+
// Note on purpose two registries will have exactly same metric family name (but with different string).
85+
// This behaviour is undefined at the moment.
86+
if err := reg.Register(NewGauge(GaugeOpts{Name: "g1", Help: "help g1"})); err != nil {
87+
t.Error("registration failed:", err)
88+
}
89+
treg.done = false
90+
91+
t.Run("two registries", func(t *testing.T) {
92+
m := NewMultiTRegistry(ToTransactionalGatherer(reg), treg)
93+
ret, done, err := m.Gather()
94+
if err != nil {
95+
t.Error("gather failed:", err)
96+
}
97+
done()
98+
if len(ret) != 3 {
99+
t.Error("unexpected number of metric families, expected 3, got", ret)
100+
}
101+
if !treg.done {
102+
t.Error("inner transactional registry not marked as done")
103+
}
104+
})
105+
106+
treg.done = false
107+
// Inject error.
108+
treg.err = errors.New("test err")
109+
110+
t.Run("two registries, one with error", func(t *testing.T) {
111+
m := NewMultiTRegistry(ToTransactionalGatherer(reg), treg)
112+
ret, done, err := m.Gather()
113+
if err != treg.err {
114+
t.Error("unexpected error:", err)
115+
}
116+
done()
117+
if len(ret) != 3 {
118+
t.Error("unexpected number of metric families, expected 3, got", ret)
119+
}
120+
// Still on error, we expect done to be triggered.
121+
if !treg.done {
122+
t.Error("inner transactional registry not marked as done")
123+
}
124+
})
125+
}

0 commit comments

Comments
 (0)