From f35f374c84991f7501da80ff550209f8f5118a9a Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 1 Sep 2019 13:51:54 +0000 Subject: [PATCH 1/3] Make BenchmarkIngesterPush() go via ToWriteRequest() Signed-off-by: Bryan Boreham --- pkg/ingester/ingester_test.go | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 52f2ea1fe1a..a3e813ebe76 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -21,7 +21,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/httpgrpc" @@ -530,18 +529,17 @@ func BenchmarkIngesterPush(b *testing.B) { ) // Construct a set of realistic-looking samples, all with slightly different label sets - labels := util.LabelsToMetric(chunk.BenchmarkLabels).Clone() - ts := make([]client.PreallocTimeseries, 0, series) + var allLabels []labels.Labels + var allSamples []client.Sample for j := 0; j < series; j++ { - labels["cpu"] = model.LabelValue(fmt.Sprintf("cpu%02d", j)) - ts = append(ts, client.PreallocTimeseries{ - TimeSeries: client.TimeSeries{ - Labels: client.FromMetricsToLabelAdapters(labels), - Samples: []client.Sample{ - {TimestampMs: 0, Value: float64(j)}, - }, - }, - }) + labels := chunk.BenchmarkLabels.Copy() + for i := range labels { + if labels[i].Name == "cpu" { + labels[i].Value = fmt.Sprintf("cpu%02d", j) + } + } + allLabels = append(allLabels, labels) + allSamples = append(allSamples, client.Sample{TimestampMs: 0, Value: float64(j)}) } ctx := user.InjectOrgID(context.Background(), "1") b.ResetTimer() @@ -549,12 +547,10 @@ func BenchmarkIngesterPush(b *testing.B) { _, ing := newTestStore(b, cfg, clientCfg, limits) // Bump the timestamp on each of our test samples each time round the loop for j := 0; j < samples; j++ { - for i := range ts { - ts[i].TimeSeries.Samples[0].TimestampMs = int64(i) + for i := range allSamples { + allSamples[i].TimestampMs = int64(j + 1) } - _, err := ing.Push(ctx, &client.WriteRequest{ - Timeseries: ts, - }) + _, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API)) require.NoError(b, err) } ing.Shutdown() From a429b7fd7d6c948ffd3489d277f2604567d447a7 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 1 Sep 2019 13:50:43 +0000 Subject: [PATCH 2/3] Return request data to the pool in Ingester.Push() Signed-off-by: Bryan Boreham --- pkg/ingester/ingester.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0531a58b558..a342587d6b6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -279,6 +279,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. return nil, err } } + client.ReuseSlice(req.Timeseries) return &client.WriteResponse{}, lastPartialErr } From fab5ced5e3c071a930966781b6eb786d8d34cc83 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sun, 1 Sep 2019 14:31:44 +0000 Subject: [PATCH 3/3] Store *TimeSeries in the pool, to avoid boxing Casting a struct to {}interface has an overhead, because the runtime needs something more like a pointer so it creates a small descriptor object. Changing the data structure to use a pointer avoids this overhead. Signed-off-by: Bryan Boreham --- pkg/distributor/distributor.go | 2 +- pkg/distributor/distributor_test.go | 6 +++--- pkg/ingester/client/client_test.go | 2 +- pkg/ingester/client/compat.go | 2 +- pkg/ingester/client/timeseries.go | 8 ++++---- pkg/ingester/ingester_test.go | 2 +- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4e405a03954..4dc292186eb 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -348,7 +348,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie keys = append(keys, key) validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{ - TimeSeries: client.TimeSeries{ + TimeSeries: &client.TimeSeries{ Labels: ts.Labels, Samples: samples, }, diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 32218354668..d2ef007d539 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -384,7 +384,7 @@ func makeWriteRequest(samples int) *client.WriteRequest { request := &client.WriteRequest{} for i := 0; i < samples; i++ { ts := client.PreallocTimeseries{ - TimeSeries: client.TimeSeries{ + TimeSeries: &client.TimeSeries{ Labels: []client.LabelAdapter{ {Name: model.MetricNameLabel, Value: "foo"}, {Name: "bar", Value: "baz"}, @@ -407,7 +407,7 @@ func makeWriteRequestHA(samples int, replica, cluster string) *client.WriteReque request := &client.WriteRequest{} for i := 0; i < samples; i++ { ts := client.PreallocTimeseries{ - TimeSeries: client.TimeSeries{ + TimeSeries: &client.TimeSeries{ Labels: []client.LabelAdapter{ {Name: "__name__", Value: "foo"}, {Name: "bar", Value: "baz"}, @@ -548,7 +548,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts response := client.QueryResponse{} for _, ts := range i.timeseries { if match(ts.Labels, matchers) { - response.Timeseries = append(response.Timeseries, ts.TimeSeries) + response.Timeseries = append(response.Timeseries, *ts.TimeSeries) } } return &response, nil diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index d5ffe626144..a272afc71ff 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -17,7 +17,7 @@ func TestMarshall(t *testing.T) { req := WriteRequest{} for i := 0; i < 10; i++ { req.Timeseries = append(req.Timeseries, PreallocTimeseries{ - TimeSeries{ + &TimeSeries{ Labels: []LabelAdapter{ {"foo", strconv.Itoa(i)}, }, diff --git a/pkg/ingester/client/compat.go b/pkg/ingester/client/compat.go index 36d0e86d70c..4174896e421 100644 --- a/pkg/ingester/client/compat.go +++ b/pkg/ingester/client/compat.go @@ -27,7 +27,7 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, source WriteRequest_ } for i, s := range samples { - ts := timeSeriesPool.Get().(TimeSeries) + ts := timeSeriesPool.Get().(*TimeSeries) ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...) ts.Samples = append(ts.Samples, s) req.Timeseries = append(req.Timeseries, PreallocTimeseries{TimeSeries: ts}) diff --git a/pkg/ingester/client/timeseries.go b/pkg/ingester/client/timeseries.go index 8919ba99f3a..204382952b0 100644 --- a/pkg/ingester/client/timeseries.go +++ b/pkg/ingester/client/timeseries.go @@ -24,7 +24,7 @@ var ( timeSeriesPool = sync.Pool{ New: func() interface{} { - return TimeSeries{ + return &TimeSeries{ Labels: make([]LabelAdapter, 0, expectedLabels), Samples: make([]Sample, 0, expectedSamplesPerSeries), } @@ -56,12 +56,12 @@ func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error { // PreallocTimeseries is a TimeSeries which preallocs slices on Unmarshal. type PreallocTimeseries struct { - TimeSeries + *TimeSeries } // Unmarshal implements proto.Message. func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { - p.TimeSeries = timeSeriesPool.Get().(TimeSeries) + p.TimeSeries = timeSeriesPool.Get().(*TimeSeries) return p.TimeSeries.Unmarshal(dAtA) } @@ -249,7 +249,7 @@ func ReuseSlice(slice []PreallocTimeseries) { } // ReuseTimeseries puts the timeseries back into a sync.Pool for reuse. -func ReuseTimeseries(ts TimeSeries) { +func ReuseTimeseries(ts *TimeSeries) { ts.Labels = ts.Labels[:0] ts.Samples = ts.Samples[:0] timeSeriesPool.Put(ts) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index a3e813ebe76..b837c3a39c5 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -498,7 +498,7 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) { _, err := ing.Push(ctx, &client.WriteRequest{ Timeseries: []client.PreallocTimeseries{ { - TimeSeries: client.TimeSeries{ + TimeSeries: &client.TimeSeries{ Labels: []client.LabelAdapter{ {Name: model.MetricNameLabel, Value: fmt.Sprintf("metric_%d", j)}, },