From e026d23b85ab799f229729e43621d78ec34b01a2 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 16 May 2023 13:24:11 -0700 Subject: [PATCH 1/3] Implementing decodeSampleStream Signed-off-by: Alan Protasio --- .../instantquery/instant_query_test.go | 66 +++++++++++++++++++ pkg/querier/tripperware/query.go | 26 ++++++++ 2 files changed, 92 insertions(+) diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 9dbc80d405c..04bee04249f 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" @@ -444,3 +446,67 @@ func Test_sortPlanForQuery(t *testing.T) { }) } } + +func Benchmark_Decode(b *testing.B) { + maxSamplesCount := 1000000 + samples := make([]tripperware.SampleStream, maxSamplesCount) + + for i := 0; i < maxSamplesCount; i++ { + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample%v", i), Value: fmt.Sprintf("Value%v", i)}) + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample2%v", i), Value: fmt.Sprintf("Value2%v", i)}) + samples[i].Labels = append(samples[i].Labels, cortexpb.LabelAdapter{Name: fmt.Sprintf("Sample3%v", i), Value: fmt.Sprintf("Value3%v", i)}) + samples[i].Samples = append(samples[i].Samples, cortexpb.Sample{TimestampMs: int64(i), Value: float64(i)}) + } + + for name, tc := range map[string]struct { + sampleStream []tripperware.SampleStream + }{ + "100 samples": { + sampleStream: samples[:100], + }, + "1000 samples": { + sampleStream: samples[:1000], + }, + "10000 samples": { + sampleStream: samples[:10000], + }, + "100000 samples": { + sampleStream: samples[:100000], + }, + "1000000 samples": { + sampleStream: samples[:1000000], + }, + } { + b.Run(name, func(b *testing.B) { + r := PrometheusInstantQueryResponse{ + Data: PrometheusInstantQueryData{ + ResultType: model.ValMatrix.String(), + Result: PrometheusInstantQueryResult{ + Result: &PrometheusInstantQueryResult_Matrix{ + Matrix: &Matrix{ + SampleStreams: tc.sampleStream, + }, + }, + }, + }, + } + + body, err := json.Marshal(r) + require.NoError(b, err) + + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + response := &http.Response{ + StatusCode: 200, + Body: io.NopCloser(bytes.NewBuffer(body)), + } + _, err := InstantQueryCodec.DecodeResponse(context.Background(), response, nil) + require.NoError(b, err) + } + }) + } + +} \ No newline at end of file diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index c2d444de85a..d40c32c0dc1 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -83,6 +83,31 @@ type Request interface { WithStats(stats string) Request } +func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) { + lbls := labels.Labels{} + samples := []cortexpb.Sample{} + for field := iter.ReadObject(); field != ""; field = iter.ReadObject() { + switch field { + case "metric": + iter.ReadVal(&lbls) + case "values": + for { + if !iter.ReadArray() { + break + } + s := cortexpb.Sample{} + cortexpb.SampleJsoniterDecode(unsafe.Pointer(&s), iter) + samples = append(samples, s) + } + } + } + + *(*SampleStream)(ptr) = SampleStream{ + Samples: samples, + Labels: cortexpb.FromLabelsToLabelAdapters(lbls), + } +} + func encodeSampleStream(ptr unsafe.Pointer, stream *jsoniter.Stream) { ss := (*SampleStream)(ptr) stream.WriteObjectStart() @@ -160,6 +185,7 @@ func init() { jsoniter.RegisterTypeEncoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterEncode, func(unsafe.Pointer) bool { return false }) jsoniter.RegisterTypeDecoderFunc("tripperware.PrometheusResponseQueryableSamplesStatsPerStep", PrometheusResponseQueryableSamplesStatsPerStepJsoniterDecode) jsoniter.RegisterTypeEncoderFunc("tripperware.SampleStream", encodeSampleStream, func(unsafe.Pointer) bool { return false }) + jsoniter.RegisterTypeDecoderFunc("tripperware.SampleStream", decodeSampleStream) } func EncodeTime(t int64) string { From ed6d9b003f5ab685d58c3d5a9c95974077d98a9d Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 16 May 2023 15:10:17 -0700 Subject: [PATCH 2/3] lint Signed-off-by: Alan Protasio --- pkg/querier/tripperware/instantquery/instant_query_test.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 04bee04249f..262e893795c 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -11,14 +11,14 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - - "github.com/cortexproject/cortex/pkg/querier/tripperware" ) func TestRequest(t *testing.T) { @@ -494,7 +494,6 @@ func Benchmark_Decode(b *testing.B) { body, err := json.Marshal(r) require.NoError(b, err) - b.ResetTimer() b.ReportAllocs() @@ -509,4 +508,4 @@ func Benchmark_Decode(b *testing.B) { }) } -} \ No newline at end of file +} From 27179b02665b5a08ebc19eb3c61a0d76c3dfb695 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 16 May 2023 15:39:36 -0700 Subject: [PATCH 3/3] lint Signed-off-by: Alan Protasio --- pkg/querier/tripperware/instantquery/instant_query_test.go | 6 +++--- pkg/querier/tripperware/query.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index 262e893795c..e6eab8a2169 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -11,14 +11,14 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/querier/tripperware" - - "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) func TestRequest(t *testing.T) { diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index d40c32c0dc1..42de413e52b 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -91,7 +91,7 @@ func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) { case "metric": iter.ReadVal(&lbls) case "values": - for { + for { if !iter.ReadArray() { break } @@ -104,7 +104,7 @@ func decodeSampleStream(ptr unsafe.Pointer, iter *jsoniter.Iterator) { *(*SampleStream)(ptr) = SampleStream{ Samples: samples, - Labels: cortexpb.FromLabelsToLabelAdapters(lbls), + Labels: cortexpb.FromLabelsToLabelAdapters(lbls), } }