Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie

keys = append(keys, key)
validatedTimeseries = append(validatedTimeseries, client.PreallocTimeseries{
TimeSeries: client.TimeSeries{
TimeSeries: &client.TimeSeries{
Labels: ts.Labels,
Samples: samples,
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func makeWriteRequest(samples int) *client.WriteRequest {
request := &client.WriteRequest{}
for i := 0; i < samples; i++ {
ts := client.PreallocTimeseries{
TimeSeries: client.TimeSeries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
Expand All @@ -407,7 +407,7 @@ func makeWriteRequestHA(samples int, replica, cluster string) *client.WriteReque
request := &client.WriteRequest{}
for i := 0; i < samples; i++ {
ts := client.PreallocTimeseries{
TimeSeries: client.TimeSeries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: "__name__", Value: "foo"},
{Name: "bar", Value: "baz"},
Expand Down Expand Up @@ -548,7 +548,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts
response := client.QueryResponse{}
for _, ts := range i.timeseries {
if match(ts.Labels, matchers) {
response.Timeseries = append(response.Timeseries, ts.TimeSeries)
response.Timeseries = append(response.Timeseries, *ts.TimeSeries)
}
}
return &response, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestMarshall(t *testing.T) {
req := WriteRequest{}
for i := 0; i < 10; i++ {
req.Timeseries = append(req.Timeseries, PreallocTimeseries{
TimeSeries{
&TimeSeries{
Labels: []LabelAdapter{
{"foo", strconv.Itoa(i)},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, source WriteRequest_
}

for i, s := range samples {
ts := timeSeriesPool.Get().(TimeSeries)
ts := timeSeriesPool.Get().(*TimeSeries)
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
ts.Samples = append(ts.Samples, s)
req.Timeseries = append(req.Timeseries, PreallocTimeseries{TimeSeries: ts})
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/client/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var (

timeSeriesPool = sync.Pool{
New: func() interface{} {
return TimeSeries{
return &TimeSeries{
Labels: make([]LabelAdapter, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
}
Expand Down Expand Up @@ -56,12 +56,12 @@ func (p *PreallocWriteRequest) Unmarshal(dAtA []byte) error {

// PreallocTimeseries is a TimeSeries which preallocs slices on Unmarshal.
type PreallocTimeseries struct {
TimeSeries
*TimeSeries
}

// Unmarshal implements proto.Message.
func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
p.TimeSeries = timeSeriesPool.Get().(TimeSeries)
p.TimeSeries = timeSeriesPool.Get().(*TimeSeries)
return p.TimeSeries.Unmarshal(dAtA)
}

Expand Down Expand Up @@ -249,7 +249,7 @@ func ReuseSlice(slice []PreallocTimeseries) {
}

// ReuseTimeseries puts the timeseries back into a sync.Pool for reuse.
func ReuseTimeseries(ts TimeSeries) {
func ReuseTimeseries(ts *TimeSeries) {
ts.Labels = ts.Labels[:0]
ts.Samples = ts.Samples[:0]
timeSeriesPool.Put(ts)
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
return nil, err
}
}
client.ReuseSlice(req.Timeseries)

return &client.WriteResponse{}, lastPartialErr
}
Expand Down
32 changes: 14 additions & 18 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/chunkcompat"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -499,7 +498,7 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) {
_, err := ing.Push(ctx, &client.WriteRequest{
Timeseries: []client.PreallocTimeseries{
{
TimeSeries: client.TimeSeries{
TimeSeries: &client.TimeSeries{
Labels: []client.LabelAdapter{
{Name: model.MetricNameLabel, Value: fmt.Sprintf("metric_%d", j)},
},
Expand Down Expand Up @@ -530,31 +529,28 @@ func BenchmarkIngesterPush(b *testing.B) {
)

// Construct a set of realistic-looking samples, all with slightly different label sets
labels := util.LabelsToMetric(chunk.BenchmarkLabels).Clone()
ts := make([]client.PreallocTimeseries, 0, series)
var allLabels []labels.Labels
var allSamples []client.Sample
for j := 0; j < series; j++ {
labels["cpu"] = model.LabelValue(fmt.Sprintf("cpu%02d", j))
ts = append(ts, client.PreallocTimeseries{
TimeSeries: client.TimeSeries{
Labels: client.FromMetricsToLabelAdapters(labels),
Samples: []client.Sample{
{TimestampMs: 0, Value: float64(j)},
},
},
})
labels := chunk.BenchmarkLabels.Copy()
for i := range labels {
if labels[i].Name == "cpu" {
labels[i].Value = fmt.Sprintf("cpu%02d", j)
}
}
allLabels = append(allLabels, labels)
allSamples = append(allSamples, client.Sample{TimestampMs: 0, Value: float64(j)})
}
ctx := user.InjectOrgID(context.Background(), "1")
b.ResetTimer()
for iter := 0; iter < b.N; iter++ {
_, ing := newTestStore(b, cfg, clientCfg, limits)
// Bump the timestamp on each of our test samples each time round the loop
for j := 0; j < samples; j++ {
for i := range ts {
ts[i].TimeSeries.Samples[0].TimestampMs = int64(i)
for i := range allSamples {
allSamples[i].TimestampMs = int64(j + 1)
}
_, err := ing.Push(ctx, &client.WriteRequest{
Timeseries: ts,
})
_, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API))
require.NoError(b, err)
}
ing.Shutdown()
Expand Down