diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c810f9ddf4..299fc57b227 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -60,6 +60,7 @@ * [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217 * [BUGFIX] Query Frontend: Disable `absent`, `absent_over_time` and `scalar` for vertical sharding. #5221 * [BUGFIX] Catch context error in the s3 bucket client. #5240 +* [BUGFIX] Fix query frontend remote read empty body. #5257 ## 1.14.0 2022-12-02 diff --git a/VERSION b/VERSION index 460eb7b0ea3..98db740001c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.15.0-rc.1 +1.15.0-rc.2 diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index feb643c37bb..adfc99faf4c 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -19,8 +19,11 @@ import ( promapi "github.com/prometheus/client_golang/api" promv1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" yaml "gopkg.in/yaml.v3" "github.com/cortexproject/cortex/pkg/ruler" @@ -153,6 +156,72 @@ func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) { return c.query(addr) } +// RemoteRead runs a remote read query. +func (c *Client) RemoteRead(matchers []*labels.Matcher, start, end time.Time, step time.Duration) (*prompb.ReadResponse, error) { + startMs := start.UnixMilli() + endMs := end.UnixMilli() + stepMs := step.Milliseconds() + + q, err := remote.ToQuery(startMs, endMs, matchers, &storage.SelectHints{ + Step: stepMs, + Start: startMs, + End: endMs, + }) + if err != nil { + return nil, err + } + + req := &prompb.ReadRequest{ + Queries: []*prompb.Query{q}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, err + } + compressed := snappy.Encode(nil, data) + + // Call the remote read API endpoint with a timeout. + httpReqCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + httpReq, err := http.NewRequestWithContext(httpReqCtx, "POST", "http://"+c.querierAddress+"/prometheus/api/v1/read", bytes.NewReader(compressed)) + if err != nil { + return nil, err + } + httpReq.Header.Set("X-Scope-OrgID", "user-1") + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Add("Accept-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("User-Agent", "Prometheus/1.8.2") + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + httpResp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, err + } + if httpResp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code %d", httpResp.StatusCode) + } + + compressed, err = io.ReadAll(httpResp.Body) + if err != nil { + return nil, err + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + + var resp prompb.ReadResponse + if err = proto.Unmarshal(uncompressed, &resp); err != nil { + return nil, err + } + return &resp, nil +} + func (c *Client) query(addr string) (*http.Response, []byte, error) { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index a0a0a702529..6df5d101015 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -31,6 +31,7 @@ type queryFrontendTestConfig struct { testMissingMetricName bool querySchedulerEnabled bool queryStatsEnabled bool + remoteReadEnabled bool setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) } @@ -194,6 +195,19 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) { }) } +func TestQueryFrontendRemoteRead(t *testing.T) { + runQueryFrontendTest(t, queryFrontendTestConfig{ + remoteReadEnabled: true, + setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) + + minio := e2edb.NewMinio(9000, BlocksStorageFlags()["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + return cortexConfigFile, flags + }, + }) +} + func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { const numUsers = 10 const numQueriesPerUser = 10 @@ -307,6 +321,18 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } + // No need to repeat the test on remote read for each user. + if userID == 0 && cfg.remoteReadEnabled { + start := now.Add(-1 * time.Hour) + end := now.Add(1 * time.Hour) + res, err := c.RemoteRead([]*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "series_1")}, start, end, time.Second) + require.NoError(t, err) + require.True(t, len(res.Results) > 0) + require.True(t, len(res.Results[0].Timeseries) > 0) + require.True(t, len(res.Results[0].Timeseries[0].Samples) > 0) + require.True(t, len(res.Results[0].Timeseries[0].Labels) > 0) + } + // In this test we do ensure that the /series start/end time is ignored and Cortex // always returns series in ingesters memory. No need to repeat it for each user. if userID == 0 { @@ -342,6 +368,10 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { extra++ } + if cfg.remoteReadEnabled { + extra++ + } + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total")) // The number of received request is greater than the query requests because include diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index a5174477b00..fdf8ae27c03 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -139,11 +139,14 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(io.TeeReader(r.Body, &buf)) // We parse form here so that we can use buf as body, in order to // prevent https://github.com/cortexproject/cortex/issues/5201. - if err := r.ParseForm(); err != nil { - writeError(w, err) - return + // Exclude remote read here as we don't have to buffer its body. + if !strings.Contains(r.URL.Path, "api/v1/read") { + if err := r.ParseForm(); err != nil { + writeError(w, err) + return + } + r.Body = io.NopCloser(&buf) } - r.Body = io.NopCloser(&buf) startTime := time.Now() resp, err := f.roundTripper.RoundTrip(r)