From 508b9b8478ec38abcd20e3dc10c1f0141ab38031 Mon Sep 17 00:00:00 2001 From: Erlan Zholdubai uulu Date: Tue, 18 Feb 2025 13:54:37 -0800 Subject: [PATCH] retrieve peakSamples and processedSamples query stats from results_cache as well Signed-off-by: Erlan Zholdubai uulu --- CHANGELOG.md | 1 + integration/query_frontend_test.go | 93 +++++++++++++++++++ pkg/frontend/transport/handler.go | 1 + pkg/frontend/transport/handler_test.go | 16 ++-- .../tripperware/queryrange/results_cache.go | 14 ++- .../queryrange/results_cache_test.go | 45 +++++++-- 6 files changed, 152 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42bc4b5d52c..35487bfec8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576 * [BUGFIX] Compactor: Cleaner would delete bucket index when there is no block in bucket store. #6577 * [BUGFIX] Querier: Fix marshal native histogram with empty bucket when protobuf codec is enabled. #6595 +* [BUGFIX] Query Frontend: Fix samples scanned and peak samples query stats when query hits results cache. #6591 ## 1.19.0 in progress diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index c62b520f820..5243e4cab33 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -859,3 +859,96 @@ func TestQueryFrontendQueryRejection(t *testing.T) { require.Contains(t, string(body), tripperware.QueryRejectErrorMessage) } + +func TestQueryFrontendStatsFromResultsCacheShouldBeSame(t *testing.T) { + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + memcached := e2ecache.NewMemcached() + consul := e2edb.NewConsul() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-querier.cache-results": "true", + "-querier.split-queries-by-interval": "24h", + "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range + "-querier.per-step-stats-enabled": strconv.FormatBool(true), + "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-frontend.query-stats-enabled": strconv.FormatBool(true), + "-frontend.cache-queryable-samples-stats": strconv.FormatBool(true), + }) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Start the query-scheduler + queryScheduler := e2ecortex.NewQueryScheduler("query-scheduler", flags, "") + require.NoError(t, s.StartAndWaitReady(queryScheduler)) + flags["-frontend.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() + flags["-querier.scheduler-address"] = queryScheduler.NetworkGRPCEndpoint() + + // Start the query-frontend. + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + + // Start all other services. + ingester := e2ecortex.NewIngesterWithConfigFile("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "") + distributor := e2ecortex.NewDistributorWithConfigFile("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "") + + querier := e2ecortex.NewQuerierWithConfigFile("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", flags, "") + + require.NoError(t, s.StartAndWaitReady(querier, ingester, distributor)) + require.NoError(t, s.WaitReady(queryFrontend)) + + // Check if we're discovering memcache or not. + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(1), "cortex_memcache_client_servers")) + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Greater(0), "cortex_dns_lookups_total")) + + // Wait until both the distributor and querier have updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now().Add(-10 * time.Minute) + series2Timestamp := seriesTimestamp.Add(1 * time.Minute) + series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Query back the series. + c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // First request that will hit the datasource. + resp, _, err := c.QueryRangeRaw(`{job="test"}`, seriesTimestamp.Add(-1*time.Minute), series2Timestamp.Add(1*time.Minute), 30*time.Second, map[string]string{}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + values, err := queryFrontend.SumMetrics([]string{"cortex_query_samples_scanned_total"}) + require.NoError(t, err) + numSamplesScannedTotal := e2e.SumValues(values) + + // We send the same query to hit the results cache. + resp, _, err = c.QueryRangeRaw(`{job="test"}`, seriesTimestamp.Add(-1*time.Minute), series2Timestamp.Add(1*time.Minute), 30*time.Second, map[string]string{}) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + values, err = queryFrontend.SumMetrics([]string{"cortex_query_samples_scanned_total"}) + require.NoError(t, err) + numSamplesScannedTotal2 := e2e.SumValues(values) + + // we expect same amount of samples_scanned added to the metric despite the second query hit the cache. + require.Equal(t, numSamplesScannedTotal2, numSamplesScannedTotal*2) +} diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 4150d5cc073..156c3ffdf68 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -410,6 +410,7 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query "split_queries", splitQueries, "status_code", statusCode, "response_size", contentLength, + "samples_scanned", numScannedSamples, }, stats.LoadExtraFields()...) if numStoreGatewayTouchedPostings > 0 { diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 10a844ee8da..b4ad35c266d 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -434,11 +434,11 @@ func TestReportQueryStatsFormat(t *testing.T) { tests := map[string]testCase{ "should not include query and header details if empty": { - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0`, }, "should include query length and string at the end": { queryString: url.Values(map[string][]string{"query": {"up"}}), - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 param_query=up`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 param_query=up`, }, "should include query stats": { queryStats: &querier_stats.QueryStats{ @@ -454,15 +454,15 @@ func TestReportQueryStatsFormat(t *testing.T) { SplitQueries: 10, }, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 query_storage_wall_time_seconds=6000`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 query_storage_wall_time_seconds=6000`, }, "should include user agent": { header: http.Header{"User-Agent": []string{"Grafana"}}, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 user_agent=Grafana`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 user_agent=Grafana`, }, "should include response error": { responseErr: errors.New("foo_err"), - expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 error=foo_err`, + expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 error=foo_err`, }, "should include query priority": { queryString: url.Values(map[string][]string{"query": {"up"}}), @@ -470,7 +470,7 @@ func TestReportQueryStatsFormat(t *testing.T) { Priority: 99, PriorityAssigned: true, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 query_length=2 priority=99 param_query=up`, }, "should include data fetch min and max time": { queryString: url.Values(map[string][]string{"query": {"up"}}), @@ -478,7 +478,7 @@ func TestReportQueryStatsFormat(t *testing.T) { DataSelectMaxTime: 1704153600000, DataSelectMinTime: 1704067200000, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 response_series_count=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 split_queries=0 status_code=200 response_size=1000 samples_scanned=0 data_select_max_time=1704153600 data_select_min_time=1704067200 query_length=2 param_query=up`, }, "should include query stats with store gateway stats": { queryStats: &querier_stats.QueryStats{ @@ -496,7 +496,7 @@ func TestReportQueryStatsFormat(t *testing.T) { StoreGatewayTouchedPostingBytes: 200, }, }, - expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 response_series_count=100 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10 status_code=200 response_size=1000 samples_scanned=0 store_gateway_touched_postings_count=20 store_gateway_touched_posting_bytes=200 query_storage_wall_time_seconds=6000`, }, } diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index a971ff261cb..1f7ab241137 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -450,7 +450,7 @@ func (s resultsCache) handleHit(ctx context.Context, r tripperware.Request, exte level.Debug(util_log.WithContext(ctx, log)).Log("msg", "handle hit", "start", r.GetStart(), "spanID", jaegerSpanID(ctx)) - requests, responses, err := s.partition(r, extents) + requests, responses, err := s.partition(ctx, r, extents) if err != nil { return nil, nil, err } @@ -647,7 +647,7 @@ func convertFromTripperwarePrometheusResponse(resp tripperware.Response) tripper // partition calculates the required requests to satisfy req given the cached data. // extents must be in order by start time. -func (s resultsCache) partition(req tripperware.Request, extents []tripperware.Extent) ([]tripperware.Request, []tripperware.Response, error) { +func (s resultsCache) partition(ctx context.Context, req tripperware.Request, extents []tripperware.Extent) ([]tripperware.Request, []tripperware.Response, error) { var requests []tripperware.Request var cachedResponses []tripperware.Response start := req.GetStart() @@ -678,7 +678,14 @@ func (s resultsCache) partition(req tripperware.Request, extents []tripperware.E return nil, nil, err } // extract the overlap from the cached extent. - cachedResponses = append(cachedResponses, s.extractor.Extract(start, req.GetEnd(), res)) + promRes := s.extractor.Extract(start, req.GetEnd(), res).(*tripperware.PrometheusResponse) + cachedResponses = append(cachedResponses, promRes) + + if queryStats := querier_stats.FromContext(ctx); queryStats != nil && promRes.Data.Stats != nil { + queryStats.AddScannedSamples(uint64(promRes.Data.Stats.Samples.TotalQueryableSamples)) + queryStats.SetPeakSamples(max(queryStats.LoadPeakSamples(), uint64(promRes.Data.Stats.Samples.PeakSamples))) + } + start = extent.End } @@ -807,6 +814,7 @@ func extractStats(start, end int64, stats *tripperware.PrometheusResponseStats) if start <= s.TimestampMs && s.TimestampMs <= end { result.Samples.TotalQueryableSamplesPerStep = append(result.Samples.TotalQueryableSamplesPerStep, s) result.Samples.TotalQueryableSamples += s.Value + result.Samples.PeakSamples = max(result.Samples.PeakSamples, s.Value) } } return result diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index 46422768b87..b029e85443c 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/partialdata" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -173,6 +174,7 @@ func mkAPIResponseWithStats(start, end, step int64, withStats bool, oldFormat bo }) stats.Samples.TotalQueryableSamples += i + stats.Samples.PeakSamples = max(stats.Samples.PeakSamples, i) } } @@ -597,11 +599,13 @@ func TestShouldCache(t *testing.T) { func TestPartition(t *testing.T) { t.Parallel() for _, tc := range []struct { - name string - input tripperware.Request - prevCachedResponse []tripperware.Extent - expectedRequests []tripperware.Request - expectedCachedResponse []tripperware.Response + name string + input tripperware.Request + prevCachedResponse []tripperware.Extent + expectedRequests []tripperware.Request + expectedCachedResponse []tripperware.Response + expectedScannedSamplesFromCachedResponse uint64 + expectedPeakSamplesFromCachedResponse uint64 }{ { name: "Test a complete hit.", @@ -822,6 +826,8 @@ func TestPartition(t *testing.T) { expectedCachedResponse: []tripperware.Response{ mkAPIResponseWithStats(0, 100, 10, true, false), }, + expectedPeakSamplesFromCachedResponse: getPeakSamples(0, 100, 10), + expectedScannedSamplesFromCachedResponse: getScannedSamples(0, 100, 10), }, { @@ -836,6 +842,8 @@ func TestPartition(t *testing.T) { expectedCachedResponse: []tripperware.Response{ mkAPIResponseWithStats(0, 100, 10, true, false), }, + expectedPeakSamplesFromCachedResponse: getPeakSamples(0, 100, 10), + expectedScannedSamplesFromCachedResponse: getScannedSamples(0, 100, 10), }, { @@ -886,6 +894,8 @@ func TestPartition(t *testing.T) { expectedCachedResponse: []tripperware.Response{ mkAPIResponseWithStats(50, 100, 10, true, false), }, + expectedPeakSamplesFromCachedResponse: getPeakSamples(50, 100, 10), + expectedScannedSamplesFromCachedResponse: getScannedSamples(50, 100, 10), }, { name: "[stats] Test multiple partial hits.", @@ -907,6 +917,8 @@ func TestPartition(t *testing.T) { mkAPIResponseWithStats(100, 120, 10, true, false), mkAPIResponseWithStats(160, 200, 10, true, false), }, + expectedPeakSamplesFromCachedResponse: max(getPeakSamples(100, 120, 10), getPeakSamples(160, 200, 10)), + expectedScannedSamplesFromCachedResponse: getScannedSamples(100, 120, 10) + getScannedSamples(160, 200, 10), }, { name: "[stats] Partial hits with tiny gap.", @@ -927,7 +939,8 @@ func TestPartition(t *testing.T) { expectedCachedResponse: []tripperware.Response{ mkAPIResponseWithStats(100, 120, 10, true, false), }, - }, + expectedPeakSamplesFromCachedResponse: getPeakSamples(100, 120, 10), + expectedScannedSamplesFromCachedResponse: getScannedSamples(100, 120, 10)}, { name: "[stats] Extent is outside the range and the request has a single step (same start and end).", input: &tripperware.PrometheusRequest{ @@ -957,6 +970,8 @@ func TestPartition(t *testing.T) { expectedCachedResponse: []tripperware.Response{ mkAPIResponseWithStats(100, 105, 10, true, false), }, + expectedPeakSamplesFromCachedResponse: getPeakSamples(100, 105, 10), + expectedScannedSamplesFromCachedResponse: getScannedSamples(100, 105, 10), }, { name: "[stats] Test when hit has a large step and only a single sample extent with old format.", @@ -971,6 +986,8 @@ func TestPartition(t *testing.T) { expectedCachedResponse: []tripperware.Response{ mkAPIResponseWithStats(100, 105, 10, true, false), }, + expectedPeakSamplesFromCachedResponse: getPeakSamples(100, 105, 10), + expectedScannedSamplesFromCachedResponse: getScannedSamples(100, 105, 10), }, } { tc := tc @@ -980,10 +997,13 @@ func TestPartition(t *testing.T) { extractor: PrometheusResponseExtractor{}, minCacheExtent: 10, } - reqs, resps, err := s.partition(tc.input, tc.prevCachedResponse) + stats, ctx := querier_stats.ContextWithEmptyStats(context.Background()) + reqs, resps, err := s.partition(ctx, tc.input, tc.prevCachedResponse) require.Nil(t, err) require.Equal(t, tc.expectedRequests, reqs) require.Equal(t, tc.expectedCachedResponse, resps) + require.Equal(t, tc.expectedScannedSamplesFromCachedResponse, stats.ScannedSamples) + require.Equal(t, tc.expectedPeakSamplesFromCachedResponse, stats.PeakSamples) }) } } @@ -1585,3 +1605,14 @@ func TestResultsCacheFillCompatibility(t *testing.T) { func toMs(t time.Duration) int64 { return int64(t / time.Millisecond) } + +func getScannedSamples(start, end, step uint64) uint64 { + lastTerm := start + ((end-start)/step)*step + n := (lastTerm-start)/step + 1 + + return (n * (2*start + (n-1)*step)) / 2 +} + +func getPeakSamples(start, end, step uint64) uint64 { + return start + ((end-start)/step)*step +}