Skip to content

Commit f63e219

Browse files
authored
Make the Go 1.17 collector thread-safe (#969)
1 parent 0108796 commit f63e219

File tree

3 files changed

+55
-7
lines changed

3 files changed

+55
-7
lines changed

prometheus/collector.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,3 +118,11 @@ func (c *selfCollector) Describe(ch chan<- *Desc) {
118118
func (c *selfCollector) Collect(ch chan<- Metric) {
119119
ch <- c.self
120120
}
121+
122+
// collectorMetric is a metric that is also a collector.
123+
// Because of selfCollector, most (if not all) Metrics in
124+
// this package are also collectors.
125+
type collectorMetric interface {
126+
Metric
127+
Collector
128+
}

prometheus/go_collector_go117.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ type goCollector struct {
3232
base baseGoCollector
3333

3434
// rm... fields all pertain to the runtime/metrics package.
35+
rmSampleMu sync.Mutex
3536
rmSampleBuf []metrics.Sample
3637
rmSampleMap map[string]*metrics.Sample
37-
rmMetrics []Metric
38+
rmMetrics []collectorMetric
3839

3940
// With Go 1.17, the runtime/metrics package was introduced.
4041
// From that point on, metric names produced by the runtime/metrics
@@ -58,7 +59,7 @@ func NewGoCollector() Collector {
5859
}
5960

6061
// Generate a Desc and ValueType for each runtime/metrics metric.
61-
metricSet := make([]Metric, 0, len(descriptions))
62+
metricSet := make([]collectorMetric, 0, len(descriptions))
6263
sampleBuf := make([]metrics.Sample, 0, len(descriptions))
6364
sampleMap := make(map[string]*metrics.Sample, len(descriptions))
6465
for i := range descriptions {
@@ -76,7 +77,7 @@ func NewGoCollector() Collector {
7677
sampleBuf = append(sampleBuf, metrics.Sample{Name: d.Name})
7778
sampleMap[d.Name] = &sampleBuf[len(sampleBuf)-1]
7879

79-
var m Metric
80+
var m collectorMetric
8081
if d.Kind == metrics.KindFloat64Histogram {
8182
_, hasSum := rmExactSumMap[d.Name]
8283
m = newBatchHistogram(
@@ -130,9 +131,19 @@ func (c *goCollector) Collect(ch chan<- Metric) {
130131
// Collect base non-memory metrics.
131132
c.base.Collect(ch)
132133

134+
// Collect must be thread-safe, so prevent concurrent use of
135+
// rmSampleBuf. Just read into rmSampleBuf but write all the data
136+
// we get into our Metrics or MemStats.
137+
//
138+
// Note that we cannot simply read and then clone rmSampleBuf
139+
// because we'd need to perform a deep clone of it, which is likely
140+
// not worth it.
141+
c.rmSampleMu.Lock()
142+
133143
// Populate runtime/metrics sample buffer.
134144
metrics.Read(c.rmSampleBuf)
135145

146+
// Update all our metrics from rmSampleBuf.
136147
for i, sample := range c.rmSampleBuf {
137148
// N.B. switch on concrete type because it's significantly more efficient
138149
// than checking for the Counter and Gauge interface implementations. In
@@ -146,22 +157,29 @@ func (c *goCollector) Collect(ch chan<- Metric) {
146157
if v1 > v0 {
147158
m.Add(unwrapScalarRMValue(sample.Value) - m.get())
148159
}
149-
m.Collect(ch)
150160
case *gauge:
151161
m.Set(unwrapScalarRMValue(sample.Value))
152-
m.Collect(ch)
153162
case *batchHistogram:
154163
m.update(sample.Value.Float64Histogram(), c.exactSumFor(sample.Name))
155-
m.Collect(ch)
156164
default:
157165
panic("unexpected metric type")
158166
}
159167
}
160-
161168
// ms is a dummy MemStats that we populate ourselves so that we can
162169
// populate the old metrics from it.
163170
var ms runtime.MemStats
164171
memStatsFromRM(&ms, c.rmSampleMap)
172+
173+
c.rmSampleMu.Unlock()
174+
175+
// Export all the metrics to ch.
176+
// At this point we must not access rmSampleBuf or rmSampleMap, because
177+
// a concurrent caller could use it. It's safe to Collect all our Metrics,
178+
// however, because they're updated in a thread-safe way while MemStats
179+
// is local to this call of Collect.
180+
for _, m := range c.rmMetrics {
181+
m.Collect(ch)
182+
}
165183
for _, i := range c.msMetrics {
166184
ch <- MustNewConstMetric(i.desc, i.valType, i.eval(&ms))
167185
}

prometheus/go_collector_go117_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,25 @@ func TestExpectedRuntimeMetrics(t *testing.T) {
280280
t.Log("where X is the Go version you are currently using")
281281
}
282282
}
283+
284+
func TestGoCollectorConcurrency(t *testing.T) {
285+
c := NewGoCollector().(*goCollector)
286+
287+
// Set up multiple goroutines to Collect from the
288+
// same GoCollector. In race mode with GOMAXPROCS > 1,
289+
// this test should fail often if Collect is not
290+
// concurrent-safe.
291+
for i := 0; i < 4; i++ {
292+
go func() {
293+
ch := make(chan Metric)
294+
go func() {
295+
// Drain all metrics recieved until the
296+
// channel is closed.
297+
for range ch {
298+
}
299+
}()
300+
c.Collect(ch)
301+
close(ch)
302+
}()
303+
}
304+
}

0 commit comments

Comments
 (0)