Skip to content

merge Prometheus metrics for removed user to avoid having to recompute metrics from scratch everytime #4813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 190 additions & 5 deletions pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"fmt"
"math"
"strings"
"sync"

"github.com/gogo/protobuf/proto"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
Expand Down Expand Up @@ -562,13 +565,14 @@ type UserRegistry struct {
// UserRegistries holds Prometheus registries for multiple users, guaranteeing
// multi-thread safety and stable ordering.
type UserRegistries struct {
regsMu sync.Mutex
regs []UserRegistry
regsMu sync.Mutex
regs []UserRegistry
removedMetrics MetricFamilyMap
}

// NewUserRegistries makes new UserRegistries.
func NewUserRegistries() *UserRegistries {
return &UserRegistries{}
return &UserRegistries{removedMetrics: make(MetricFamilyMap)}
}

// AddUserRegistry adds an user registry. If user already has a registry,
Expand Down Expand Up @@ -647,15 +651,21 @@ func (r *UserRegistries) softRemoveUserRegistry(ur *UserRegistry) bool {
return false
}

ur.lastGather, err = NewMetricFamilyMap(last)
gatheredMetrics, err := NewMetricFamilyMap(last)
if err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to gather metrics from registry", "user", ur.user, "err", err)
return false
}

aggregatedMetrics, err := MergeMetricFamilies([]MetricFamilyMap{gatheredMetrics, r.removedMetrics})
if err != nil {
level.Warn(util_log.Logger).Log("msg", "failed to merge metrics", "user", ur.user, "err", err)
return false
}
r.removedMetrics = aggregatedMetrics
ur.user = ""
ur.reg = nil
return true
return false
}

// Registries returns a copy of the user registries list.
Expand Down Expand Up @@ -704,6 +714,11 @@ func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser {
continue
}
}
data = append(data, struct {
user string
metrics MetricFamilyMap
}{
user: "", metrics: r.removedMetrics})
return data
}

Expand Down Expand Up @@ -805,3 +820,173 @@ type CollectorVec interface {
prometheus.Collector
Delete(labels prometheus.Labels) bool
}

type MergedMetricFamily struct {
metricFamily *dto.MetricFamily
metricMap MetricMap
}

func (m *MergedMetricFamily) CreateMetricFamily() *dto.MetricFamily {
newMetricFamily := proto.Clone(m.metricFamily).(*dto.MetricFamily)
var metrics []*dto.Metric

for _, metric := range m.metricMap.metrics {
for _, m := range metric {
metrics = append(metrics, &m.metric)
}
}

newMetricFamily.Metric = metrics
return newMetricFamily
}

// MergeMetricFamilies - Capable of merging summaries, histograms, and counters
func MergeMetricFamilies(metricFamilies []MetricFamilyMap) (MetricFamilyMap, error) {
mergedMap := make(map[string]*MergedMetricFamily)

for _, mf := range metricFamilies {
for metricName, metricFamily := range mf {
mergeFunc, err := getMergeFunc(metricFamily.GetType())
if err != nil {
return nil, err
}

if _, found := mergedMap[metricName]; !found {
mergedMap[metricName] = &MergedMetricFamily{metricFamily: proto.Clone(metricFamily).(*dto.MetricFamily), metricMap: NewMetricMap()}
}

for _, metric := range metricFamily.Metric {
(mergedMap[metricName].metricMap).AddOrSetMetric(*metric, mergeFunc)
}
}
}

metricFamilyMap := make(MetricFamilyMap)
for metricName, mergedMetricFamily := range mergedMap {
metricFamilyMap[metricName] = mergedMetricFamily.CreateMetricFamily()
}

return metricFamilyMap, nil
}

func getMergeFunc(metricType dto.MetricType) (func(existing *dto.Metric, new *dto.Metric), error) {
switch metricType {
case dto.MetricType_SUMMARY:
return mergeSummary, nil
case dto.MetricType_COUNTER:
return mergeCounter, nil
case dto.MetricType_HISTOGRAM:
return mergeHistogram, nil
default:
return nil, fmt.Errorf("unknown metric type: %v", metricType)
}
}

func mergeCounter(mf1, mf2 *dto.Metric) {
newValue := *mf1.Counter.Value + *mf2.Counter.Value
mf1.Counter.Value = &newValue
}

func mergeHistogram(mf1, mf2 *dto.Metric) {
bucketMap := map[float64]uint64{}

for _, bucket := range append(mf1.Histogram.GetBucket(), mf2.Histogram.GetBucket()...) {
bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount()
}

var newBucket []*dto.Bucket
for upperBound, cumulativeCount := range bucketMap {
ubValue := upperBound
ccValue := cumulativeCount
newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue})
}

newSampleCount := *mf1.Histogram.SampleCount + *mf2.Histogram.SampleCount
newSampleSum := *mf1.Histogram.SampleSum + *mf2.Histogram.SampleSum
mf1.Histogram.Bucket = newBucket
mf1.Histogram.SampleCount = &newSampleCount
mf1.Histogram.SampleSum = &newSampleSum
}

func mergeSummary(mf1 *dto.Metric, mf2 *dto.Metric) {
newSampleCount := *mf1.Summary.SampleCount + *mf2.Summary.SampleCount
newSampleSum := *mf1.Summary.SampleSum + *mf2.Summary.SampleSum

// we are not merging the Quantiles themselves because there's no operation that makes sense
mf1.Summary.Quantile = []*dto.Quantile{}
mf1.Summary.SampleCount = &newSampleCount
mf1.Summary.SampleSum = &newSampleSum
}

type MetricMap struct {
metrics map[string][]*Metric
lock sync.Mutex
}

type Metric struct {
metric dto.Metric
lock sync.Mutex
}

