Skip to content

Commit 1ca8284

Browse files
committed
refactor
Signed-off-by: Ben Ye <[email protected]> fix e2e test Signed-off-by: Ben Ye <[email protected]>
1 parent bf33802 commit 1ca8284

File tree

9 files changed

+126
-134
lines changed

9 files changed

+126
-134
lines changed

integration/query_frontend_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ import (
77
"crypto/x509"
88
"crypto/x509/pkix"
99
"fmt"
10-
"github.com/thanos-io/thanos/pkg/pool"
1110
"net/http"
1211
"os"
13-
"path"
1412
"path/filepath"
1513
"strconv"
1614
"sync"
@@ -23,6 +21,7 @@ import (
2321
"github.com/prometheus/prometheus/prompb"
2422
"github.com/stretchr/testify/assert"
2523
"github.com/stretchr/testify/require"
24+
"github.com/thanos-io/thanos/pkg/pool"
2625

2726
"github.com/cortexproject/cortex/integration/ca"
2827
"github.com/cortexproject/cortex/integration/e2e"
@@ -452,7 +451,7 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) {
452451
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
453452
"-blocks-storage.tsdb.ship-interval": "1s",
454453
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
455-
"-blocks-storage.bucket-store.max_chunk_pool_bytes": "1",
454+
"-blocks-storage.bucket-store.max-chunk-pool-bytes": "1",
456455
})
457456

458457
// Start dependencies.
@@ -492,24 +491,26 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) {
492491
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total"))
493492
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series"))
494493

494+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
495+
require.NoError(t, s.Start(queryFrontend))
496+
495497
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
496498
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
497499
"-blocks-storage.bucket-store.sync-interval": "5s",
498500
}), "")
499-
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
500501
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
501502
"-blocks-storage.bucket-store.sync-interval": "5s",
502503
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
503504
}), "")
504-
require.NoError(t, s.StartAndWaitReady(queryFrontend, querier, storeGateway))
505+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
505506

506507
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
507508
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
508509
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
509510
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))
510511

511512
// Query back the series.
512-
c, err = e2ecortex.NewClient("", path.Join(queryFrontend.HTTPEndpoint(), "/prometheus"), "", "", "user-1")
513+
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
513514
require.NoError(t, err)
514515

515516
// We expect request to hit chunk pool exhaustion.

pkg/frontend/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i
5959
cfg.FrontendV2.Port = grpcListenPort
6060
}
6161

62-
fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg)
63-
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, retry), nil, fr, err
62+
fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry)
63+
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err
6464

6565
default:
6666
// No scheduler = use original frontend.
67-
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
67+
fr, err := v1.New(cfg.FrontendV1, limits, log, reg, retry)
6868
if err != nil {
6969
return nil, nil, nil, err
7070
}
71-
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, retry), fr, nil, nil
71+
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
7272
}
7373
}

pkg/frontend/transport/retry.go

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package transport
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
7-
"io"
8-
"net/http"
96
"strings"
107
"unsafe"
118

129
"github.com/prometheus/client_golang/prometheus"
1310
"github.com/prometheus/client_golang/prometheus/promauto"
1411
"github.com/thanos-io/thanos/pkg/pool"
12+
"github.com/weaveworks/common/httpgrpc"
1513

1614
"github.com/cortexproject/cortex/pkg/querier/tripperware"
1715
)
@@ -33,7 +31,7 @@ func NewRetry(maxRetries int, reg prometheus.Registerer) *Retry {
3331
}
3432
}
3533

36-
func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http.Response, error) {
34+
func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) {
3735
if r.maxRetries == 0 {
3836
// Retries are disabled. Try only once.
3937
return f()
@@ -43,7 +41,7 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http
4341
defer func() { r.retriesCount.Observe(float64(tries)) }()
4442

4543
var (
46-
resp *http.Response
44+
resp *httpgrpc.HTTPResponse
4745
err error
4846
)
4947
for ; tries < r.maxRetries; tries++ {
@@ -54,8 +52,11 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http
5452
resp, err = f()
5553
if err != nil && !errors.Is(err, context.Canceled) {
5654
continue // Retryable
57-
} else if resp != nil && resp.StatusCode/100 == 5 {
58-
body, err := tripperware.BodyBuffer(resp, nil)
55+
} else if resp != nil && resp.Code/100 == 5 {
56+
// This is not that efficient as we might decode the body multiple
57+
// times. But error response should be too large so we should be fine.
58+
// TODO: investigate ways to decode only once.
59+
body, err := tripperware.BodyBufferFromHTTPGRPCResponse(resp, nil)
5960
if err != nil {
6061
return nil, err
6162
}
@@ -64,22 +65,14 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http
6465
continue
6566
}
6667

67-
resp.Body = &buffer{buff: body, ReadCloser: io.NopCloser(bytes.NewReader(body))}
68-
resp.ContentLength = int64(len(body))
6968
return resp, nil
7069
}
7170
break
7271
}
7372
if err != nil {
7473
return nil, err
7574
}
76-
// We always want to return decoded response body if possible.
77-
body, err := tripperware.BodyBuffer(resp, nil)
78-
if err != nil {
79-
return nil, err
80-
}
81-
resp.Body = &buffer{buff: body, ReadCloser: io.NopCloser(bytes.NewReader(body))}
82-
resp.ContentLength = int64(len(body))
75+
8376
return resp, err
8477
}
8578

