Skip to content

Commit b367cd8

Browse files
authored
Merge pull request #1638 from cortexproject/ingester-timeseries-pool
* Return request data to the pool in Ingester.Push() Signed-off-by: Bryan Boreham <[email protected]>
2 parents 7447f75 + fab5ced commit b367cd8

File tree

7 files changed

+25
-28
lines changed

7 files changed

+25
-28
lines changed

pkg/distributor/distributor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
348348

349349
keys = append(keys, key)
350350
validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{
351-
TimeSeries: client.TimeSeries{
351+
TimeSeries: &client.TimeSeries{
352352
Labels: ts.Labels,
353353
Samples: samples,
354354
},

pkg/distributor/distributor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func makeWriteRequest(samples int) *client.WriteRequest {
384384
request := &client.WriteRequest{}
385385
for i := 0; i < samples; i++ {
386386
ts := client.PreallocTimeseries{
387-
TimeSeries: client.TimeSeries{
387+
TimeSeries: &client.TimeSeries{
388388
Labels: []client.LabelAdapter{
389389
{Name: model.MetricNameLabel, Value: "foo"},
390390
{Name: "bar", Value: "baz"},
@@ -407,7 +407,7 @@ func makeWriteRequestHA(samples int, replica, cluster string) *client.WriteReque
407407
request := &client.WriteRequest{}
408408
for i := 0; i < samples; i++ {
409409
ts := client.PreallocTimeseries{
410-
TimeSeries: client.TimeSeries{
410+
TimeSeries: &client.TimeSeries{
411411
Labels: []client.LabelAdapter{
412412
{Name: "__name__", Value: "foo"},
413413
{Name: "bar", Value: "baz"},
@@ -548,7 +548,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts
548548
response := client.QueryResponse{}
549549
for _, ts := range i.timeseries {
550550
if match(ts.Labels, matchers) {
551-
response.Timeseries = append(response.Timeseries, ts.TimeSeries)
551+
response.Timeseries = append(response.Timeseries, *ts.TimeSeries)
552552
}
553553
}
554554
return &response, nil

pkg/ingester/client/client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestMarshall(t *testing.T) {
1717
req := WriteRequest{}
1818
for i := 0; i < 10; i++ {
1919
req.Timeseries = append(req.Timeseries, PreallocTimeseries{
20-
TimeSeries{
20+
&TimeSeries{
2121
Labels: []LabelAdapter{
2222
{"foo", strconv.Itoa(i)},
2323
},

pkg/ingester/client/compat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, source WriteRequest_
2727
}
2828

2929
for i, s := range samples {
30-
ts := timeSeriesPool.Get().(TimeSeries)
30+
ts := timeSeriesPool.Get().(*TimeSeries)
3131
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
3232
ts.Samples = append(ts.Samples, s)
3333
req.Timeseries = append(req.Timeseries, PreallocTimeseries{TimeSeries: ts})

pkg/ingester/client/timeseries.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var (
2424

2525
timeSeriesPool = sync.Pool{
2626
New: func() interface{} {
27-
return TimeSeries{
27+
return &TimeSeries{
2828
Labels: make([]LabelAdapter, 0, expectedLabels),
2929
Samples: make([]Sample, 0, expectedSamplesPerSeries),
3030
}
@@ -56,12 +56,12 @@ func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error {
5656

5757
// PreallocTimeseries is a TimeSeries which preallocs slices on Unmarshal.
5858
type PreallocTimeseries struct {
59-
TimeSeries
59+
*TimeSeries
6060
}
6161

6262
// Unmarshal implements proto.Message.
6363
func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
64-
p.TimeSeries = timeSeriesPool.Get().(TimeSeries)
64+
p.TimeSeries = timeSeriesPool.Get().(*TimeSeries)
6565
return p.TimeSeries.Unmarshal(dAtA)
6666
}
6767

@@ -249,7 +249,7 @@ func ReuseSlice(slice []PreallocTimeseries) {
249249
}
250250

251251
// ReuseTimeseries puts the timeseries back into a sync.Pool for reuse.
252-
func ReuseTimeseries(ts TimeSeries) {
252+
func ReuseTimeseries(ts *TimeSeries) {
253253
ts.Labels = ts.Labels[:0]
254254
ts.Samples = ts.Samples[:0]
255255
timeSeriesPool.Put(ts)

pkg/ingester/ingester.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
278278
return nil, err
279279
}
280280
}
281+
client.ReuseSlice(req.Timeseries)
281282

282283
return &client.WriteResponse{}, lastPartialErr
283284
}

pkg/ingester/ingester_test.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121

2222
"github.com/cortexproject/cortex/pkg/chunk"
2323
"github.com/cortexproject/cortex/pkg/ingester/client"
24-
"github.com/cortexproject/cortex/pkg/util"
2524
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
2625
"github.com/cortexproject/cortex/pkg/util/validation"
2726
"github.com/weaveworks/common/httpgrpc"
@@ -499,7 +498,7 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) {
499498
_, err := ing.Push(ctx, &client.WriteRequest{
500499
Timeseries: []client.PreallocTimeseries{
501500
{
502-
TimeSeries: client.TimeSeries{
501+
TimeSeries: &client.TimeSeries{
503502
Labels: []client.LabelAdapter{
504503
{Name: model.MetricNameLabel, Value: fmt.Sprintf("metric_%d", j)},
505504
},
@@ -530,31 +529,28 @@ func BenchmarkIngesterPush(b *testing.B) {
530529
)
531530

532531
// Construct a set of realistic-looking samples, all with slightly different label sets
533-
labels := util.LabelsToMetric(chunk.BenchmarkLabels).Clone()
534-
ts := make([]client.PreallocTimeseries, 0, series)
532+
var allLabels []labels.Labels
533+
var allSamples []client.Sample
535534
for j := 0; j < series; j++ {
536-
labels["cpu"] = model.LabelValue(fmt.Sprintf("cpu%02d", j))
537-
ts = append(ts, client.PreallocTimeseries{
538-
TimeSeries: client.TimeSeries{
539-
Labels: client.FromMetricsToLabelAdapters(labels),
540-
Samples: []client.Sample{
541-
{TimestampMs: 0, Value: float64(j)},
542-
},
543-
},
544-
})
535+
labels := chunk.BenchmarkLabels.Copy()
536+
for i := range labels {
537+
if labels[i].Name == "cpu" {
538+
labels[i].Value = fmt.Sprintf("cpu%02d", j)
539+
}
540+
}
541+
allLabels = append(allLabels, labels)
542+
allSamples = append(allSamples, client.Sample{TimestampMs: 0, Value: float64(j)})
545543
}
546544
ctx := user.InjectOrgID(context.Background(), "1")
547545
b.ResetTimer()
548546
for iter := 0; iter < b.N; iter++ {
549547
_, ing := newTestStore(b, cfg, clientCfg, limits)
550548
// Bump the timestamp on each of our test samples each time round the loop
551549
for j := 0; j < samples; j++ {
552-
for i := range ts {
553-
ts[i].TimeSeries.Samples[0].TimestampMs = int64(i)
550+
for i := range allSamples {
551+
allSamples[i].TimestampMs = int64(j + 1)
554552
}
555-
_, err := ing.Push(ctx, &client.WriteRequest{
556-
Timeseries: ts,
557-
})
553+
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API))
558554
require.NoError(b, err)
559555
}
560556
ing.Shutdown()

0 commit comments

Comments
 (0)