func NewMetricMap() MetricMap {
return MetricMap{
metrics: make(map[string][]*Metric),
}
}

// AddOrSetMetric - given a metric, see if there's another metric with the same labels. If not, add metric to list
// If yes, call mergeFn to merge the two metrics in-place, and updating existing metric
func (m *MetricMap) AddOrSetMetric(metric dto.Metric, mergeFn func(existing *dto.Metric, new *dto.Metric)) {
var metricLabels []string
for _, labelPair := range metric.GetLabel() {
metricLabels = append(metricLabels, fmt.Sprintf("%s=%s", labelPair.GetName(), labelPair.GetValue()))
}

metricKey := strings.Join(metricLabels, ",")

m.lock.Lock()
defer m.lock.Unlock()

if metrics, found := m.metrics[metricKey]; found {
// we might have hash collision, so let's iterate through the list to make sure the item is actually what we want
for _, existingMetric := range metrics {
same := m.compareLabels(existingMetric.metric.GetLabel(), metric.GetLabel())
if same {
existingMetric.lock.Lock()
mergeFn(&existingMetric.metric, &metric)
existingMetric.lock.Unlock()
return
}
}
// only get there if we don't have the same metric, so let's append it
m.metrics[metricKey] = append(m.metrics[metricKey], &Metric{metric: metric})
return
}

// no such key, so let's add it
m.metrics[metricKey] = []*Metric{{metric: metric}}
}

func (m *MetricMap) compareLabels(labels1, labels2 []*dto.LabelPair) bool {
if len(labels1) != len(labels2) {
return false
}

// create a map of labels for lookup
labelMap := make(map[string]string)
for _, labelPair := range labels1 {
labelMap[labelPair.GetName()] = labelPair.GetValue()
}

for _, labelPair := range labels2 {
if value, found := labelMap[labelPair.GetName()]; found {
if value != labelPair.GetValue() {
return false
}
} else {
return false
}
}

return true
}
103 changes: 103 additions & 0 deletions pkg/util/metrics_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,85 @@ func TestUserRegistries_RemoveUserRegistry_SoftRemoval(t *testing.T) {
summary_user_sum{user="5"} 25
summary_user_count{user="5"} 5
`)))

tm.regs.RemoveUserRegistry(strconv.Itoa(4), false)
require.NoError(t, testutil.GatherAndCompare(mainRegistry, bytes.NewBufferString(`
# HELP counter help
# TYPE counter counter
# No change in counter
counter 75

# HELP counter_labels help
# TYPE counter_labels counter
# No change in counter per label.
counter_labels{label_one="a"} 75

# HELP counter_user help
# TYPE counter_user counter
# User 3 is now missing.
counter_user{user="1"} 5
counter_user{user="2"} 10
counter_user{user="5"} 25

# HELP gauge help
# TYPE gauge gauge
# Drop in the gauge (value 3, counted 5 times)
gauge 40

# HELP gauge_labels help
# TYPE gauge_labels gauge
# Drop in the gauge (value 3, counted 5 times)
gauge_labels{label_one="a"} 40

# HELP gauge_user help
# TYPE gauge_user gauge
# User 3 is now missing.
gauge_user{user="1"} 5
gauge_user{user="2"} 10
gauge_user{user="5"} 25

# HELP histogram help
# TYPE histogram histogram
# No change in the histogram
histogram_bucket{le="1"} 5
histogram_bucket{le="3"} 15
histogram_bucket{le="5"} 25
histogram_bucket{le="+Inf"} 25
histogram_sum 75
histogram_count 25

# HELP histogram_labels help
# TYPE histogram_labels histogram
# No change in the histogram per label
histogram_labels_bucket{label_one="a",le="1"} 5
histogram_labels_bucket{label_one="a",le="3"} 15
histogram_labels_bucket{label_one="a",le="5"} 25
histogram_labels_bucket{label_one="a",le="+Inf"} 25
histogram_labels_sum{label_one="a"} 75
histogram_labels_count{label_one="a"} 25

# HELP summary help
# TYPE summary summary
# No change in the summary
summary_sum 75
summary_count 25

# HELP summary_labels help
# TYPE summary_labels summary
# No change in the summary per label
summary_labels_sum{label_one="a"} 75
summary_labels_count{label_one="a"} 25

# HELP summary_user help
# TYPE summary_user summary
# Summary for user 3 is now missing.
summary_user_sum{user="1"} 5
summary_user_count{user="1"} 5
summary_user_sum{user="2"} 10
summary_user_count{user="2"} 5
summary_user_sum{user="5"} 25
summary_user_count{user="5"} 5
`)))
}
func TestUserRegistries_RemoveUserRegistry_HardRemoval(t *testing.T) {
tm := setupTestMetrics()
Expand Down Expand Up @@ -1144,6 +1223,30 @@ func TestGetLabels(t *testing.T) {
})
}

func TestMergeMetricFamilies(t *testing.T) {
tm := setupTestMetrics()

var list []MetricFamilyMap
for _, registry := range tm.regs.regs {
mfs, err := registry.reg.Gather()
var filteredMf []*dto.MetricFamily
for _, metric := range mfs {
if metric.GetType() == dto.MetricType_GAUGE {
continue
}
filteredMf = append(filteredMf, metric)
}
require.NoError(t, err)
mfm, err := NewMetricFamilyMap(filteredMf)
require.NoError(t, err)
list = append(list, mfm)
}

_, err := MergeMetricFamilies(list)

require.NoError(t, err)
}

func verifyLabels(t *testing.T, m prometheus.Collector, filter map[string]string, expectedLabels []labels.Labels) {
result, err := GetLabels(m, filter)
require.NoError(t, err)
Expand Down