From 50fb3c5bb69de42165921c0ad818d9ab26e483f2 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 10 Apr 2020 15:38:03 -0400 Subject: [PATCH 01/44] Added param Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index d399697a52a..e7dcf9c87b4 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -33,6 +33,7 @@ var ( type WorkerConfig struct { Address string `yaml:"frontend_address"` Parallelism int `yaml:"parallelism"` + TotalParallelism int `yaml:"total_parallelism"` DNSLookupDuration time.Duration `yaml:"dns_lookup_duration"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` @@ -41,7 +42,8 @@ type WorkerConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service, in host:port format.") - f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process.") + f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query frontend.") + f.IntVar(&cfg.TotalParallelism, "querier.worker-total-parallelism", 20, "Number of simultaneous queries to process.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) @@ -156,6 +158,7 @@ func (w *worker) runOne(ctx context.Context, client FrontendClient) { backoff := util.NewBackoff(ctx, backoffConfig) for backoff.Ongoing() { + c, err := client.Process(ctx) if err != nil { level.Error(w.log).Log("msg", "error contacting frontend", "err", err) @@ -175,6 +178,7 @@ func (w *worker) runOne(ctx context.Context, client FrontendClient) { // process loops processing requests on an established stream. func (w *worker) process(c Frontend_ProcessClient) error { + // Build a child context so we can cancel querie when the stream is closed. ctx, cancel := context.WithCancel(c.Context()) defer cancel() From 1efc6d4dcbd05cbd2520e145852d3ca7684b55ce Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 10:13:44 -0400 Subject: [PATCH 02/44] First pass new structure Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 141 ++++++++++++++++++++++++--------- 1 file changed, 102 insertions(+), 39 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index e7dcf9c87b4..4c24b33f347 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -43,7 +43,6 @@ type WorkerConfig struct { func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service, in host:port format.") f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query frontend.") - f.IntVar(&cfg.TotalParallelism, "querier.worker-total-parallelism", 20, "Number of simultaneous queries to process.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) @@ -103,7 +102,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { w.watcher.Close() }() - cancels := map[string]context.CancelFunc{} + mgrs := map[string]*frontendManager{} for { updates, err := w.watcher.Next() @@ -119,15 +118,23 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { for _, update := range updates { switch update.Op { case naming.Add: + // jpe : do the cancel contexts matter? + level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) - ctx, cancel := context.WithCancel(servCtx) - cancels[update.Addr] = cancel - w.runMany(ctx, update.Addr) + client, err := w.connect(update.Addr) + if err != nil { + level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) // jpe : dangerous + } + + mgr := NewFrontendManager(servCtx, w.log, w.server, client, w.cfg.Parallelism, w.cfg.GRPCClientConfig.MaxRecvMsgSize) + mgrs[update.Addr] = mgr case naming.Delete: + // jpe : does this actually gracefully shutdown? + level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) - if cancel, ok := cancels[update.Addr]; ok { - cancel() + if mgr, ok := mgrs[update.Addr]; ok { + mgr.stop() } default: @@ -137,37 +144,98 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { } } -// runMany starts N runOne loops for a given address. -func (w *worker) runMany(ctx context.Context, address string) { - client, err := w.connect(address) +func (w *worker) connect(address string) (FrontendClient, error) { + opts := []grpc.DialOption{grpc.WithInsecure()} + opts = append(opts, w.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClientUserHeaderInterceptor}, nil)...) + conn, err := grpc.Dial(address, opts...) if err != nil { - level.Error(w.log).Log("msg", "error connecting", "addr", address, "err", err) - return + return nil, err } + return NewFrontendClient(conn), nil +} + +type frontendManager struct { + client FrontendClient + gracefulQuit []chan struct{} - w.wg.Add(w.cfg.Parallelism) - for i := 0; i < w.cfg.Parallelism; i++ { - go w.runOne(ctx, client) + server *server.Server + log log.Logger + ctx context.Context + maxSendMsgSize int + + wg sync.WaitGroup + mtx sync.Mutex +} + +func NewFrontendManager(ctx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { + f := &frontendManager{ + client: client, + ctx: ctx, + log: log, + server: server, + maxSendMsgSize: maxSendMsgSize, } + + f.concurrentRequests(initialConcurrentRequests) + + return f } +func (f *frontendManager) stop() { + f.concurrentRequests(0) + f.wg.Wait() +} + +func (f *frontendManager) concurrentRequests(n int) error { + f.mtx.Lock() + defer f.mtx.Unlock() + + // adjust clients slice as necessary + for len(f.gracefulQuit) != n { + if len(f.gracefulQuit) < n { + quit := make(chan struct{}) + f.gracefulQuit = append(f.gracefulQuit, quit) + + f.runOne(quit) + + continue + } + + if len(f.gracefulQuit) > n { + // remove from slice and shutdown + var quit chan struct{} + quit, f.gracefulQuit = f.gracefulQuit[0], f.gracefulQuit[1:] + close(quit) + } + } + + return nil +} + +// jpe +// pass grpc client config? +// is f.wg.Add(1) safe? +// pass graceful quit + // runOne loops, trying to establish a stream to the frontend to begin // request processing. -func (w *worker) runOne(ctx context.Context, client FrontendClient) { - defer w.wg.Done() +func (f *frontendManager) runOne(quit <-chan struct{}) { + f.wg.Add(1) + defer f.wg.Done() - backoff := util.NewBackoff(ctx, backoffConfig) + backoff := util.NewBackoff(f.ctx, backoffConfig) for backoff.Ongoing() { - c, err := client.Process(ctx) + // break context chain here + c, err := f.client.Process(f.ctx) if err != nil { - level.Error(w.log).Log("msg", "error contacting frontend", "err", err) + level.Error(f.log).Log("msg", "error contacting frontend", "err", err) backoff.Wait() continue } - if err := w.process(c); err != nil { - level.Error(w.log).Log("msg", "error processing requests", "err", err) + if err := f.process(quit, c); err != nil { + level.Error(f.log).Log("msg", "error processing requests", "err", err) backoff.Wait() continue } @@ -177,13 +245,18 @@ func (w *worker) runOne(ctx context.Context, client FrontendClient) { } // process loops processing requests on an established stream. -func (w *worker) process(c Frontend_ProcessClient) error { - +func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient) error { // Build a child context so we can cancel querie when the stream is closed. ctx, cancel := context.WithCancel(c.Context()) defer cancel() for { + select { + case <-quit: + return nil // jpe: won't really work with runOne + default: + } + request, err := c.Recv() if err != nil { return err @@ -195,7 +268,7 @@ func (w *worker) process(c Frontend_ProcessClient) error { // here, as we're running in lock step with the server - each Recv is // paired with a Send. go func() { - response, err := w.server.Handle(ctx, request.HttpRequest) + response, err := f.server.Handle(ctx, request.HttpRequest) if err != nil { var ok bool response, ok = httpgrpc.HTTPResponseFromError(err) @@ -208,30 +281,20 @@ func (w *worker) process(c Frontend_ProcessClient) error { } // Ensure responses that are too big are not retried. - if len(response.Body) >= w.cfg.GRPCClientConfig.MaxSendMsgSize { - errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), w.cfg.GRPCClientConfig.MaxSendMsgSize) + if len(response.Body) >= f.maxSendMsgSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), f.maxSendMsgSize) response = &httpgrpc.HTTPResponse{ Code: http.StatusRequestEntityTooLarge, Body: []byte(errMsg), } - level.Error(w.log).Log("msg", "error processing query", "err", errMsg) + level.Error(f.log).Log("msg", "error processing query", "err", errMsg) } if err := c.Send(&ProcessResponse{ HttpResponse: response, }); err != nil { - level.Error(w.log).Log("msg", "error processing requests", "err", err) + level.Error(f.log).Log("msg", "error processing requests", "err", err) } }() } } - -func (w *worker) connect(address string) (FrontendClient, error) { - opts := []grpc.DialOption{grpc.WithInsecure()} - opts = append(opts, w.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClientUserHeaderInterceptor}, nil)...) - conn, err := grpc.Dial(address, opts...) - if err != nil { - return nil, err - } - return NewFrontendClient(conn), nil -} From 3cda9ed22b6d42dac5df1c9a8db4045c85650ea3 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 10:22:28 -0400 Subject: [PATCH 03/44] Split out manager into its own code Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 147 ---------------- .../frontend/worker_frontend_manager.go | 161 ++++++++++++++++++ 2 files changed, 161 insertions(+), 147 deletions(-) create mode 100644 pkg/querier/frontend/worker_frontend_manager.go diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 4c24b33f347..51000385b03 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -4,14 +4,12 @@ import ( "context" "flag" "fmt" - "net/http" "sync" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" "google.golang.org/grpc" @@ -153,148 +151,3 @@ func (w *worker) connect(address string) (FrontendClient, error) { } return NewFrontendClient(conn), nil } - -type frontendManager struct { - client FrontendClient - gracefulQuit []chan struct{} - - server *server.Server - log log.Logger - ctx context.Context - maxSendMsgSize int - - wg sync.WaitGroup - mtx sync.Mutex -} - -func NewFrontendManager(ctx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { - f := &frontendManager{ - client: client, - ctx: ctx, - log: log, - server: server, - maxSendMsgSize: maxSendMsgSize, - } - - f.concurrentRequests(initialConcurrentRequests) - - return f -} - -func (f *frontendManager) stop() { - f.concurrentRequests(0) - f.wg.Wait() -} - -func (f *frontendManager) concurrentRequests(n int) error { - f.mtx.Lock() - defer f.mtx.Unlock() - - // adjust clients slice as necessary - for len(f.gracefulQuit) != n { - if len(f.gracefulQuit) < n { - quit := make(chan struct{}) - f.gracefulQuit = append(f.gracefulQuit, quit) - - f.runOne(quit) - - continue - } - - if len(f.gracefulQuit) > n { - // remove from slice and shutdown - var quit chan struct{} - quit, f.gracefulQuit = f.gracefulQuit[0], f.gracefulQuit[1:] - close(quit) - } - } - - return nil -} - -// jpe -// pass grpc client config? -// is f.wg.Add(1) safe? -// pass graceful quit - -// runOne loops, trying to establish a stream to the frontend to begin -// request processing. -func (f *frontendManager) runOne(quit <-chan struct{}) { - f.wg.Add(1) - defer f.wg.Done() - - backoff := util.NewBackoff(f.ctx, backoffConfig) - for backoff.Ongoing() { - - // break context chain here - c, err := f.client.Process(f.ctx) - if err != nil { - level.Error(f.log).Log("msg", "error contacting frontend", "err", err) - backoff.Wait() - continue - } - - if err := f.process(quit, c); err != nil { - level.Error(f.log).Log("msg", "error processing requests", "err", err) - backoff.Wait() - continue - } - - backoff.Reset() - } -} - -// process loops processing requests on an established stream. -func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient) error { - // Build a child context so we can cancel querie when the stream is closed. - ctx, cancel := context.WithCancel(c.Context()) - defer cancel() - - for { - select { - case <-quit: - return nil // jpe: won't really work with runOne - default: - } - - request, err := c.Recv() - if err != nil { - return err - } - - // Handle the request on a "background" goroutine, so we go back to - // blocking on c.Recv(). This allows us to detect the stream closing - // and cancel the query. We don't actally handle queries in parallel - // here, as we're running in lock step with the server - each Recv is - // paired with a Send. - go func() { - response, err := f.server.Handle(ctx, request.HttpRequest) - if err != nil { - var ok bool - response, ok = httpgrpc.HTTPResponseFromError(err) - if !ok { - response = &httpgrpc.HTTPResponse{ - Code: http.StatusInternalServerError, - Body: []byte(err.Error()), - } - } - } - - // Ensure responses that are too big are not retried. - if len(response.Body) >= f.maxSendMsgSize { - errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), f.maxSendMsgSize) - response = &httpgrpc.HTTPResponse{ - Code: http.StatusRequestEntityTooLarge, - Body: []byte(errMsg), - } - level.Error(f.log).Log("msg", "error processing query", "err", errMsg) - } - - if err := c.Send(&ProcessResponse{ - HttpResponse: response, - }); err != nil { - level.Error(f.log).Log("msg", "error processing requests", "err", err) - } - }() - } -} diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go new file mode 100644 index 00000000000..cdf5d6189bf --- /dev/null +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -0,0 +1,161 @@ +package frontend + +import ( + "context" + "fmt" + "net/http" + "sync" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" + + "github.com/cortexproject/cortex/pkg/util" +) + +type frontendManager struct { + client FrontendClient + gracefulQuit []chan struct{} + + server *server.Server + log log.Logger + ctx context.Context + maxSendMsgSize int + + wg sync.WaitGroup + mtx sync.Mutex +} + + +func NewFrontendManager(ctx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { + f := &frontendManager{ + client: client, + ctx: ctx, + log: log, + server: server, + maxSendMsgSize: maxSendMsgSize, + } + + f.concurrentRequests(initialConcurrentRequests) + + return f +} + +func (f *frontendManager) stop() { + f.concurrentRequests(0) + f.wg.Wait() +} + +func (f *frontendManager) concurrentRequests(n int) error { + f.mtx.Lock() + defer f.mtx.Unlock() + + // adjust clients slice as necessary + for len(f.gracefulQuit) != n { + if len(f.gracefulQuit) < n { + quit := make(chan struct{}) + f.gracefulQuit = append(f.gracefulQuit, quit) + + f.runOne(quit) + + continue + } + + if len(f.gracefulQuit) > n { + // remove from slice and shutdown + var quit chan struct{} + quit, f.gracefulQuit = f.gracefulQuit[0], f.gracefulQuit[1:] + close(quit) + } + } + + return nil +} + +// jpe +// pass grpc client config? +// is f.wg.Add(1) safe? +// pass graceful quit + +// runOne loops, trying to establish a stream to the frontend to begin +// request processing. +func (f *frontendManager) runOne(quit <-chan struct{}) { + f.wg.Add(1) + defer f.wg.Done() + + backoff := util.NewBackoff(f.ctx, backoffConfig) + for backoff.Ongoing() { + + // break context chain here + c, err := f.client.Process(f.ctx) + if err != nil { + level.Error(f.log).Log("msg", "error contacting frontend", "err", err) + backoff.Wait() + continue + } + + if err := f.process(quit, c); err != nil { + level.Error(f.log).Log("msg", "error processing requests", "err", err) + backoff.Wait() + continue + } + + backoff.Reset() + } +} + +// process loops processing requests on an established stream. +func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient) error { + // Build a child context so we can cancel querie when the stream is closed. + ctx, cancel := context.WithCancel(c.Context()) + defer cancel() + + for { + select { + case <-quit: + return nil // jpe: won't really work with runOne + default: + } + + request, err := c.Recv() + if err != nil { + return err + } + + // Handle the request on a "background" goroutine, so we go back to + // blocking on c.Recv(). This allows us to detect the stream closing + // and cancel the query. We don't actally handle queries in parallel + // here, as we're running in lock step with the server - each Recv is + // paired with a Send. + go func() { + response, err := f.server.Handle(ctx, request.HttpRequest) + if err != nil { + var ok bool + response, ok = httpgrpc.HTTPResponseFromError(err) + if !ok { + response = &httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Body: []byte(err.Error()), + } + } + } + + // Ensure responses that are too big are not retried. + if len(response.Body) >= f.maxSendMsgSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), f.maxSendMsgSize) + response = &httpgrpc.HTTPResponse{ + Code: http.StatusRequestEntityTooLarge, + Body: []byte(errMsg), + } + level.Error(f.log).Log("msg", "error processing query", "err", errMsg) + } + + if err := c.Send(&ProcessResponse{ + HttpResponse: response, + }); err != nil { + level.Error(f.log).Log("msg", "error processing requests", "err", err) + } + }() + } +} From 662e36420b6d789d687c0743f409d15b74d9454b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 10:47:30 -0400 Subject: [PATCH 04/44] created interface to help with testing Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index cdf5d6189bf..c36743e4fe6 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -9,16 +9,19 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/httpgrpc/server" "github.com/cortexproject/cortex/pkg/util" ) +type upstream interface { + Handle(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) +} + type frontendManager struct { client FrontendClient gracefulQuit []chan struct{} - server *server.Server + server upstream log log.Logger ctx context.Context maxSendMsgSize int @@ -27,8 +30,7 @@ type frontendManager struct { mtx sync.Mutex } - -func NewFrontendManager(ctx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { +func NewFrontendManager(ctx context.Context, log log.Logger, server upstream, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { f := &frontendManager{ client: client, ctx: ctx, @@ -74,7 +76,6 @@ func (f *frontendManager) concurrentRequests(n int) error { } // jpe -// pass grpc client config? // is f.wg.Add(1) safe? // pass graceful quit From bae5fdfbfbe207278d3d69a560e14dfc8c7926b0 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 11:20:41 -0400 Subject: [PATCH 05/44] Cleaned up notes. Addressed concurrency issues Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 30 ++++++++----------- .../frontend/worker_frontend_manager.go | 22 ++++++-------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 51000385b03..845834a8f16 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "sync" "time" "github.com/go-kit/kit/log" @@ -52,8 +51,8 @@ type worker struct { log log.Logger server *server.Server - watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. - wg sync.WaitGroup + watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. + managers map[string]*frontendManager } // NewWorker creates a new worker and returns a service that is wrapping it. @@ -75,17 +74,20 @@ func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (service } w := &worker{ - cfg: cfg, - log: log, - server: server, - watcher: watcher, + cfg: cfg, + log: log, + server: server, + watcher: watcher, + managers: map[string]*frontendManager{}, } return services.NewBasicService(nil, w.watchDNSLoop, w.stopping), nil } func (w *worker) stopping(_ error) error { // wait until all per-address workers are done. This is only called after watchDNSLoop exits. - w.wg.Wait() + for _, mgr := range w.managers { + mgr.stop() + } return nil } @@ -100,8 +102,6 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { w.watcher.Close() }() - mgrs := map[string]*frontendManager{} - for { updates, err := w.watcher.Next() if err != nil { @@ -116,22 +116,18 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { for _, update := range updates { switch update.Op { case naming.Add: - // jpe : do the cancel contexts matter? - level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) client, err := w.connect(update.Addr) if err != nil { - level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) // jpe : dangerous + level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) } mgr := NewFrontendManager(servCtx, w.log, w.server, client, w.cfg.Parallelism, w.cfg.GRPCClientConfig.MaxRecvMsgSize) - mgrs[update.Addr] = mgr + w.managers[update.Addr] = mgr case naming.Delete: - // jpe : does this actually gracefully shutdown? - level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) - if mgr, ok := mgrs[update.Addr]; ok { + if mgr, ok := w.managers[update.Addr]; ok { mgr.stop() } diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index c36743e4fe6..4027962cd77 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -23,19 +23,22 @@ type frontendManager struct { server upstream log log.Logger - ctx context.Context maxSendMsgSize int - wg sync.WaitGroup - mtx sync.Mutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } func NewFrontendManager(ctx context.Context, log log.Logger, server upstream, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { + ctx, cancel := context.WithCancel((ctx)) + f := &frontendManager{ client: client, - ctx: ctx, log: log, server: server, + ctx: ctx, + cancel: cancel, maxSendMsgSize: maxSendMsgSize, } @@ -45,14 +48,12 @@ func NewFrontendManager(ctx context.Context, log log.Logger, server upstream, cl } func (f *frontendManager) stop() { + f.cancel() f.concurrentRequests(0) f.wg.Wait() } func (f *frontendManager) concurrentRequests(n int) error { - f.mtx.Lock() - defer f.mtx.Unlock() - // adjust clients slice as necessary for len(f.gracefulQuit) != n { if len(f.gracefulQuit) < n { @@ -75,10 +76,6 @@ func (f *frontendManager) concurrentRequests(n int) error { return nil } -// jpe -// is f.wg.Add(1) safe? -// pass graceful quit - // runOne loops, trying to establish a stream to the frontend to begin // request processing. func (f *frontendManager) runOne(quit <-chan struct{}) { @@ -88,7 +85,6 @@ func (f *frontendManager) runOne(quit <-chan struct{}) { backoff := util.NewBackoff(f.ctx, backoffConfig) for backoff.Ongoing() { - // break context chain here c, err := f.client.Process(f.ctx) if err != nil { level.Error(f.log).Log("msg", "error contacting frontend", "err", err) @@ -115,7 +111,7 @@ func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient for { select { case <-quit: - return nil // jpe: won't really work with runOne + return fmt.Errorf("graceful quit received") default: } From cecf9b257ab38f81c0f5b176185ce7bea9bd41d4 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 11:44:46 -0400 Subject: [PATCH 06/44] Added parallelism reset Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 41 ++++++++++++++++++- .../frontend/worker_frontend_manager.go | 6 +-- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 845834a8f16..9c6a724c023 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "math/rand" "time" "github.com/go-kit/kit/log" @@ -40,6 +41,7 @@ type WorkerConfig struct { func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service, in host:port format.") f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query frontend.") + f.IntVar(&cfg.TotalParallelism, "querier.worker-total-parallelism", 0, "Number of simultaneous queries to process across all query frontends. Overrides querier.worker-parallelism.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) @@ -122,8 +124,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) } - mgr := NewFrontendManager(servCtx, w.log, w.server, client, w.cfg.Parallelism, w.cfg.GRPCClientConfig.MaxRecvMsgSize) - w.managers[update.Addr] = mgr + w.managers[update.Addr] = NewFrontendManager(servCtx, w.log, w.server, client, 0, w.cfg.GRPCClientConfig.MaxRecvMsgSize) case naming.Delete: level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) @@ -135,6 +136,8 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { return fmt.Errorf("unknown op: %v", update.Op) } } + + w.resetParallelism() } } @@ -147,3 +150,37 @@ func (w *worker) connect(address string) (FrontendClient, error) { } return NewFrontendClient(conn), nil } + +func (w *worker) resetParallelism() { + + // if total parallelism is unset, this is easy + if w.cfg.TotalParallelism == 0 { + for _, mgr := range w.managers { + mgr.concurrentRequests(w.cfg.TotalParallelism) + } + } + + // otherwise we have to do some work. assign + addresses := make([]string, 0, len(w.managers)) + for addr := range w.managers { + addresses = append(addresses, addr) + } + rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) + + for i, addr := range addresses { + concurrentRequests := w.cfg.TotalParallelism / len(w.managers) + + if i <= w.cfg.TotalParallelism%len(w.managers) { + concurrentRequests++ + } + + if concurrentRequests == 0 { + concurrentRequests = 1 + } + + mgr, ok := w.managers[addr] + if ok { + mgr.concurrentRequests(concurrentRequests) + } + } +} diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 4027962cd77..b2436848d9c 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -31,7 +31,7 @@ type frontendManager struct { } func NewFrontendManager(ctx context.Context, log log.Logger, server upstream, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { - ctx, cancel := context.WithCancel((ctx)) + ctx, cancel := context.WithCancel(ctx) f := &frontendManager{ client: client, @@ -53,7 +53,7 @@ func (f *frontendManager) stop() { f.wg.Wait() } -func (f *frontendManager) concurrentRequests(n int) error { +func (f *frontendManager) concurrentRequests(n int) { // adjust clients slice as necessary for len(f.gracefulQuit) != n { if len(f.gracefulQuit) < n { @@ -72,8 +72,6 @@ func (f *frontendManager) concurrentRequests(n int) error { close(quit) } } - - return nil } // runOne loops, trying to establish a stream to the frontend to begin From 55b0d213bc054348cd61d7c2717925d569f98795 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 12:47:03 -0400 Subject: [PATCH 07/44] Added total parallelism support Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager.go | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index b2436848d9c..b69f89b9fb1 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -18,26 +18,27 @@ type upstream interface { } type frontendManager struct { - client FrontendClient - gracefulQuit []chan struct{} + client FrontendClient + server upstream - server upstream log log.Logger maxSendMsgSize int - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + gracefulQuit []chan struct{} + serverCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup } -func NewFrontendManager(ctx context.Context, log log.Logger, server upstream, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { - ctx, cancel := context.WithCancel(ctx) +// NewFrontendManager creates a frontend manager with the given params +func NewFrontendManager(serverCtx context.Context, log log.Logger, server upstream, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { + serverCtx, cancel := context.WithCancel(serverCtx) f := &frontendManager{ client: client, log: log, server: server, - ctx: ctx, + serverCtx: serverCtx, cancel: cancel, maxSendMsgSize: maxSendMsgSize, } @@ -80,10 +81,10 @@ func (f *frontendManager) runOne(quit <-chan struct{}) { f.wg.Add(1) defer f.wg.Done() - backoff := util.NewBackoff(f.ctx, backoffConfig) + backoff := util.NewBackoff(f.serverCtx, backoffConfig) for backoff.Ongoing() { - c, err := f.client.Process(f.ctx) + c, err := f.client.Process(f.serverCtx) if err != nil { level.Error(f.log).Log("msg", "error contacting frontend", "err", err) backoff.Wait() From c0373c5e93fd6c9115585bdde9b9caa2bc29fc1b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 12:52:34 -0400 Subject: [PATCH 08/44] Mirrored all tests for total parallelism Signed-off-by: Joe Elliott --- pkg/querier/frontend/frontend_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 954ec828edd..cdeddcc59b7 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -55,7 +55,8 @@ func TestFrontend(t *testing.T) { assert.Equal(t, "Hello World", string(body)) } - testFrontend(t, handler, test) + testFrontend(t, handler, test, 0) + testFrontend(t, handler, test, 1) } func TestFrontendPropagateTrace(t *testing.T) { @@ -104,7 +105,8 @@ func TestFrontendPropagateTrace(t *testing.T) { // Query should do one calls. assert.Equal(t, traceID, <-observedTraceID) } - testFrontend(t, handler, test) + testFrontend(t, handler, test, 0) + testFrontend(t, handler, test, 1) } // TestFrontendCancel ensures that when client requests are cancelled, @@ -135,7 +137,9 @@ func TestFrontendCancel(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.Equal(t, int32(1), atomic.LoadInt32(&tries)) } - testFrontend(t, handler, test) + testFrontend(t, handler, test, 0) + tries = 0 + testFrontend(t, handler, test, 1) } func TestFrontendCancelStatusCode(t *testing.T) { @@ -156,7 +160,7 @@ func TestFrontendCancelStatusCode(t *testing.T) { } } -func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { +func testFrontend(t *testing.T, handler http.Handler, test func(addr string), totalParallelism int) { logger := log.NewNopLogger() var ( @@ -165,6 +169,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { ) flagext.DefaultValues(&config, &workerConfig) workerConfig.Parallelism = 1 + workerConfig.TotalParallelism = totalParallelism // localhost:0 prevents firewall warnings on Mac OS X. grpcListen, err := net.Listen("tcp", "localhost:0") From b7f18583d5d02788f30db66f7d90736184032483 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 12:58:27 -0400 Subject: [PATCH 09/44] Added clarity Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 8 -------- pkg/querier/frontend/worker_frontend_manager.go | 12 ++++++++++++ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 9c6a724c023..03bfebaa9a2 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -15,18 +15,10 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/naming" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/cortexproject/cortex/pkg/util/services" ) -var ( - backoffConfig = util.BackoffConfig{ - MinBackoff: 50 * time.Millisecond, - MaxBackoff: 1 * time.Second, - } -) - // WorkerConfig is config for a worker. type WorkerConfig struct { Address string `yaml:"frontend_address"` diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index b69f89b9fb1..4ff4587b23e 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -13,6 +14,13 @@ import ( "github.com/cortexproject/cortex/pkg/util" ) +var ( + backoffConfig = util.BackoffConfig{ + MinBackoff: 50 * time.Millisecond, + MaxBackoff: 1 * time.Second, + } +) + type upstream interface { Handle(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) } @@ -77,6 +85,10 @@ func (f *frontendManager) concurrentRequests(n int) { // runOne loops, trying to establish a stream to the frontend to begin // request processing. +// Ways that this can be cancelled +// servCtx is cancelled => Cortex is shutting down. +// c.Recv() errors => transient network issue, client timeout +// close quit channel => frontendManager is politely asking to shutdown a processor func (f *frontendManager) runOne(quit <-chan struct{}) { f.wg.Add(1) defer f.wg.Done() From 818bb94ffb28ea840af63cb20ec75925ad973b5e Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 13:13:10 -0400 Subject: [PATCH 10/44] Removed upstream interface Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 4ff4587b23e..27aaa1bd652 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" "github.com/cortexproject/cortex/pkg/util" ) @@ -21,13 +22,9 @@ var ( } ) -type upstream interface { - Handle(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) -} - type frontendManager struct { client FrontendClient - server upstream + server *server.Server log log.Logger maxSendMsgSize int @@ -39,7 +36,7 @@ type frontendManager struct { } // NewFrontendManager creates a frontend manager with the given params -func NewFrontendManager(serverCtx context.Context, log log.Logger, server upstream, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { +func NewFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { serverCtx, cancel := context.WithCancel(serverCtx) f := &frontendManager{ From 6c5e68b327798fddd161fa83c46584d825f907f2 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 13:22:42 -0400 Subject: [PATCH 11/44] Added first worker test Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager_test.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 pkg/querier/frontend/worker_frontend_manager_test.go diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go new file mode 100644 index 00000000000..91f0d6f6c8e --- /dev/null +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -0,0 +1,34 @@ +package frontend + +import ( + "context" + "net/http" + "sync/atomic" + "testing" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/stretchr/testify/assert" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + grpc "google.golang.org/grpc" +) + +type mockFrontendClient struct { +} + +func (m *mockFrontendClient) Process(ctx context.Context, opts ...grpc.CallOption) (Frontend_ProcessClient, error) { + return nil, nil +} + +func TestConstructionAndStop(t *testing.T) { + var calls int32 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + atomic.AddInt32(&calls, 1) + assert.NoError(t, err) + }) + + mgr := NewFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + mgr.stop() + + assert.Equal(t, int32(0), calls) +} From dcf2e458aa76bd720821a51ff23e83415647c8f4 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 14:20:33 -0400 Subject: [PATCH 12/44] Added concurrency test Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 2 +- .../frontend/worker_frontend_manager.go | 31 +++++++------ .../frontend/worker_frontend_manager_test.go | 46 ++++++++++++++++++- 3 files changed, 63 insertions(+), 16 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 03bfebaa9a2..2e0591fb8bd 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -116,7 +116,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) } - w.managers[update.Addr] = NewFrontendManager(servCtx, w.log, w.server, client, 0, w.cfg.GRPCClientConfig.MaxRecvMsgSize) + w.managers[update.Addr] = newFrontendManager(servCtx, w.log, w.server, client, 0, w.cfg.GRPCClientConfig.MaxRecvMsgSize) case naming.Delete: level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 27aaa1bd652..e2081717f80 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" ) @@ -29,23 +30,24 @@ type frontendManager struct { log log.Logger maxSendMsgSize int - gracefulQuit []chan struct{} - serverCtx context.Context - cancel context.CancelFunc - wg sync.WaitGroup + gracefulQuit []chan struct{} + serverCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + currentProcessors *atomic.Int32 } -// NewFrontendManager creates a frontend manager with the given params -func NewFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { +func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { serverCtx, cancel := context.WithCancel(serverCtx) f := &frontendManager{ - client: client, - log: log, - server: server, - serverCtx: serverCtx, - cancel: cancel, - maxSendMsgSize: maxSendMsgSize, + client: client, + log: log, + server: server, + serverCtx: serverCtx, + cancel: cancel, + maxSendMsgSize: maxSendMsgSize, + currentProcessors: atomic.NewInt32(0), } f.concurrentRequests(initialConcurrentRequests) @@ -66,7 +68,7 @@ func (f *frontendManager) concurrentRequests(n int) { quit := make(chan struct{}) f.gracefulQuit = append(f.gracefulQuit, quit) - f.runOne(quit) + go f.runOne(quit) continue } @@ -116,6 +118,9 @@ func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient ctx, cancel := context.WithCancel(c.Context()) defer cancel() + f.currentProcessors.Inc() + defer f.currentProcessors.Dec() + for { select { case <-quit: diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 91f0d6f6c8e..54113e069da 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -3,11 +3,14 @@ package frontend import ( "context" "net/http" + "sync" "sync/atomic" "testing" + "time" "github.com/cortexproject/cortex/pkg/util" "github.com/stretchr/testify/assert" + "github.com/weaveworks/common/httpgrpc" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" grpc "google.golang.org/grpc" ) @@ -16,7 +19,29 @@ type mockFrontendClient struct { } func (m *mockFrontendClient) Process(ctx context.Context, opts ...grpc.CallOption) (Frontend_ProcessClient, error) { - return nil, nil + return &mockFrontendProcessClient{}, nil +} + +type mockFrontendProcessClient struct { + grpc.ClientStream + + wg sync.WaitGroup +} + +func (m *mockFrontendProcessClient) Send(*ProcessResponse) error { + m.wg.Done() + return nil +} +func (m *mockFrontendProcessClient) Recv() (*ProcessRequest, error) { + m.wg.Wait() + m.wg.Add(1) + + return &ProcessRequest{ + HttpRequest: &httpgrpc.HTTPRequest{}, + }, nil +} +func (m *mockFrontendProcessClient) Context() context.Context { + return context.Background() } func TestConstructionAndStop(t *testing.T) { @@ -27,8 +52,25 @@ func TestConstructionAndStop(t *testing.T) { assert.NoError(t, err) }) - mgr := NewFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) mgr.stop() assert.Equal(t, int32(0), calls) } + +func TestSingleConcurrency(t *testing.T) { + concurrency := 1 + + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + mgr.concurrentRequests(concurrency) + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, int32(concurrency), mgr.currentProcessors.Load()) + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) +} From 2e75c3f1ed65d82434b4fa2106a3a99bc4ac6b0f Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 14:24:54 -0400 Subject: [PATCH 13/44] TestTable for Concurrency Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager_test.go | 52 ++++++++++--------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 54113e069da..ee704f053b6 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -2,9 +2,9 @@ package frontend import ( "context" + "fmt" "net/http" "sync" - "sync/atomic" "testing" "time" @@ -44,33 +44,37 @@ func (m *mockFrontendProcessClient) Context() context.Context { return context.Background() } -func TestConstructionAndStop(t *testing.T) { - var calls int32 +func TestConcurrency(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte("Hello World")) - atomic.AddInt32(&calls, 1) assert.NoError(t, err) }) - mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) - mgr.stop() + tests := []struct { + concurrency int + }{ + { + concurrency: 0, + }, + { + concurrency: 1, + }, + { + concurrency: 5, + }, + { + concurrency: 30, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprintf("Testing concurrency %d", tt.concurrency), func(t *testing.T) { + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + mgr.concurrentRequests(tt.concurrency) + time.Sleep(100 * time.Millisecond) - assert.Equal(t, int32(0), calls) -} - -func TestSingleConcurrency(t *testing.T) { - concurrency := 1 - - handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - _, err := w.Write([]byte("Hello World")) - assert.NoError(t, err) - }) - - mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) - mgr.concurrentRequests(concurrency) - time.Sleep(100 * time.Millisecond) - - assert.Equal(t, int32(concurrency), mgr.currentProcessors.Load()) - mgr.stop() - assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + assert.Equal(t, int32(tt.concurrency), mgr.currentProcessors.Load()) + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + }) + } } From 9c064cf2fc07b7b00e99f73061f42b96a448bf56 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 14:37:49 -0400 Subject: [PATCH 14/44] Added series of test cases Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager.go | 8 +++-- .../frontend/worker_frontend_manager_test.go | 29 +++++++++++++------ 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index e2081717f80..751725348fd 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -56,12 +56,16 @@ func newFrontendManager(serverCtx context.Context, log log.Logger, server *serve } func (f *frontendManager) stop() { - f.cancel() - f.concurrentRequests(0) + f.cancel() // force stop + f.concurrentRequests(0) // graceful quit f.wg.Wait() } func (f *frontendManager) concurrentRequests(n int) { + if n < 0 { + n = 0 + } + // adjust clients slice as necessary for len(f.gracefulQuit) != n { if len(f.gracefulQuit) < n { diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index ee704f053b6..605f3c5c49f 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -51,28 +51,39 @@ func TestConcurrency(t *testing.T) { }) tests := []struct { - concurrency int + concurrency []int }{ { - concurrency: 0, + concurrency: []int{0}, }, { - concurrency: 1, + concurrency: []int{1}, }, { - concurrency: 5, + concurrency: []int{5}, }, { - concurrency: 30, + concurrency: []int{5, 3, 7}, + }, + { + concurrency: []int{-1}, }, } for _, tt := range tests { - t.Run(fmt.Sprintf("Testing concurrency %d", tt.concurrency), func(t *testing.T) { + t.Run(fmt.Sprintf("Testing concurrency %v", tt.concurrency), func(t *testing.T) { mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) - mgr.concurrentRequests(tt.concurrency) - time.Sleep(100 * time.Millisecond) - assert.Equal(t, int32(tt.concurrency), mgr.currentProcessors.Load()) + for _, c := range tt.concurrency { + mgr.concurrentRequests(c) + time.Sleep(50 * time.Millisecond) + + expected := int32(c) + if expected < 0 { + expected = 0 + } + assert.Equal(t, expected, mgr.currentProcessors.Load()) + } + mgr.stop() assert.Equal(t, int32(0), mgr.currentProcessors.Load()) }) From 203ee306738aa800e571d7b2a76ce07236df9244 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 14:49:14 -0400 Subject: [PATCH 15/44] Added test for failed receive. Fixed graceful quit shutdown Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager.go | 16 ++++++--- .../frontend/worker_frontend_manager_test.go | 33 +++++++++++++++++-- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 751725348fd..5281e48fc45 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -2,6 +2,7 @@ package frontend import ( "context" + "errors" "fmt" "net/http" "sync" @@ -21,6 +22,8 @@ var ( MinBackoff: 50 * time.Millisecond, MaxBackoff: 1 * time.Second, } + + errGracefulQuit = errors.New("processor quitting gracefully") ) type frontendManager struct { @@ -96,6 +99,9 @@ func (f *frontendManager) runOne(quit <-chan struct{}) { f.wg.Add(1) defer f.wg.Done() + f.currentProcessors.Inc() + defer f.currentProcessors.Dec() + backoff := util.NewBackoff(f.serverCtx, backoffConfig) for backoff.Ongoing() { @@ -107,6 +113,11 @@ func (f *frontendManager) runOne(quit <-chan struct{}) { } if err := f.process(quit, c); err != nil { + if err == errGracefulQuit { + level.Debug(f.log).Log("msg", "gracefully shutting down processor") + return + } + level.Error(f.log).Log("msg", "error processing requests", "err", err) backoff.Wait() continue @@ -122,13 +133,10 @@ func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient ctx, cancel := context.WithCancel(c.Context()) defer cancel() - f.currentProcessors.Inc() - defer f.currentProcessors.Dec() - for { select { case <-quit: - return fmt.Errorf("graceful quit received") + return errGracefulQuit default: } diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 605f3c5c49f..9607489d72b 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -2,6 +2,7 @@ package frontend import ( "context" + "errors" "fmt" "net/http" "sync" @@ -16,16 +17,20 @@ import ( ) type mockFrontendClient struct { + failRecv bool } func (m *mockFrontendClient) Process(ctx context.Context, opts ...grpc.CallOption) (Frontend_ProcessClient, error) { - return &mockFrontendProcessClient{}, nil + return &mockFrontendProcessClient{ + failRecv: m.failRecv, + }, nil } type mockFrontendProcessClient struct { grpc.ClientStream - wg sync.WaitGroup + failRecv bool + wg sync.WaitGroup } func (m *mockFrontendProcessClient) Send(*ProcessResponse) error { @@ -36,6 +41,10 @@ func (m *mockFrontendProcessClient) Recv() (*ProcessRequest, error) { m.wg.Wait() m.wg.Add(1) + if m.failRecv { + return nil, errors.New("wups") + } + return &ProcessRequest{ HttpRequest: &httpgrpc.HTTPRequest{}, }, nil @@ -89,3 +98,23 @@ func TestConcurrency(t *testing.T) { }) } } + +func TestRecvFailDoesntCancelProcess(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + client := &mockFrontendClient{ + failRecv: true, + } + + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), client, 0, 100000000) + + mgr.concurrentRequests(1) + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(1), mgr.currentProcessors.Load()) + + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) +} From 67119bc7eaaf9e096c15056c6ffdfb7b9d642bcc Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 14:53:12 -0400 Subject: [PATCH 16/44] Added tests for number of calls made to the handler Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 9607489d72b..e2c357ed435 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/weaveworks/common/httpgrpc" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "go.uber.org/atomic" grpc "google.golang.org/grpc" ) @@ -54,7 +55,9 @@ func (m *mockFrontendProcessClient) Context() context.Context { } func TestConcurrency(t *testing.T) { + calls := atomic.NewInt32(0) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Inc() _, err := w.Write([]byte("Hello World")) assert.NoError(t, err) }) @@ -83,6 +86,7 @@ func TestConcurrency(t *testing.T) { mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) for _, c := range tt.concurrency { + calls.Store(0) mgr.concurrentRequests(c) time.Sleep(50 * time.Millisecond) @@ -91,16 +95,23 @@ func TestConcurrency(t *testing.T) { expected = 0 } assert.Equal(t, expected, mgr.currentProcessors.Load()) + + if expected > 0 { + assert.Greater(t, calls.Load(), int32(0)) + } } mgr.stop() assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + }) } } func TestRecvFailDoesntCancelProcess(t *testing.T) { + calls := atomic.NewInt32(0) handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Inc() _, err := w.Write([]byte("Hello World")) assert.NoError(t, err) }) @@ -117,4 +128,5 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) { mgr.stop() assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + assert.Equal(t, int32(0), calls.Load()) } From c2cc86fcd7e2cd8d7e21fd2c3f709495763c004f Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 14:55:50 -0400 Subject: [PATCH 17/44] Added test for cancelling service context cancelling processes Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager_test.go | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index e2c357ed435..dbd1a068696 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -103,7 +103,6 @@ func TestConcurrency(t *testing.T) { mgr.stop() assert.Equal(t, int32(0), mgr.currentProcessors.Load()) - }) } } @@ -130,3 +129,30 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) { assert.Equal(t, int32(0), mgr.currentProcessors.Load()) assert.Equal(t, int32(0), calls.Load()) } + +func TestServeCancelStopsProcess(t *testing.T) { + calls := atomic.NewInt32(0) + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + calls.Inc() + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + client := &mockFrontendClient{ + failRecv: true, + } + + ctx, cancel := context.WithCancel(context.Background()) + mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, 0, 100000000) + + mgr.concurrentRequests(1) + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(1), mgr.currentProcessors.Load()) + + cancel() + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) + + mgr.stop() + assert.Equal(t, int32(0), mgr.currentProcessors.Load()) +} From 9f2b58b9610930aff69eba22f411115d8f366b37 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 16:07:43 -0400 Subject: [PATCH 18/44] Added changelog entry and updated docs Signed-off-by: Joe Elliott --- CHANGELOG.md | 1 + docs/configuration/arguments.md | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98ad35808e9..0722c4fae30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392 * [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371 * [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-.fifocache.size` CLI flag has been renamed to `-.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319 +* [ENHANCEMENT] Added `-querier.worker-total-parallelism`. Controls the total parallelism across all query frontends. Overrides `-querier.worker-parallelism`. #2456 * [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372 * [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400 * [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 8413bda90bc..aad8eb4bc4a 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -16,7 +16,7 @@ Duration arguments should be specified with a unit like `5s` or `3h`. Valid time - `-querier.max-concurrent` The maximum number of top-level PromQL queries that will execute at the same time, per querier process. - If using the query frontend, this should be set to at least (`querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. + If using the query frontend, this should be set to at least (`querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. Alternatively, if you are using `-querier.worker-parallelism` these two values should be equal. - `-querier.query-parallelism` @@ -42,9 +42,15 @@ The next three options only apply when the querier is used together with the Que - `-querier.worker-parallelism` - Number of simultaneous queries to process, per worker process. + Number of simultaneous queries to process, per query frontend. See note on `-querier.max-concurrent` +- `-querier.worker-total-parallelism` + + Number of simultaneous queries to process across all query frontends. Overrides `-querier.worker-parallelism`. + See note on `-querier.max-concurrent` + + ## Querier and Ruler The ingester query API was improved over time, but defaults to the old behaviour for backwards-compatibility. For best results both of these next two flags should be set to `true`: From 992a3e24ba54373b79316c7f6019162f15514b6c Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 16:12:15 -0400 Subject: [PATCH 19/44] lint Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 5281e48fc45..9e3284a1e60 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -8,13 +8,12 @@ import ( "sync" "time" + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "go.uber.org/atomic" - - "github.com/cortexproject/cortex/pkg/util" ) var ( From 9e504c37fe80a29ba0bc96c2cc4d24184ba4dd92 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 13 Apr 2020 16:37:34 -0400 Subject: [PATCH 20/44] lint part deux Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager.go | 3 ++- pkg/querier/frontend/worker_frontend_manager_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 9e3284a1e60..5281e48fc45 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -8,12 +8,13 @@ import ( "sync" "time" - "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" "go.uber.org/atomic" + + "github.com/cortexproject/cortex/pkg/util" ) var ( diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index dbd1a068696..44c035e0e8c 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -9,12 +9,13 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/util" "github.com/stretchr/testify/assert" "github.com/weaveworks/common/httpgrpc" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "go.uber.org/atomic" grpc "google.golang.org/grpc" + + "github.com/cortexproject/cortex/pkg/util" ) type mockFrontendClient struct { From 9593d25447d092a1583fea2a6a16a959c6a33ea5 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 08:25:01 -0400 Subject: [PATCH 21/44] make doc Signed-off-by: Joe Elliott --- docs/configuration/config-file-reference.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1d4b3cfbf6a..9e9281ab0e2 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1801,10 +1801,15 @@ The `frontend_worker_config` configures the worker - running within the Cortex q # CLI flag: -querier.frontend-address [frontend_address: | default = ""] -# Number of simultaneous queries to process. +# Number of simultaneous queries to process per query frontend. # CLI flag: -querier.worker-parallelism [parallelism: | default = 10] +# Number of simultaneous queries to process across all query frontends. +# Overrides querier.worker-parallelism. +# CLI flag: -querier.worker-total-parallelism +[total_parallelism: | default = 0] + # How often to query DNS. # CLI flag: -querier.dns-lookup-period [dns_lookup_duration: | default = 10s] From 0fb4098d5c59af156b1c9bb26bc8648fa02bb701 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 09:10:55 -0400 Subject: [PATCH 22/44] Added resetParallelism tests and fixed bug Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 2 +- pkg/querier/frontend/worker_test.go | 85 +++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 pkg/querier/frontend/worker_test.go diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 2e0591fb8bd..50a8955844b 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -162,7 +162,7 @@ func (w *worker) resetParallelism() { for i, addr := range addresses { concurrentRequests := w.cfg.TotalParallelism / len(w.managers) - if i <= w.cfg.TotalParallelism%len(w.managers) { + if i < w.cfg.TotalParallelism%len(w.managers) { concurrentRequests++ } diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go new file mode 100644 index 00000000000..4bd5f53eab6 --- /dev/null +++ b/pkg/querier/frontend/worker_test.go @@ -0,0 +1,85 @@ +package frontend + +import ( + "context" + "net/http" + "strconv" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/stretchr/testify/assert" + httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" +) + +func TestResetParallelism(t *testing.T) { + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte("Hello World")) + assert.NoError(t, err) + }) + + tests := []struct { + parallelism int + totalParallelism int + numManagers int + expectedConcurrency int32 + }{ + { + parallelism: 0, + totalParallelism: 0, + numManagers: 2, + expectedConcurrency: 2, + }, + { + parallelism: 1, + totalParallelism: 0, + numManagers: 2, + expectedConcurrency: 2, + }, + { + parallelism: 1, + totalParallelism: 7, + numManagers: 4, + expectedConcurrency: 7, + }, + { + parallelism: 1, + totalParallelism: 3, + numManagers: 6, + expectedConcurrency: 6, + }, + { + parallelism: 1, + totalParallelism: 6, + numManagers: 2, + expectedConcurrency: 6, + }, + } + + for _, tt := range tests { + + cfg := WorkerConfig{ + Parallelism: tt.parallelism, + TotalParallelism: tt.totalParallelism, + } + + w := &worker{ + cfg: cfg, + log: util.Logger, + managers: map[string]*frontendManager{}, + } + + for i := 0; i < tt.numManagers; i++ { + w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + } + + w.resetParallelism() + time.Sleep(100 * time.Millisecond) + + concurrency := int32(0) + for _, mgr := range w.managers { + concurrency += mgr.currentProcessors.Load() + } + assert.Equal(t, tt.expectedConcurrency, concurrency) + } +} From 248ec809685ff684ad98c73b6874dc8e6e54d04d Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 09:20:28 -0400 Subject: [PATCH 23/44] Added stop test to resetParallelism Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 3 ++- pkg/querier/frontend/worker_frontend_manager.go | 2 +- pkg/querier/frontend/worker_test.go | 9 +++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 50a8955844b..8e5e465b2c7 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -152,7 +152,8 @@ func (w *worker) resetParallelism() { } } - // otherwise we have to do some work. assign + // otherwise we have to do some work. randomize the order of our managers and set concurrency + // on each to match the requested total concurrency addresses := make([]string, 0, len(w.managers)) for addr := range w.managers { addresses = append(addresses, addr) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 5281e48fc45..3d4fee1bed7 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -93,7 +93,7 @@ func (f *frontendManager) concurrentRequests(n int) { // request processing. // Ways that this can be cancelled // servCtx is cancelled => Cortex is shutting down. -// c.Recv() errors => transient network issue, client timeout +// c.Recv() errors => transient network issue, a client of the query frontend times out // close quit channel => frontendManager is politely asking to shutdown a processor func (f *frontendManager) runOne(quit <-chan struct{}) { f.wg.Add(1) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 4bd5f53eab6..99c27da6a22 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -81,5 +81,14 @@ func TestResetParallelism(t *testing.T) { concurrency += mgr.currentProcessors.Load() } assert.Equal(t, tt.expectedConcurrency, concurrency) + + err := w.stopping(nil) + assert.NoError(t, err) + + concurrency = int32(0) + for _, mgr := range w.managers { + concurrency += mgr.currentProcessors.Load() + } + assert.Equal(t, int32(0), concurrency) } } From ebd667f3acac9538729c64ca283d5582b62ab1d6 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 09:31:45 -0400 Subject: [PATCH 24/44] Added test names Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_test.go | 73 ++++++++++++++++------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 99c27da6a22..d2aaebca21f 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -19,76 +19,83 @@ func TestResetParallelism(t *testing.T) { }) tests := []struct { + name string parallelism int totalParallelism int numManagers int expectedConcurrency int32 }{ { + name: "Test create least one worker per manager", parallelism: 0, totalParallelism: 0, numManagers: 2, expectedConcurrency: 2, }, { - parallelism: 1, + name: "Test concurrency per query frontend configuration", + parallelism: 4, totalParallelism: 0, numManagers: 2, - expectedConcurrency: 2, + expectedConcurrency: 8, }, { + name: "Test Total Parallelism with a remainder", parallelism: 1, totalParallelism: 7, numManagers: 4, expectedConcurrency: 7, }, { + name: "Test Total Parallelism dividing evenly", parallelism: 1, - totalParallelism: 3, - numManagers: 6, + totalParallelism: 6, + numManagers: 2, expectedConcurrency: 6, }, { + name: "Test Total Parallelism at least one worker per manager", parallelism: 1, - totalParallelism: 6, - numManagers: 2, + totalParallelism: 3, + numManagers: 6, expectedConcurrency: 6, }, } for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := WorkerConfig{ + Parallelism: tt.parallelism, + TotalParallelism: tt.totalParallelism, + } - cfg := WorkerConfig{ - Parallelism: tt.parallelism, - TotalParallelism: tt.totalParallelism, - } - - w := &worker{ - cfg: cfg, - log: util.Logger, - managers: map[string]*frontendManager{}, - } + w := &worker{ + cfg: cfg, + log: util.Logger, + managers: map[string]*frontendManager{}, + } - for i := 0; i < tt.numManagers; i++ { - w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) - } + for i := 0; i < tt.numManagers; i++ { + w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + } - w.resetParallelism() - time.Sleep(100 * time.Millisecond) + w.resetParallelism() + time.Sleep(100 * time.Millisecond) - concurrency := int32(0) - for _, mgr := range w.managers { - concurrency += mgr.currentProcessors.Load() - } - assert.Equal(t, tt.expectedConcurrency, concurrency) + concurrency := int32(0) + for _, mgr := range w.managers { + concurrency += mgr.currentProcessors.Load() + } + assert.Equal(t, tt.expectedConcurrency, concurrency) - err := w.stopping(nil) - assert.NoError(t, err) + err := w.stopping(nil) + assert.NoError(t, err) - concurrency = int32(0) - for _, mgr := range w.managers { - concurrency += mgr.currentProcessors.Load() - } - assert.Equal(t, int32(0), concurrency) + concurrency = int32(0) + for _, mgr := range w.managers { + concurrency += mgr.currentProcessors.Load() + } + assert.Equal(t, int32(0), concurrency) + }) } } From 670e157468aeb7e204d3ca41b8cb25756aef7269 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 09:37:27 -0400 Subject: [PATCH 25/44] Cleaned up resetParallelism Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 8e5e465b2c7..bf08b731b66 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -144,16 +144,6 @@ func (w *worker) connect(address string) (FrontendClient, error) { } func (w *worker) resetParallelism() { - - // if total parallelism is unset, this is easy - if w.cfg.TotalParallelism == 0 { - for _, mgr := range w.managers { - mgr.concurrentRequests(w.cfg.TotalParallelism) - } - } - - // otherwise we have to do some work. randomize the order of our managers and set concurrency - // on each to match the requested total concurrency addresses := make([]string, 0, len(w.managers)) for addr := range w.managers { addresses = append(addresses, addr) @@ -161,10 +151,15 @@ func (w *worker) resetParallelism() { rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) for i, addr := range addresses { - concurrentRequests := w.cfg.TotalParallelism / len(w.managers) + concurrentRequests := 0 + if w.cfg.TotalParallelism > 0 { + concurrentRequests = w.cfg.TotalParallelism / len(w.managers) - if i < w.cfg.TotalParallelism%len(w.managers) { - concurrentRequests++ + if i < w.cfg.TotalParallelism%len(w.managers) { + concurrentRequests++ + } + } else { + concurrentRequests = w.cfg.Parallelism } if concurrentRequests == 0 { From b77ba05f7be020ad5ed50468d889af9779566f98 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 10:14:49 -0400 Subject: [PATCH 26/44] Added DNS Watch tests Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_test.go | 120 ++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index d2aaebca21f..73a99080846 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -4,12 +4,14 @@ import ( "context" "net/http" "strconv" + "sync" "testing" "time" "github.com/cortexproject/cortex/pkg/util" "github.com/stretchr/testify/assert" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "google.golang.org/grpc/naming" ) func TestResetParallelism(t *testing.T) { @@ -99,3 +101,121 @@ func TestResetParallelism(t *testing.T) { }) } } + +type mockDNSWatcher struct { + updates []*naming.Update + wg sync.WaitGroup +} + +func (m *mockDNSWatcher) Next() ([]*naming.Update, error) { + m.wg.Add(1) + m.wg.Wait() + + var update *naming.Update + update, m.updates = m.updates[0], m.updates[1:] + + return []*naming.Update{update}, nil +} + +func (m *mockDNSWatcher) Close() { + +} + +func TestDNSWatcher(t *testing.T) { + tests := []struct { + name string + updates []*naming.Update + expectedFrontends [][]string + }{ + { + name: "Test add one", + updates: []*naming.Update{ + { + Op: naming.Add, + Addr: "blerg", + }, + }, + expectedFrontends: [][]string{ + { + "blerg", + }, + }, + }, + { + name: "Test add one and delete", + updates: []*naming.Update{ + { + Op: naming.Add, + Addr: "blerg", + }, + { + Op: naming.Delete, + Addr: "blerg", + }, + }, + expectedFrontends: [][]string{ + { + "blerg", + }, + {}, + }, + }, + { + name: "Test delete nonexistent", + updates: []*naming.Update{ + { + Op: naming.Delete, + Addr: "blerg", + }, + { + Op: naming.Add, + Addr: "blerg", + }, + }, + expectedFrontends: [][]string{ + {}, + { + "blerg", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := WorkerConfig{ + Parallelism: 0, + TotalParallelism: 0, + } + + watcher := &mockDNSWatcher{ + updates: tt.updates, + } + w := &worker{ + cfg: cfg, + log: util.Logger, + managers: map[string]*frontendManager{}, + watcher: watcher, + } + + ctx, cancel := context.WithCancel(context.Background()) + go w.watchDNSLoop(ctx) + time.Sleep(50 * time.Millisecond) + + for i := range tt.updates { + watcher.wg.Done() + + time.Sleep(50 * time.Millisecond) + + // confirm all expected frontends exist + for _, expected := range tt.expectedFrontends[i] { + _, ok := w.managers[expected] + + assert.Truef(t, ok, "Unable to find %s on iteration %d", expected, i) + } + } + + cancel() + }) + } +} From 19cbec9cf8da802cdec01d4055b76d3f3aa68665 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 10:18:26 -0400 Subject: [PATCH 27/44] lint Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_test.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 73a99080846..c5260accd1a 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -8,10 +8,11 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/util" "github.com/stretchr/testify/assert" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "google.golang.org/grpc/naming" + + "github.com/cortexproject/cortex/pkg/util" ) func TestResetParallelism(t *testing.T) { @@ -103,18 +104,18 @@ func TestResetParallelism(t *testing.T) { } type mockDNSWatcher struct { - updates []*naming.Update + updates []*naming.Update //nolint:staticcheck wg sync.WaitGroup } -func (m *mockDNSWatcher) Next() ([]*naming.Update, error) { +func (m *mockDNSWatcher) Next() ([]*naming.Update, error) { //nolint:staticcheck m.wg.Add(1) m.wg.Wait() - var update *naming.Update + var update *naming.Update //nolint:staticcheck update, m.updates = m.updates[0], m.updates[1:] - return []*naming.Update{update}, nil + return []*naming.Update{update}, nil //nolint:staticcheck } func (m *mockDNSWatcher) Close() { @@ -124,12 +125,12 @@ func (m *mockDNSWatcher) Close() { func TestDNSWatcher(t *testing.T) { tests := []struct { name string - updates []*naming.Update + updates []*naming.Update //nolint:staticcheck expectedFrontends [][]string }{ { name: "Test add one", - updates: []*naming.Update{ + updates: []*naming.Update{ //nolint:staticcheck { Op: naming.Add, Addr: "blerg", @@ -143,7 +144,7 @@ func TestDNSWatcher(t *testing.T) { }, { name: "Test add one and delete", - updates: []*naming.Update{ + updates: []*naming.Update{ //nolint:staticcheck { Op: naming.Add, Addr: "blerg", @@ -162,7 +163,7 @@ func TestDNSWatcher(t *testing.T) { }, { name: "Test delete nonexistent", - updates: []*naming.Update{ + updates: []*naming.Update{ //nolint:staticcheck { Op: naming.Delete, Addr: "blerg", @@ -199,7 +200,7 @@ func TestDNSWatcher(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - go w.watchDNSLoop(ctx) + go w.watchDNSLoop(ctx) //nolint:errcheck time.Sleep(50 * time.Millisecond) for i := range tt.updates { From 819b207925d5e0e31cf3d5b8c98261b65ad86a57 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 14 Apr 2020 10:42:23 -0400 Subject: [PATCH 28/44] Removed racey dns watcher tests Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_test.go | 120 ---------------------------- 1 file changed, 120 deletions(-) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index c5260accd1a..7e7dce533dc 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -4,13 +4,11 @@ import ( "context" "net/http" "strconv" - "sync" "testing" "time" "github.com/stretchr/testify/assert" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" - "google.golang.org/grpc/naming" "github.com/cortexproject/cortex/pkg/util" ) @@ -102,121 +100,3 @@ func TestResetParallelism(t *testing.T) { }) } } - -type mockDNSWatcher struct { - updates []*naming.Update //nolint:staticcheck - wg sync.WaitGroup -} - -func (m *mockDNSWatcher) Next() ([]*naming.Update, error) { //nolint:staticcheck - m.wg.Add(1) - m.wg.Wait() - - var update *naming.Update //nolint:staticcheck - update, m.updates = m.updates[0], m.updates[1:] - - return []*naming.Update{update}, nil //nolint:staticcheck -} - -func (m *mockDNSWatcher) Close() { - -} - -func TestDNSWatcher(t *testing.T) { - tests := []struct { - name string - updates []*naming.Update //nolint:staticcheck - expectedFrontends [][]string - }{ - { - name: "Test add one", - updates: []*naming.Update{ //nolint:staticcheck - { - Op: naming.Add, - Addr: "blerg", - }, - }, - expectedFrontends: [][]string{ - { - "blerg", - }, - }, - }, - { - name: "Test add one and delete", - updates: []*naming.Update{ //nolint:staticcheck - { - Op: naming.Add, - Addr: "blerg", - }, - { - Op: naming.Delete, - Addr: "blerg", - }, - }, - expectedFrontends: [][]string{ - { - "blerg", - }, - {}, - }, - }, - { - name: "Test delete nonexistent", - updates: []*naming.Update{ //nolint:staticcheck - { - Op: naming.Delete, - Addr: "blerg", - }, - { - Op: naming.Add, - Addr: "blerg", - }, - }, - expectedFrontends: [][]string{ - {}, - { - "blerg", - }, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := WorkerConfig{ - Parallelism: 0, - TotalParallelism: 0, - } - - watcher := &mockDNSWatcher{ - updates: tt.updates, - } - w := &worker{ - cfg: cfg, - log: util.Logger, - managers: map[string]*frontendManager{}, - watcher: watcher, - } - - ctx, cancel := context.WithCancel(context.Background()) - go w.watchDNSLoop(ctx) //nolint:errcheck - time.Sleep(50 * time.Millisecond) - - for i := range tt.updates { - watcher.wg.Done() - - time.Sleep(50 * time.Millisecond) - - // confirm all expected frontends exist - for _, expected := range tt.expectedFrontends[i] { - _, ok := w.managers[expected] - - assert.Truef(t, ok, "Unable to find %s on iteration %d", expected, i) - } - } - - cancel() - }) - } -} From 0fc3b444ecf1d3c6b5f241b70b038a61e78ec050 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 10:50:07 -0400 Subject: [PATCH 29/44] Changed to worker-match-max-concurrent Signed-off-by: Joe Elliott --- CHANGELOG.md | 2 +- docs/configuration/arguments.md | 6 ++-- docs/configuration/config-file-reference.md | 6 ++-- pkg/cortex/modules.go | 2 +- pkg/querier/frontend/frontend_test.go | 25 ++++++++------ pkg/querier/frontend/worker.go | 37 +++++++++++---------- pkg/querier/frontend/worker_test.go | 27 +++++++++------ 7 files changed, 58 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e66a6386a1..d279752fbff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ * [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392 * [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371 * [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-.fifocache.size` CLI flag has been renamed to `-.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319 -* [ENHANCEMENT] Added `-querier.worker-total-parallelism`. Controls the total parallelism across all query frontends. Overrides `-querier.worker-parallelism`. #2456 +* [ENHANCEMENT] Added `-querier.worker-match-max-concurrent`. Force worker concurrency to match the `-querier.max-concurrent` option. Overrides `-querier.worker-parallelism`. #2456 * [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437 * [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454 * [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 7a9603815dd..2d78008bda5 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -16,7 +16,7 @@ Duration arguments should be specified with a unit like `5s` or `3h`. Valid time - `-querier.max-concurrent` The maximum number of top-level PromQL queries that will execute at the same time, per querier process. - If using the query frontend, this should be set to at least (`querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. Alternatively, if you are using `-querier.worker-parallelism` these two values should be equal. + If using the query frontend, this should be set to at least (`-querier.worker-parallelism` * number of query frontend replicas). Otherwise queries may queue in the queriers and not the frontend, which will affect QoS. Alternatively, consider using `-querier.worker-match-max-concurrent` to force worker parallelism to match `-querier.max-concurrent`. - `-querier.query-parallelism` @@ -45,9 +45,9 @@ The next three options only apply when the querier is used together with the Que Number of simultaneous queries to process, per query frontend. See note on `-querier.max-concurrent` -- `-querier.worker-total-parallelism` +- `-querier.worker-match-max-concurrent` - Number of simultaneous queries to process across all query frontends. Overrides `-querier.worker-parallelism`. + Force worker concurrency to match the -querier.max-concurrent option. Overrides `-querier.worker-parallelism`. See note on `-querier.max-concurrent` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5ae4528d551..4ad672e0c0b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1805,10 +1805,10 @@ The `frontend_worker_config` configures the worker - running within the Cortex q # CLI flag: -querier.worker-parallelism [parallelism: | default = 10] -# Number of simultaneous queries to process across all query frontends. +# Force worker concurrency to match the -querier.max-concurrent option. # Overrides querier.worker-parallelism. -# CLI flag: -querier.worker-total-parallelism -[total_parallelism: | default = 0] +# CLI flag: -querier.worker-match-max-concurrent +[match_max_concurrency: | default = false] # How often to query DNS. # CLI flag: -querier.dns-lookup-period diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 2a0d8603583..178e3d0196b 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -210,7 +210,7 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { // Query frontend worker will only be started after all its dependencies are started, not here. // Worker may also be nil, if not configured, which is OK. - worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger) + worker, err := frontend.NewWorker(cfg.Worker, cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger) if err != nil { return } diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index cdeddcc59b7..19dd3c2655a 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -26,6 +26,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -55,8 +56,8 @@ func TestFrontend(t *testing.T) { assert.Equal(t, "Hello World", string(body)) } - testFrontend(t, handler, test, 0) - testFrontend(t, handler, test, 1) + testFrontend(t, handler, test, false) + testFrontend(t, handler, test, true) } func TestFrontendPropagateTrace(t *testing.T) { @@ -105,8 +106,8 @@ func TestFrontendPropagateTrace(t *testing.T) { // Query should do one calls. assert.Equal(t, traceID, <-observedTraceID) } - testFrontend(t, handler, test, 0) - testFrontend(t, handler, test, 1) + testFrontend(t, handler, test, false) + testFrontend(t, handler, test, true) } // TestFrontendCancel ensures that when client requests are cancelled, @@ -137,9 +138,9 @@ func TestFrontendCancel(t *testing.T) { time.Sleep(100 * time.Millisecond) assert.Equal(t, int32(1), atomic.LoadInt32(&tries)) } - testFrontend(t, handler, test, 0) + testFrontend(t, handler, test, false) tries = 0 - testFrontend(t, handler, test, 1) + testFrontend(t, handler, test, true) } func TestFrontendCancelStatusCode(t *testing.T) { @@ -160,16 +161,18 @@ func TestFrontendCancelStatusCode(t *testing.T) { } } -func testFrontend(t *testing.T, handler http.Handler, test func(addr string), totalParallelism int) { +func testFrontend(t *testing.T, handler http.Handler, test func(addr string), matchMaxConcurrency bool) { logger := log.NewNopLogger() var ( - config Config - workerConfig WorkerConfig + config Config + workerConfig WorkerConfig + querierConfig querier.Config ) flagext.DefaultValues(&config, &workerConfig) workerConfig.Parallelism = 1 - workerConfig.TotalParallelism = totalParallelism + workerConfig.MatchMaxConcurrency = matchMaxConcurrency + querierConfig.MaxConcurrent = 1 // localhost:0 prevents firewall warnings on Mac OS X. grpcListen, err := net.Listen("tcp", "localhost:0") @@ -201,7 +204,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string), to go httpServer.Serve(httpListen) //nolint:errcheck go grpcServer.Serve(grpcListen) //nolint:errcheck - worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger) + worker, err := NewWorker(workerConfig, querierConfig, httpgrpc_server.NewServer(handler), logger) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker)) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index bf08b731b66..e3079a607f4 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -15,16 +15,17 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/naming" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/cortexproject/cortex/pkg/util/services" ) // WorkerConfig is config for a worker. type WorkerConfig struct { - Address string `yaml:"frontend_address"` - Parallelism int `yaml:"parallelism"` - TotalParallelism int `yaml:"total_parallelism"` - DNSLookupDuration time.Duration `yaml:"dns_lookup_duration"` + Address string `yaml:"frontend_address"` + Parallelism int `yaml:"parallelism"` + MatchMaxConcurrency bool `yaml:"match_max_concurrency"` + DNSLookupDuration time.Duration `yaml:"dns_lookup_duration"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` } @@ -33,7 +34,7 @@ type WorkerConfig struct { func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Address, "querier.frontend-address", "", "Address of query frontend service, in host:port format.") f.IntVar(&cfg.Parallelism, "querier.worker-parallelism", 10, "Number of simultaneous queries to process per query frontend.") - f.IntVar(&cfg.TotalParallelism, "querier.worker-total-parallelism", 0, "Number of simultaneous queries to process across all query frontends. Overrides querier.worker-parallelism.") + f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", false, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.") f.DurationVar(&cfg.DNSLookupDuration, "querier.dns-lookup-period", 10*time.Second, "How often to query DNS.") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f) @@ -41,9 +42,10 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) { // Worker is the counter-part to the frontend, actually processing requests. type worker struct { - cfg WorkerConfig - log log.Logger - server *server.Server + cfg WorkerConfig + querierCfg querier.Config + log log.Logger + server *server.Server watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed. managers map[string]*frontendManager @@ -51,7 +53,7 @@ type worker struct { // NewWorker creates a new worker and returns a service that is wrapping it. // If no address is specified, it returns nil service (and no error). -func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (services.Service, error) { +func NewWorker(cfg WorkerConfig, querierCfg querier.Config, server *server.Server, log log.Logger) (services.Service, error) { if cfg.Address == "" { level.Info(log).Log("msg", "no address specified, not starting worker") return nil, nil @@ -68,11 +70,12 @@ func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (service } w := &worker{ - cfg: cfg, - log: log, - server: server, - watcher: watcher, - managers: map[string]*frontendManager{}, + cfg: cfg, + querierCfg: querierCfg, + log: log, + server: server, + watcher: watcher, + managers: map[string]*frontendManager{}, } return services.NewBasicService(nil, w.watchDNSLoop, w.stopping), nil } @@ -152,10 +155,10 @@ func (w *worker) resetParallelism() { for i, addr := range addresses { concurrentRequests := 0 - if w.cfg.TotalParallelism > 0 { - concurrentRequests = w.cfg.TotalParallelism / len(w.managers) + if w.cfg.MatchMaxConcurrency { + concurrentRequests = w.querierCfg.MaxConcurrent / len(w.managers) - if i < w.cfg.TotalParallelism%len(w.managers) { + if i < w.querierCfg.MaxConcurrent%len(w.managers) { concurrentRequests++ } } else { diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 7e7dce533dc..511656c7f7f 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" + "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util" ) @@ -22,42 +23,42 @@ func TestResetParallelism(t *testing.T) { tests := []struct { name string parallelism int - totalParallelism int + maxConcurrent int numManagers int expectedConcurrency int32 }{ { name: "Test create least one worker per manager", parallelism: 0, - totalParallelism: 0, + maxConcurrent: 0, numManagers: 2, expectedConcurrency: 2, }, { name: "Test concurrency per query frontend configuration", parallelism: 4, - totalParallelism: 0, + maxConcurrent: 0, numManagers: 2, expectedConcurrency: 8, }, { name: "Test Total Parallelism with a remainder", parallelism: 1, - totalParallelism: 7, + maxConcurrent: 7, numManagers: 4, expectedConcurrency: 7, }, { name: "Test Total Parallelism dividing evenly", parallelism: 1, - totalParallelism: 6, + maxConcurrent: 6, numManagers: 2, expectedConcurrency: 6, }, { name: "Test Total Parallelism at least one worker per manager", parallelism: 1, - totalParallelism: 3, + maxConcurrent: 3, numManagers: 6, expectedConcurrency: 6, }, @@ -66,14 +67,18 @@ func TestResetParallelism(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := WorkerConfig{ - Parallelism: tt.parallelism, - TotalParallelism: tt.totalParallelism, + Parallelism: tt.parallelism, + MatchMaxConcurrency: tt.maxConcurrent > 0, + } + querierCfg := querier.Config{ + MaxConcurrent: tt.maxConcurrent, } w := &worker{ - cfg: cfg, - log: util.Logger, - managers: map[string]*frontendManager{}, + cfg: cfg, + querierCfg: querierCfg, + log: util.Logger, + managers: map[string]*frontendManager{}, } for i := 0; i < tt.numManagers; i++ { From 47297232693f71e34a5773a092164f0cd39cfb34 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 11:00:04 -0400 Subject: [PATCH 30/44] Removed unnecessary param Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 2 +- pkg/querier/frontend/worker_frontend_manager.go | 4 +--- pkg/querier/frontend/worker_frontend_manager_test.go | 6 +++--- pkg/querier/frontend/worker_test.go | 2 +- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index e3079a607f4..6e19ae07dc0 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -119,7 +119,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) } - w.managers[update.Addr] = newFrontendManager(servCtx, w.log, w.server, client, 0, w.cfg.GRPCClientConfig.MaxRecvMsgSize) + w.managers[update.Addr] = newFrontendManager(servCtx, w.log, w.server, client, w.cfg.GRPCClientConfig.MaxRecvMsgSize) case naming.Delete: level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 3d4fee1bed7..ba7f166d8ef 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -40,7 +40,7 @@ type frontendManager struct { currentProcessors *atomic.Int32 } -func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, initialConcurrentRequests int, maxSendMsgSize int) *frontendManager { +func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, maxSendMsgSize int) *frontendManager { serverCtx, cancel := context.WithCancel(serverCtx) f := &frontendManager{ @@ -53,8 +53,6 @@ func newFrontendManager(serverCtx context.Context, log log.Logger, server *serve currentProcessors: atomic.NewInt32(0), } - f.concurrentRequests(initialConcurrentRequests) - return f } diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 44c035e0e8c..7f7821ff67e 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -84,7 +84,7 @@ func TestConcurrency(t *testing.T) { } for _, tt := range tests { t.Run(fmt.Sprintf("Testing concurrency %v", tt.concurrency), func(t *testing.T) { - mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 100000000) for _, c := range tt.concurrency { calls.Store(0) @@ -120,7 +120,7 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) { failRecv: true, } - mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), client, 0, 100000000) + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), client, 100000000) mgr.concurrentRequests(1) time.Sleep(50 * time.Millisecond) @@ -144,7 +144,7 @@ func TestServeCancelStopsProcess(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, 0, 100000000) + mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, 100000000) mgr.concurrentRequests(1) time.Sleep(50 * time.Millisecond) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 511656c7f7f..82d11551589 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -82,7 +82,7 @@ func TestResetParallelism(t *testing.T) { } for i := 0; i < tt.numManagers; i++ { - w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 0, 100000000) + w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 100000000) } w.resetParallelism() From 692b62395086194b857c405eca652f721c7af3f7 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 11:28:46 -0400 Subject: [PATCH 31/44] Swapped to client config instead of bare param Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 2 +- .../frontend/worker_frontend_manager.go | 19 ++++++++++--------- .../frontend/worker_frontend_manager_test.go | 7 ++++--- pkg/querier/frontend/worker_test.go | 3 ++- 4 files changed, 17 insertions(+), 14 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 6e19ae07dc0..c15bb3063ed 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -119,7 +119,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) } - w.managers[update.Addr] = newFrontendManager(servCtx, w.log, w.server, client, w.cfg.GRPCClientConfig.MaxRecvMsgSize) + w.managers[update.Addr] = newFrontendManager(servCtx, w.log, w.server, client, w.cfg.GRPCClientConfig) case naming.Delete: level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index ba7f166d8ef..f6e0f9f5419 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -15,6 +15,7 @@ import ( "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" ) var ( @@ -27,11 +28,11 @@ var ( ) type frontendManager struct { - client FrontendClient - server *server.Server + server *server.Server + client FrontendClient + clientCfg grpcclient.Config - log log.Logger - maxSendMsgSize int + log log.Logger gracefulQuit []chan struct{} serverCtx context.Context @@ -40,16 +41,16 @@ type frontendManager struct { currentProcessors *atomic.Int32 } -func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, maxSendMsgSize int) *frontendManager { +func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, clientCfg grpcclient.Config) *frontendManager { serverCtx, cancel := context.WithCancel(serverCtx) f := &frontendManager{ - client: client, log: log, + client: client, + clientCfg: clientCfg, server: server, serverCtx: serverCtx, cancel: cancel, - maxSendMsgSize: maxSendMsgSize, currentProcessors: atomic.NewInt32(0), } @@ -162,8 +163,8 @@ func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient } // Ensure responses that are too big are not retried. - if len(response.Body) >= f.maxSendMsgSize { - errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), f.maxSendMsgSize) + if len(response.Body) >= f.clientCfg.MaxSendMsgSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), f.clientCfg.MaxSendMsgSize) response = &httpgrpc.HTTPResponse{ Code: http.StatusRequestEntityTooLarge, Body: []byte(errMsg), diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 7f7821ff67e..80a37ba2dc4 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -16,6 +16,7 @@ import ( grpc "google.golang.org/grpc" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" ) type mockFrontendClient struct { @@ -84,7 +85,7 @@ func TestConcurrency(t *testing.T) { } for _, tt := range tests { t.Run(fmt.Sprintf("Testing concurrency %v", tt.concurrency), func(t *testing.T) { - mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 100000000) + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, grpcclient.Config{}) for _, c := range tt.concurrency { calls.Store(0) @@ -120,7 +121,7 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) { failRecv: true, } - mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), client, 100000000) + mgr := newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), client, grpcclient.Config{}) mgr.concurrentRequests(1) time.Sleep(50 * time.Millisecond) @@ -144,7 +145,7 @@ func TestServeCancelStopsProcess(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, 100000000) + mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, grpcclient.Config{}) mgr.concurrentRequests(1) time.Sleep(50 * time.Millisecond) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 82d11551589..9bc41dfd2c2 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/grpcclient" ) func TestResetParallelism(t *testing.T) { @@ -82,7 +83,7 @@ func TestResetParallelism(t *testing.T) { } for i := 0; i < tt.numManagers; i++ { - w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, 100000000) + w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, grpcclient.Config{}) } w.resetParallelism() From 75425bd4870e64af8a0ec123171994e4da7af35b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 11:39:17 -0400 Subject: [PATCH 32/44] Use DialContext() Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index c15bb3063ed..622d44dae5e 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -114,7 +114,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { switch update.Op { case naming.Add: level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr) - client, err := w.connect(update.Addr) + client, err := w.connect(servCtx, update.Addr) if err != nil { level.Error(w.log).Log("msg", "error connecting", "addr", update.Addr, "err", err) } @@ -136,10 +136,14 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { } } -func (w *worker) connect(address string) (FrontendClient, error) { - opts := []grpc.DialOption{grpc.WithInsecure()} +func (w *worker) connect(ctx context.Context, address string) (FrontendClient, error) { + opts := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + } opts = append(opts, w.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClientUserHeaderInterceptor}, nil)...) - conn, err := grpc.Dial(address, opts...) + + conn, err := grpc.DialContext(ctx, address, opts...) if err != nil { return nil, err } From b18c376d671997a1719f9ed4ce02562f70266c83 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 12:00:03 -0400 Subject: [PATCH 33/44] Added comments/logs around concurrency distribution Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 622d44dae5e..afdc89cf32b 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -157,25 +157,37 @@ func (w *worker) resetParallelism() { } rand.Shuffle(len(addresses), func(i, j int) { addresses[i], addresses[j] = addresses[j], addresses[i] }) + totalConcurrency := 0 for i, addr := range addresses { concurrentRequests := 0 if w.cfg.MatchMaxConcurrency { concurrentRequests = w.querierCfg.MaxConcurrent / len(w.managers) + // if max concurrency does not evenly divide into our frontends we will choose some + // to receive an extra connection. addresses were randomized above so this will be a + // random selection of frontends if i < w.querierCfg.MaxConcurrent%len(w.managers) { + level.Warn(w.log).Log("msg", "max concurrency is not evenly dividable across query frontends. adding an extra connection", "addr", addr) concurrentRequests++ } } else { concurrentRequests = w.cfg.Parallelism } + // max concurrency is less than the total number of query frontends. to prevent accidentally + // starving a frontend we are just going to always connect once to every frontend + // this is dangerous b/c we may start exceeding promql max concurrency if concurrentRequests == 0 { concurrentRequests = 1 } - mgr, ok := w.managers[addr] - if ok { + totalConcurrency += concurrentRequests + if mgr, ok := w.managers[addr]; ok { mgr.concurrentRequests(concurrentRequests) } } + + if totalConcurrency > w.querierCfg.MaxConcurrent { + level.Warn(w.log).Log("msg", "total worker concurrency is greater than promql max concurrency. queries may be queued in the querier which reduces QOS") + } } From 94dbddd7668f25dce95691c4205cb6051b12ac30 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 13:53:10 -0400 Subject: [PATCH 34/44] Swapped to force cancel via context Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager.go | 61 +++++++------------ .../frontend/worker_frontend_manager_test.go | 8 ++- 2 files changed, 30 insertions(+), 39 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index f6e0f9f5419..5c557058f29 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -2,7 +2,6 @@ package frontend import ( "context" - "errors" "fmt" "net/http" "sync" @@ -23,8 +22,6 @@ var ( MinBackoff: 50 * time.Millisecond, MaxBackoff: 1 * time.Second, } - - errGracefulQuit = errors.New("processor quitting gracefully") ) type frontendManager struct { @@ -34,23 +31,23 @@ type frontendManager struct { log log.Logger - gracefulQuit []chan struct{} - serverCtx context.Context - cancel context.CancelFunc + workerCancels []context.CancelFunc + managerCtx context.Context + managerCancel context.CancelFunc wg sync.WaitGroup currentProcessors *atomic.Int32 } func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, clientCfg grpcclient.Config) *frontendManager { - serverCtx, cancel := context.WithCancel(serverCtx) + managerCtx, cancel := context.WithCancel(serverCtx) f := &frontendManager{ log: log, client: client, clientCfg: clientCfg, server: server, - serverCtx: serverCtx, - cancel: cancel, + managerCtx: managerCtx, + managerCancel: cancel, currentProcessors: atomic.NewInt32(0), } @@ -58,8 +55,8 @@ func newFrontendManager(serverCtx context.Context, log log.Logger, server *serve } func (f *frontendManager) stop() { - f.cancel() // force stop - f.concurrentRequests(0) // graceful quit + f.managerCancel() + f.concurrentRequests(0) f.wg.Wait() } @@ -69,21 +66,20 @@ func (f *frontendManager) concurrentRequests(n int) { } // adjust clients slice as necessary - for len(f.gracefulQuit) != n { - if len(f.gracefulQuit) < n { - quit := make(chan struct{}) - f.gracefulQuit = append(f.gracefulQuit, quit) - - go f.runOne(quit) + for len(f.workerCancels) != n { + if len(f.workerCancels) < n { + ctx, cancel := context.WithCancel(f.managerCtx) + f.workerCancels = append(f.workerCancels, cancel) + go f.runOne(ctx) continue } - if len(f.gracefulQuit) > n { + if len(f.workerCancels) > n { // remove from slice and shutdown - var quit chan struct{} - quit, f.gracefulQuit = f.gracefulQuit[0], f.gracefulQuit[1:] - close(quit) + var cancel context.CancelFunc + cancel, f.workerCancels = f.workerCancels[0], f.workerCancels[1:] + cancel() } } } @@ -94,29 +90,24 @@ func (f *frontendManager) concurrentRequests(n int) { // servCtx is cancelled => Cortex is shutting down. // c.Recv() errors => transient network issue, a client of the query frontend times out // close quit channel => frontendManager is politely asking to shutdown a processor -func (f *frontendManager) runOne(quit <-chan struct{}) { +func (f *frontendManager) runOne(ctx context.Context) { f.wg.Add(1) defer f.wg.Done() f.currentProcessors.Inc() defer f.currentProcessors.Dec() - backoff := util.NewBackoff(f.serverCtx, backoffConfig) + backoff := util.NewBackoff(ctx, backoffConfig) for backoff.Ongoing() { - c, err := f.client.Process(f.serverCtx) + c, err := f.client.Process(ctx) if err != nil { level.Error(f.log).Log("msg", "error contacting frontend", "err", err) backoff.Wait() continue } - if err := f.process(quit, c); err != nil { - if err == errGracefulQuit { - level.Debug(f.log).Log("msg", "gracefully shutting down processor") - return - } - + if err := f.process(ctx, c); err != nil { level.Error(f.log).Log("msg", "error processing requests", "err", err) backoff.Wait() continue @@ -127,18 +118,12 @@ func (f *frontendManager) runOne(quit <-chan struct{}) { } // process loops processing requests on an established stream. -func (f *frontendManager) process(quit <-chan struct{}, c Frontend_ProcessClient) error { - // Build a child context so we can cancel querie when the stream is closed. +func (f *frontendManager) process(ctx context.Context, c Frontend_ProcessClient) error { + // Build a child context so we can cancel a query when the stream is closed. ctx, cancel := context.WithCancel(c.Context()) defer cancel() for { - select { - case <-quit: - return errGracefulQuit - default: - } - request, err := c.Recv() if err != nil { return err diff --git a/pkg/querier/frontend/worker_frontend_manager_test.go b/pkg/querier/frontend/worker_frontend_manager_test.go index 80a37ba2dc4..90131363502 100644 --- a/pkg/querier/frontend/worker_frontend_manager_test.go +++ b/pkg/querier/frontend/worker_frontend_manager_test.go @@ -25,6 +25,7 @@ type mockFrontendClient struct { func (m *mockFrontendClient) Process(ctx context.Context, opts ...grpc.CallOption) (Frontend_ProcessClient, error) { return &mockFrontendProcessClient{ + ctx: ctx, failRecv: m.failRecv, }, nil } @@ -32,6 +33,7 @@ func (m *mockFrontendClient) Process(ctx context.Context, opts ...grpc.CallOptio type mockFrontendProcessClient struct { grpc.ClientStream + ctx context.Context failRecv bool wg sync.WaitGroup } @@ -44,6 +46,10 @@ func (m *mockFrontendProcessClient) Recv() (*ProcessRequest, error) { m.wg.Wait() m.wg.Add(1) + if m.ctx.Err() != nil { + return nil, m.ctx.Err() + } + if m.failRecv { return nil, errors.New("wups") } @@ -145,7 +151,7 @@ func TestServeCancelStopsProcess(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) - mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, grpcclient.Config{}) + mgr := newFrontendManager(ctx, util.Logger, httpgrpc_server.NewServer(handler), client, grpcclient.Config{MaxSendMsgSize: 100000}) mgr.concurrentRequests(1) time.Sleep(50 * time.Millisecond) From be78bab86d8951cebefcad8e1e08f45c0d274afc Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 16 Apr 2020 14:35:48 -0400 Subject: [PATCH 35/44] Removed outdated comment Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 5c557058f29..75a9aeefe96 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -86,10 +86,6 @@ func (f *frontendManager) concurrentRequests(n int) { // runOne loops, trying to establish a stream to the frontend to begin // request processing. -// Ways that this can be cancelled -// servCtx is cancelled => Cortex is shutting down. -// c.Recv() errors => transient network issue, a client of the query frontend times out -// close quit channel => frontendManager is politely asking to shutdown a processor func (f *frontendManager) runOne(ctx context.Context) { f.wg.Add(1) defer f.wg.Done() From 99104878b9cc99c2c4aa0210a1d09af301e4e566 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 11:51:31 -0400 Subject: [PATCH 36/44] Added match_max_concurrent to example single binary configs Signed-off-by: Joe Elliott --- docs/configuration/config-file-reference.md | 2 +- .../configuration/single-process-config-blocks-gossip-1.yaml | 3 +++ .../configuration/single-process-config-blocks-gossip-2.yaml | 3 +++ docs/configuration/single-process-config-blocks.yaml | 3 +++ docs/configuration/single-process-config.md | 5 +++++ docs/configuration/single-process-config.yaml | 3 +++ pkg/querier/frontend/worker.go | 2 +- 7 files changed, 19 insertions(+), 2 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4ad672e0c0b..0c4f745231b 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1808,7 +1808,7 @@ The `frontend_worker_config` configures the worker - running within the Cortex q # Force worker concurrency to match the -querier.max-concurrent option. # Overrides querier.worker-parallelism. # CLI flag: -querier.worker-match-max-concurrent -[match_max_concurrency: | default = false] +[match_max_concurrent: | default = false] # How often to query DNS. # CLI flag: -querier.dns-lookup-period diff --git a/docs/configuration/single-process-config-blocks-gossip-1.yaml b/docs/configuration/single-process-config-blocks-gossip-1.yaml index 1d75f91b4b0..6b4ae86b224 100644 --- a/docs/configuration/single-process-config-blocks-gossip-1.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-1.yaml @@ -80,3 +80,6 @@ tsdb: backend: filesystem # s3, gcs, azure or filesystem are valid options filesystem: dir: /tmp/cortex/storage + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/docs/configuration/single-process-config-blocks-gossip-2.yaml b/docs/configuration/single-process-config-blocks-gossip-2.yaml index aa45fbb16a3..8c74f3f9d20 100644 --- a/docs/configuration/single-process-config-blocks-gossip-2.yaml +++ b/docs/configuration/single-process-config-blocks-gossip-2.yaml @@ -79,3 +79,6 @@ tsdb: backend: filesystem # s3, gcs, azure or filesystem are valid options filesystem: dir: /tmp/cortex/storage + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/docs/configuration/single-process-config-blocks.yaml b/docs/configuration/single-process-config-blocks.yaml index 05dcd5c7b25..033ec1618fa 100644 --- a/docs/configuration/single-process-config-blocks.yaml +++ b/docs/configuration/single-process-config-blocks.yaml @@ -82,3 +82,6 @@ compactor: sharding_ring: kvstore: store: inmemory + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/docs/configuration/single-process-config.md b/docs/configuration/single-process-config.md index 5ff7f281a45..291d10847b6 100644 --- a/docs/configuration/single-process-config.md +++ b/docs/configuration/single-process-config.md @@ -71,4 +71,9 @@ storage: filesystem: directory: /tmp/cortex/chunks + +# Configure the frontend worker in the querier to match worker count +# to max_concurrent on the queriers. +frontend_worker: + match_max_concurrent: true ``` diff --git a/docs/configuration/single-process-config.yaml b/docs/configuration/single-process-config.yaml index 03c5680690d..03377082fa7 100644 --- a/docs/configuration/single-process-config.yaml +++ b/docs/configuration/single-process-config.yaml @@ -75,3 +75,6 @@ storage: purger: enable: true object_store_type: filesystem + +frontend_worker: + match_max_concurrent: true \ No newline at end of file diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index afdc89cf32b..b35789012a8 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -24,7 +24,7 @@ import ( type WorkerConfig struct { Address string `yaml:"frontend_address"` Parallelism int `yaml:"parallelism"` - MatchMaxConcurrency bool `yaml:"match_max_concurrency"` + MatchMaxConcurrency bool `yaml:"match_max_concurrent"` DNSLookupDuration time.Duration `yaml:"dns_lookup_duration"` GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` From adda6a5587f9a458b68e39c9996d870aa8f3bcb8 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 11:55:06 -0400 Subject: [PATCH 37/44] Improved comments Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index b35789012a8..dea5a4d933e 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -163,9 +163,9 @@ func (w *worker) resetParallelism() { if w.cfg.MatchMaxConcurrency { concurrentRequests = w.querierCfg.MaxConcurrent / len(w.managers) - // if max concurrency does not evenly divide into our frontends we will choose some - // to receive an extra connection. addresses were randomized above so this will be a - // random selection of frontends + // If max concurrency does not evenly divide into our frontends a subset will be chosen + // to receive an extra connection. Frontend addresses were shuffled above so this will be a + // random selection of frontends. if i < w.querierCfg.MaxConcurrent%len(w.managers) { level.Warn(w.log).Log("msg", "max concurrency is not evenly dividable across query frontends. adding an extra connection", "addr", addr) concurrentRequests++ @@ -174,9 +174,10 @@ func (w *worker) resetParallelism() { concurrentRequests = w.cfg.Parallelism } - // max concurrency is less than the total number of query frontends. to prevent accidentally - // starving a frontend we are just going to always connect once to every frontend - // this is dangerous b/c we may start exceeding promql max concurrency + // If concurrentRequests is 0 then w.querierCfg.MaxConcurrent is less than the total number of + // query frontends. In order to prevent accidentally starving a frontend we are just going to + // always connect once to every frontend. This is dangerous b/c we may start exceeding promql + // max concurrency. if concurrentRequests == 0 { concurrentRequests = 1 } From e868e4f74a475d0f16d5cd526254cd357f4e0406 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 11:58:56 -0400 Subject: [PATCH 38/44] Added log for scenario where we can't find in a map Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index dea5a4d933e..e2ac38c5020 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -185,6 +185,8 @@ func (w *worker) resetParallelism() { totalConcurrency += concurrentRequests if mgr, ok := w.managers[addr]; ok { mgr.concurrentRequests(concurrentRequests) + } else { + level.Error(w.log).Log("msg", "address not found in managers map. this should not happen", "addr", addr) } } From ea97cb833b22044d7b04bbedc24f0fda46111ee6 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 12:04:27 -0400 Subject: [PATCH 39/44] Removed unnecessary nesting Signed-off-by: Joe Elliott --- .../frontend/worker_frontend_manager.go | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 75a9aeefe96..680de844117 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -66,21 +66,18 @@ func (f *frontendManager) concurrentRequests(n int) { } // adjust clients slice as necessary - for len(f.workerCancels) != n { - if len(f.workerCancels) < n { - ctx, cancel := context.WithCancel(f.managerCtx) - f.workerCancels = append(f.workerCancels, cancel) + for len(f.workerCancels) < n { + ctx, cancel := context.WithCancel(f.managerCtx) + f.workerCancels = append(f.workerCancels, cancel) - go f.runOne(ctx) - continue - } + go f.runOne(ctx) + } - if len(f.workerCancels) > n { - // remove from slice and shutdown - var cancel context.CancelFunc - cancel, f.workerCancels = f.workerCancels[0], f.workerCancels[1:] - cancel() - } + for len(f.workerCancels) > n { + // remove from slice and shutdown + var cancel context.CancelFunc + cancel, f.workerCancels = f.workerCancels[0], f.workerCancels[1:] + cancel() } } From d209888607b5fa6f95480fbdc35e789630b25508 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 12:27:20 -0400 Subject: [PATCH 40/44] Moved concurrency to its own method Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 57 ++++++++++++++++------------- pkg/querier/frontend/worker_test.go | 6 +-- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index e2ac38c5020..5c1b9fe6881 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -132,7 +132,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { } } - w.resetParallelism() + w.resetConcurrency() } } @@ -150,7 +150,7 @@ func (w *worker) connect(ctx context.Context, address string) (FrontendClient, e return NewFrontendClient(conn), nil } -func (w *worker) resetParallelism() { +func (w *worker) resetConcurrency() { addresses := make([]string, 0, len(w.managers)) for addr := range w.managers { addresses = append(addresses, addr) @@ -159,30 +159,9 @@ func (w *worker) resetParallelism() { totalConcurrency := 0 for i, addr := range addresses { - concurrentRequests := 0 - if w.cfg.MatchMaxConcurrency { - concurrentRequests = w.querierCfg.MaxConcurrent / len(w.managers) - - // If max concurrency does not evenly divide into our frontends a subset will be chosen - // to receive an extra connection. Frontend addresses were shuffled above so this will be a - // random selection of frontends. - if i < w.querierCfg.MaxConcurrent%len(w.managers) { - level.Warn(w.log).Log("msg", "max concurrency is not evenly dividable across query frontends. adding an extra connection", "addr", addr) - concurrentRequests++ - } - } else { - concurrentRequests = w.cfg.Parallelism - } - - // If concurrentRequests is 0 then w.querierCfg.MaxConcurrent is less than the total number of - // query frontends. In order to prevent accidentally starving a frontend we are just going to - // always connect once to every frontend. This is dangerous b/c we may start exceeding promql - // max concurrency. - if concurrentRequests == 0 { - concurrentRequests = 1 - } - + concurrentRequests := w.concurrency(i, addr) totalConcurrency += concurrentRequests + if mgr, ok := w.managers[addr]; ok { mgr.concurrentRequests(concurrentRequests) } else { @@ -194,3 +173,31 @@ func (w *worker) resetParallelism() { level.Warn(w.log).Log("msg", "total worker concurrency is greater than promql max concurrency. queries may be queued in the querier which reduces QOS") } } + +func (w *worker) concurrency(index int, addr string) int { + concurrentRequests := 0 + + if w.cfg.MatchMaxConcurrency { + concurrentRequests = w.querierCfg.MaxConcurrent / len(w.managers) + + // If max concurrency does not evenly divide into our frontends a subset will be chosen + // to receive an extra connection. Frontend addresses were shuffled above so this will be a + // random selection of frontends. + if index < w.querierCfg.MaxConcurrent%len(w.managers) { + level.Warn(w.log).Log("msg", "max concurrency is not evenly divisible across query frontends. adding an extra connection", "addr", addr) + concurrentRequests++ + } + } else { + concurrentRequests = w.cfg.Parallelism + } + + // If concurrentRequests is 0 then w.querierCfg.MaxConcurrent is less than the total number of + // query frontends. In order to prevent accidentally starving a frontend we are just going to + // always connect once to every frontend. This is dangerous b/c we may start exceeding promql + // max concurrency. + if concurrentRequests == 0 { + concurrentRequests = 1 + } + + return concurrentRequests +} diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 9bc41dfd2c2..1459e070b76 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -15,7 +15,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpcclient" ) -func TestResetParallelism(t *testing.T) { +func TestResetConcurrency(t *testing.T) { handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, err := w.Write([]byte("Hello World")) assert.NoError(t, err) @@ -33,7 +33,7 @@ func TestResetParallelism(t *testing.T) { parallelism: 0, maxConcurrent: 0, numManagers: 2, - expectedConcurrency: 2, + expectedConcurrency: 0, }, { name: "Test concurrency per query frontend configuration", @@ -86,7 +86,7 @@ func TestResetParallelism(t *testing.T) { w.managers[strconv.Itoa(i)] = newFrontendManager(context.Background(), util.Logger, httpgrpc_server.NewServer(handler), &mockFrontendClient{}, grpcclient.Config{}) } - w.resetParallelism() + w.resetConcurrency() time.Sleep(100 * time.Millisecond) concurrency := int32(0) From ec7809ec28a2a4e1e92c0f968c4050f47eb3c040 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 12:33:34 -0400 Subject: [PATCH 41/44] Removed managerCtx/Cancel Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_frontend_manager.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/pkg/querier/frontend/worker_frontend_manager.go b/pkg/querier/frontend/worker_frontend_manager.go index 680de844117..2af3ce883d3 100644 --- a/pkg/querier/frontend/worker_frontend_manager.go +++ b/pkg/querier/frontend/worker_frontend_manager.go @@ -32,22 +32,18 @@ type frontendManager struct { log log.Logger workerCancels []context.CancelFunc - managerCtx context.Context - managerCancel context.CancelFunc + serverCtx context.Context wg sync.WaitGroup currentProcessors *atomic.Int32 } func newFrontendManager(serverCtx context.Context, log log.Logger, server *server.Server, client FrontendClient, clientCfg grpcclient.Config) *frontendManager { - managerCtx, cancel := context.WithCancel(serverCtx) - f := &frontendManager{ log: log, client: client, clientCfg: clientCfg, server: server, - managerCtx: managerCtx, - managerCancel: cancel, + serverCtx: serverCtx, currentProcessors: atomic.NewInt32(0), } @@ -55,7 +51,6 @@ func newFrontendManager(serverCtx context.Context, log log.Logger, server *serve } func (f *frontendManager) stop() { - f.managerCancel() f.concurrentRequests(0) f.wg.Wait() } @@ -65,16 +60,14 @@ func (f *frontendManager) concurrentRequests(n int) { n = 0 } - // adjust clients slice as necessary for len(f.workerCancels) < n { - ctx, cancel := context.WithCancel(f.managerCtx) + ctx, cancel := context.WithCancel(f.serverCtx) f.workerCancels = append(f.workerCancels, cancel) go f.runOne(ctx) } for len(f.workerCancels) > n { - // remove from slice and shutdown var cancel context.CancelFunc cancel, f.workerCancels = f.workerCancels[0], f.workerCancels[1:] cancel() @@ -92,7 +85,6 @@ func (f *frontendManager) runOne(ctx context.Context) { backoff := util.NewBackoff(ctx, backoffConfig) for backoff.Ongoing() { - c, err := f.client.Process(ctx) if err != nil { level.Error(f.log).Log("msg", "error contacting frontend", "err", err) From 9eb6218cd7c6dbb12d0311054e11f8cbcd31d2f2 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 17 Apr 2020 12:35:19 -0400 Subject: [PATCH 42/44] Fixed expected value on test Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/frontend/worker_test.go b/pkg/querier/frontend/worker_test.go index 1459e070b76..ba3ee655a26 100644 --- a/pkg/querier/frontend/worker_test.go +++ b/pkg/querier/frontend/worker_test.go @@ -33,7 +33,7 @@ func TestResetConcurrency(t *testing.T) { parallelism: 0, maxConcurrent: 0, numManagers: 2, - expectedConcurrency: 0, + expectedConcurrency: 2, }, { name: "Test concurrency per query frontend configuration", From ac2fc0f2dc23a32f6bd9e9475ed4b76af6affd00 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Sun, 19 Apr 2020 08:15:42 -0400 Subject: [PATCH 43/44] Removed WithBlock() option Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 5c1b9fe6881..4246aa2b687 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -139,7 +139,6 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { func (w *worker) connect(ctx context.Context, address string) (FrontendClient, error) { opts := []grpc.DialOption{ grpc.WithInsecure(), - grpc.WithBlock(), } opts = append(opts, w.cfg.GRPCClientConfig.DialOption([]grpc.UnaryClientInterceptor{middleware.ClientUserHeaderInterceptor}, nil)...) From cd54802bda7ad35e470d49854d11253965841ba0 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Tue, 21 Apr 2020 20:55:30 -0400 Subject: [PATCH 44/44] Remove deleted address from manager map Signed-off-by: Joe Elliott --- pkg/querier/frontend/worker.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/querier/frontend/worker.go b/pkg/querier/frontend/worker.go index 4246aa2b687..0b095458293 100644 --- a/pkg/querier/frontend/worker.go +++ b/pkg/querier/frontend/worker.go @@ -125,6 +125,7 @@ func (w *worker) watchDNSLoop(servCtx context.Context) error { level.Debug(w.log).Log("msg", "removing connection", "addr", update.Addr) if mgr, ok := w.managers[update.Addr]; ok { mgr.stop() + delete(w.managers, update.Addr) } default: