Skip to content

Commit 8d15b2d

Browse files
authored
fix remote read snappy input due to empty request body (#7025)
* fix remote read snappy input regression due to empty request body Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent e7b21cf commit 8d15b2d

File tree

4 files changed

+52
-26
lines changed

4 files changed

+52
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
* [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832
9797
* [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863
9898
* [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880
99+
* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025
99100

100101
## 1.19.0 2025-02-27
101102

integration/query_frontend_test.go

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
type queryFrontendTestConfig struct {
3939
testMissingMetricName bool
4040
querySchedulerEnabled bool
41-
queryStatsEnabled bool
4241
remoteReadEnabled bool
4342
testSubQueryStepSize bool
4443
setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)
@@ -61,7 +60,6 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
6160
func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) {
6261
runQueryFrontendTest(t, queryFrontendTestConfig{
6362
testMissingMetricName: false,
64-
queryStatsEnabled: true,
6563
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
6664
flags = BlocksStorageFlags()
6765

@@ -92,7 +90,6 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStat
9290
runQueryFrontendTest(t, queryFrontendTestConfig{
9391
testMissingMetricName: false,
9492
querySchedulerEnabled: true,
95-
queryStatsEnabled: true,
9693
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
9794
flags = BlocksStorageFlags()
9895

@@ -168,7 +165,6 @@ func TestQueryFrontendWithVerticalSharding(t *testing.T) {
168165
runQueryFrontendTest(t, queryFrontendTestConfig{
169166
testMissingMetricName: false,
170167
querySchedulerEnabled: false,
171-
queryStatsEnabled: true,
172168
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
173169
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
174170

@@ -188,7 +184,6 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) {
188184
runQueryFrontendTest(t, queryFrontendTestConfig{
189185
testMissingMetricName: false,
190186
querySchedulerEnabled: true,
191-
queryStatsEnabled: true,
192187
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
193188
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
194189

@@ -208,7 +203,6 @@ func TestQueryFrontendProtobufCodec(t *testing.T) {
208203
runQueryFrontendTest(t, queryFrontendTestConfig{
209204
testMissingMetricName: false,
210205
querySchedulerEnabled: true,
211-
queryStatsEnabled: true,
212206
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
213207
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
214208

@@ -228,7 +222,6 @@ func TestQuerierToQueryFrontendCompression(t *testing.T) {
228222
runQueryFrontendTest(t, queryFrontendTestConfig{
229223
testMissingMetricName: false,
230224
querySchedulerEnabled: true,
231-
queryStatsEnabled: true,
232225
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
233226
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))
234227

@@ -294,7 +287,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
294287
"-querier.split-queries-by-interval": "24h",
295288
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
296289
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
297-
"-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled),
290+
"-frontend.query-stats-enabled": "true", // Always enable query stats to capture regressions
298291
})
299292

300293
// Start the query-scheduler if enabled.
@@ -382,7 +375,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
382375
}
383376

384377
// No need to repeat the test on Server-Timing header for each user.
385-
if userID == 0 && cfg.queryStatsEnabled {
378+
if userID == 0 {
386379
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{})
387380
require.NoError(t, err)
388381
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
@@ -433,15 +426,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
433426

434427
wg.Wait()
435428

436-
extra := float64(2)
429+
extra := float64(3) // Always include query stats test
437430
if cfg.testMissingMetricName {
438431
extra++
439432
}
440433

441-
if cfg.queryStatsEnabled {
442-
extra++
443-
}
444-
445434
if cfg.remoteReadEnabled {
446435
extra++
447436
}
@@ -458,15 +447,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
458447
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount))
459448
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount))
460449

461-
// Ensure query stats metrics are tracked only when enabled.
462-
if cfg.queryStatsEnabled {
463-
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
464-
e2e.Greater(0),
465-
[]string{"cortex_query_seconds_total"},
466-
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
467-
} else {
468-
require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total"))
469-
}
450+
// Ensure query stats metrics are always tracked to capture regressions.
451+
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(
452+
e2e.Greater(0),
453+
[]string{"cortex_query_seconds_total"},
454+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1"))))
470455

471456
// Ensure no service-specific metrics prefix is used by the wrong service.
472457
assertServiceMetricsPrefixes(t, Distributor, distributor)

pkg/frontend/transport/handler.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
285285
// We parse form here so that we can use buf as body, in order to
286286
// prevent https://github.com/cortexproject/cortex/issues/5201.
287287
// Exclude remote read here as we don't have to buffer its body.
288-
if !strings.Contains(r.URL.Path, "api/v1/read") {
288+
isRemoteRead := strings.Contains(r.URL.Path, "api/v1/read")
289+
if !isRemoteRead {
289290
if err := r.ParseForm(); err != nil {
290291
statusCode := http.StatusBadRequest
291292
if util.IsRequestBodyTooLarge(err) {
@@ -300,8 +301,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
300301
r.Body = io.NopCloser(&buf)
301302
}
302303

303-
// Log request
304-
if f.cfg.QueryStatsEnabled {
304+
// Log request if the request is not remote read.
305+
// We need to parse remote read proto to be properly log it so skip it.
306+
if f.cfg.QueryStatsEnabled && !isRemoteRead {
305307
queryString = f.parseRequestQueryString(r, buf)
306308
f.logQueryRequest(r, queryString, source)
307309
}

pkg/frontend/transport/handler_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,3 +841,41 @@ func TestHandlerMetricsCleanup(t *testing.T) {
841841
"cortex_query_samples_scanned_total", "cortex_query_peak_samples", "cortex_query_fetched_chunks_bytes_total",
842842
"cortex_query_fetched_data_bytes_total", "cortex_rejected_queries_total", "cortex_slow_queries_total"))
843843
}
844+
845+
func TestHandler_RemoteReadRequest_DoesNotParseQueryString(t *testing.T) {
846+
// Create a mock round tripper that captures the request
847+
var capturedRequest *http.Request
848+
roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
849+
capturedRequest = req
850+
return &http.Response{
851+
StatusCode: http.StatusOK,
852+
Body: io.NopCloser(strings.NewReader("{}")),
853+
}, nil
854+
})
855+
856+
// Use a larger MaxBodySize to avoid the "request body too large" error
857+
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 10 * 1024 * 1024}, tenantfederation.Config{}, roundTripper, log.NewNopLogger(), nil)
858+
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)
859+
860+
// Create a remote read request with a body that would be corrupted by parseRequestQueryString
861+
originalBody := "snappy-compressed-data"
862+
req := httptest.NewRequest("POST", "http://fake/api/v1/read", strings.NewReader(originalBody))
863+
req.Header.Set("X-Scope-OrgId", "user-1")
864+
req.Header.Set("Content-Type", "application/x-protobuf")
865+
req.Header.Set("Content-Encoding", "snappy")
866+
867+
resp := httptest.NewRecorder()
868+
handlerWithAuth.ServeHTTP(resp, req)
869+
870+
// Verify the request was successful
871+
require.Equal(t, http.StatusOK, resp.Code)
872+
873+
// Verify that the original request body was preserved and not corrupted
874+
require.NotNil(t, capturedRequest)
875+
bodyBytes, err := io.ReadAll(capturedRequest.Body)
876+
require.NoError(t, err)
877+
require.Equal(t, originalBody, string(bodyBytes))
878+
879+
// Verify that the request body is still readable (not replaced with empty buffer)
880+
require.NotEmpty(t, string(bodyBytes))
881+
}

0 commit comments

Comments
 (0)