From 08f9bf7c2a78b10b452cf69723a946d0322cb514 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 29 Dec 2022 23:15:52 -0800 Subject: [PATCH 1/2] Create Span for the codec MergeResponse method Signed-off-by: Alan Protasio --- pkg/querier/tripperware/instantquery/instant_query.go | 5 ++++- pkg/querier/tripperware/instantquery/instant_query_test.go | 2 +- pkg/querier/tripperware/query.go | 2 +- pkg/querier/tripperware/queryrange/query_range.go | 4 +++- pkg/querier/tripperware/queryrange/query_range_test.go | 2 +- pkg/querier/tripperware/queryrange/results_cache.go | 6 +++--- pkg/querier/tripperware/queryrange/split_by_interval.go | 2 +- .../tripperware/queryrange/split_by_interval_test.go | 2 +- pkg/querier/tripperware/shard_by.go | 2 +- 9 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index f925f41cc19..2761709c58a 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -243,7 +243,10 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res return &resp, nil } -func (instantQueryCodec) MergeResponse(responses ...tripperware.Response) (tripperware.Response, error) { +func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse") + defer sp.Finish() + if len(responses) == 0 { return NewEmptyPrometheusInstantQueryResponse(), nil } else if len(responses) == 1 { diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index a0013a5ab0a..efdf968fea9 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -308,7 +308,7 @@ func TestMergeResponse(t *testing.T) { require.NoError(t, err) resps = append(resps, dr) } - resp, err := InstantQueryCodec.MergeResponse(resps...) + resp, err := InstantQueryCodec.MergeResponse(context.Background(), resps...) assert.Equal(t, err, tc.expectedErr) if err != nil { return diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 4f90fb5cce7..8bff06ccb6e 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -40,7 +40,7 @@ type Codec interface { // Merger is used by middlewares making multiple requests to merge back all responses into a single one. type Merger interface { // MergeResponse merges responses from multiple requests into a single Response - MergeResponse(...Response) (Response, error) + MergeResponse(context.Context, ...Response) (Response, error) } // Response represents a query range response. diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index c3ad1839c19..441e19038f0 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -127,7 +127,9 @@ func NewEmptyPrometheusResponse() *PrometheusResponse { } } -func (c prometheusCodec) MergeResponse(responses ...tripperware.Response) (tripperware.Response, error) { +func (c prometheusCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) { + sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse") + defer sp.Finish() if len(responses) == 0 { return NewEmptyPrometheusResponse(), nil } diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index cfad6e8f999..bd61485d064 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -652,7 +652,7 @@ func TestMergeAPIResponses(t *testing.T) { }, }} { t.Run(tc.name, func(t *testing.T) { - output, err := PrometheusCodec.MergeResponse(tc.input...) + output, err := PrometheusCodec.MergeResponse(context.Background(), tc.input...) require.NoError(t, err) require.Equal(t, tc.expected, output) }) diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index c16d63dc892..5c0fc543759 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -407,7 +407,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte return nil, nil, err } if len(requests) == 0 { - response, err := s.merger.MergeResponse(responses...) + response, err := s.merger.MergeResponse(context.Background(), responses...) // No downstream requests so no need to write back to the cache. return response, nil, err } @@ -469,7 +469,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte if err != nil { return nil, nil, err } - merged, err := s.merger.MergeResponse(accumulator.Response, currentRes) + merged, err := s.merger.MergeResponse(ctx, accumulator.Response, currentRes) if err != nil { return nil, nil, err } @@ -481,7 +481,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte return nil, nil, err } - response, err := s.merger.MergeResponse(responses...) + response, err := s.merger.MergeResponse(ctx, responses...) return response, mergedExtents, err } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index 6bf87bf13d0..4ad6e8f47f8 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -61,7 +61,7 @@ func (s splitByInterval) Do(ctx context.Context, r tripperware.Request) (tripper resps = append(resps, reqResp.Response) } - response, err := s.merger.MergeResponse(resps...) + response, err := s.merger.MergeResponse(ctx, resps...) if err != nil { return nil, err } diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 3451570ce42..22e9565bce1 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -266,7 +266,7 @@ func TestSplitQuery(t *testing.T) { } func TestSplitByDay(t *testing.T) { - mergedResponse, err := PrometheusCodec.MergeResponse(parsedResponse, parsedResponse) + mergedResponse, err := PrometheusCodec.MergeResponse(context.Background(), parsedResponse, parsedResponse) require.NoError(t, err) mergedHTTPResponse, err := PrometheusCodec.EncodeResponse(context.Background(), mergedResponse) diff --git a/pkg/querier/tripperware/shard_by.go b/pkg/querier/tripperware/shard_by.go index 8bb218af321..13d6645612e 100644 --- a/pkg/querier/tripperware/shard_by.go +++ b/pkg/querier/tripperware/shard_by.go @@ -78,7 +78,7 @@ func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { resps = append(resps, reqResp.Response) } - return s.merger.MergeResponse(resps...) + return s.merger.MergeResponse(ctx, resps...) } func (s shardBy) shardQuery(l log.Logger, numShards int, r Request, analysis querysharding.QueryAnalysis) []Request { From f1868960e37bae0872a57f783871b7d546e1f984 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 29 Dec 2022 23:24:18 -0800 Subject: [PATCH 2/2] Changelog Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + pkg/querier/tripperware/instantquery/instant_query.go | 1 + pkg/querier/tripperware/queryrange/query_range.go | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05432ef42c1..903118c1979 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * [FEATURE] Ingester: Enable snapshotting of In-memory TSDB on disk during shutdown via `-blocks-storage.tsdb.memory-snapshot-on-shutdown`. #5011 * [FEATURE] Query Frontend/Scheduler: Add a new counter metric `cortex_request_queue_requests_total` for total requests going to queue. #5030 * [FEATURE] Build ARM docker images. #5041 +* [FEATURE] Query-frontend/Querier: Create spans to measure time to merge promql responses. #5041 * [BUGFIX] Updated `golang.org/x/net` dependency to fix CVE-2022-27664. #5008 * [BUGFIX] Fix panic when otel and xray tracing is enabled. #5044 * [BUGFIX] Fixed no compact block got grouped in shuffle sharding grouper. #5055 diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 2761709c58a..0af7d72627b 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -245,6 +245,7 @@ func (instantQueryCodec) EncodeResponse(ctx context.Context, res tripperware.Res func (instantQueryCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse") + sp.SetTag("response_count", len(responses)) defer sp.Finish() if len(responses) == 0 { diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 441e19038f0..cb94e8b1196 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -128,7 +128,8 @@ func NewEmptyPrometheusResponse() *PrometheusResponse { } func (c prometheusCodec) MergeResponse(ctx context.Context, responses ...tripperware.Response) (tripperware.Response, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "PrometheusInstantQueryResponse.MergeResponse") + sp, _ := opentracing.StartSpanFromContext(ctx, "QueryRangeResponse.MergeResponse") + sp.SetTag("response_count", len(responses)) defer sp.Finish() if len(responses) == 0 { return NewEmptyPrometheusResponse(), nil