From a1c9be45cf9add3d346624ab04a28bf69a1e0efd Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Sat, 23 Oct 2021 13:58:26 +0200 Subject: [PATCH 01/12] Added cached collector. Signed-off-by: Bartlomiej Plotka update. Signed-off-by: Bartlomiej Plotka Attempt 2 Signed-off-by: Bartlomiej Plotka Added blocking registry, with raw collector and transactional handler. Signed-off-by: Bartlomiej Plotka Added fast path to normal (empty) registry to save 8 allocs and 3K5B per Gather. Signed-off-by: Bartlomiej Plotka Simplified API, added tests. Signed-off-by: Bartlomiej Plotka Fix. Signed-off-by: Bartlomiej Plotka Simplified implementation. Signed-off-by: Bartlomiej Plotka Added benchmark. Signed-off-by: Bartlomiej Plotka Optimized. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 261 ++++++++++++++++++++++++++++++ prometheus/cache/cache_test.go | 275 ++++++++++++++++++++++++++++++++ prometheus/desc.go | 4 +- prometheus/internal/metric.go | 16 ++ prometheus/metric.go | 16 -- prometheus/promhttp/http.go | 8 +- prometheus/registry.go | 101 +++++++++++- prometheus/registry_test.go | 80 ++++++++++ prometheus/testutil/testutil.go | 11 +- prometheus/value.go | 44 +++-- prometheus/wrap.go | 3 +- 11 files changed, 785 insertions(+), 34 deletions(-) create mode 100644 prometheus/cache/cache.go create mode 100644 prometheus/cache/cache_test.go diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go new file mode 100644 index 000000000..db0df82c4 --- /dev/null +++ b/prometheus/cache/cache.go @@ -0,0 +1,261 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + + //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. + "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" + dto "github.com/prometheus/client_model/go" +) + +var _ prometheus.TransactionalGatherer = &CachedTGatherer{} + +var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with xxhash. + +// CachedTGatherer is a transactional gatherer that allows maintaining a set of metrics which +// change less frequently than scrape time, yet label values and values change over time. +// +// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider +// using CachedTGatherer instead. +// +// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. +// NOTE(bwplotka): Experimental, API and behaviour can change. +type CachedTGatherer struct { + metrics map[uint64]*dto.Metric + metricFamilyByName map[string]*dto.MetricFamily + mMu sync.RWMutex +} + +func NewCachedTGatherer() *CachedTGatherer { + return &CachedTGatherer{ + metrics: make(map[uint64]*dto.Metric), + metricFamilyByName: map[string]*dto.MetricFamily{}, + } +} + +// Gather implements TransactionalGatherer interface. +func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + c.mMu.RLock() + + // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families + // this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now. + return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil +} + +type Key struct { + FQName string // __name__ + + // Label names can be unsorted, we will be sorting them later. The only implication is cachability if + // consumer provide non-deterministic order of those. + LabelNames []string + LabelValues []string +} + +func (k Key) isValid() error { + if k.FQName == "" { + return errors.New("FQName cannot be empty") + } + if len(k.LabelNames) != len(k.LabelValues) { + return errors.New("new metric: label name has different length than values") + } + + return nil +} + +// hash returns unique hash for this key. +func (k Key) hash() uint64 { + h := xxhash.New() + h.WriteString(k.FQName) + h.Write(separatorByteSlice) + + for i := range k.LabelNames { + h.WriteString(k.LabelNames[i]) + h.Write(separatorByteSlice) + h.WriteString(k.LabelValues[i]) + h.Write(separatorByteSlice) + } + return h.Sum64() +} + +// Insert represents record to set in cache. +type Insert struct { + Key + + Help string + ValueType prometheus.ValueType + Value float64 + + // Timestamp is optional. Pass nil for no explicit timestamp. + Timestamp *time.Time +} + +// Update goes through inserts and deletions and updates current cache in concurrency safe manner. +// If reset is set to true, all inserts and deletions are working on empty cache. In such case +// this implementation tries to reuse memory from existing cached item when possible. +// +// Update reuses insert struct memory, so after use, Insert slice and its elements cannot be reused +// outside of this method. +// TODO(bwplotka): Lack of copying can pose memory safety problems if insert variables are reused. Consider copying if value +// is different. Yet it gives significant allocation gains. +func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) error { + c.mMu.Lock() + defer c.mMu.Unlock() + + currMetrics := c.metrics + currMetricFamilies := c.metricFamilyByName + if reset { + currMetrics = make(map[uint64]*dto.Metric, len(c.metrics)) + currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)) + } + + errs := prometheus.MultiError{} + for i := range inserts { + // TODO(bwplotka): Validate more about this insert? + if err := inserts[i].isValid(); err != nil { + errs.Append(err) + continue + } + + // Update metric family. + mf, ok := c.metricFamilyByName[inserts[i].FQName] + if !ok { + mf = &dto.MetricFamily{} + mf.Name = &inserts[i].FQName + } else if reset { + // Reset metric slice, since we want to start from scratch. + mf.Metric = mf.Metric[:0] + } + mf.Type = inserts[i].ValueType.ToDTO() + mf.Help = &inserts[i].Help + + currMetricFamilies[inserts[i].FQName] = mf + + // Update metric pointer. + hSum := inserts[i].hash() + m, ok := c.metrics[hSum] + if !ok { + m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))} + for j := range inserts[i].LabelNames { + m.Label = append(m.Label, &dto.LabelPair{ + Name: &inserts[i].LabelNames[j], + Value: &inserts[i].LabelValues[j], + }) + } + sort.Sort(internal.LabelPairSorter(m.Label)) + } + + switch inserts[i].ValueType { + case prometheus.CounterValue: + v := m.Counter + if v == nil { + v = &dto.Counter{} + } + v.Value = &inserts[i].Value + m.Counter = v + m.Gauge = nil + m.Untyped = nil + case prometheus.GaugeValue: + v := m.Gauge + if v == nil { + v = &dto.Gauge{} + } + v.Value = &inserts[i].Value + m.Counter = nil + m.Gauge = v + m.Untyped = nil + case prometheus.UntypedValue: + v := m.Untyped + if v == nil { + v = &dto.Untyped{} + } + v.Value = &inserts[i].Value + m.Counter = nil + m.Gauge = nil + m.Untyped = v + default: + return fmt.Errorf("unsupported value type %v", inserts[i].ValueType) + } + + m.TimestampMs = nil + if inserts[i].Timestamp != nil { + m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) + } + currMetrics[hSum] = m + + if !reset && ok { + // If we did update without reset and we found metric in previous + // map, we know metric pointer exists in metric family map, so just continue. + continue + } + + // Will be sorted later anyway, so just append. + mf.Metric = append(mf.Metric, m) + } + + for _, del := range deletions { + if err := del.isValid(); err != nil { + errs.Append(err) + continue + } + + hSum := del.hash() + m, ok := currMetrics[hSum] + if !ok { + continue + } + delete(currMetrics, hSum) + + mf, ok := currMetricFamilies[del.FQName] + if !ok { + // Impossible, but well... + errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues)) + continue + } + + toDel := -1 + for i := range mf.Metric { + if mf.Metric[i] == m { + toDel = i + break + } + } + + if toDel == -1 { + errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues)) + continue + } + + if len(mf.Metric) == 1 { + delete(currMetricFamilies, del.FQName) + continue + } + + mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...) + } + + c.metrics = currMetrics + c.metricFamilyByName = currMetricFamilies + return errs.MaybeUnwrap() +} diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go new file mode 100644 index 000000000..f5281172e --- /dev/null +++ b/prometheus/cache/cache_test.go @@ -0,0 +1,275 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func TestCachedTGatherer(t *testing.T) { + c := NewCachedTGatherer() + mfs, done, err := c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if got := mfsToString(mfs); got != "" { + t.Error("unexpected metric family", got) + } + + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help b", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + }, []Key{ + {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. + }); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + const expected = "name:\"a\" help:\"help a2\" type:COUNTER metric: label: " + + "gauge: > metric: label: counter: > ,name:\"b\" help:\"help b\" " + + "type:GAUGE metric: label: gauge: > " + if got := mfsToString(mfs); got != expected { + t.Error("unexpected metric family, got", got) + } + + // Update with exactly same insertion should have the same effect. + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help b", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + }, []Key{ + {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. + }); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != expected { + t.Error("unexpected metric family, got", got) + } + + // Update one element. + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a12321", + ValueType: prometheus.CounterValue, + Value: 9999, + }, + }, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != "name:\"a\" help:\"help a12321\" type:COUNTER metric: label:"+ + " counter: > metric: label: counter: > ,name:\"b\" help:\"help b\" "+ + "type:GAUGE metric: label: gauge: > " { + t.Error("unexpected metric family, got", got) + } + + // Rebuild cache and insert only 2 elements. + if err := c.Update(true, []Insert{ + { + Key: Key{FQName: "ax", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help ax", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "bx", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help bx", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + }, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != "name:\"ax\" help:\"help ax\" type:GAUGE metric: label:"+ + " gauge: > ,name:\"bx\" help:\"help bx\" type:GAUGE metric: label: gauge: > " { + t.Error("unexpected metric family, got", got) + } + + if err := c.Update(true, nil, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if got := mfsToString(mfs); got != "" { + t.Error("unexpected metric family", got) + } +} + +func mfsToString(mfs []*dto.MetricFamily) string { + ret := make([]string, 0, len(mfs)) + for _, m := range mfs { + ret = append(ret, m.String()) + } + return strings.Join(ret, ",") +} + +// export var=v1 && go test -count 5 -benchtime 100x -run '^$' -bench . -memprofile=${var}.mem.pprof -cpuprofile=${var}.cpu.pprof > ${var}.txt +func BenchmarkCachedTGatherer_Update(b *testing.B) { + c := NewCachedTGatherer() + + // Generate larger metric payload. + inserts := make([]Insert, 0, 1e6) + + // 1000 metrics in 1000 families. + for i := 0; i < 1e3; i++ { + for j := 0; j < 1e3; j++ { + inserts = append(inserts, Insert{ + Key: Key{ + FQName: fmt.Sprintf("realistic_longer_name_%d", i), + LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, + LabelValues: []string{"realistic_label_value1", "realistic_label_value2", fmt.Sprintf("realistic_label_value3_%d", j)}}, + Help: "help string is usually quite large, so let's make it a bit realistic.", + ValueType: prometheus.GaugeValue, + Value: float64(j), + }) + } + } + + if err := c.Update(false, inserts, nil); err != nil { + b.Error("update:", err) + } + + if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 { + // Ensure we did not generate duplicates. + panic("generated data set gave wrong numbers") + } + + b.Run("Update of one element without reset", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := c.Update(false, []Insert{ + { + Key: Key{ + FQName: "realistic_longer_name_334", + LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, + LabelValues: []string{"realistic_label_value1", "realistic_label_value2", "realistic_label_value3_2345"}}, + Help: "CUSTOM help string is usually quite large, so let's make it a bit realistic.", + ValueType: prometheus.CounterValue, + Value: 1929495, + }, + }, nil); err != nil { + b.Error("update:", err) + } + } + }) + + b.Run("Update of all elements with reset", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := c.Update(true, inserts, nil); err != nil { + b.Error("update:", err) + } + } + }) + + b.Run("Gather", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mfs, done, err := c.Gather() + done() + if err != nil { + b.Error("update:", err) + } + testMfs = mfs + } + }) +} + +var testMfs []*dto.MetricFamily diff --git a/prometheus/desc.go b/prometheus/desc.go index 4bb816ab7..ee81107c8 100644 --- a/prometheus/desc.go +++ b/prometheus/desc.go @@ -20,6 +20,8 @@ import ( "strings" "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus/internal" + //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" "github.com/prometheus/common/model" @@ -154,7 +156,7 @@ func NewDesc(fqName, help string, variableLabels []string, constLabels Labels) * Value: proto.String(v), }) } - sort.Sort(labelPairSorter(d.constLabelPairs)) + sort.Sort(internal.LabelPairSorter(d.constLabelPairs)) return d } diff --git a/prometheus/internal/metric.go b/prometheus/internal/metric.go index 351c26e1a..089ab5b97 100644 --- a/prometheus/internal/metric.go +++ b/prometheus/internal/metric.go @@ -19,6 +19,22 @@ import ( dto "github.com/prometheus/client_model/go" ) +// LabelPairSorter implements sort.Interface. It is used to sort a slice of +// dto.LabelPair pointers. +type LabelPairSorter []*dto.LabelPair + +func (s LabelPairSorter) Len() int { + return len(s) +} + +func (s LabelPairSorter) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +func (s LabelPairSorter) Less(i, j int) bool { + return s[i].GetName() < s[j].GetName() +} + // metricSorter is a sortable slice of *dto.Metric. type metricSorter []*dto.Metric diff --git a/prometheus/metric.go b/prometheus/metric.go index dc121910a..118a54e84 100644 --- a/prometheus/metric.go +++ b/prometheus/metric.go @@ -115,22 +115,6 @@ func BuildFQName(namespace, subsystem, name string) string { return name } -// labelPairSorter implements sort.Interface. It is used to sort a slice of -// dto.LabelPair pointers. -type labelPairSorter []*dto.LabelPair - -func (s labelPairSorter) Len() int { - return len(s) -} - -func (s labelPairSorter) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - -func (s labelPairSorter) Less(i, j int) bool { - return s[i].GetName() < s[j].GetName() -} - type invalidMetric struct { desc *Desc err error diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index d86d0cf4b..b463a747f 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -84,6 +84,10 @@ func Handler() http.Handler { // instrumentation. Use the InstrumentMetricHandler function to apply the same // kind of instrumentation as it is used by the Handler function. func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { + return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts) +} + +func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler { var ( inFlightSem chan struct{} errCnt = prometheus.NewCounterVec( @@ -113,6 +117,7 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if inFlightSem != nil { + // TODO(bwplotka): Implement single-flight which is essential for blocking TransactionalGatherer. select { case inFlightSem <- struct{}{}: // All good, carry on. defer func() { <-inFlightSem }() @@ -123,7 +128,8 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return } } - mfs, err := reg.Gather() + mfs, done, err := reg.Gather() + defer done() if err != nil { if opts.ErrorLog != nil { opts.ErrorLog.Println("error gathering metrics:", err) diff --git a/prometheus/registry.go b/prometheus/registry.go index 383a7f594..5046f7e2f 100644 --- a/prometheus/registry.go +++ b/prometheus/registry.go @@ -407,6 +407,14 @@ func (r *Registry) MustRegister(cs ...Collector) { // Gather implements Gatherer. func (r *Registry) Gather() ([]*dto.MetricFamily, error) { + r.mtx.RLock() + + if len(r.collectorsByID) == 0 && len(r.uncheckedCollectors) == 0 { + // Fast path. + r.mtx.RUnlock() + return nil, nil + } + var ( checkedMetricChan = make(chan Metric, capMetricChan) uncheckedMetricChan = make(chan Metric, capMetricChan) @@ -416,7 +424,6 @@ func (r *Registry) Gather() ([]*dto.MetricFamily, error) { registeredDescIDs map[uint64]struct{} // Only used for pedantic checks ) - r.mtx.RLock() goroutineBudget := len(r.collectorsByID) + len(r.uncheckedCollectors) metricFamiliesByName := make(map[string]*dto.MetricFamily, len(r.dimHashesByName)) checkedCollectors := make(chan Collector, len(r.collectorsByID)) @@ -884,11 +891,11 @@ func checkMetricConsistency( h.Write(separatorByteSlice) // Make sure label pairs are sorted. We depend on it for the consistency // check. - if !sort.IsSorted(labelPairSorter(dtoMetric.Label)) { + if !sort.IsSorted(internal.LabelPairSorter(dtoMetric.Label)) { // We cannot sort dtoMetric.Label in place as it is immutable by contract. copiedLabels := make([]*dto.LabelPair, len(dtoMetric.Label)) copy(copiedLabels, dtoMetric.Label) - sort.Sort(labelPairSorter(copiedLabels)) + sort.Sort(internal.LabelPairSorter(copiedLabels)) dtoMetric.Label = copiedLabels } for _, lp := range dtoMetric.Label { @@ -935,7 +942,7 @@ func checkDescConsistency( metricFamily.GetName(), dtoMetric, desc, ) } - sort.Sort(labelPairSorter(lpsFromDesc)) + sort.Sort(internal.LabelPairSorter(lpsFromDesc)) for i, lpFromDesc := range lpsFromDesc { lpFromMetric := dtoMetric.Label[i] if lpFromDesc.GetName() != lpFromMetric.GetName() || @@ -948,3 +955,89 @@ func checkDescConsistency( } return nil } + +var _ TransactionalGatherer = &MultiTRegistry{} + +// MultiTRegistry is a TransactionalGatherer that joins gathered metrics from multiple +// transactional gatherers. +// +// It is caller responsibility to ensure two registries have mutually exclusive metric families, +// no deduplication will happen. +type MultiTRegistry struct { + tGatherers []TransactionalGatherer +} + +// NewMultiTRegistry creates MultiTRegistry. +func NewMultiTRegistry(tGatherers ...TransactionalGatherer) *MultiTRegistry { + return &MultiTRegistry{ + tGatherers: tGatherers, + } +} + +// Gather implements TransactionalGatherer interface. +func (r *MultiTRegistry) Gather() (mfs []*dto.MetricFamily, done func(), err error) { + errs := MultiError{} + + dFns := make([]func(), 0, len(r.tGatherers)) + // TODO(bwplotka): Implement concurrency for those? + for _, g := range r.tGatherers { + // TODO(bwplotka): Check for duplicates? + m, d, err := g.Gather() + errs.Append(err) + + mfs = append(mfs, m...) + dFns = append(dFns, d) + } + + // TODO(bwplotka): Consider sort in place, given metric family in gather is sorted already. + sort.Slice(mfs, func(i, j int) bool { + return *mfs[i].Name < *mfs[j].Name + }) + return mfs, func() { + for _, d := range dFns { + d() + } + }, errs.MaybeUnwrap() +} + +// TransactionalGatherer represents transactional gatherer that can be triggered to notify gatherer that memory +// used by metric family is no longer used by a caller. This allows implementations with cache. +type TransactionalGatherer interface { + // Gather returns metrics in a lexicographically sorted slice + // of uniquely named MetricFamily protobufs. Gather ensures that the + // returned slice is valid and self-consistent so that it can be used + // for valid exposition. As an exception to the strict consistency + // requirements described for metric.Desc, Gather will tolerate + // different sets of label names for metrics of the same metric family. + // + // Even if an error occurs, Gather attempts to gather as many metrics as + // possible. Hence, if a non-nil error is returned, the returned + // MetricFamily slice could be nil (in case of a fatal error that + // prevented any meaningful metric collection) or contain a number of + // MetricFamily protobufs, some of which might be incomplete, and some + // might be missing altogether. The returned error (which might be a + // MultiError) explains the details. Note that this is mostly useful for + // debugging purposes. If the gathered protobufs are to be used for + // exposition in actual monitoring, it is almost always better to not + // expose an incomplete result and instead disregard the returned + // MetricFamily protobufs in case the returned error is non-nil. + // + // Important: done is expected to be triggered (even if the error occurs!) + // once caller does not need returned slice of dto.MetricFamily. + Gather() (_ []*dto.MetricFamily, done func(), err error) +} + +// ToTransactionalGatherer transforms Gatherer to transactional one with noop as done function. +func ToTransactionalGatherer(g Gatherer) TransactionalGatherer { + return &noTransactionGatherer{g: g} +} + +type noTransactionGatherer struct { + g Gatherer +} + +// Gather implements TransactionalGatherer interface. +func (g *noTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + mfs, err := g.g.Gather() + return mfs, func() {}, err +} diff --git a/prometheus/registry_test.go b/prometheus/registry_test.go index 0ff7a644a..7a959da47 100644 --- a/prometheus/registry_test.go +++ b/prometheus/registry_test.go @@ -21,6 +21,7 @@ package prometheus_test import ( "bytes" + "errors" "fmt" "io/ioutil" "math/rand" @@ -1175,3 +1176,82 @@ func TestAlreadyRegisteredCollision(t *testing.T) { } } } + +type tGatherer struct { + done bool + err error +} + +func (g *tGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + name := "g1" + val := 1.0 + return []*dto.MetricFamily{ + {Name: &name, Metric: []*dto.Metric{{Gauge: &dto.Gauge{Value: &val}}}}, + }, func() { g.done = true }, g.err +} + +func TestNewMultiTRegistry(t *testing.T) { + treg := &tGatherer{} + + t.Run("one registry", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(treg) + ret, done, err := m.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if len(ret) != 1 { + t.Error("unexpected number of metric families, expected 1, got", ret) + } + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) + + reg := prometheus.NewRegistry() + if err := reg.Register(prometheus.NewCounter(prometheus.CounterOpts{Name: "c1", Help: "help c1"})); err != nil { + t.Error("registration failed:", err) + } + + // Note on purpose two registries will have exactly same metric family name (but with different string). + // This behaviour is undefined at the moment. + if err := reg.Register(prometheus.NewGauge(prometheus.GaugeOpts{Name: "g1", Help: "help g1"})); err != nil { + t.Error("registration failed:", err) + } + treg.done = false + + t.Run("two registries", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg) + ret, done, err := m.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if len(ret) != 3 { + t.Error("unexpected number of metric families, expected 3, got", ret) + } + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) + + treg.done = false + // Inject error. + treg.err = errors.New("test err") + + t.Run("two registries, one with error", func(t *testing.T) { + m := prometheus.NewMultiTRegistry(prometheus.ToTransactionalGatherer(reg), treg) + ret, done, err := m.Gather() + if err != treg.err { + t.Error("unexpected error:", err) + } + done() + if len(ret) != 3 { + t.Error("unexpected number of metric families, expected 3, got", ret) + } + // Still on error, we expect done to be triggered. + if !treg.done { + t.Error("inner transactional registry not marked as done") + } + }) +} diff --git a/prometheus/testutil/testutil.go b/prometheus/testutil/testutil.go index 9af60ce1d..bf95beaf7 100644 --- a/prometheus/testutil/testutil.go +++ b/prometheus/testutil/testutil.go @@ -167,7 +167,16 @@ func CollectAndCompare(c prometheus.Collector, expected io.Reader, metricNames . // exposition format. If any metricNames are provided, only metrics with those // names are compared. func GatherAndCompare(g prometheus.Gatherer, expected io.Reader, metricNames ...string) error { - got, err := g.Gather() + return TransactionalGatherAndCompare(prometheus.ToTransactionalGatherer(g), expected, metricNames...) +} + +// TransactionalGatherAndCompare gathers all metrics from the provided Gatherer and compares +// it to an expected output read from the provided Reader in the Prometheus text +// exposition format. If any metricNames are provided, only metrics with those +// names are compared. +func TransactionalGatherAndCompare(g prometheus.TransactionalGatherer, expected io.Reader, metricNames ...string) error { + got, done, err := g.Gather() + defer done() if err != nil { return fmt.Errorf("gathering metrics failed: %s", err) } diff --git a/prometheus/value.go b/prometheus/value.go index b4e0ae11c..9f106952d 100644 --- a/prometheus/value.go +++ b/prometheus/value.go @@ -21,6 +21,7 @@ import ( //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" "google.golang.org/protobuf/types/known/timestamppb" dto "github.com/prometheus/client_model/go" @@ -38,6 +39,23 @@ const ( UntypedValue ) +var ( + CounterMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_COUNTER; return &d }() + GaugeMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_GAUGE; return &d }() + UntypedMetricTypePtr = func() *dto.MetricType { d := dto.MetricType_UNTYPED; return &d }() +) + +func (v ValueType) ToDTO() *dto.MetricType { + switch v { + case CounterValue: + return CounterMetricTypePtr + case GaugeValue: + return GaugeMetricTypePtr + default: + return UntypedMetricTypePtr + } +} + // valueFunc is a generic metric for simple values retrieved on collect time // from a function. It implements Metric and Collector. Its effective type is // determined by ValueType. This is a low-level building block used by the @@ -91,11 +109,15 @@ func NewConstMetric(desc *Desc, valueType ValueType, value float64, labelValues if err := validateLabelValues(labelValues, len(desc.variableLabels)); err != nil { return nil, err } + + metric := &dto.Metric{} + if err := populateMetric(valueType, value, MakeLabelPairs(desc, labelValues), nil, metric); err != nil { + return nil, err + } + return &constMetric{ - desc: desc, - valType: valueType, - val: value, - labelPairs: MakeLabelPairs(desc, labelValues), + desc: desc, + metric: metric, }, nil } @@ -110,10 +132,8 @@ func MustNewConstMetric(desc *Desc, valueType ValueType, value float64, labelVal } type constMetric struct { - desc *Desc - valType ValueType - val float64 - labelPairs []*dto.LabelPair + desc *Desc + metric *dto.Metric } func (m *constMetric) Desc() *Desc { @@ -121,7 +141,11 @@ func (m *constMetric) Desc() *Desc { } func (m *constMetric) Write(out *dto.Metric) error { - return populateMetric(m.valType, m.val, m.labelPairs, nil, out) + out.Label = m.metric.Label + out.Counter = m.metric.Counter + out.Gauge = m.metric.Gauge + out.Untyped = m.metric.Untyped + return nil } func populateMetric( @@ -170,7 +194,7 @@ func MakeLabelPairs(desc *Desc, labelValues []string) []*dto.LabelPair { }) } labelPairs = append(labelPairs, desc.constLabelPairs...) - sort.Sort(labelPairSorter(labelPairs)) + sort.Sort(internal.LabelPairSorter(labelPairs)) return labelPairs } diff --git a/prometheus/wrap.go b/prometheus/wrap.go index 74ee93280..c29f94b72 100644 --- a/prometheus/wrap.go +++ b/prometheus/wrap.go @@ -20,6 +20,7 @@ import ( //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" dto "github.com/prometheus/client_model/go" ) @@ -182,7 +183,7 @@ func (m *wrappingMetric) Write(out *dto.Metric) error { Value: proto.String(lv), }) } - sort.Sort(labelPairSorter(out.Label)) + sort.Sort(internal.LabelPairSorter(out.Label)) return nil } From 2fcaf51be9a12b4b95413b6b3e0c13fabfaaf73f Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 26 Jan 2022 00:27:38 +0100 Subject: [PATCH 02/12] Optimization attempt. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 128 ++++++++++++++++++--------------- prometheus/cache/cache_test.go | 2 +- prometheus/internal/metric.go | 12 ++-- 3 files changed, 78 insertions(+), 64 deletions(-) diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go index db0df82c4..64ad96ac1 100644 --- a/prometheus/cache/cache.go +++ b/prometheus/cache/cache.go @@ -43,25 +43,61 @@ var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with // Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. // NOTE(bwplotka): Experimental, API and behaviour can change. type CachedTGatherer struct { - metrics map[uint64]*dto.Metric - metricFamilyByName map[string]*dto.MetricFamily + metricFamilyByName map[string]*family mMu sync.RWMutex } func NewCachedTGatherer() *CachedTGatherer { return &CachedTGatherer{ - metrics: make(map[uint64]*dto.Metric), - metricFamilyByName: map[string]*dto.MetricFamily{}, + metricFamilyByName: map[string]*family{}, } } +type family struct { + *dto.MetricFamily + + metricsByHash map[uint64]*dto.Metric +} + +// normalizeMetricFamilies returns a MetricFamily slice with empty +// MetricFamilies pruned and the remaining MetricFamilies sorted by name within +// the slice, with the contained Metrics sorted within each MetricFamily. +func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { + for _, mf := range metricFamiliesByName { + if cap(mf.Metric) < len(mf.metricsByHash) { + mf.Metric = make([]*dto.Metric, 0, len(mf.metricsByHash)) + } + mf.Metric = mf.Metric[:0] + for _, m := range mf.metricsByHash { + mf.Metric = append(mf.Metric, m) + } + sort.Sort(internal.MetricSorter(mf.Metric)) + } + + for _, mf := range metricFamiliesByName { + sort.Sort(internal.MetricSorter(mf.Metric)) + } + names := make([]string, 0, len(metricFamiliesByName)) + for name, mf := range metricFamiliesByName { + if len(mf.Metric) > 0 { + names = append(names, name) + } + } + sort.Strings(names) + result := make([]*dto.MetricFamily, 0, len(names)) + for _, name := range names { + result = append(result, metricFamiliesByName[name].MetricFamily) + } + return result +} + // Gather implements TransactionalGatherer interface. func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { c.mMu.RLock() - // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families + // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics among 1000 families // this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now. - return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil + return normalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil } type Key struct { @@ -123,11 +159,9 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) c.mMu.Lock() defer c.mMu.Unlock() - currMetrics := c.metrics - currMetricFamilies := c.metricFamilyByName + currMetricFamilyByName := c.metricFamilyByName if reset { - currMetrics = make(map[uint64]*dto.Metric, len(c.metrics)) - currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)) + currMetricFamilyByName = make(map[string]*family, len(c.metricFamilyByName)) } errs := prometheus.MultiError{} @@ -139,22 +173,35 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } // Update metric family. - mf, ok := c.metricFamilyByName[inserts[i].FQName] + mf, ok := currMetricFamilyByName[inserts[i].FQName] + oldMf, oldOk := c.metricFamilyByName[inserts[i].FQName] if !ok { - mf = &dto.MetricFamily{} - mf.Name = &inserts[i].FQName - } else if reset { - // Reset metric slice, since we want to start from scratch. - mf.Metric = mf.Metric[:0] + if !oldOk { + mf = &family{ + MetricFamily: &dto.MetricFamily{}, + metricsByHash: map[uint64]*dto.Metric{}, + } + mf.Name = &inserts[i].FQName + } else if reset { + mf = &family{ + MetricFamily: oldMf.MetricFamily, + metricsByHash: make(map[uint64]*dto.Metric, len(oldMf.metricsByHash)), + } + } } + mf.Type = inserts[i].ValueType.ToDTO() mf.Help = &inserts[i].Help - currMetricFamilies[inserts[i].FQName] = mf + currMetricFamilyByName[inserts[i].FQName] = mf // Update metric pointer. hSum := inserts[i].hash() - m, ok := c.metrics[hSum] + m, ok := mf.metricsByHash[hSum] + if !ok && reset && oldOk { + m, ok = oldMf.metricsByHash[hSum] + } + if !ok { m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))} for j := range inserts[i].LabelNames { @@ -202,16 +249,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) if inserts[i].Timestamp != nil { m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) } - currMetrics[hSum] = m - - if !reset && ok { - // If we did update without reset and we found metric in previous - // map, we know metric pointer exists in metric family map, so just continue. - continue - } - - // Will be sorted later anyway, so just append. - mf.Metric = append(mf.Metric, m) + mf.metricsByHash[hSum] = m } for _, del := range deletions { @@ -220,42 +258,18 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) continue } - hSum := del.hash() - m, ok := currMetrics[hSum] - if !ok { - continue - } - delete(currMetrics, hSum) - - mf, ok := currMetricFamilies[del.FQName] + mf, ok := currMetricFamilyByName[del.FQName] if !ok { - // Impossible, but well... - errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues)) continue } - toDel := -1 - for i := range mf.Metric { - if mf.Metric[i] == m { - toDel = i - break - } - } - - if toDel == -1 { - errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues)) - continue - } - - if len(mf.Metric) == 1 { - delete(currMetricFamilies, del.FQName) + hSum := del.hash() + if _, ok := mf.metricsByHash[hSum]; !ok { continue } - - mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...) + delete(mf.metricsByHash, hSum) } - c.metrics = currMetrics - c.metricFamilyByName = currMetricFamilies + c.metricFamilyByName = currMetricFamilyByName return errs.MaybeUnwrap() } diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go index f5281172e..df04b8804 100644 --- a/prometheus/cache/cache_test.go +++ b/prometheus/cache/cache_test.go @@ -220,7 +220,7 @@ func BenchmarkCachedTGatherer_Update(b *testing.B) { b.Error("update:", err) } - if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 { + if len(c.metricFamilyByName) != 1e3 || len(c.metricFamilyByName["realistic_longer_name_123"].metricsByHash) != 1e3 { // Ensure we did not generate duplicates. panic("generated data set gave wrong numbers") } diff --git a/prometheus/internal/metric.go b/prometheus/internal/metric.go index 089ab5b97..6515c1148 100644 --- a/prometheus/internal/metric.go +++ b/prometheus/internal/metric.go @@ -35,18 +35,18 @@ func (s LabelPairSorter) Less(i, j int) bool { return s[i].GetName() < s[j].GetName() } -// metricSorter is a sortable slice of *dto.Metric. -type metricSorter []*dto.Metric +// MetricSorter is a sortable slice of *dto.Metric. +type MetricSorter []*dto.Metric -func (s metricSorter) Len() int { +func (s MetricSorter) Len() int { return len(s) } -func (s metricSorter) Swap(i, j int) { +func (s MetricSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s metricSorter) Less(i, j int) bool { +func (s MetricSorter) Less(i, j int) bool { if len(s[i].Label) != len(s[j].Label) { // This should not happen. The metrics are // inconsistent. However, we have to deal with the fact, as @@ -84,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool { // the slice, with the contained Metrics sorted within each MetricFamily. func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily { for _, mf := range metricFamiliesByName { - sort.Sort(metricSorter(mf.Metric)) + sort.Sort(MetricSorter(mf.Metric)) } names := make([]string, 0, len(metricFamiliesByName)) for name, mf := range metricFamiliesByName { From 10f1d94aaf3200baac40aca727a6e43251a094e4 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 26 Jan 2022 00:29:17 +0100 Subject: [PATCH 03/12] Revert "Optimization attempt." MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 2fcaf51be9a12b4b95413b6b3e0c13fabfaaf73f. Optimization was not worth it: benchstat v1.txt v2.txt name old time/op new time/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 2.64µs ± 0% 4.05µs ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 701ms ± 0% 358ms ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 535µs ± 0% 703934µs ± 0% ~ (p=1.000 n=1+1) name old alloc/op new alloc/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 208B ± 0% 208B ± 0% ~ (all equal) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 40.2MB ± 0% 41.1MB ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 48.6kB ± 0% 84.3kB ± 0% ~ (p=1.000 n=1+1) name old allocs/op new allocs/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 3.00 ± 0% 3.00 ± 0% ~ (all equal) CachedTGatherer_Update/Update_of_all_elements_with_reset-12 6.00 ± 0% 4003.00 ± 0% ~ (p=1.000 n=1+1) CachedTGatherer_Update/Gather-12 1.00k ± 0% 2.01k ± 0% ~ (p=1.000 n=1+1) --- prometheus/cache/cache.go | 128 +++++++++++++++------------------ prometheus/cache/cache_test.go | 2 +- prometheus/internal/metric.go | 12 ++-- 3 files changed, 64 insertions(+), 78 deletions(-) diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go index 64ad96ac1..db0df82c4 100644 --- a/prometheus/cache/cache.go +++ b/prometheus/cache/cache.go @@ -43,61 +43,25 @@ var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with // Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. // NOTE(bwplotka): Experimental, API and behaviour can change. type CachedTGatherer struct { - metricFamilyByName map[string]*family + metrics map[uint64]*dto.Metric + metricFamilyByName map[string]*dto.MetricFamily mMu sync.RWMutex } func NewCachedTGatherer() *CachedTGatherer { return &CachedTGatherer{ - metricFamilyByName: map[string]*family{}, + metrics: make(map[uint64]*dto.Metric), + metricFamilyByName: map[string]*dto.MetricFamily{}, } } -type family struct { - *dto.MetricFamily - - metricsByHash map[uint64]*dto.Metric -} - -// normalizeMetricFamilies returns a MetricFamily slice with empty -// MetricFamilies pruned and the remaining MetricFamilies sorted by name within -// the slice, with the contained Metrics sorted within each MetricFamily. -func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { - for _, mf := range metricFamiliesByName { - if cap(mf.Metric) < len(mf.metricsByHash) { - mf.Metric = make([]*dto.Metric, 0, len(mf.metricsByHash)) - } - mf.Metric = mf.Metric[:0] - for _, m := range mf.metricsByHash { - mf.Metric = append(mf.Metric, m) - } - sort.Sort(internal.MetricSorter(mf.Metric)) - } - - for _, mf := range metricFamiliesByName { - sort.Sort(internal.MetricSorter(mf.Metric)) - } - names := make([]string, 0, len(metricFamiliesByName)) - for name, mf := range metricFamiliesByName { - if len(mf.Metric) > 0 { - names = append(names, name) - } - } - sort.Strings(names) - result := make([]*dto.MetricFamily, 0, len(names)) - for _, name := range names { - result = append(result, metricFamiliesByName[name].MetricFamily) - } - return result -} - // Gather implements TransactionalGatherer interface. func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { c.mMu.RLock() - // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics among 1000 families + // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families // this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now. - return normalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil + return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil } type Key struct { @@ -159,9 +123,11 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) c.mMu.Lock() defer c.mMu.Unlock() - currMetricFamilyByName := c.metricFamilyByName + currMetrics := c.metrics + currMetricFamilies := c.metricFamilyByName if reset { - currMetricFamilyByName = make(map[string]*family, len(c.metricFamilyByName)) + currMetrics = make(map[uint64]*dto.Metric, len(c.metrics)) + currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)) } errs := prometheus.MultiError{} @@ -173,35 +139,22 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } // Update metric family. - mf, ok := currMetricFamilyByName[inserts[i].FQName] - oldMf, oldOk := c.metricFamilyByName[inserts[i].FQName] + mf, ok := c.metricFamilyByName[inserts[i].FQName] if !ok { - if !oldOk { - mf = &family{ - MetricFamily: &dto.MetricFamily{}, - metricsByHash: map[uint64]*dto.Metric{}, - } - mf.Name = &inserts[i].FQName - } else if reset { - mf = &family{ - MetricFamily: oldMf.MetricFamily, - metricsByHash: make(map[uint64]*dto.Metric, len(oldMf.metricsByHash)), - } - } + mf = &dto.MetricFamily{} + mf.Name = &inserts[i].FQName + } else if reset { + // Reset metric slice, since we want to start from scratch. + mf.Metric = mf.Metric[:0] } - mf.Type = inserts[i].ValueType.ToDTO() mf.Help = &inserts[i].Help - currMetricFamilyByName[inserts[i].FQName] = mf + currMetricFamilies[inserts[i].FQName] = mf // Update metric pointer. hSum := inserts[i].hash() - m, ok := mf.metricsByHash[hSum] - if !ok && reset && oldOk { - m, ok = oldMf.metricsByHash[hSum] - } - + m, ok := c.metrics[hSum] if !ok { m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))} for j := range inserts[i].LabelNames { @@ -249,7 +202,16 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) if inserts[i].Timestamp != nil { m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) } - mf.metricsByHash[hSum] = m + currMetrics[hSum] = m + + if !reset && ok { + // If we did update without reset and we found metric in previous + // map, we know metric pointer exists in metric family map, so just continue. + continue + } + + // Will be sorted later anyway, so just append. + mf.Metric = append(mf.Metric, m) } for _, del := range deletions { @@ -258,18 +220,42 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) continue } - mf, ok := currMetricFamilyByName[del.FQName] + hSum := del.hash() + m, ok := currMetrics[hSum] if !ok { continue } + delete(currMetrics, hSum) - hSum := del.hash() - if _, ok := mf.metricsByHash[hSum]; !ok { + mf, ok := currMetricFamilies[del.FQName] + if !ok { + // Impossible, but well... + errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues)) continue } - delete(mf.metricsByHash, hSum) + + toDel := -1 + for i := range mf.Metric { + if mf.Metric[i] == m { + toDel = i + break + } + } + + if toDel == -1 { + errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues)) + continue + } + + if len(mf.Metric) == 1 { + delete(currMetricFamilies, del.FQName) + continue + } + + mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...) } - c.metricFamilyByName = currMetricFamilyByName + c.metrics = currMetrics + c.metricFamilyByName = currMetricFamilies return errs.MaybeUnwrap() } diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go index df04b8804..f5281172e 100644 --- a/prometheus/cache/cache_test.go +++ b/prometheus/cache/cache_test.go @@ -220,7 +220,7 @@ func BenchmarkCachedTGatherer_Update(b *testing.B) { b.Error("update:", err) } - if len(c.metricFamilyByName) != 1e3 || len(c.metricFamilyByName["realistic_longer_name_123"].metricsByHash) != 1e3 { + if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 { // Ensure we did not generate duplicates. panic("generated data set gave wrong numbers") } diff --git a/prometheus/internal/metric.go b/prometheus/internal/metric.go index 6515c1148..089ab5b97 100644 --- a/prometheus/internal/metric.go +++ b/prometheus/internal/metric.go @@ -35,18 +35,18 @@ func (s LabelPairSorter) Less(i, j int) bool { return s[i].GetName() < s[j].GetName() } -// MetricSorter is a sortable slice of *dto.Metric. -type MetricSorter []*dto.Metric +// metricSorter is a sortable slice of *dto.Metric. +type metricSorter []*dto.Metric -func (s MetricSorter) Len() int { +func (s metricSorter) Len() int { return len(s) } -func (s MetricSorter) Swap(i, j int) { +func (s metricSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s MetricSorter) Less(i, j int) bool { +func (s metricSorter) Less(i, j int) bool { if len(s[i].Label) != len(s[j].Label) { // This should not happen. The metrics are // inconsistent. However, we have to deal with the fact, as @@ -84,7 +84,7 @@ func (s MetricSorter) Less(i, j int) bool { // the slice, with the contained Metrics sorted within each MetricFamily. func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily { for _, mf := range metricFamiliesByName { - sort.Sort(MetricSorter(mf.Metric)) + sort.Sort(metricSorter(mf.Metric)) } names := make([]string, 0, len(metricFamiliesByName)) for name, mf := range metricFamiliesByName { From 273cb92f53cbfa3aa52555717da93f1787f3a304 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 26 Jan 2022 00:51:54 +0100 Subject: [PATCH 04/12] nit. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go index db0df82c4..20afc7ea8 100644 --- a/prometheus/cache/cache.go +++ b/prometheus/cache/cache.go @@ -126,7 +126,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) currMetrics := c.metrics currMetricFamilies := c.metricFamilyByName if reset { - currMetrics = make(map[uint64]*dto.Metric, len(c.metrics)) + currMetrics = make(map[uint64]*dto.Metric, len(inserts)) currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)) } From 870e237c900178cbbd62cc7bd27c0a36190de9da Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 26 Jan 2022 01:11:19 +0100 Subject: [PATCH 05/12] Another optimization attempt. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 142 +++++++++++++++++++++------------ prometheus/cache/cache_test.go | 2 +- prometheus/internal/metric.go | 12 +-- 3 files changed, 96 insertions(+), 60 deletions(-) diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go index 20afc7ea8..385fc1f28 100644 --- a/prometheus/cache/cache.go +++ b/prometheus/cache/cache.go @@ -43,25 +43,67 @@ var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with // Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. // NOTE(bwplotka): Experimental, API and behaviour can change. type CachedTGatherer struct { - metrics map[uint64]*dto.Metric - metricFamilyByName map[string]*dto.MetricFamily + metricFamilyByName map[string]*family mMu sync.RWMutex } func NewCachedTGatherer() *CachedTGatherer { return &CachedTGatherer{ - metrics: make(map[uint64]*dto.Metric), - metricFamilyByName: map[string]*dto.MetricFamily{}, + metricFamilyByName: map[string]*family{}, } } +type family struct { + *dto.MetricFamily + + metricsByHash map[uint64]*metric + touched bool +} + +type metric struct { + *dto.Metric + touched bool +} + +// normalizeMetricFamilies returns a MetricFamily slice with empty +// MetricFamilies pruned and the remaining MetricFamilies sorted by name within +// the slice, with the contained Metrics sorted within each MetricFamily. +func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { + for _, mf := range metricFamiliesByName { + if cap(mf.Metric) < len(mf.metricsByHash) { + mf.Metric = make([]*dto.Metric, 0, len(mf.metricsByHash)) + } + mf.Metric = mf.Metric[:0] + for _, m := range mf.metricsByHash { + mf.Metric = append(mf.Metric, m.Metric) + } + sort.Sort(internal.MetricSorter(mf.Metric)) + } + + for _, mf := range metricFamiliesByName { + sort.Sort(internal.MetricSorter(mf.Metric)) + } + names := make([]string, 0, len(metricFamiliesByName)) + for name, mf := range metricFamiliesByName { + if len(mf.Metric) > 0 { + names = append(names, name) + } + } + sort.Strings(names) + result := make([]*dto.MetricFamily, 0, len(names)) + for _, name := range names { + result = append(result, metricFamiliesByName[name].MetricFamily) + } + return result +} + // Gather implements TransactionalGatherer interface. func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { c.mMu.RLock() - // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics with 1000 families + // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics among 1000 families // this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now. - return internal.NormalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil + return normalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil } type Key struct { @@ -123,13 +165,6 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) c.mMu.Lock() defer c.mMu.Unlock() - currMetrics := c.metrics - currMetricFamilies := c.metricFamilyByName - if reset { - currMetrics = make(map[uint64]*dto.Metric, len(inserts)) - currMetricFamilies = make(map[string]*dto.MetricFamily, len(c.metricFamilyByName)) - } - errs := prometheus.MultiError{} for i := range inserts { // TODO(bwplotka): Validate more about this insert? @@ -141,22 +176,25 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) // Update metric family. mf, ok := c.metricFamilyByName[inserts[i].FQName] if !ok { - mf = &dto.MetricFamily{} + mf = &family{ + MetricFamily: &dto.MetricFamily{}, + metricsByHash: map[uint64]*metric{}, + } mf.Name = &inserts[i].FQName - } else if reset { - // Reset metric slice, since we want to start from scratch. - mf.Metric = mf.Metric[:0] } + mf.touched = true mf.Type = inserts[i].ValueType.ToDTO() mf.Help = &inserts[i].Help - currMetricFamilies[inserts[i].FQName] = mf + c.metricFamilyByName[inserts[i].FQName] = mf // Update metric pointer. hSum := inserts[i].hash() - m, ok := c.metrics[hSum] + m, ok := mf.metricsByHash[hSum] if !ok { - m = &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))} + m = &metric{ + Metric: &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))}, + } for j := range inserts[i].LabelNames { m.Label = append(m.Label, &dto.LabelPair{ Name: &inserts[i].LabelNames[j], @@ -165,6 +203,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } sort.Sort(internal.LabelPairSorter(m.Label)) } + m.touched = true switch inserts[i].ValueType { case prometheus.CounterValue: @@ -202,16 +241,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) if inserts[i].Timestamp != nil { m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) } - currMetrics[hSum] = m - - if !reset && ok { - // If we did update without reset and we found metric in previous - // map, we know metric pointer exists in metric family map, so just continue. - continue - } - - // Will be sorted later anyway, so just append. - mf.Metric = append(mf.Metric, m) + mf.metricsByHash[hSum] = m } for _, del := range deletions { @@ -220,42 +250,48 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) continue } - hSum := del.hash() - m, ok := currMetrics[hSum] + mf, ok := c.metricFamilyByName[del.FQName] if !ok { continue } - delete(currMetrics, hSum) - mf, ok := currMetricFamilies[del.FQName] - if !ok { - // Impossible, but well... - errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not exists", del.FQName, del.LabelValues)) + hSum := del.hash() + if _, ok := mf.metricsByHash[hSum]; !ok { continue } - toDel := -1 - for i := range mf.Metric { - if mf.Metric[i] == m { - toDel = i - break - } - } - - if toDel == -1 { - errs.Append(fmt.Errorf("could not remove metric %s(%s) from metric family, metric family does not have such metric", del.FQName, del.LabelValues)) + if len(mf.metricsByHash) == 1 { + delete(c.metricFamilyByName, del.FQName) continue } - if len(mf.Metric) == 1 { - delete(currMetricFamilies, del.FQName) - continue + delete(mf.metricsByHash, hSum) + } + + if reset { + for name, mf := range c.metricFamilyByName { + if !mf.touched { + delete(c.metricFamilyByName, name) + continue + } + for hash, m := range mf.metricsByHash { + if !m.touched { + delete(mf.metricsByHash, hash) + continue + } + } + if len(mf.metricsByHash) == 0 { + delete(c.metricFamilyByName, name) + } } + } - mf.Metric = append(mf.Metric[:toDel], mf.Metric[toDel+1:]...) + for _, mf := range c.metricFamilyByName { + mf.touched = false + for _, m := range mf.metricsByHash { + m.touched = false + } } - c.metrics = currMetrics - c.metricFamilyByName = currMetricFamilies return errs.MaybeUnwrap() } diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go index f5281172e..df04b8804 100644 --- a/prometheus/cache/cache_test.go +++ b/prometheus/cache/cache_test.go @@ -220,7 +220,7 @@ func BenchmarkCachedTGatherer_Update(b *testing.B) { b.Error("update:", err) } - if len(c.metricFamilyByName) != 1e3 || len(c.metrics) != 1e6 { + if len(c.metricFamilyByName) != 1e3 || len(c.metricFamilyByName["realistic_longer_name_123"].metricsByHash) != 1e3 { // Ensure we did not generate duplicates. panic("generated data set gave wrong numbers") } diff --git a/prometheus/internal/metric.go b/prometheus/internal/metric.go index 089ab5b97..6515c1148 100644 --- a/prometheus/internal/metric.go +++ b/prometheus/internal/metric.go @@ -35,18 +35,18 @@ func (s LabelPairSorter) Less(i, j int) bool { return s[i].GetName() < s[j].GetName() } -// metricSorter is a sortable slice of *dto.Metric. -type metricSorter []*dto.Metric +// MetricSorter is a sortable slice of *dto.Metric. +type MetricSorter []*dto.Metric -func (s metricSorter) Len() int { +func (s MetricSorter) Len() int { return len(s) } -func (s metricSorter) Swap(i, j int) { +func (s MetricSorter) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s metricSorter) Less(i, j int) bool { +func (s MetricSorter) Less(i, j int) bool { if len(s[i].Label) != len(s[j].Label) { // This should not happen. The metrics are // inconsistent. However, we have to deal with the fact, as @@ -84,7 +84,7 @@ func (s metricSorter) Less(i, j int) bool { // the slice, with the contained Metrics sorted within each MetricFamily. func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily { for _, mf := range metricFamiliesByName { - sort.Sort(metricSorter(mf.Metric)) + sort.Sort(MetricSorter(mf.Metric)) } names := make([]string, 0, len(metricFamiliesByName)) for name, mf := range metricFamiliesByName { From 11bdfa31f4e735986f02c96236d953d83a38eec0 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 26 Jan 2022 01:23:50 +0100 Subject: [PATCH 06/12] rename and further optimization. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 33 +++++++++++++++++---------------- prometheus/cache/cache_test.go | 2 +- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go index 385fc1f28..bd8e4741c 100644 --- a/prometheus/cache/cache.go +++ b/prometheus/cache/cache.go @@ -43,13 +43,13 @@ var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with // Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. // NOTE(bwplotka): Experimental, API and behaviour can change. type CachedTGatherer struct { - metricFamilyByName map[string]*family - mMu sync.RWMutex + metricFamiliesByName map[string]*family + mMu sync.RWMutex } func NewCachedTGatherer() *CachedTGatherer { return &CachedTGatherer{ - metricFamilyByName: map[string]*family{}, + metricFamiliesByName: map[string]*family{}, } } @@ -69,6 +69,7 @@ type metric struct { // MetricFamilies pruned and the remaining MetricFamilies sorted by name within // the slice, with the contained Metrics sorted within each MetricFamily. func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { + // TODO(bwplotka): We could optimize this further by bookkeeping this slice in place. for _, mf := range metricFamiliesByName { if cap(mf.Metric) < len(mf.metricsByHash) { mf.Metric = make([]*dto.Metric, 0, len(mf.metricsByHash)) @@ -80,9 +81,6 @@ func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.Met sort.Sort(internal.MetricSorter(mf.Metric)) } - for _, mf := range metricFamiliesByName { - sort.Sort(internal.MetricSorter(mf.Metric)) - } names := make([]string, 0, len(metricFamiliesByName)) for name, mf := range metricFamiliesByName { if len(mf.Metric) > 0 { @@ -102,8 +100,8 @@ func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err erro c.mMu.RLock() // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics among 1000 families - // this is efficient enough (~300µs and ~50 kB per op), no need to cache it for now. - return normalizeMetricFamilies(c.metricFamilyByName), c.mMu.RUnlock, nil + // this is efficient enough (~400ms and ~50 kB per op), no need to cache it for now. + return normalizeMetricFamilies(c.metricFamiliesByName), c.mMu.RUnlock, nil } type Key struct { @@ -174,7 +172,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } // Update metric family. - mf, ok := c.metricFamilyByName[inserts[i].FQName] + mf, ok := c.metricFamiliesByName[inserts[i].FQName] if !ok { mf = &family{ MetricFamily: &dto.MetricFamily{}, @@ -186,7 +184,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) mf.Type = inserts[i].ValueType.ToDTO() mf.Help = &inserts[i].Help - c.metricFamilyByName[inserts[i].FQName] = mf + c.metricFamiliesByName[inserts[i].FQName] = mf // Update metric pointer. hSum := inserts[i].hash() @@ -250,7 +248,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) continue } - mf, ok := c.metricFamilyByName[del.FQName] + mf, ok := c.metricFamiliesByName[del.FQName] if !ok { continue } @@ -261,7 +259,7 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } if len(mf.metricsByHash) == 1 { - delete(c.metricFamilyByName, del.FQName) + delete(c.metricFamiliesByName, del.FQName) continue } @@ -269,9 +267,10 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } if reset { - for name, mf := range c.metricFamilyByName { + // Trading off-time instead of memory allocated for otherwise needed replacement map. + for name, mf := range c.metricFamiliesByName { if !mf.touched { - delete(c.metricFamilyByName, name) + delete(c.metricFamiliesByName, name) continue } for hash, m := range mf.metricsByHash { @@ -281,12 +280,14 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } } if len(mf.metricsByHash) == 0 { - delete(c.metricFamilyByName, name) + delete(c.metricFamiliesByName, name) } } } - for _, mf := range c.metricFamilyByName { + // TODO(bwplotka): Potentially move this only for reset, but then code would assume + // you either only update or only reset update. For now we can live with small overhead. + for _, mf := range c.metricFamiliesByName { mf.touched = false for _, m := range mf.metricsByHash { m.touched = false diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go index df04b8804..b0408377b 100644 --- a/prometheus/cache/cache_test.go +++ b/prometheus/cache/cache_test.go @@ -220,7 +220,7 @@ func BenchmarkCachedTGatherer_Update(b *testing.B) { b.Error("update:", err) } - if len(c.metricFamilyByName) != 1e3 || len(c.metricFamilyByName["realistic_longer_name_123"].metricsByHash) != 1e3 { + if len(c.metricFamiliesByName) != 1e3 || len(c.metricFamiliesByName["realistic_longer_name_123"].metricsByHash) != 1e3 { // Ensure we did not generate duplicates. panic("generated data set gave wrong numbers") } From 961f8ca5e65b8e7f890322fc13675ee5209a407d Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 26 Jan 2022 20:38:45 +0100 Subject: [PATCH 07/12] Hopefully final optimization. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit benchstat -delta-test=none v6.txt v9.txt name old time/op new time/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 13.1ms ± 0% 0.0ms ± 0% -99.81% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 309ms ± 0% 282ms ± 0% -8.77% CachedTGatherer_Update/Gather-12 422ms ± 0% 0ms ± 0% -99.95% name old alloc/op new alloc/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 208B ± 0% 208B ± 0% 0.00% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 2.47kB ± 0% 1.67kB ± 0% -32.56% CachedTGatherer_Update/Gather-12 52.8kB ± 0% 24.6kB ± 0% -53.34% name old allocs/op new allocs/op delta CachedTGatherer_Update/Update_of_one_element_without_reset-12 3.00 ± 0% 3.00 ± 0% 0.00% CachedTGatherer_Update/Update_of_all_elements_with_reset-12 0.00 0.00 0.00% CachedTGatherer_Update/Gather-12 1.00k ± 0% 0.00k ± 0% -99.60% Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 54 +++++++++++++++++++++------------------ 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go index bd8e4741c..ad2193559 100644 --- a/prometheus/cache/cache.go +++ b/prometheus/cache/cache.go @@ -45,10 +45,13 @@ var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with type CachedTGatherer struct { metricFamiliesByName map[string]*family mMu sync.RWMutex + + desiredTouchState bool } func NewCachedTGatherer() *CachedTGatherer { return &CachedTGatherer{ + desiredTouchState: true, metricFamiliesByName: map[string]*family{}, } } @@ -57,30 +60,19 @@ type family struct { *dto.MetricFamily metricsByHash map[uint64]*metric - touched bool + touchState bool + needsRebuild bool } type metric struct { *dto.Metric - touched bool + touchState bool } // normalizeMetricFamilies returns a MetricFamily slice with empty // MetricFamilies pruned and the remaining MetricFamilies sorted by name within // the slice, with the contained Metrics sorted within each MetricFamily. func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { - // TODO(bwplotka): We could optimize this further by bookkeeping this slice in place. - for _, mf := range metricFamiliesByName { - if cap(mf.Metric) < len(mf.metricsByHash) { - mf.Metric = make([]*dto.Metric, 0, len(mf.metricsByHash)) - } - mf.Metric = mf.Metric[:0] - for _, m := range mf.metricsByHash { - mf.Metric = append(mf.Metric, m.Metric) - } - sort.Sort(internal.MetricSorter(mf.Metric)) - } - names := make([]string, 0, len(metricFamiliesByName)) for name, mf := range metricFamiliesByName { if len(mf.Metric) > 0 { @@ -98,9 +90,6 @@ func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.Met // Gather implements TransactionalGatherer interface. func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { c.mMu.RLock() - - // BenchmarkCachedTGatherer_Update shows, even for 1 million metrics among 1000 families - // this is efficient enough (~400ms and ~50 kB per op), no need to cache it for now. return normalizeMetricFamilies(c.metricFamiliesByName), c.mMu.RUnlock, nil } @@ -180,7 +169,10 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) } mf.Name = &inserts[i].FQName } - mf.touched = true + if reset { + mf.touchState = c.desiredTouchState + } + mf.Type = inserts[i].ValueType.ToDTO() mf.Help = &inserts[i].Help @@ -200,8 +192,11 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) }) } sort.Sort(internal.LabelPairSorter(m.Label)) + mf.needsRebuild = true + } + if reset { + m.touchState = c.desiredTouchState } - m.touched = true switch inserts[i].ValueType { case prometheus.CounterValue: @@ -263,18 +258,19 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) continue } + mf.needsRebuild = true delete(mf.metricsByHash, hSum) } if reset { // Trading off-time instead of memory allocated for otherwise needed replacement map. for name, mf := range c.metricFamiliesByName { - if !mf.touched { + if mf.touchState != c.desiredTouchState { delete(c.metricFamiliesByName, name) continue } for hash, m := range mf.metricsByHash { - if !m.touched { + if m.touchState != c.desiredTouchState { delete(mf.metricsByHash, hash) continue } @@ -283,15 +279,23 @@ func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) delete(c.metricFamiliesByName, name) } } + + // Avoid resetting state by flipping what we will expect in the next update. + c.desiredTouchState = !c.desiredTouchState } - // TODO(bwplotka): Potentially move this only for reset, but then code would assume - // you either only update or only reset update. For now we can live with small overhead. for _, mf := range c.metricFamiliesByName { - mf.touched = false + if !mf.needsRebuild { + continue + } + + mf.Metric = mf.Metric[:0] for _, m := range mf.metricsByHash { - m.touched = false + mf.Metric = append(mf.Metric, m.Metric) } + sort.Sort(internal.MetricSorter(mf.Metric)) + + mf.needsRebuild = false } return errs.MaybeUnwrap() From 8bbbe7a01322f4da2c4ecc5105635391072cc4b7 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Thu, 17 Feb 2022 16:51:35 +0100 Subject: [PATCH 08/12] Removed obsolete comment Signed-off-by: Bartlomiej Plotka --- prometheus/promhttp/http.go | 1 - 1 file changed, 1 deletion(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index b463a747f..6b37c0887 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -117,7 +117,6 @@ func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerO h := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) { if inFlightSem != nil { - // TODO(bwplotka): Implement single-flight which is essential for blocking TransactionalGatherer. select { case inFlightSem <- struct{}{}: // All good, carry on. defer func() { <-inFlightSem }() From 8caf328497763b77ead9bc2e0fb906722ea8ce3e Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 23 Feb 2022 11:06:38 +0100 Subject: [PATCH 09/12] Fixed tests. Signed-off-by: Bartlomiej Plotka --- prometheus/promhttp/http.go | 3 + prometheus/promhttp/http_test.go | 99 +++++++++++++++++--------------- 2 files changed, 57 insertions(+), 45 deletions(-) diff --git a/prometheus/promhttp/http.go b/prometheus/promhttp/http.go index 6b37c0887..a6e4f850c 100644 --- a/prometheus/promhttp/http.go +++ b/prometheus/promhttp/http.go @@ -87,6 +87,9 @@ func HandlerFor(reg prometheus.Gatherer, opts HandlerOpts) http.Handler { return HandlerForTransactional(prometheus.ToTransactionalGatherer(reg), opts) } +// HandlerForTransactional is like HandlerFor, but it uses transactional gather, which +// can safely change in-place returned *dto.MetricFamily before call to `Gather` and after +// call to `done` of that `Gather`. func HandlerForTransactional(reg prometheus.TransactionalGatherer, opts HandlerOpts) http.Handler { var ( inFlightSem chan struct{} diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 781ea8f10..3e3db27e3 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -16,14 +16,16 @@ package promhttp import ( "bytes" "errors" + "fmt" "log" "net/http" "net/http/httptest" - "strings" "testing" "time" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" ) type errorCollector struct{} @@ -56,8 +58,19 @@ func (b blockingCollector) Collect(ch chan<- prometheus.Metric) { <-b.Block } -func TestHandlerErrorHandling(t *testing.T) { +type mockTransactionGatherer struct { + g prometheus.Gatherer + gatherInvoked int + doneInvoked int +} + +func (g *mockTransactionGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + g.gatherInvoked++ + mfs, err := g.g.Gather() + return mfs, func() { g.doneInvoked++ }, err +} +func TestHandlerErrorHandling(t *testing.T) { // Create a registry that collects a MetricFamily with two elements, // another with one, and reports an error. Further down, we'll use the // same registry in the HandlerOpts. @@ -90,21 +103,26 @@ func TestHandlerErrorHandling(t *testing.T) { request, _ := http.NewRequest("GET", "/", nil) request.Header.Add("Accept", "test/plain") - errorHandler := HandlerFor(reg, HandlerOpts{ + mReg := &mockTransactionGatherer{g: reg} + errorHandler := HandlerForTransactional(mReg, HandlerOpts{ ErrorLog: logger, ErrorHandling: HTTPErrorOnError, Registry: reg, }) - continueHandler := HandlerFor(reg, HandlerOpts{ + continueHandler := HandlerForTransactional(mReg, HandlerOpts{ ErrorLog: logger, ErrorHandling: ContinueOnError, Registry: reg, }) - panicHandler := HandlerFor(reg, HandlerOpts{ + panicHandler := HandlerForTransactional(mReg, HandlerOpts{ ErrorLog: logger, ErrorHandling: PanicOnError, Registry: reg, }) + // Expect gatherer not touched. + require.Equal(t, 0, mReg.gatherInvoked) + require.Equal(t, 0, mReg.doneInvoked) + wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` wantErrorBody := `An error has occurred while serving metrics: @@ -140,26 +158,23 @@ the_count 0 ` errorHandler.ServeHTTP(writer, request) - if got, want := writer.Code, http.StatusInternalServerError; got != want { - t.Errorf("got HTTP status code %d, want %d", got, want) - } - if got := logBuf.String(); got != wantMsg { - t.Errorf("got log message:\n%s\nwant log message:\n%s\n", got, wantMsg) - } - if got := writer.Body.String(); got != wantErrorBody { - t.Errorf("got body:\n%s\nwant body:\n%s\n", got, wantErrorBody) - } + require.Equal(t, 1, mReg.gatherInvoked) + require.Equal(t, 1, mReg.doneInvoked) + + require.Equal(t, http.StatusInternalServerError, writer.Code) + require.Equal(t, wantMsg, logBuf.String()) + require.Equal(t, wantErrorBody, writer.Body.String()) + logBuf.Reset() writer.Body.Reset() writer.Code = http.StatusOK continueHandler.ServeHTTP(writer, request) - if got, want := writer.Code, http.StatusOK; got != want { - t.Errorf("got HTTP status code %d, want %d", got, want) - } - if got := logBuf.String(); got != wantMsg { - t.Errorf("got log message %q, want %q", got, wantMsg) - } + + require.Equal(t, 2, mReg.gatherInvoked) + require.Equal(t, 2, mReg.doneInvoked) + require.Equal(t, http.StatusOK, writer.Code) + require.Equal(t, wantMsg, logBuf.String()) if got := writer.Body.String(); got != wantOKBody1 && got != wantOKBody2 { t.Errorf("got body %q, want either %q or %q", got, wantOKBody1, wantOKBody2) } @@ -168,46 +183,40 @@ the_count 0 if err := recover(); err == nil { t.Error("expected panic from panicHandler") } + require.Equal(t, 3, mReg.gatherInvoked) + require.Equal(t, 3, mReg.doneInvoked) }() panicHandler.ServeHTTP(writer, request) } func TestInstrumentMetricHandler(t *testing.T) { reg := prometheus.NewRegistry() - handler := InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{})) + mReg := &mockTransactionGatherer{g: reg} + handler := InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) // Do it again to test idempotency. - InstrumentMetricHandler(reg, HandlerFor(reg, HandlerOpts{})) + InstrumentMetricHandler(reg, HandlerForTransactional(mReg, HandlerOpts{})) writer := httptest.NewRecorder() request, _ := http.NewRequest("GET", "/", nil) request.Header.Add("Accept", "test/plain") handler.ServeHTTP(writer, request) - if got, want := writer.Code, http.StatusOK; got != want { - t.Errorf("got HTTP status code %d, want %d", got, want) - } + require.Equal(t, 1, mReg.gatherInvoked) + require.Equal(t, 1, mReg.doneInvoked) - want := "promhttp_metric_handler_requests_in_flight 1\n" - if got := writer.Body.String(); !strings.Contains(got, want) { - t.Errorf("got body %q, does not contain %q", got, want) - } - want = "promhttp_metric_handler_requests_total{code=\"200\"} 0\n" - if got := writer.Body.String(); !strings.Contains(got, want) { - t.Errorf("got body %q, does not contain %q", got, want) - } + require.Equal(t, http.StatusOK, writer.Code) + require.Contains(t, writer.Body.String(), "promhttp_metric_handler_requests_in_flight 1\n") + require.Contains(t, writer.Body.String(), "promhttp_metric_handler_requests_total{code=\"200\"} 0\n") - writer.Body.Reset() - handler.ServeHTTP(writer, request) - if got, want := writer.Code, http.StatusOK; got != want { - t.Errorf("got HTTP status code %d, want %d", got, want) - } + for i := 0; i < 100; i++ { + writer.Body.Reset() + handler.ServeHTTP(writer, request) - want = "promhttp_metric_handler_requests_in_flight 1\n" - if got := writer.Body.String(); !strings.Contains(got, want) { - t.Errorf("got body %q, does not contain %q", got, want) - } - want = "promhttp_metric_handler_requests_total{code=\"200\"} 1\n" - if got := writer.Body.String(); !strings.Contains(got, want) { - t.Errorf("got body %q, does not contain %q", got, want) + require.Equal(t, i+2, mReg.gatherInvoked) + require.Equal(t, i+2, mReg.doneInvoked) + + require.Equal(t, http.StatusOK, writer.Code) + require.Contains(t, writer.Body.String(), "promhttp_metric_handler_requests_in_flight 1\n") + require.Contains(t, writer.Body.String(), fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1)) } } From 81d86fb14fa571d436bce4abdce000ab8a1cf2a9 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 23 Feb 2022 11:24:23 +0100 Subject: [PATCH 10/12] Removed cache. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 302 --------------------------------- prometheus/cache/cache_test.go | 275 ------------------------------ 2 files changed, 577 deletions(-) delete mode 100644 prometheus/cache/cache.go delete mode 100644 prometheus/cache/cache_test.go diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go deleted file mode 100644 index ad2193559..000000000 --- a/prometheus/cache/cache.go +++ /dev/null @@ -1,302 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "errors" - "fmt" - "sort" - "sync" - "time" - - "github.com/cespare/xxhash/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - - //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. - "github.com/golang/protobuf/proto" - "github.com/prometheus/client_golang/prometheus/internal" - dto "github.com/prometheus/client_model/go" -) - -var _ prometheus.TransactionalGatherer = &CachedTGatherer{} - -var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with xxhash. - -// CachedTGatherer is a transactional gatherer that allows maintaining a set of metrics which -// change less frequently than scrape time, yet label values and values change over time. -// -// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider -// using CachedTGatherer instead. -// -// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. -// NOTE(bwplotka): Experimental, API and behaviour can change. -type CachedTGatherer struct { - metricFamiliesByName map[string]*family - mMu sync.RWMutex - - desiredTouchState bool -} - -func NewCachedTGatherer() *CachedTGatherer { - return &CachedTGatherer{ - desiredTouchState: true, - metricFamiliesByName: map[string]*family{}, - } -} - -type family struct { - *dto.MetricFamily - - metricsByHash map[uint64]*metric - touchState bool - needsRebuild bool -} - -type metric struct { - *dto.Metric - touchState bool -} - -// normalizeMetricFamilies returns a MetricFamily slice with empty -// MetricFamilies pruned and the remaining MetricFamilies sorted by name within -// the slice, with the contained Metrics sorted within each MetricFamily. -func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { - names := make([]string, 0, len(metricFamiliesByName)) - for name, mf := range metricFamiliesByName { - if len(mf.Metric) > 0 { - names = append(names, name) - } - } - sort.Strings(names) - result := make([]*dto.MetricFamily, 0, len(names)) - for _, name := range names { - result = append(result, metricFamiliesByName[name].MetricFamily) - } - return result -} - -// Gather implements TransactionalGatherer interface. -func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { - c.mMu.RLock() - return normalizeMetricFamilies(c.metricFamiliesByName), c.mMu.RUnlock, nil -} - -type Key struct { - FQName string // __name__ - - // Label names can be unsorted, we will be sorting them later. The only implication is cachability if - // consumer provide non-deterministic order of those. - LabelNames []string - LabelValues []string -} - -func (k Key) isValid() error { - if k.FQName == "" { - return errors.New("FQName cannot be empty") - } - if len(k.LabelNames) != len(k.LabelValues) { - return errors.New("new metric: label name has different length than values") - } - - return nil -} - -// hash returns unique hash for this key. -func (k Key) hash() uint64 { - h := xxhash.New() - h.WriteString(k.FQName) - h.Write(separatorByteSlice) - - for i := range k.LabelNames { - h.WriteString(k.LabelNames[i]) - h.Write(separatorByteSlice) - h.WriteString(k.LabelValues[i]) - h.Write(separatorByteSlice) - } - return h.Sum64() -} - -// Insert represents record to set in cache. -type Insert struct { - Key - - Help string - ValueType prometheus.ValueType - Value float64 - - // Timestamp is optional. Pass nil for no explicit timestamp. - Timestamp *time.Time -} - -// Update goes through inserts and deletions and updates current cache in concurrency safe manner. -// If reset is set to true, all inserts and deletions are working on empty cache. In such case -// this implementation tries to reuse memory from existing cached item when possible. -// -// Update reuses insert struct memory, so after use, Insert slice and its elements cannot be reused -// outside of this method. -// TODO(bwplotka): Lack of copying can pose memory safety problems if insert variables are reused. Consider copying if value -// is different. Yet it gives significant allocation gains. -func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) error { - c.mMu.Lock() - defer c.mMu.Unlock() - - errs := prometheus.MultiError{} - for i := range inserts { - // TODO(bwplotka): Validate more about this insert? - if err := inserts[i].isValid(); err != nil { - errs.Append(err) - continue - } - - // Update metric family. - mf, ok := c.metricFamiliesByName[inserts[i].FQName] - if !ok { - mf = &family{ - MetricFamily: &dto.MetricFamily{}, - metricsByHash: map[uint64]*metric{}, - } - mf.Name = &inserts[i].FQName - } - if reset { - mf.touchState = c.desiredTouchState - } - - mf.Type = inserts[i].ValueType.ToDTO() - mf.Help = &inserts[i].Help - - c.metricFamiliesByName[inserts[i].FQName] = mf - - // Update metric pointer. - hSum := inserts[i].hash() - m, ok := mf.metricsByHash[hSum] - if !ok { - m = &metric{ - Metric: &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))}, - } - for j := range inserts[i].LabelNames { - m.Label = append(m.Label, &dto.LabelPair{ - Name: &inserts[i].LabelNames[j], - Value: &inserts[i].LabelValues[j], - }) - } - sort.Sort(internal.LabelPairSorter(m.Label)) - mf.needsRebuild = true - } - if reset { - m.touchState = c.desiredTouchState - } - - switch inserts[i].ValueType { - case prometheus.CounterValue: - v := m.Counter - if v == nil { - v = &dto.Counter{} - } - v.Value = &inserts[i].Value - m.Counter = v - m.Gauge = nil - m.Untyped = nil - case prometheus.GaugeValue: - v := m.Gauge - if v == nil { - v = &dto.Gauge{} - } - v.Value = &inserts[i].Value - m.Counter = nil - m.Gauge = v - m.Untyped = nil - case prometheus.UntypedValue: - v := m.Untyped - if v == nil { - v = &dto.Untyped{} - } - v.Value = &inserts[i].Value - m.Counter = nil - m.Gauge = nil - m.Untyped = v - default: - return fmt.Errorf("unsupported value type %v", inserts[i].ValueType) - } - - m.TimestampMs = nil - if inserts[i].Timestamp != nil { - m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) - } - mf.metricsByHash[hSum] = m - } - - for _, del := range deletions { - if err := del.isValid(); err != nil { - errs.Append(err) - continue - } - - mf, ok := c.metricFamiliesByName[del.FQName] - if !ok { - continue - } - - hSum := del.hash() - if _, ok := mf.metricsByHash[hSum]; !ok { - continue - } - - if len(mf.metricsByHash) == 1 { - delete(c.metricFamiliesByName, del.FQName) - continue - } - - mf.needsRebuild = true - delete(mf.metricsByHash, hSum) - } - - if reset { - // Trading off-time instead of memory allocated for otherwise needed replacement map. - for name, mf := range c.metricFamiliesByName { - if mf.touchState != c.desiredTouchState { - delete(c.metricFamiliesByName, name) - continue - } - for hash, m := range mf.metricsByHash { - if m.touchState != c.desiredTouchState { - delete(mf.metricsByHash, hash) - continue - } - } - if len(mf.metricsByHash) == 0 { - delete(c.metricFamiliesByName, name) - } - } - - // Avoid resetting state by flipping what we will expect in the next update. - c.desiredTouchState = !c.desiredTouchState - } - - for _, mf := range c.metricFamiliesByName { - if !mf.needsRebuild { - continue - } - - mf.Metric = mf.Metric[:0] - for _, m := range mf.metricsByHash { - mf.Metric = append(mf.Metric, m.Metric) - } - sort.Sort(internal.MetricSorter(mf.Metric)) - - mf.needsRebuild = false - } - - return errs.MaybeUnwrap() -} diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go deleted file mode 100644 index b0408377b..000000000 --- a/prometheus/cache/cache_test.go +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright 2022 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cache - -import ( - "fmt" - "strings" - "testing" - - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" -) - -func TestCachedTGatherer(t *testing.T) { - c := NewCachedTGatherer() - mfs, done, err := c.Gather() - if err != nil { - t.Error("gather failed:", err) - } - done() - if got := mfsToString(mfs); got != "" { - t.Error("unexpected metric family", got) - } - - if err := c.Update(false, []Insert{ - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help a", - ValueType: prometheus.GaugeValue, - Value: 1, - }, - { - Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help b", - ValueType: prometheus.GaugeValue, - Value: 1, - }, - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, - Help: "help a2", - ValueType: prometheus.CounterValue, - Value: 2, - }, - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, - Help: "help a2", - ValueType: prometheus.CounterValue, - Value: 2, - }, - }, []Key{ - {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. - }); err != nil { - t.Error("update:", err) - } - - mfs, done, err = c.Gather() - if err != nil { - t.Error("gather failed:", err) - } - done() - - const expected = "name:\"a\" help:\"help a2\" type:COUNTER metric: label: " + - "gauge: > metric: label: counter: > ,name:\"b\" help:\"help b\" " + - "type:GAUGE metric: label: gauge: > " - if got := mfsToString(mfs); got != expected { - t.Error("unexpected metric family, got", got) - } - - // Update with exactly same insertion should have the same effect. - if err := c.Update(false, []Insert{ - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help a", - ValueType: prometheus.GaugeValue, - Value: 1, - }, - { - Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help b", - ValueType: prometheus.GaugeValue, - Value: 1, - }, - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, - Help: "help a2", - ValueType: prometheus.CounterValue, - Value: 2, - }, - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, - Help: "help a2", - ValueType: prometheus.CounterValue, - Value: 2, - }, - }, []Key{ - {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. - }); err != nil { - t.Error("update:", err) - } - - mfs, done, err = c.Gather() - if err != nil { - t.Error("gather failed:", err) - } - done() - - if got := mfsToString(mfs); got != expected { - t.Error("unexpected metric family, got", got) - } - - // Update one element. - if err := c.Update(false, []Insert{ - { - Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help a12321", - ValueType: prometheus.CounterValue, - Value: 9999, - }, - }, nil); err != nil { - t.Error("update:", err) - } - - mfs, done, err = c.Gather() - if err != nil { - t.Error("gather failed:", err) - } - done() - - if got := mfsToString(mfs); got != "name:\"a\" help:\"help a12321\" type:COUNTER metric: label:"+ - " counter: > metric: label: counter: > ,name:\"b\" help:\"help b\" "+ - "type:GAUGE metric: label: gauge: > " { - t.Error("unexpected metric family, got", got) - } - - // Rebuild cache and insert only 2 elements. - if err := c.Update(true, []Insert{ - { - Key: Key{FQName: "ax", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help ax", - ValueType: prometheus.GaugeValue, - Value: 1, - }, - { - Key: Key{FQName: "bx", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, - Help: "help bx", - ValueType: prometheus.GaugeValue, - Value: 1, - }, - }, nil); err != nil { - t.Error("update:", err) - } - - mfs, done, err = c.Gather() - if err != nil { - t.Error("gather failed:", err) - } - done() - - if got := mfsToString(mfs); got != "name:\"ax\" help:\"help ax\" type:GAUGE metric: label:"+ - " gauge: > ,name:\"bx\" help:\"help bx\" type:GAUGE metric: label: gauge: > " { - t.Error("unexpected metric family, got", got) - } - - if err := c.Update(true, nil, nil); err != nil { - t.Error("update:", err) - } - - mfs, done, err = c.Gather() - if err != nil { - t.Error("gather failed:", err) - } - done() - if got := mfsToString(mfs); got != "" { - t.Error("unexpected metric family", got) - } -} - -func mfsToString(mfs []*dto.MetricFamily) string { - ret := make([]string, 0, len(mfs)) - for _, m := range mfs { - ret = append(ret, m.String()) - } - return strings.Join(ret, ",") -} - -// export var=v1 && go test -count 5 -benchtime 100x -run '^$' -bench . -memprofile=${var}.mem.pprof -cpuprofile=${var}.cpu.pprof > ${var}.txt -func BenchmarkCachedTGatherer_Update(b *testing.B) { - c := NewCachedTGatherer() - - // Generate larger metric payload. - inserts := make([]Insert, 0, 1e6) - - // 1000 metrics in 1000 families. - for i := 0; i < 1e3; i++ { - for j := 0; j < 1e3; j++ { - inserts = append(inserts, Insert{ - Key: Key{ - FQName: fmt.Sprintf("realistic_longer_name_%d", i), - LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, - LabelValues: []string{"realistic_label_value1", "realistic_label_value2", fmt.Sprintf("realistic_label_value3_%d", j)}}, - Help: "help string is usually quite large, so let's make it a bit realistic.", - ValueType: prometheus.GaugeValue, - Value: float64(j), - }) - } - } - - if err := c.Update(false, inserts, nil); err != nil { - b.Error("update:", err) - } - - if len(c.metricFamiliesByName) != 1e3 || len(c.metricFamiliesByName["realistic_longer_name_123"].metricsByHash) != 1e3 { - // Ensure we did not generate duplicates. - panic("generated data set gave wrong numbers") - } - - b.Run("Update of one element without reset", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - if err := c.Update(false, []Insert{ - { - Key: Key{ - FQName: "realistic_longer_name_334", - LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, - LabelValues: []string{"realistic_label_value1", "realistic_label_value2", "realistic_label_value3_2345"}}, - Help: "CUSTOM help string is usually quite large, so let's make it a bit realistic.", - ValueType: prometheus.CounterValue, - Value: 1929495, - }, - }, nil); err != nil { - b.Error("update:", err) - } - } - }) - - b.Run("Update of all elements with reset", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - if err := c.Update(true, inserts, nil); err != nil { - b.Error("update:", err) - } - } - }) - - b.Run("Gather", func(b *testing.B) { - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - mfs, done, err := c.Gather() - done() - if err != nil { - b.Error("update:", err) - } - testMfs = mfs - } - }) -} - -var testMfs []*dto.MetricFamily From cba52a75bec49867ad66afff648442ad2fa14eea Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 23 Feb 2022 11:36:02 +0100 Subject: [PATCH 11/12] Fixed tests. Signed-off-by: Bartlomiej Plotka --- prometheus/promhttp/http_test.go | 100 +++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/prometheus/promhttp/http_test.go b/prometheus/promhttp/http_test.go index 3e3db27e3..53204c5fc 100644 --- a/prometheus/promhttp/http_test.go +++ b/prometheus/promhttp/http_test.go @@ -20,12 +20,12 @@ import ( "log" "net/http" "net/http/httptest" + "strings" "testing" "time" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" - "github.com/stretchr/testify/require" ) type errorCollector struct{} @@ -120,8 +120,12 @@ func TestHandlerErrorHandling(t *testing.T) { Registry: reg, }) // Expect gatherer not touched. - require.Equal(t, 0, mReg.gatherInvoked) - require.Equal(t, 0, mReg.doneInvoked) + if got := mReg.gatherInvoked; got != 0 { + t.Fatalf("unexpected number of gather invokes, want 0, got %d", got) + } + if got := mReg.doneInvoked; got != 0 { + t.Fatalf("unexpected number of done invokes, want 0, got %d", got) + } wantMsg := `error gathering metrics: error collecting metric Desc{fqName: "invalid_metric", help: "not helpful", constLabels: {}, variableLabels: []}: collect error ` @@ -158,12 +162,21 @@ the_count 0 ` errorHandler.ServeHTTP(writer, request) - require.Equal(t, 1, mReg.gatherInvoked) - require.Equal(t, 1, mReg.doneInvoked) - - require.Equal(t, http.StatusInternalServerError, writer.Code) - require.Equal(t, wantMsg, logBuf.String()) - require.Equal(t, wantErrorBody, writer.Body.String()) + if got := mReg.gatherInvoked; got != 1 { + t.Fatalf("unexpected number of gather invokes, want 1, got %d", got) + } + if got := mReg.doneInvoked; got != 1 { + t.Fatalf("unexpected number of done invokes, want 1, got %d", got) + } + if got, want := writer.Code, http.StatusInternalServerError; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + if got, want := logBuf.String(), wantMsg; got != want { + t.Errorf("got log buf %q, want %q", got, want) + } + if got, want := writer.Body.String(), wantErrorBody; got != want { + t.Errorf("got body %q, want %q", got, want) + } logBuf.Reset() writer.Body.Reset() @@ -171,10 +184,18 @@ the_count 0 continueHandler.ServeHTTP(writer, request) - require.Equal(t, 2, mReg.gatherInvoked) - require.Equal(t, 2, mReg.doneInvoked) - require.Equal(t, http.StatusOK, writer.Code) - require.Equal(t, wantMsg, logBuf.String()) + if got := mReg.gatherInvoked; got != 2 { + t.Fatalf("unexpected number of gather invokes, want 2, got %d", got) + } + if got := mReg.doneInvoked; got != 2 { + t.Fatalf("unexpected number of done invokes, want 2, got %d", got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } + if got, want := logBuf.String(), wantMsg; got != want { + t.Errorf("got log buf %q, want %q", got, want) + } if got := writer.Body.String(); got != wantOKBody1 && got != wantOKBody2 { t.Errorf("got body %q, want either %q or %q", got, wantOKBody1, wantOKBody2) } @@ -183,8 +204,12 @@ the_count 0 if err := recover(); err == nil { t.Error("expected panic from panicHandler") } - require.Equal(t, 3, mReg.gatherInvoked) - require.Equal(t, 3, mReg.doneInvoked) + if got := mReg.gatherInvoked; got != 3 { + t.Fatalf("unexpected number of gather invokes, want 3, got %d", got) + } + if got := mReg.doneInvoked; got != 3 { + t.Fatalf("unexpected number of done invokes, want 3, got %d", got) + } }() panicHandler.ServeHTTP(writer, request) } @@ -200,23 +225,48 @@ func TestInstrumentMetricHandler(t *testing.T) { request.Header.Add("Accept", "test/plain") handler.ServeHTTP(writer, request) - require.Equal(t, 1, mReg.gatherInvoked) - require.Equal(t, 1, mReg.doneInvoked) + if got := mReg.gatherInvoked; got != 1 { + t.Fatalf("unexpected number of gather invokes, want 1, got %d", got) + } + if got := mReg.doneInvoked; got != 1 { + t.Fatalf("unexpected number of done invokes, want 1, got %d", got) + } + + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } - require.Equal(t, http.StatusOK, writer.Code) - require.Contains(t, writer.Body.String(), "promhttp_metric_handler_requests_in_flight 1\n") - require.Contains(t, writer.Body.String(), "promhttp_metric_handler_requests_total{code=\"200\"} 0\n") + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := writer.Body.String(); !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q", got, want) + } + want = "promhttp_metric_handler_requests_total{code=\"200\"} 0\n" + if got := writer.Body.String(); !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q", got, want) + } for i := 0; i < 100; i++ { writer.Body.Reset() handler.ServeHTTP(writer, request) - require.Equal(t, i+2, mReg.gatherInvoked) - require.Equal(t, i+2, mReg.doneInvoked) + if got, want := mReg.gatherInvoked, i+2; got != want { + t.Fatalf("unexpected number of gather invokes, want %d, got %d", want, got) + } + if got, want := mReg.doneInvoked, i+2; got != want { + t.Fatalf("unexpected number of done invokes, want %d, got %d", want, got) + } + if got, want := writer.Code, http.StatusOK; got != want { + t.Errorf("got HTTP status code %d, want %d", got, want) + } - require.Equal(t, http.StatusOK, writer.Code) - require.Contains(t, writer.Body.String(), "promhttp_metric_handler_requests_in_flight 1\n") - require.Contains(t, writer.Body.String(), fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1)) + want := "promhttp_metric_handler_requests_in_flight 1\n" + if got := writer.Body.String(); !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q", got, want) + } + want = fmt.Sprintf("promhttp_metric_handler_requests_total{code=\"200\"} %d\n", i+1) + if got := writer.Body.String(); !strings.Contains(got, want) { + t.Errorf("got body %q, does not contain %q", got, want) + } } } From 29c767163ce95444af6a0b0080948bb56af6b763 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 23 Feb 2022 11:37:11 +0100 Subject: [PATCH 12/12] Re-add cache. Signed-off-by: Bartlomiej Plotka --- prometheus/cache/cache.go | 302 +++++++++++++++++++++++++++++++++ prometheus/cache/cache_test.go | 275 ++++++++++++++++++++++++++++++ 2 files changed, 577 insertions(+) create mode 100644 prometheus/cache/cache.go create mode 100644 prometheus/cache/cache_test.go diff --git a/prometheus/cache/cache.go b/prometheus/cache/cache.go new file mode 100644 index 000000000..ad2193559 --- /dev/null +++ b/prometheus/cache/cache.go @@ -0,0 +1,302 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/cespare/xxhash/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + + //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility. + "github.com/golang/protobuf/proto" + "github.com/prometheus/client_golang/prometheus/internal" + dto "github.com/prometheus/client_model/go" +) + +var _ prometheus.TransactionalGatherer = &CachedTGatherer{} + +var separatorByteSlice = []byte{model.SeparatorByte} // For convenient use with xxhash. + +// CachedTGatherer is a transactional gatherer that allows maintaining a set of metrics which +// change less frequently than scrape time, yet label values and values change over time. +// +// If you happen to use NewDesc, NewConstMetric or MustNewConstMetric inside Collector.Collect routine, consider +// using CachedTGatherer instead. +// +// Use CachedTGatherer with classic Registry using NewMultiTRegistry and ToTransactionalGatherer helpers. +// NOTE(bwplotka): Experimental, API and behaviour can change. +type CachedTGatherer struct { + metricFamiliesByName map[string]*family + mMu sync.RWMutex + + desiredTouchState bool +} + +func NewCachedTGatherer() *CachedTGatherer { + return &CachedTGatherer{ + desiredTouchState: true, + metricFamiliesByName: map[string]*family{}, + } +} + +type family struct { + *dto.MetricFamily + + metricsByHash map[uint64]*metric + touchState bool + needsRebuild bool +} + +type metric struct { + *dto.Metric + touchState bool +} + +// normalizeMetricFamilies returns a MetricFamily slice with empty +// MetricFamilies pruned and the remaining MetricFamilies sorted by name within +// the slice, with the contained Metrics sorted within each MetricFamily. +func normalizeMetricFamilies(metricFamiliesByName map[string]*family) []*dto.MetricFamily { + names := make([]string, 0, len(metricFamiliesByName)) + for name, mf := range metricFamiliesByName { + if len(mf.Metric) > 0 { + names = append(names, name) + } + } + sort.Strings(names) + result := make([]*dto.MetricFamily, 0, len(names)) + for _, name := range names { + result = append(result, metricFamiliesByName[name].MetricFamily) + } + return result +} + +// Gather implements TransactionalGatherer interface. +func (c *CachedTGatherer) Gather() (_ []*dto.MetricFamily, done func(), err error) { + c.mMu.RLock() + return normalizeMetricFamilies(c.metricFamiliesByName), c.mMu.RUnlock, nil +} + +type Key struct { + FQName string // __name__ + + // Label names can be unsorted, we will be sorting them later. The only implication is cachability if + // consumer provide non-deterministic order of those. + LabelNames []string + LabelValues []string +} + +func (k Key) isValid() error { + if k.FQName == "" { + return errors.New("FQName cannot be empty") + } + if len(k.LabelNames) != len(k.LabelValues) { + return errors.New("new metric: label name has different length than values") + } + + return nil +} + +// hash returns unique hash for this key. +func (k Key) hash() uint64 { + h := xxhash.New() + h.WriteString(k.FQName) + h.Write(separatorByteSlice) + + for i := range k.LabelNames { + h.WriteString(k.LabelNames[i]) + h.Write(separatorByteSlice) + h.WriteString(k.LabelValues[i]) + h.Write(separatorByteSlice) + } + return h.Sum64() +} + +// Insert represents record to set in cache. +type Insert struct { + Key + + Help string + ValueType prometheus.ValueType + Value float64 + + // Timestamp is optional. Pass nil for no explicit timestamp. + Timestamp *time.Time +} + +// Update goes through inserts and deletions and updates current cache in concurrency safe manner. +// If reset is set to true, all inserts and deletions are working on empty cache. In such case +// this implementation tries to reuse memory from existing cached item when possible. +// +// Update reuses insert struct memory, so after use, Insert slice and its elements cannot be reused +// outside of this method. +// TODO(bwplotka): Lack of copying can pose memory safety problems if insert variables are reused. Consider copying if value +// is different. Yet it gives significant allocation gains. +func (c *CachedTGatherer) Update(reset bool, inserts []Insert, deletions []Key) error { + c.mMu.Lock() + defer c.mMu.Unlock() + + errs := prometheus.MultiError{} + for i := range inserts { + // TODO(bwplotka): Validate more about this insert? + if err := inserts[i].isValid(); err != nil { + errs.Append(err) + continue + } + + // Update metric family. + mf, ok := c.metricFamiliesByName[inserts[i].FQName] + if !ok { + mf = &family{ + MetricFamily: &dto.MetricFamily{}, + metricsByHash: map[uint64]*metric{}, + } + mf.Name = &inserts[i].FQName + } + if reset { + mf.touchState = c.desiredTouchState + } + + mf.Type = inserts[i].ValueType.ToDTO() + mf.Help = &inserts[i].Help + + c.metricFamiliesByName[inserts[i].FQName] = mf + + // Update metric pointer. + hSum := inserts[i].hash() + m, ok := mf.metricsByHash[hSum] + if !ok { + m = &metric{ + Metric: &dto.Metric{Label: make([]*dto.LabelPair, 0, len(inserts[i].LabelNames))}, + } + for j := range inserts[i].LabelNames { + m.Label = append(m.Label, &dto.LabelPair{ + Name: &inserts[i].LabelNames[j], + Value: &inserts[i].LabelValues[j], + }) + } + sort.Sort(internal.LabelPairSorter(m.Label)) + mf.needsRebuild = true + } + if reset { + m.touchState = c.desiredTouchState + } + + switch inserts[i].ValueType { + case prometheus.CounterValue: + v := m.Counter + if v == nil { + v = &dto.Counter{} + } + v.Value = &inserts[i].Value + m.Counter = v + m.Gauge = nil + m.Untyped = nil + case prometheus.GaugeValue: + v := m.Gauge + if v == nil { + v = &dto.Gauge{} + } + v.Value = &inserts[i].Value + m.Counter = nil + m.Gauge = v + m.Untyped = nil + case prometheus.UntypedValue: + v := m.Untyped + if v == nil { + v = &dto.Untyped{} + } + v.Value = &inserts[i].Value + m.Counter = nil + m.Gauge = nil + m.Untyped = v + default: + return fmt.Errorf("unsupported value type %v", inserts[i].ValueType) + } + + m.TimestampMs = nil + if inserts[i].Timestamp != nil { + m.TimestampMs = proto.Int64(inserts[i].Timestamp.Unix()*1000 + int64(inserts[i].Timestamp.Nanosecond()/1000000)) + } + mf.metricsByHash[hSum] = m + } + + for _, del := range deletions { + if err := del.isValid(); err != nil { + errs.Append(err) + continue + } + + mf, ok := c.metricFamiliesByName[del.FQName] + if !ok { + continue + } + + hSum := del.hash() + if _, ok := mf.metricsByHash[hSum]; !ok { + continue + } + + if len(mf.metricsByHash) == 1 { + delete(c.metricFamiliesByName, del.FQName) + continue + } + + mf.needsRebuild = true + delete(mf.metricsByHash, hSum) + } + + if reset { + // Trading off-time instead of memory allocated for otherwise needed replacement map. + for name, mf := range c.metricFamiliesByName { + if mf.touchState != c.desiredTouchState { + delete(c.metricFamiliesByName, name) + continue + } + for hash, m := range mf.metricsByHash { + if m.touchState != c.desiredTouchState { + delete(mf.metricsByHash, hash) + continue + } + } + if len(mf.metricsByHash) == 0 { + delete(c.metricFamiliesByName, name) + } + } + + // Avoid resetting state by flipping what we will expect in the next update. + c.desiredTouchState = !c.desiredTouchState + } + + for _, mf := range c.metricFamiliesByName { + if !mf.needsRebuild { + continue + } + + mf.Metric = mf.Metric[:0] + for _, m := range mf.metricsByHash { + mf.Metric = append(mf.Metric, m.Metric) + } + sort.Sort(internal.MetricSorter(mf.Metric)) + + mf.needsRebuild = false + } + + return errs.MaybeUnwrap() +} diff --git a/prometheus/cache/cache_test.go b/prometheus/cache/cache_test.go new file mode 100644 index 000000000..b0408377b --- /dev/null +++ b/prometheus/cache/cache_test.go @@ -0,0 +1,275 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +func TestCachedTGatherer(t *testing.T) { + c := NewCachedTGatherer() + mfs, done, err := c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if got := mfsToString(mfs); got != "" { + t.Error("unexpected metric family", got) + } + + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help b", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + }, []Key{ + {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. + }); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + const expected = "name:\"a\" help:\"help a2\" type:COUNTER metric: label: " + + "gauge: > metric: label: counter: > ,name:\"b\" help:\"help b\" " + + "type:GAUGE metric: label: gauge: > " + if got := mfsToString(mfs); got != expected { + t.Error("unexpected metric family, got", got) + } + + // Update with exactly same insertion should have the same effect. + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "b", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help b", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc2"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, + Help: "help a2", + ValueType: prometheus.CounterValue, + Value: 2, + }, + }, []Key{ + {FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc3"}}, // Does not make much sense, but deletion works as expected. + }); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != expected { + t.Error("unexpected metric family, got", got) + } + + // Update one element. + if err := c.Update(false, []Insert{ + { + Key: Key{FQName: "a", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help a12321", + ValueType: prometheus.CounterValue, + Value: 9999, + }, + }, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != "name:\"a\" help:\"help a12321\" type:COUNTER metric: label:"+ + " counter: > metric: label: counter: > ,name:\"b\" help:\"help b\" "+ + "type:GAUGE metric: label: gauge: > " { + t.Error("unexpected metric family, got", got) + } + + // Rebuild cache and insert only 2 elements. + if err := c.Update(true, []Insert{ + { + Key: Key{FQName: "ax", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help ax", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + { + Key: Key{FQName: "bx", LabelNames: []string{"b", "c"}, LabelValues: []string{"valb", "valc"}}, + Help: "help bx", + ValueType: prometheus.GaugeValue, + Value: 1, + }, + }, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + + if got := mfsToString(mfs); got != "name:\"ax\" help:\"help ax\" type:GAUGE metric: label:"+ + " gauge: > ,name:\"bx\" help:\"help bx\" type:GAUGE metric: label: gauge: > " { + t.Error("unexpected metric family, got", got) + } + + if err := c.Update(true, nil, nil); err != nil { + t.Error("update:", err) + } + + mfs, done, err = c.Gather() + if err != nil { + t.Error("gather failed:", err) + } + done() + if got := mfsToString(mfs); got != "" { + t.Error("unexpected metric family", got) + } +} + +func mfsToString(mfs []*dto.MetricFamily) string { + ret := make([]string, 0, len(mfs)) + for _, m := range mfs { + ret = append(ret, m.String()) + } + return strings.Join(ret, ",") +} + +// export var=v1 && go test -count 5 -benchtime 100x -run '^$' -bench . -memprofile=${var}.mem.pprof -cpuprofile=${var}.cpu.pprof > ${var}.txt +func BenchmarkCachedTGatherer_Update(b *testing.B) { + c := NewCachedTGatherer() + + // Generate larger metric payload. + inserts := make([]Insert, 0, 1e6) + + // 1000 metrics in 1000 families. + for i := 0; i < 1e3; i++ { + for j := 0; j < 1e3; j++ { + inserts = append(inserts, Insert{ + Key: Key{ + FQName: fmt.Sprintf("realistic_longer_name_%d", i), + LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, + LabelValues: []string{"realistic_label_value1", "realistic_label_value2", fmt.Sprintf("realistic_label_value3_%d", j)}}, + Help: "help string is usually quite large, so let's make it a bit realistic.", + ValueType: prometheus.GaugeValue, + Value: float64(j), + }) + } + } + + if err := c.Update(false, inserts, nil); err != nil { + b.Error("update:", err) + } + + if len(c.metricFamiliesByName) != 1e3 || len(c.metricFamiliesByName["realistic_longer_name_123"].metricsByHash) != 1e3 { + // Ensure we did not generate duplicates. + panic("generated data set gave wrong numbers") + } + + b.Run("Update of one element without reset", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := c.Update(false, []Insert{ + { + Key: Key{ + FQName: "realistic_longer_name_334", + LabelNames: []string{"realistic_label_name1", "realistic_label_name2", "realistic_label_name3"}, + LabelValues: []string{"realistic_label_value1", "realistic_label_value2", "realistic_label_value3_2345"}}, + Help: "CUSTOM help string is usually quite large, so let's make it a bit realistic.", + ValueType: prometheus.CounterValue, + Value: 1929495, + }, + }, nil); err != nil { + b.Error("update:", err) + } + } + }) + + b.Run("Update of all elements with reset", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if err := c.Update(true, inserts, nil); err != nil { + b.Error("update:", err) + } + } + }) + + b.Run("Gather", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + mfs, done, err := c.Gather() + done() + if err != nil { + b.Error("update:", err) + } + testMfs = mfs + } + }) +} + +var testMfs []*dto.MetricFamily