From 8ca273bdefb5c3ab1eddd1e22012b11bbb732348 Mon Sep 17 00:00:00 2001 From: tanghengjian <1040104807@qq.com> Date: Tue, 1 Nov 2022 16:57:24 +0800 Subject: [PATCH] Push reduce one hash operation of Labels. Signed-off-by: tanghengjian <1040104807@qq.com> --- CHANGELOG.md | 1 + pkg/ingester/active_series.go | 31 ++------------ pkg/ingester/active_series_test.go | 41 +++++++++++-------- pkg/ingester/ingester.go | 6 ++- .../prometheus/storage/interface.go | 2 +- .../prometheus/prometheus/tsdb/db.go | 4 +- .../prometheus/prometheus/tsdb/head_append.go | 8 ++-- 7 files changed, 40 insertions(+), 53 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 368a385fa4e..adcac571a79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ * [ENHANCEMENT] Enhance traces with hostname information. #4898 * [ENHANCEMENT] Improve the documentation around limits. #4905 * [ENHANCEMENT] Distributor: cache user overrides to reduce lock contention. #4904 +* [ENHANCEMENT] Push reduce one hash operation of Labels. #4945 * [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [FEATURE] Compactor: Added `-compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787 * [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818 diff --git a/pkg/ingester/active_series.go b/pkg/ingester/active_series.go index ba3e2760d37..5285f279639 100644 --- a/pkg/ingester/active_series.go +++ b/pkg/ingester/active_series.go @@ -1,17 +1,12 @@ package ingester import ( - "hash" "math" "sync" "time" - "github.com/cespare/xxhash" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" - - "github.com/cortexproject/cortex/pkg/util" ) const ( @@ -53,30 +48,10 @@ func NewActiveSeries() *ActiveSeries { } // Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet. -func (c *ActiveSeries) UpdateSeries(series labels.Labels, now time.Time, labelsCopy func(labels.Labels) labels.Labels) { - fp := fingerprint(series) - stripeID := fp % numActiveSeriesStripes - - c.stripes[stripeID].updateSeriesTimestamp(now, series, fp, labelsCopy) -} - -var sep = []byte{model.SeparatorByte} - -var hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} - -func fingerprint(series labels.Labels) uint64 { - sum := hashPool.Get().(hash.Hash64) - defer hashPool.Put(sum) - - sum.Reset() - for _, label := range series { - _, _ = sum.Write(util.YoloBuf(label.Name)) - _, _ = sum.Write(sep) - _, _ = sum.Write(util.YoloBuf(label.Value)) - _, _ = sum.Write(sep) - } +func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) { + stripeID := hash % numActiveSeriesStripes - return sum.Sum64() + c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy) } // Purge removes expired entries from the cache. This function should be called diff --git a/pkg/ingester/active_series_test.go b/pkg/ingester/active_series_test.go index ab3c63db15d..02a5477194c 100644 --- a/pkg/ingester/active_series_test.go +++ b/pkg/ingester/active_series_test.go @@ -8,6 +8,7 @@ import ( "sync" "testing" "time" + "unsafe" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" @@ -15,20 +16,25 @@ import ( func copyFn(l labels.Labels) labels.Labels { return l } +func fromLabelToLabels(ls []labels.Label) labels.Labels { + return *(*labels.Labels)(unsafe.Pointer(&ls)) +} + func TestActiveSeries_UpdateSeries(t *testing.T) { ls1 := []labels.Label{{Name: "a", Value: "1"}} ls2 := []labels.Label{{Name: "a", Value: "2"}} c := NewActiveSeries() assert.Equal(t, 0, c.Active()) - - c.UpdateSeries(ls1, time.Now(), copyFn) + labels1Hash := fromLabelToLabels(ls1).Hash() + labels2Hash := fromLabelToLabels(ls2).Hash() + c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn) assert.Equal(t, 1, c.Active()) - c.UpdateSeries(ls1, time.Now(), copyFn) + c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn) assert.Equal(t, 1, c.Active()) - c.UpdateSeries(ls2, time.Now(), copyFn) + c.UpdateSeries(ls2, labels2Hash, time.Now(), copyFn) assert.Equal(t, 2, c.Active()) } @@ -46,7 +52,7 @@ func TestActiveSeries_Purge(t *testing.T) { c := NewActiveSeries() for i := 0; i < len(series); i++ { - c.UpdateSeries(series[i], time.Unix(int64(i), 0), copyFn) + c.UpdateSeries(series[i], fromLabelToLabels(series[i]).Hash(), time.Unix(int64(i), 0), copyFn) } c.Purge(time.Unix(int64(ttl+1), 0)) @@ -62,24 +68,23 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { metric := labels.NewBuilder(labels.FromStrings("__name__", "logs")) ls1 := metric.Set("_", "ypfajYg2lsv").Labels(nil) ls2 := metric.Set("_", "KiqbryhzUpn").Labels(nil) - c := NewActiveSeries() now := time.Now() - c.UpdateSeries(ls1, now.Add(-2*time.Minute), copyFn) - c.UpdateSeries(ls2, now, copyFn) + c.UpdateSeries(ls1, ls1.Hash(), now.Add(-2*time.Minute), copyFn) + c.UpdateSeries(ls2, ls2.Hash(), now, copyFn) c.Purge(now) assert.Equal(t, 1, c.Active()) - c.UpdateSeries(ls1, now.Add(-1*time.Minute), copyFn) - c.UpdateSeries(ls2, now, copyFn) + c.UpdateSeries(ls1, ls1.Hash(), now.Add(-1*time.Minute), copyFn) + c.UpdateSeries(ls2, ls2.Hash(), now, copyFn) c.Purge(now) assert.Equal(t, 1, c.Active()) // This will *not* update the series, since there is already newer timestamp. - c.UpdateSeries(ls2, now.Add(-1*time.Minute), copyFn) + c.UpdateSeries(ls2, ls2.Hash(), now.Add(-1*time.Minute), copyFn) c.Purge(now) assert.Equal(t, 1, c.Active()) @@ -105,7 +110,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int) wg := &sync.WaitGroup{} start := make(chan struct{}) max := int(math.Ceil(float64(b.N) / float64(goroutines))) - + labelhash := series.Hash() for i := 0; i < goroutines; i++ { wg.Add(1) go func() { @@ -116,7 +121,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int) for ix := 0; ix < max; ix++ { now = now.Add(time.Duration(ix) * time.Millisecond) - c.UpdateSeries(series, now, copyFn) + c.UpdateSeries(series, labelhash, now, copyFn) } }() } @@ -137,15 +142,17 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { name := nameBuf.String() series := make([]labels.Labels, b.N) + labelhash := make([]uint64, b.N) for s := 0; s < b.N; s++ { series[s] = labels.Labels{{Name: name, Value: name + strconv.Itoa(s)}} + labelhash[s] = series[s].Hash() } now := time.Now().UnixNano() b.ResetTimer() for ix := 0; ix < b.N; ix++ { - c.UpdateSeries(series[ix], time.Unix(0, now+int64(ix)), copyFn) + c.UpdateSeries(series[ix], labelhash[ix], time.Unix(0, now+int64(ix)), copyFn) } } @@ -165,8 +172,10 @@ func benchmarkPurge(b *testing.B, twice bool) { c := NewActiveSeries() series := [numSeries]labels.Labels{} + labelhash := [numSeries]uint64{} for s := 0; s < numSeries; s++ { series[s] = labels.Labels{{Name: "a", Value: strconv.Itoa(s)}} + labelhash[s] = series[s].Hash() } for i := 0; i < b.N; i++ { @@ -175,9 +184,9 @@ func benchmarkPurge(b *testing.B, twice bool) { // Prepare series for ix, s := range series { if ix < numExpiresSeries { - c.UpdateSeries(s, now.Add(-time.Minute), copyFn) + c.UpdateSeries(s, labelhash[ix], now.Add(-time.Minute), copyFn) } else { - c.UpdateSeries(s, now, copyFn) + c.UpdateSeries(s, labelhash[ix], now, copyFn) } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 91ad41f0e70..2f45f560380 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -963,7 +963,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte // has sorted labels once hit the ingester). // Look up a reference for this series. - ref, copiedLabels := app.GetRef(cortexpb.FromLabelAdaptersToLabels(ts.Labels)) + tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels) + tsLabelsHash := tsLabels.Hash() + ref, copiedLabels := app.GetRef(tsLabels, tsLabelsHash) // To find out if any sample was added to this series, we keep old value. oldSucceededSamplesCount := succeededSamplesCount @@ -1034,7 +1036,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte } if i.cfg.ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount { - db.activeSeries.UpdateSeries(cortexpb.FromLabelAdaptersToLabels(ts.Labels), startAppend, func(l labels.Labels) labels.Labels { + db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels { // we must already have copied the labels if succeededSamplesCount has been incremented. return copiedLabels }) diff --git a/vendor/github.com/prometheus/prometheus/storage/interface.go b/vendor/github.com/prometheus/prometheus/storage/interface.go index d73ec72203a..a0a5f291951 100644 --- a/vendor/github.com/prometheus/prometheus/storage/interface.go +++ b/vendor/github.com/prometheus/prometheus/storage/interface.go @@ -237,7 +237,7 @@ type GetRef interface { // Returns reference number that can be used to pass to Appender.Append(), // and a set of labels that will not cause another copy when passed to Appender.Append(). // 0 means the appender does not have a reference to this series. - GetRef(lset labels.Labels) (SeriesRef, labels.Labels) + GetRef(lset labels.Labels, hash uint64) (SeriesRef, labels.Labels) } // ExemplarAppender provides an interface for adding samples to exemplar storage, which diff --git a/vendor/github.com/prometheus/prometheus/tsdb/db.go b/vendor/github.com/prometheus/prometheus/tsdb/db.go index 854918c369b..7d5c59d73ce 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/db.go @@ -983,9 +983,9 @@ type dbAppender struct { var _ storage.GetRef = dbAppender{} -func (a dbAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Labels) { +func (a dbAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) { if g, ok := a.Appender.(storage.GetRef); ok { - return g.GetRef(lset) + return g.GetRef(lset, hash) } return 0, nil } diff --git a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go index f843aa1ec69..e67bf21e434 100644 --- a/vendor/github.com/prometheus/prometheus/tsdb/head_append.go +++ b/vendor/github.com/prometheus/prometheus/tsdb/head_append.go @@ -86,9 +86,9 @@ func (h *Head) initTime(t int64) { h.maxTime.CompareAndSwap(math.MinInt64, t) } -func (a *initAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Labels) { +func (a *initAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) { if g, ok := a.app.(storage.GetRef); ok { - return g.GetRef(lset) + return g.GetRef(lset, hash) } return 0, nil } @@ -455,8 +455,8 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels, var _ storage.GetRef = &headAppender{} -func (a *headAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Labels) { - s := a.head.series.getByHash(lset.Hash(), lset) +func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) { + s := a.head.series.getByHash(hash, lset) if s == nil { return 0, nil }