Skip to content

Commit 0c41288

Browse files
authored
Optimize the decoding of SampleStream (#5349)
* Implementing decodeSampleStream Signed-off-by: Alan Protasio <[email protected]> * lint Signed-off-by: Alan Protasio <[email protected]> * lint Signed-off-by: Alan Protasio <[email protected]> --------- Signed-off-by: Alan Protasio <[email protected]>
1 parent ced5b43 commit 0c41288

File tree

2 files changed

+91
-0
lines changed

2 files changed

+91
-0
lines changed

pkg/querier/tripperware/instantquery/instant_query_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/prometheus/common/model"
1415
"github.com/stretchr/testify/assert"
1516
"github.com/stretchr/testify/require"
1617
"github.com/weaveworks/common/httpgrpc"
1718
"github.com/weaveworks/common/user"
1819

20+
"github.com/cortexproject/cortex/pkg/cortexpb"
1921
"github.com/cortexproject/cortex/pkg/querier/tripperware"
2022
)
2123

@@ -444,3 +446,66 @@ func Test_sortPlanForQuery(t *testing.T) {
444446
})
445447
}
446448
}
449+
450+
func Benchmark_Decode(b *testing.B) {
451+
maxSamplesCount := 1000000
452+
samples := make([]tripperware.SampleStream, maxSamplesCount)
453+
454+
for i := 0; i < maxSamplesCount; i++ {
455+
samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample%v", i), Value: fmt.Sprintf("Value%v", i)})
456+
samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample2%v", i), Value: fmt.Sprintf("Value2%v", i)})
457+
samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample3%v", i), Value: fmt.Sprintf("Value3%v", i)})
458+
samples[i].Samples = append(samples[i].Samples, cortexpb.Sample{TimestampMs: int64(i), Value: float64(i)})
459+
}
460+
461+
for name, tc := range map[string]struct {
462+
sampleStream []tripperware.SampleStream
463+
}{
464+
"100 samples": {
465+
sampleStream: samples[:100],
466+
},
467+
"1000 samples": {
468+
sampleStream: samples[:1000],
469+
},
470+
"10000 samples": {
471+
sampleStream: samples[:10000],
472+
},
473+
"100000 samples": {
474+
sampleStream: samples[:100000],
475+
},
476+
"1000000 samples": {
477+
sampleStream: samples[:1000000],
478+
},
479+
} {
480+
b.Run(name, func(b *testing.B) {
481+
r := PrometheusInstantQueryResponse{
482+
Data: PrometheusInstantQueryData{
483+
ResultType: model.ValMatrix.String(),
484+
Result: PrometheusInstantQueryResult{
485+
Result: &PrometheusInstantQueryResult_Matrix{
486+
Matrix: &Matrix{
487+
SampleStreams: tc.sampleStream,
488+
},
489+
},
490+
},
491+
},
492+
}
493+
494+
body, err := json.Marshal(r)
495+
require.NoError(b, err)
496+
497+
b.ResetTimer()
498+
b.ReportAllocs()
499+
500+
for i := 0; i < b.N; i++ {
501+
response := &http.Response{
502+
StatusCode: 200,
503+
Body: io.NopCloser(bytes.NewBuffer(body)),
504+
}
505+
_, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil)
506+
require.NoError(b, err)
507+
}
508+
})
509+
}
510+
511+
}

pkg/querier/tripperware/query.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,31 @@ type Request interface {
8383
WithStats(stats string) Request
8484
}
8585

86+
func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
87+
lbls := labels.Labels{}
88+
samples := []cortexpb.Sample{}
89+
for field := iter.ReadObject(); field != ""; field = iter.ReadObject() {
90+
switch field {
91+
case "metric":
92+
iter.ReadVal(&lbls)
93+
case "values":
94+
for {
95+
if !iter.ReadArray() {
96+
break
97+
}
98+
s := cortexpb.Sample{}
99+
cortexpb.SampleJsoniterDecode(unsafe.Pointer(&s), iter)
100+
samples = append(samples, s)
101+
}
102+
}
103+
}
104+
105+
*(*SampleStream)(ptr) = SampleStream{
106+
Samples: samples,
107+
Labels: cortexpb.FromLabelsToLabelAdapters(lbls),
108+
}
109+
}
110+
86111
func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) {
87112
ss := (*SampleStream)(ptr)
88113
stream.WriteObjectStart()
@@ -160,6 +185,7 @@ func init() {
160185
jsoniter.RegisterTypeEncoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false })
161186
jsoniter.RegisterTypeDecoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode)
162187
jsoniter.RegisterTypeEncoderFunc("tripperware.SampleStream", encodeSampleStream, func(unsafe.Pointer) bool { return false })
188+
jsoniter.RegisterTypeDecoderFunc("tripperware.SampleStream", decodeSampleStream)
163189
}
164190

165191
func EncodeTime(t int64) string {

0 commit comments

Comments
 (0)