Skip to content

Commit dabbb7f

Browse files
committed
Push reduce one hash operation of Labels.
Signed-off-by: tanghengjian <[email protected]>
1 parent 8565860 commit dabbb7f

File tree

7 files changed

+37
-52
lines changed

7 files changed

+37
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
* [ENHANCEMENT] Enhance traces with hostname information. #4898
5555
* [ENHANCEMENT] Improve the documentation around limits. #4905
5656
* [ENHANCEMENT] Distributor: cache user overrides to reduce lock contention. #4904
57+
* [ENHANCEMENT] Push reduce one hash operation of Labels. #4945
5758
* [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784
5859
* [FEATURE] Compactor: Added `-compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787
5960
* [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818

pkg/ingester/active_series.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
package ingester
22

33
import (
4-
"hash"
54
"math"
65
"sync"
76
"time"
87

9-
"github.com/cespare/xxhash"
10-
"github.com/prometheus/common/model"
118
"github.com/prometheus/prometheus/model/labels"
129
"go.uber.org/atomic"
13-
14-
"github.com/cortexproject/cortex/pkg/util"
1510
)
1611

1712
const (
@@ -53,30 +48,10 @@ func NewActiveSeries() *ActiveSeries {
5348
}
5449

5550
// Updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet.
56-
func (c *ActiveSeries) UpdateSeries(series labels.Labels, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
57-
fp := fingerprint(series)
58-
stripeID := fp % numActiveSeriesStripes
59-
60-
c.stripes[stripeID].updateSeriesTimestamp(now, series, fp, labelsCopy)
61-
}
62-
63-
var sep = []byte{model.SeparatorByte}
64-
65-
var hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }}
66-
67-
func fingerprint(series labels.Labels) uint64 {
68-
sum := hashPool.Get().(hash.Hash64)
69-
defer hashPool.Put(sum)
70-
71-
sum.Reset()
72-
for _, label := range series {
73-
_, _ = sum.Write(util.YoloBuf(label.Name))
74-
_, _ = sum.Write(sep)
75-
_, _ = sum.Write(util.YoloBuf(label.Value))
76-
_, _ = sum.Write(sep)
77-
}
51+
func (c *ActiveSeries) UpdateSeries(series labels.Labels, hash uint64, now time.Time, labelsCopy func(labels.Labels) labels.Labels) {
52+
stripeID := hash % numActiveSeriesStripes
7853

79-
return sum.Sum64()
54+
c.stripes[stripeID].updateSeriesTimestamp(now, series, hash, labelsCopy)
8055
}
8156

8257
// Purge removes expired entries from the cache. This function should be called

pkg/ingester/active_series_test.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,33 @@ import (
88
"sync"
99
"testing"
1010
"time"
11+
"unsafe"
1112

1213
"github.com/prometheus/prometheus/model/labels"
1314
"github.com/stretchr/testify/assert"
1415
)
1516

1617
func copyFn(l labels.Labels) labels.Labels { return l }
1718

19+
func FromLabelAdaptersToLabels(ls []labels.Label) labels.Labels {
20+
return *(*labels.Labels)(unsafe.Pointer(&ls))
21+
}
22+
1823
func TestActiveSeries_UpdateSeries(t *testing.T) {
1924
ls1 := []labels.Label{{Name: "a", Value: "1"}}
2025
ls2 := []labels.Label{{Name: "a", Value: "2"}}
2126

2227
c := NewActiveSeries()
2328
assert.Equal(t, 0, c.Active())
24-
25-
c.UpdateSeries(ls1, time.Now(), copyFn)
29+
labels1Hash := FromLabelAdaptersToLabels(ls1).Hash()
30+
labels2Hash := FromLabelAdaptersToLabels(ls2).Hash()
31+
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
2632
assert.Equal(t, 1, c.Active())
2733

28-
c.UpdateSeries(ls1, time.Now(), copyFn)
34+
c.UpdateSeries(ls1, labels1Hash, time.Now(), copyFn)
2935
assert.Equal(t, 1, c.Active())
3036

31-
c.UpdateSeries(ls2, time.Now(), copyFn)
37+
c.UpdateSeries(ls2, labels2Hash, time.Now(), copyFn)
3238
assert.Equal(t, 2, c.Active())
3339
}
3440

@@ -46,7 +52,7 @@ func TestActiveSeries_Purge(t *testing.T) {
4652
c := NewActiveSeries()
4753

4854
for i := 0; i < len(series); i++ {
49-
c.UpdateSeries(series[i], time.Unix(int64(i), 0), copyFn)
55+
c.UpdateSeries(series[i], FromLabelAdaptersToLabels(series[i]).Hash(), time.Unix(int64(i), 0), copyFn)
5056
}
5157

5258
c.Purge(time.Unix(int64(ttl+1), 0))
@@ -62,24 +68,25 @@ func TestActiveSeries_PurgeOpt(t *testing.T) {
6268
metric := labels.NewBuilder(labels.FromStrings("__name__", "logs"))
6369
ls1 := metric.Set("_", "ypfajYg2lsv").Labels(nil)
6470
ls2 := metric.Set("_", "KiqbryhzUpn").Labels(nil)
65-
71+
labels1Hash := FromLabelAdaptersToLabels(ls1).Hash()
72+
labels2Hash := FromLabelAdaptersToLabels(ls2).Hash()
6673
c := NewActiveSeries()
6774

6875
now := time.Now()
69-
c.UpdateSeries(ls1, now.Add(-2*time.Minute), copyFn)
70-
c.UpdateSeries(ls2, now, copyFn)
76+
c.UpdateSeries(ls1, labels1Hash, now.Add(-2*time.Minute), copyFn)
77+
c.UpdateSeries(ls2, labels2Hash, now, copyFn)
7178
c.Purge(now)
7279

7380
assert.Equal(t, 1, c.Active())
7481

75-
c.UpdateSeries(ls1, now.Add(-1*time.Minute), copyFn)
76-
c.UpdateSeries(ls2, now, copyFn)
82+
c.UpdateSeries(ls1, labels1Hash, now.Add(-1*time.Minute), copyFn)
83+
c.UpdateSeries(ls2, labels2Hash, now, copyFn)
7784
c.Purge(now)
7885

7986
assert.Equal(t, 1, c.Active())
8087

8188
// This will *not* update the series, since there is already newer timestamp.
82-
c.UpdateSeries(ls2, now.Add(-1*time.Minute), copyFn)
89+
c.UpdateSeries(ls2, labels2Hash, now.Add(-1*time.Minute), copyFn)
8390
c.Purge(now)
8491

8592
assert.Equal(t, 1, c.Active())
@@ -116,7 +123,7 @@ func benchmarkActiveSeriesConcurrencySingleSeries(b *testing.B, goroutines int)
116123

117124
for ix := 0; ix < max; ix++ {
118125
now = now.Add(time.Duration(ix) * time.Millisecond)
119-
c.UpdateSeries(series, now, copyFn)
126+
c.UpdateSeries(series, FromLabelAdaptersToLabels(series).Hash(), now, copyFn)
120127
}
121128
}()
122129
}
@@ -145,7 +152,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) {
145152

146153
b.ResetTimer()
147154
for ix := 0; ix < b.N; ix++ {
148-
c.UpdateSeries(series[ix], time.Unix(0, now+int64(ix)), copyFn)
155+
c.UpdateSeries(series[ix], FromLabelAdaptersToLabels(series[ix]).Hash(), time.Unix(0, now+int64(ix)), copyFn)
149156
}
150157
}
151158

@@ -175,9 +182,9 @@ func benchmarkPurge(b *testing.B, twice bool) {
175182
// Prepare series
176183
for ix, s := range series {
177184
if ix < numExpiresSeries {
178-
c.UpdateSeries(s, now.Add(-time.Minute), copyFn)
185+
c.UpdateSeries(s, FromLabelAdaptersToLabels(s).Hash(), now.Add(-time.Minute), copyFn)
179186
} else {
180-
c.UpdateSeries(s, now, copyFn)
187+
c.UpdateSeries(s, FromLabelAdaptersToLabels(s).Hash(), now, copyFn)
181188
}
182189
}
183190

pkg/ingester/ingester.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
963963
// has sorted labels once hit the ingester).
964964

965965
// Look up a reference for this series.
966-
ref, copiedLabels := app.GetRef(cortexpb.FromLabelAdaptersToLabels(ts.Labels))
966+
tsLabels := cortexpb.FromLabelAdaptersToLabels(ts.Labels)
967+
tsLabelsHash := tsLabels.Hash()
968+
ref, copiedLabels := app.GetRef(tsLabels, tsLabelsHash)
967969

968970
// To find out if any sample was added to this series, we keep old value.
969971
oldSucceededSamplesCount := succeededSamplesCount
@@ -1034,7 +1036,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
10341036
}
10351037

10361038
if i.cfg.ActiveSeriesMetricsEnabled && succeededSamplesCount > oldSucceededSamplesCount {
1037-
db.activeSeries.UpdateSeries(cortexpb.FromLabelAdaptersToLabels(ts.Labels), startAppend, func(l labels.Labels) labels.Labels {
1039+
db.activeSeries.UpdateSeries(tsLabels, tsLabelsHash, startAppend, func(l labels.Labels) labels.Labels {
10381040
// we must already have copied the labels if succeededSamplesCount has been incremented.
10391041
return copiedLabels
10401042
})

vendor/github.com/prometheus/prometheus/storage/interface.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/prometheus/prometheus/tsdb/db.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/prometheus/prometheus/tsdb/head_append.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)