From 795379f7e7a984626e851dc2d6f2736ecf425274 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 13 Sep 2023 23:49:18 -0700 Subject: [PATCH 1/3] stop retrying chunk pool exhaustion at query frontend, retry at querier level Signed-off-by: Ben Ye --- pkg/frontend/config.go | 8 +- pkg/frontend/transport/retry.go | 58 +++++++-- pkg/frontend/transport/retry_test.go | 21 ++++ pkg/frontend/transport/roundtripper.go | 20 +-- pkg/frontend/v1/frontend.go | 45 ++++--- pkg/frontend/v2/frontend.go | 117 +++++++++--------- pkg/querier/blocks_store_queryable.go | 81 ++++++++---- pkg/querier/blocks_store_queryable_test.go | 30 +++++ .../tripperware/instantquery/instant_query.go | 12 +- pkg/querier/tripperware/query.go | 19 +++ .../tripperware/queryrange/query_range.go | 18 +-- 11 files changed, 288 insertions(+), 141 deletions(-) diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index 8ef8fa36030..863dbf4f91b 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -59,15 +59,15 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i cfg.FrontendV2.Port = grpcListenPort } - fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry) - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err + fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg) + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, retry), nil, fr, err default: // No scheduler = use original frontend. - fr, err := v1.New(cfg.FrontendV1, limits, log, reg, retry) + fr, err := v1.New(cfg.FrontendV1, limits, log, reg) if err != nil { return nil, nil, nil, err } - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, retry), fr, nil, nil } } diff --git a/pkg/frontend/transport/retry.go b/pkg/frontend/transport/retry.go index bf010745acc..7c8a3f5ffa8 100644 --- a/pkg/frontend/transport/retry.go +++ b/pkg/frontend/transport/retry.go @@ -1,11 +1,19 @@ package transport import ( + "bytes" "context" + "errors" + "io" + "net/http" + "strings" + "unsafe" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/weaveworks/common/httpgrpc" + "github.com/thanos-io/thanos/pkg/pool" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" ) type Retry struct { @@ -25,7 +33,7 @@ func NewRetry(maxRetries int, reg prometheus.Registerer) *Retry { } } -func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) { +func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http.Response, error) { if r.maxRetries == 0 { // Retries are disabled. Try only once. return f() @@ -35,7 +43,7 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error) defer func() { r.retriesCount.Observe(float64(tries)) }() var ( - resp *httpgrpc.HTTPResponse + resp *http.Response err error ) for ; tries < r.maxRetries; tries++ { @@ -44,13 +52,47 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error) } resp, err = f() - if err != nil && err != context.Canceled { - continue // Retryable - } else if resp != nil && resp.Code/100 == 5 { + if err != nil && !errors.Is(err, context.Canceled) { continue // Retryable - } else { - break + } else if resp != nil && resp.StatusCode/100 == 5 { + body, err := tripperware.BodyBuffer(resp, nil) + if err != nil { + return nil, err + } + + if tries < r.maxRetries-1 && isBodyRetryable(yoloString(body)) { + continue + } + + resp.Body = &buffer{buff: body, ReadCloser: io.NopCloser(bytes.NewReader(body))} + resp.ContentLength = int64(len(body)) + return resp, nil } + break + } + if err != nil { + return nil, err + } + // We always want to return decoded response body if possible. + body, err := tripperware.BodyBuffer(resp, nil) + if err != nil { + return nil, err } + resp.Body = &buffer{buff: body, ReadCloser: io.NopCloser(bytes.NewReader(body))} + resp.ContentLength = int64(len(body)) return resp, err } + +func isBodyRetryable(body string) bool { + // If pool exhausted, retry at query frontend might make things worse. + // Rely on retries at querier level only. + if strings.Contains(body, pool.ErrPoolExhausted.Error()) { + return false + } + + return true +} + +func yoloString(b []byte) string { + return *((*string)(unsafe.Pointer(&b))) +} diff --git a/pkg/frontend/transport/retry_test.go b/pkg/frontend/transport/retry_test.go index a79c083640e..36b8ce7462f 100644 --- a/pkg/frontend/transport/retry_test.go +++ b/pkg/frontend/transport/retry_test.go @@ -29,3 +29,24 @@ func TestRetry(t *testing.T) { require.NoError(t, err) require.Equal(t, int32(200), res.Code) } + +func TestNoRetryOnChunkPoolExhaustion(t *testing.T) { + tries := atomic.NewInt64(3) + r := NewRetry(3, nil) + ctx := context.Background() + res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + try := tries.Dec() + if try > 1 { + return &httpgrpc.HTTPResponse{ + Code: 500, + }, nil + } + return &httpgrpc.HTTPResponse{ + Code: 200, + }, nil + + }) + + require.NoError(t, err) + require.Equal(t, int32(500), res.Code) +} diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 583fc22d04a..c5bbaabb7d6 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -15,13 +15,14 @@ type GrpcRoundTripper interface { RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) } -func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { - return &grpcRoundTripperAdapter{roundTripper: r} +func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, retry *Retry) http.RoundTripper { + return &grpcRoundTripperAdapter{roundTripper: r, retry: retry} } // This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper type grpcRoundTripperAdapter struct { roundTripper GrpcRoundTripper + retry *Retry } type buffer struct { @@ -39,11 +40,16 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er return nil, err } - resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) - if err != nil { - return nil, err - } + return a.retry.Do(r.Context(), func() (*http.Response, error) { + resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) + if err != nil { + return nil, err + } + return httpGRPCRespToHTTPResp(resp), nil + }) +} +func httpGRPCRespToHTTPResp(resp *httpgrpc.HTTPResponse) *http.Response { httpResp := &http.Response{ StatusCode: int(resp.Code), Body: &buffer{buff: resp.Body, ReadCloser: io.NopCloser(bytes.NewReader(resp.Body))}, @@ -53,5 +59,5 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er for _, h := range resp.Headers { httpResp.Header[h.Key] = h.Values } - return httpResp, nil + return httpResp } diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index ac5074dd1c8..b055c5d2809 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -94,12 +94,11 @@ type request struct { } // New creates a new frontend. Frontend implements service, and must be started and stopped. -func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { +func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) { f := &Frontend{ cfg: cfg, log: log, limits: limits, - retry: retry, queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_query_frontend_queue_length", Help: "Number of queries in the queue.", @@ -176,33 +175,31 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) } } - return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { - request := request{ - request: req, - originalCtx: ctx, + request := request{ + request: req, + originalCtx: ctx, - // Buffer of 1 to ensure response can be written by the server side - // of the Process stream, even if this goroutine goes away due to - // client context cancellation. - err: make(chan error, 1), - response: make(chan *httpgrpc.HTTPResponse, 1), - } + // Buffer of 1 to ensure response can be written by the server side + // of the Process stream, even if this goroutine goes away due to + // client context cancellation. + err: make(chan error, 1), + response: make(chan *httpgrpc.HTTPResponse, 1), + } - if err := f.queueRequest(ctx, &request); err != nil { - return nil, err - } + if err := f.queueRequest(ctx, &request); err != nil { + return nil, err + } - select { - case <-ctx.Done(): - return nil, ctx.Err() + select { + case <-ctx.Done(): + return nil, ctx.Err() - case resp := <-request.response: - return resp, nil + case resp := <-request.response: + return resp, nil - case err := <-request.err: - return nil, err - } - }) + case err := <-request.err: + return nil, err + } } // Process allows backends to pull requests from the frontend. diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index dea15faeaf5..abf6de760a6 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -112,7 +112,7 @@ type enqueueResult struct { } // NewFrontend creates a new frontend. -func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { +func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Frontend, error) { requestsCh := make(chan *frontendRequest) schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log) @@ -125,7 +125,6 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *t log: log, requestsCh: requestsCh, schedulerWorkers: schedulerWorkers, - retry: retry, requests: newRequestsInProgress(), } // Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results @@ -188,80 +187,78 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) ctx, cancel := context.WithCancel(ctx) defer cancel() - return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { - freq := &frontendRequest{ - queryID: f.lastQueryID.Inc(), - request: req, - userID: userID, - statsEnabled: stats.IsEnabled(ctx), + freq := &frontendRequest{ + queryID: f.lastQueryID.Inc(), + request: req, + userID: userID, + statsEnabled: stats.IsEnabled(ctx), - cancel: cancel, + cancel: cancel, - // Buffer of 1 to ensure response or error can be written to the channel - // even if this goroutine goes away due to client context cancellation. - enqueue: make(chan enqueueResult, 1), - response: make(chan *frontendv2pb.QueryResultRequest, 1), + // Buffer of 1 to ensure response or error can be written to the channel + // even if this goroutine goes away due to client context cancellation. + enqueue: make(chan enqueueResult, 1), + response: make(chan *frontendv2pb.QueryResultRequest, 1), - retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, - } + retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, + } - f.requests.put(freq) - defer f.requests.delete(freq.queryID) + f.requests.put(freq) + defer f.requests.delete(freq.queryID) - retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. + retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. - enqueueAgain: - select { - case <-ctx.Done(): - return nil, ctx.Err() +enqueueAgain: + select { + case <-ctx.Done(): + return nil, ctx.Err() - case f.requestsCh <- freq: - // Enqueued, let's wait for response. - } + case f.requestsCh <- freq: + // Enqueued, let's wait for response. + } - var cancelCh chan<- uint64 + var cancelCh chan<- uint64 - select { - case <-ctx.Done(): - return nil, ctx.Err() - - case enqRes := <-freq.enqueue: - if enqRes.status == waitForResponse { - cancelCh = enqRes.cancelCh - break // go wait for response. - } else if enqRes.status == failed { - retries-- - if retries > 0 { - goto enqueueAgain - } - } + select { + case <-ctx.Done(): + return nil, ctx.Err() - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") + case enqRes := <-freq.enqueue: + if enqRes.status == waitForResponse { + cancelCh = enqRes.cancelCh + break // go wait for response. + } else if enqRes.status == failed { + retries-- + if retries > 0 { + goto enqueueAgain + } } - select { - case <-ctx.Done(): - if cancelCh != nil { - select { - case cancelCh <- freq.queryID: - // cancellation sent. - default: - // failed to cancel, log it. - level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID) - f.cancelFailedQueries.Inc() - } - } - return nil, ctx.Err() + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") + } - case resp := <-freq.response: - if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { - stats := stats.FromContext(ctx) - stats.Merge(resp.Stats) // Safe if stats is nil. + select { + case <-ctx.Done(): + if cancelCh != nil { + select { + case cancelCh <- freq.queryID: + // cancellation sent. + default: + // failed to cancel, log it. + level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID) + f.cancelFailedQueries.Inc() } + } + return nil, ctx.Err() - return resp.HttpResponse, nil + case resp := <-freq.response: + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(ctx) + stats.Merge(resp.Stats) // Safe if stats is nil. } - }) + + return resp.HttpResponse, nil + } } func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 5729a82a176..078e8f7edb7 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -46,6 +47,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/math" + "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -341,10 +343,10 @@ func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, convertedMatchers = convertMatchersToLabelMatcher(matchers) ) - queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT, convertedMatchers) + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { + nameSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT, convertedMatchers) if err != nil { - return nil, err + return nil, err, retryableError } resMtx.Lock() @@ -352,7 +354,7 @@ func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, resWarnings = append(resWarnings, warnings...) resMtx.Unlock() - return queriedBlocks, nil + return queriedBlocks, nil, retryableError } err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) @@ -376,10 +378,10 @@ func (q *blocksStoreQuerier) LabelValues(name string, matchers ...*labels.Matche resultMtx sync.Mutex ) - queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - valueSets, warnings, queriedBlocks, err := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT, matchers...) + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { + valueSets, warnings, queriedBlocks, err, retryableError := q.fetchLabelValuesFromStore(spanCtx, name, clients, minT, maxT, matchers...) if err != nil { - return nil, err + return nil, err, retryableError } resultMtx.Lock() @@ -387,7 +389,7 @@ func (q *blocksStoreQuerier) LabelValues(name string, matchers ...*labels.Matche resWarnings = append(resWarnings, warnings...) resultMtx.Unlock() - return queriedBlocks, nil + return queriedBlocks, nil, retryableError } err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) @@ -421,11 +423,10 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* resultMtx sync.Mutex ) - queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { - seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit) + queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error) { + seriesSets, queriedBlocks, warnings, numChunks, err, retryableError := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit) if err != nil { - - return nil, err + return nil, err, retryableError } resultMtx.Lock() @@ -440,7 +441,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* } resultMtx.Unlock() - return queriedBlocks, nil + return queriedBlocks, nil, retryableError } err := q.queryWithConsistencyCheck(spanCtx, spanLog, minT, maxT, queryFunc) @@ -458,7 +459,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* } func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, - queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error)) error { + queryFunc func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error, error)) error { // If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until // now - queryStoreAfter, because the most recent time range is covered by ingesters. This // optimization is particularly important for the blocks storage because can be used to skip @@ -501,6 +502,9 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg resQueriedBlocks = []ulid.ULID(nil) attemptedBlocksZones = make(map[ulid.ULID]map[string]int, len(remainingBlocks)) + + queriedBlocks []ulid.ULID + retryableError error ) for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ { @@ -521,7 +525,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg // Fetch series from stores. If an error occur we do not retry because retries // are only meant to cover missing blocks. - queriedBlocks, err := queryFunc(clients, minT, maxT) + queriedBlocks, err, retryableError = queryFunc(clients, minT, maxT) if err != nil { return err } @@ -553,6 +557,12 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg remainingBlocks = missingBlocks } + // After we exhausted retries, if retryable error is not nil return the retryable error. + // It can be helpful to know whether we need to retry more or not. + if retryableError != nil { + return retryableError + } + // We've not been able to query all expected blocks after all retries. level.Warn(util_log.WithContext(ctx, logger)).Log("msg", "failed consistency check", "err", err) return fmt.Errorf("consistency check failed because some blocks were not queried: %s", strings.Join(convertULIDsToString(remainingBlocks), " ")) @@ -567,7 +577,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( matchers []*labels.Matcher, maxChunksLimit int, leftChunksLimit int, -) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) { +) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) @@ -579,11 +589,13 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( spanLog = spanlogger.FromContext(ctx) queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) reqStats = stats.FromContext(ctx) + merrMtx = sync.Mutex{} + merr = multierror.MultiError{} ) matchers, shardingInfo, err := querysharding.ExtractShardingInfo(matchers) if err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, 0, err, merr.Err() } convertedMatchers := convertMatchersToLabelMatcher(matchers) @@ -613,6 +625,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if err != nil { if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) @@ -636,6 +651,9 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to receive series from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } @@ -769,10 +787,10 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( // Wait until all client requests complete. if err := g.Wait(); err != nil { - return nil, nil, nil, 0, err + return nil, nil, nil, 0, err, merr.Err() } - return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil + return seriesSets, queriedBlocks, warnings, int(numChunks.Load()), nil, merr.Err() } func (q *blocksStoreQuerier) fetchLabelNamesFromStore( @@ -781,7 +799,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( minT int64, maxT int64, matchers []storepb.LabelMatcher, -) ([][]string, storage.Warnings, []ulid.ULID, error) { +) ([][]string, storage.Warnings, []ulid.ULID, error, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) @@ -790,6 +808,8 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) + merrMtx = sync.Mutex{} + merr = multierror.MultiError{} ) // Concurrently fetch series from all clients. @@ -808,6 +828,9 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( if err != nil { if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label names from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } @@ -864,10 +887,10 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( // Wait until all client requests complete. if err := g.Wait(); err != nil { - return nil, nil, nil, err + return nil, nil, nil, err, merr.Err() } - return nameSets, warnings, queriedBlocks, nil + return nameSets, warnings, queriedBlocks, nil, merr.Err() } func (q *blocksStoreQuerier) fetchLabelValuesFromStore( @@ -877,7 +900,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( minT int64, maxT int64, matchers ...*labels.Matcher, -) ([][]string, storage.Warnings, []ulid.ULID, error) { +) ([][]string, storage.Warnings, []ulid.ULID, error, error) { var ( reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID) g, gCtx = errgroup.WithContext(reqCtx) @@ -886,6 +909,8 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) + merrMtx = sync.Mutex{} + merr = multierror.MultiError{} ) // Concurrently fetch series from all clients. @@ -904,6 +929,9 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( if err != nil { if isRetryableError(err) { level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress())) + merrMtx.Lock() + merr.Add(err) + merrMtx.Unlock() return nil } @@ -963,10 +991,10 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( // Wait until all client requests complete. if err := g.Wait(); err != nil { - return nil, nil, nil, err + return nil, nil, nil, err, merr.Err() } - return valueSets, warnings, queriedBlocks, nil + return valueSets, warnings, queriedBlocks, nil, merr.Err() } func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) { @@ -1122,6 +1150,9 @@ func isRetryableError(err error) bool { // https://github.com/grpc/grpc-go/blob/03172006f5d168fc646d87928d85cb9c4a480291/clientconn.go#L67 case codes.Canceled: return strings.Contains(err.Error(), "grpc: the client connection is closing") + case codes.Unknown: + // Catch chunks pool exhaustion error only. + return strings.Contains(err.Error(), pool.ErrPoolExhausted.Error()) default: return false } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index a01d4f28937..23114f26634 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/promql-engine/engine" "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -668,6 +669,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { }, }, }, + "multiple store-gateways has the block, but one of them fails to return due to chunk pool exhaustion": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{ + remoteAddr: "1.1.1.1", + mockedSeriesErr: status.Error(codes.Unknown, pool.ErrPoolExhausted.Error()), + }: {block1}, + }, + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2), + mockHintsResponse(block1), + }}: {block1}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel, series1Label), + values: []valueResult{ + {t: minT, v: 2}, + }, + }, + }, + }, "all store-gateways return PermissionDenied": { finderResult: bucketindex.Blocks{ {ID: block1}, diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index a7350e65d5a..b37b096b94b 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -162,17 +162,17 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } - buf, err := tripperware.BodyBuffer(r, log) - if err != nil { - log.Error(err) - return nil, err + buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) + if _, err := buf.ReadFrom(r.Body); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } + if r.StatusCode/100 != 2 { - return nil, httpgrpc.Errorf(r.StatusCode, string(buf)) + return nil, httpgrpc.Errorf(r.StatusCode, string(buf.Bytes())) } var resp PrometheusInstantQueryResponse - if err := json.Unmarshal(buf, &resp); err != nil { + if err := json.Unmarshal(buf.Bytes(), &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index 42de413e52b..f893d20b66f 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -231,6 +231,25 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) { return buf.Bytes(), nil } +func BodyBufferFromHTTPGRPCResponse(res *httpgrpc.HTTPResponse, logger log.Logger) ([]byte, error) { + // if the response is gziped, lets unzip it here + headers := http.Header{} + for _, h := range res.Headers { + headers[h.Key] = h.Values + } + if strings.EqualFold(headers.Get("Content-Encoding"), "gzip") { + gReader, err := gzip.NewReader(bytes.NewBuffer(res.Body)) + if err != nil { + return nil, err + } + defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader") + + return io.ReadAll(gReader) + } + + return res.Body, nil +} + func StatsMerge(stats map[int64]*PrometheusResponseQueryableSamplesStatsPerStep) *PrometheusResponseStats { keys := make([]int64, 0, len(stats)) for key := range stats { diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 18b0be88882..efad5682913 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -271,18 +271,22 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t return nil, err } - buf, err := tripperware.BodyBuffer(r, log) - if err != nil { - log.Error(err) - return nil, err + buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) + if _, err := buf.ReadFrom(r.Body); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + + if r.StatusCode/100 != 2 { + return nil, httpgrpc.Errorf(r.StatusCode, string(buf.Bytes())) } if r.StatusCode/100 != 2 { - return nil, httpgrpc.Errorf(r.StatusCode, string(buf)) + return nil, httpgrpc.Errorf(r.StatusCode, string(buf.Bytes())) } - log.LogFields(otlog.Int("bytes", len(buf))) + body := buf.Bytes() + log.LogFields(otlog.Int("bytes", len(body))) var resp PrometheusResponse - if err := json.Unmarshal(buf, &resp); err != nil { + if err := json.Unmarshal(body, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } From bf338027aa149a6b0a840d09b07d4b98e8ff5b29 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 20 Sep 2023 09:52:00 -0700 Subject: [PATCH 2/3] update integration test Signed-off-by: Ben Ye --- integration/query_frontend_test.go | 84 ++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 67d6b5f8c5c..28b8ea6ded5 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -7,8 +7,10 @@ import ( "crypto/x509" "crypto/x509/pkix" "fmt" + "github.com/thanos-io/thanos/pkg/pool" "net/http" "os" + "path" "path/filepath" "strconv" "sync" @@ -436,3 +438,85 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend) assertServiceMetricsPrefixes(t, QueryScheduler, queryScheduler) } + +func TestQueryFrontendNoRetryChunkPool(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.bucket-store.max_chunk_pool_bytes": "1", + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now() + series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) + series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2ns series is in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + }), "") + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), + }), "") + require.NoError(t, s.StartAndWaitReady(queryFrontend, querier, storeGateway)) + + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) + + // Query back the series. + c, err = e2ecortex.NewClient("", path.Join(queryFrontend.HTTPEndpoint(), "/prometheus"), "", "", "user-1") + require.NoError(t, err) + + // We expect request to hit chunk pool exhaustion. + resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusInternalServerError, resp.StatusCode) + require.Contains(t, string(body), pool.ErrPoolExhausted.Error()) + // We shouldn't be able to see any retries. + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_frontend_retries"}, e2e.WaitMissingMetrics)) +} From 85603c1a94f495d240f51a1fdd908922d48d8d6f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 20 Sep 2023 23:59:44 -0700 Subject: [PATCH 3/3] refactor Signed-off-by: Ben Ye fix e2e test Signed-off-by: Ben Ye --- integration/query_frontend_test.go | 13 +- pkg/frontend/config.go | 8 +- pkg/frontend/transport/retry.go | 31 ++--- pkg/frontend/transport/retry_test.go | 2 + pkg/frontend/transport/roundtripper.go | 20 ++- pkg/frontend/v1/frontend.go | 45 +++---- pkg/frontend/v2/frontend.go | 117 +++++++++--------- .../tripperware/instantquery/instant_query.go | 12 +- .../tripperware/queryrange/query_range.go | 18 ++- 9 files changed, 127 insertions(+), 139 deletions(-) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 28b8ea6ded5..02054d117ef 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -7,10 +7,8 @@ import ( "crypto/x509" "crypto/x509/pkix" "fmt" - "github.com/thanos-io/thanos/pkg/pool" "net/http" "os" - "path" "path/filepath" "strconv" "sync" @@ -23,6 +21,7 @@ import ( "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/pool" "github.com/cortexproject/cortex/integration/ca" "github.com/cortexproject/cortex/integration/e2e" @@ -452,7 +451,7 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) { "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), "-blocks-storage.tsdb.ship-interval": "1s", "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), - "-blocks-storage.bucket-store.max_chunk_pool_bytes": "1", + "-blocks-storage.bucket-store.max-chunk-pool-bytes": "1", }) // Start dependencies. @@ -492,16 +491,18 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) { require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") + require.NoError(t, s.Start(queryFrontend)) + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ "-blocks-storage.bucket-store.sync-interval": "5s", }), "") - queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "") querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ "-blocks-storage.bucket-store.sync-interval": "5s", "-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(), }), "") - require.NoError(t, s.StartAndWaitReady(queryFrontend, querier, storeGateway)) + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) @@ -509,7 +510,7 @@ func TestQueryFrontendNoRetryChunkPool(t *testing.T) { require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) // Query back the series. - c, err = e2ecortex.NewClient("", path.Join(queryFrontend.HTTPEndpoint(), "/prometheus"), "", "", "user-1") + c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1") require.NoError(t, err) // We expect request to hit chunk pool exhaustion. diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index 863dbf4f91b..8ef8fa36030 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -59,15 +59,15 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i cfg.FrontendV2.Port = grpcListenPort } - fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg) - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, retry), nil, fr, err + fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry) + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err default: // No scheduler = use original frontend. - fr, err := v1.New(cfg.FrontendV1, limits, log, reg) + fr, err := v1.New(cfg.FrontendV1, limits, log, reg, retry) if err != nil { return nil, nil, nil, err } - return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr, retry), fr, nil, nil + return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil } } diff --git a/pkg/frontend/transport/retry.go b/pkg/frontend/transport/retry.go index 7c8a3f5ffa8..bf1b4faa1ce 100644 --- a/pkg/frontend/transport/retry.go +++ b/pkg/frontend/transport/retry.go @@ -1,17 +1,15 @@ package transport import ( - "bytes" "context" "errors" - "io" - "net/http" "strings" "unsafe" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/thanos-io/thanos/pkg/pool" + "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/querier/tripperware" ) @@ -33,7 +31,7 @@ func NewRetry(maxRetries int, reg prometheus.Registerer) *Retry { } } -func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http.Response, error) { +func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)) (*httpgrpc.HTTPResponse, error) { if r.maxRetries == 0 { // Retries are disabled. Try only once. return f() @@ -43,7 +41,7 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http defer func() { r.retriesCount.Observe(float64(tries)) }() var ( - resp *http.Response + resp *httpgrpc.HTTPResponse err error ) for ; tries < r.maxRetries; tries++ { @@ -54,8 +52,11 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http resp, err = f() if err != nil && !errors.Is(err, context.Canceled) { continue // Retryable - } else if resp != nil && resp.StatusCode/100 == 5 { - body, err := tripperware.BodyBuffer(resp, nil) + } else if resp != nil && resp.Code/100 == 5 { + // This is not that efficient as we might decode the body multiple + // times. But error response should be too large so we should be fine. + // TODO: investigate ways to decode only once. + body, err := tripperware.BodyBufferFromHTTPGRPCResponse(resp, nil) if err != nil { return nil, err } @@ -64,8 +65,6 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http continue } - resp.Body = &buffer{buff: body, ReadCloser: io.NopCloser(bytes.NewReader(body))} - resp.ContentLength = int64(len(body)) return resp, nil } break @@ -73,24 +72,14 @@ func (r *Retry) Do(ctx context.Context, f func() (*http.Response, error)) (*http if err != nil { return nil, err } - // We always want to return decoded response body if possible. - body, err := tripperware.BodyBuffer(resp, nil) - if err != nil { - return nil, err - } - resp.Body = &buffer{buff: body, ReadCloser: io.NopCloser(bytes.NewReader(body))} - resp.ContentLength = int64(len(body)) + return resp, err } func isBodyRetryable(body string) bool { // If pool exhausted, retry at query frontend might make things worse. // Rely on retries at querier level only. - if strings.Contains(body, pool.ErrPoolExhausted.Error()) { - return false - } - - return true + return !strings.Contains(body, pool.ErrPoolExhausted.Error()) } func yoloString(b []byte) string { diff --git a/pkg/frontend/transport/retry_test.go b/pkg/frontend/transport/retry_test.go index 36b8ce7462f..3b8ead1a891 100644 --- a/pkg/frontend/transport/retry_test.go +++ b/pkg/frontend/transport/retry_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/pool" "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" ) @@ -39,6 +40,7 @@ func TestNoRetryOnChunkPoolExhaustion(t *testing.T) { if try > 1 { return &httpgrpc.HTTPResponse{ Code: 500, + Body: []byte(pool.ErrPoolExhausted.Error()), }, nil } return &httpgrpc.HTTPResponse{ diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index c5bbaabb7d6..583fc22d04a 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -15,14 +15,13 @@ type GrpcRoundTripper interface { RoundTripGRPC(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) } -func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper, retry *Retry) http.RoundTripper { - return &grpcRoundTripperAdapter{roundTripper: r, retry: retry} +func AdaptGrpcRoundTripperToHTTPRoundTripper(r GrpcRoundTripper) http.RoundTripper { + return &grpcRoundTripperAdapter{roundTripper: r} } // This adapter wraps GrpcRoundTripper and converted it into http.RoundTripper type grpcRoundTripperAdapter struct { roundTripper GrpcRoundTripper - retry *Retry } type buffer struct { @@ -40,16 +39,11 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er return nil, err } - return a.retry.Do(r.Context(), func() (*http.Response, error) { - resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) - if err != nil { - return nil, err - } - return httpGRPCRespToHTTPResp(resp), nil - }) -} + resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req) + if err != nil { + return nil, err + } -func httpGRPCRespToHTTPResp(resp *httpgrpc.HTTPResponse) *http.Response { httpResp := &http.Response{ StatusCode: int(resp.Code), Body: &buffer{buff: resp.Body, ReadCloser: io.NopCloser(bytes.NewReader(resp.Body))}, @@ -59,5 +53,5 @@ func httpGRPCRespToHTTPResp(resp *httpgrpc.HTTPResponse) *http.Response { for _, h := range resp.Headers { httpResp.Header[h.Key] = h.Values } - return httpResp + return httpResp, nil } diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index b055c5d2809..ac5074dd1c8 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -94,11 +94,12 @@ type request struct { } // New creates a new frontend. Frontend implements service, and must be started and stopped. -func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) { +func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { f := &Frontend{ cfg: cfg, log: log, limits: limits, + retry: retry, queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_query_frontend_queue_length", Help: "Number of queries in the queue.", @@ -175,31 +176,33 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) } } - request := request{ - request: req, - originalCtx: ctx, + return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + request := request{ + request: req, + originalCtx: ctx, - // Buffer of 1 to ensure response can be written by the server side - // of the Process stream, even if this goroutine goes away due to - // client context cancellation. - err: make(chan error, 1), - response: make(chan *httpgrpc.HTTPResponse, 1), - } + // Buffer of 1 to ensure response can be written by the server side + // of the Process stream, even if this goroutine goes away due to + // client context cancellation. + err: make(chan error, 1), + response: make(chan *httpgrpc.HTTPResponse, 1), + } - if err := f.queueRequest(ctx, &request); err != nil { - return nil, err - } + if err := f.queueRequest(ctx, &request); err != nil { + return nil, err + } - select { - case <-ctx.Done(): - return nil, ctx.Err() + select { + case <-ctx.Done(): + return nil, ctx.Err() - case resp := <-request.response: - return resp, nil + case resp := <-request.response: + return resp, nil - case err := <-request.err: - return nil, err - } + case err := <-request.err: + return nil, err + } + }) } // Process allows backends to pull requests from the frontend. diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index abf6de760a6..dea15faeaf5 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -112,7 +112,7 @@ type enqueueResult struct { } // NewFrontend creates a new frontend. -func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Frontend, error) { +func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { requestsCh := make(chan *frontendRequest) schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log) @@ -125,6 +125,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer) (*Fronte log: log, requestsCh: requestsCh, schedulerWorkers: schedulerWorkers, + retry: retry, requests: newRequestsInProgress(), } // Randomize to avoid getting responses from queries sent before restart, which could lead to mixing results @@ -187,78 +188,80 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) ctx, cancel := context.WithCancel(ctx) defer cancel() - freq := &frontendRequest{ - queryID: f.lastQueryID.Inc(), - request: req, - userID: userID, - statsEnabled: stats.IsEnabled(ctx), + return f.retry.Do(ctx, func() (*httpgrpc.HTTPResponse, error) { + freq := &frontendRequest{ + queryID: f.lastQueryID.Inc(), + request: req, + userID: userID, + statsEnabled: stats.IsEnabled(ctx), - cancel: cancel, + cancel: cancel, - // Buffer of 1 to ensure response or error can be written to the channel - // even if this goroutine goes away due to client context cancellation. - enqueue: make(chan enqueueResult, 1), - response: make(chan *frontendv2pb.QueryResultRequest, 1), + // Buffer of 1 to ensure response or error can be written to the channel + // even if this goroutine goes away due to client context cancellation. + enqueue: make(chan enqueueResult, 1), + response: make(chan *frontendv2pb.QueryResultRequest, 1), - retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, - } - - f.requests.put(freq) - defer f.requests.delete(freq.queryID) + retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1, + } - retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. + f.requests.put(freq) + defer f.requests.delete(freq.queryID) -enqueueAgain: - select { - case <-ctx.Done(): - return nil, ctx.Err() + retries := f.cfg.WorkerConcurrency + 1 // To make sure we hit at least two different schedulers. - case f.requestsCh <- freq: - // Enqueued, let's wait for response. - } + enqueueAgain: + select { + case <-ctx.Done(): + return nil, ctx.Err() - var cancelCh chan<- uint64 + case f.requestsCh <- freq: + // Enqueued, let's wait for response. + } - select { - case <-ctx.Done(): - return nil, ctx.Err() + var cancelCh chan<- uint64 - case enqRes := <-freq.enqueue: - if enqRes.status == waitForResponse { - cancelCh = enqRes.cancelCh - break // go wait for response. - } else if enqRes.status == failed { - retries-- - if retries > 0 { - goto enqueueAgain + select { + case <-ctx.Done(): + return nil, ctx.Err() + + case enqRes := <-freq.enqueue: + if enqRes.status == waitForResponse { + cancelCh = enqRes.cancelCh + break // go wait for response. + } else if enqRes.status == failed { + retries-- + if retries > 0 { + goto enqueueAgain + } } + + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") } - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "failed to enqueue request") - } + select { + case <-ctx.Done(): + if cancelCh != nil { + select { + case cancelCh <- freq.queryID: + // cancellation sent. + default: + // failed to cancel, log it. + level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID) + f.cancelFailedQueries.Inc() + } + } + return nil, ctx.Err() - select { - case <-ctx.Done(): - if cancelCh != nil { - select { - case cancelCh <- freq.queryID: - // cancellation sent. - default: - // failed to cancel, log it. - level.Warn(util_log.WithContext(ctx, f.log)).Log("msg", "failed to enqueue cancellation signal", "query_id", freq.queryID) - f.cancelFailedQueries.Inc() + case resp := <-freq.response: + if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { + stats := stats.FromContext(ctx) + stats.Merge(resp.Stats) // Safe if stats is nil. } - } - return nil, ctx.Err() - case resp := <-freq.response: - if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) { - stats := stats.FromContext(ctx) - stats.Merge(resp.Stats) // Safe if stats is nil. + return resp.HttpResponse, nil } - - return resp.HttpResponse, nil - } + }) } func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) { diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index b37b096b94b..a7350e65d5a 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -162,17 +162,17 @@ func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ return nil, err } - buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) - if _, err := buf.ReadFrom(r.Body); err != nil { - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + buf, err := tripperware.BodyBuffer(r, log) + if err != nil { + log.Error(err) + return nil, err } - if r.StatusCode/100 != 2 { - return nil, httpgrpc.Errorf(r.StatusCode, string(buf.Bytes())) + return nil, httpgrpc.Errorf(r.StatusCode, string(buf)) } var resp PrometheusInstantQueryResponse - if err := json.Unmarshal(buf.Bytes(), &resp); err != nil { + if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index efad5682913..18b0be88882 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -271,22 +271,18 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ t return nil, err } - buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) - if _, err := buf.ReadFrom(r.Body); err != nil { - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) - } - - if r.StatusCode/100 != 2 { - return nil, httpgrpc.Errorf(r.StatusCode, string(buf.Bytes())) + buf, err := tripperware.BodyBuffer(r, log) + if err != nil { + log.Error(err) + return nil, err } if r.StatusCode/100 != 2 { - return nil, httpgrpc.Errorf(r.StatusCode, string(buf.Bytes())) + return nil, httpgrpc.Errorf(r.StatusCode, string(buf)) } - body := buf.Bytes() - log.LogFields(otlog.Int("bytes", len(body))) + log.LogFields(otlog.Int("bytes", len(buf))) var resp PrometheusResponse - if err := json.Unmarshal(body, &resp); err != nil { + if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) }