pkg/frontend/transport/retry_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
"github.com/stretchr/testify/require"
8+
"github.com/thanos-io/thanos/pkg/pool"
89
"github.com/weaveworks/common/httpgrpc"
910
"go.uber.org/atomic"
1011
)
@@ -39,6 +40,7 @@ func TestNoRetryOnChunkPoolExhaustion(t *testing.T) {
3940
if try > 1 {
4041
return &httpgrpc.HTTPResponse{
4142
Code: 500,
43+
Body: []byte(pool.ErrPoolExhausted.Error()),
4244
}, nil
4345
}
4446
return &httpgrpc.HTTPResponse{

pkg/frontend/transport/roundtripper.go

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@ type GrpcRoundTripper interface {
1515
RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)
1616
}
1717

18-
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, retry *Retry) http.RoundTripper {
19-
return &grpcRoundTripperAdapter{roundTripper: r, retry: retry}
18+
func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper {
19+
return &grpcRoundTripperAdapter{roundTripper: r}
2020
}
2121

2222
// This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper
2323
type grpcRoundTripperAdapter struct {
2424
roundTripper GrpcRoundTripper
25-
retry *Retry
2625
}
2726

2827
type buffer struct {
@@ -40,16 +39,11 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er
4039
return nil, err
4140
}
4241

43-
return a.retry.Do(r.Context(), func() (*http.Response, error) {
44-
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
45-
if err != nil {
46-
return nil, err
47-
}
48-
return httpGRPCRespToHTTPResp(resp), nil
49-
})
50-
}
42+
resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
43+
if err != nil {
44+
return nil, err
45+
}
5146

52-
func httpGRPCRespToHTTPResp(resp *httpgrpc.HTTPResponse) *http.Response {
5347
httpResp := &http.Response{
5448
StatusCode: int(resp.Code),
5549
Body: &buffer{buff: resp.Body, ReadCloser: io.NopCloser(bytes.NewReader(resp.Body))},
@@ -59,5 +53,5 @@ func httpGRPCRespToHTTPResp(resp *httpgrpc.HTTPResponse) *http.Response {
5953
for _, h := range resp.Headers {
6054
httpResp.Header[h.Key] = h.Values
6155
}
62-
return httpResp
56+
return httpResp, nil
6357
}

pkg/frontend/v1/frontend.go

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,12 @@ type request struct {
9494
}
9595

9696
// New creates a new frontend. Frontend implements service, and must be started and stopped.
97-
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
97+
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) {
9898
f := &Frontend{
9999
cfg: cfg,
100100
log: log,
101101
limits: limits,
102+
retry: retry,
102103
queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{
103104
Name: "cortex_query_frontend_queue_length",
104105
Help: "Number of queries in the queue.",
@@ -175,31 +176,33 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
175176
}
176177
}
177178

178-
request := request{
179-
request: req,
180-
originalCtx: ctx,
179+
return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
180+
request := request{
181+
request: req,
182+
originalCtx: ctx,
181183

182-
// Buffer of 1 to ensure response can be written by the server side
183-
// of the Process stream, even if this goroutine goes away due to
184-
// client context cancellation.
185-
err: make(chan error, 1),
186-
response: make(chan *httpgrpc.HTTPResponse, 1),
187-
}
184+
// Buffer of 1 to ensure response can be written by the server side
185+
// of the Process stream, even if this goroutine goes away due to
186+
// client context cancellation.
187+
err: make(chan error, 1),
188+
response: make(chan *httpgrpc.HTTPResponse, 1),
189+
}
188190

189-
if err := f.queueRequest(ctx, &request); err != nil {
190-
return nil, err
191-
}
191+
if err := f.queueRequest(ctx, &request); err != nil {
192+
return nil, err
193+
}
192194

193-
select {
194-
case <-ctx.Done():
195-
return nil, ctx.Err()
195+
select {
196+
case <-ctx.Done():
197+
return nil, ctx.Err()
196198

197-
case resp := <-request.response:
198-
return resp, nil
199+
case resp := <-request.response:
200+
return resp, nil
199201

200-
case err := <-request.err:
201-
return nil, err
202-
}
202+
case err := <-request.err:
203+
return nil, err
204+
}
205+
})
203206
}
204207

205208
// Process allows backends to pull requests from the frontend.

0 commit comments

Comments
 (0)