Skip to content

Commit 5fc2fc6

Browse files
authored
Converted frontend worker to a service. (#2246)
* Converted frontend worker to a service. Frontend worker is now started only after all Querier module dependencies are started. Previously, worker was started much earlier, long before Querier was ready to accept queries.
1 parent cbcd45f commit 5fc2fc6

File tree

5 files changed

+39
-55
lines changed

5 files changed

+39
-55
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## master / unreleased
44

55
* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
6+
* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246
67
* [FEATURE] Flusher target to flush the WAL.
78
* `-flusher.wal-dir` for the WAL directory to recover from.
89
* `-flusher.concurrent-flushes` for number of concurrent flushes.

pkg/cortex/cortex.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ type Cortex struct {
182182
ingester *ingester.Ingester
183183
flusher *flusher.Flusher
184184
store chunk.Store
185-
worker frontend.Worker
186185
frontend *frontend.Frontend
187186
tableManager *chunk.TableManager
188187
cache cache.Cache

pkg/cortex/modules.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -202,23 +202,14 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) {
202202
subrouter.Path("/chunks").Handler(t.httpAuthMiddleware.Wrap(querier.ChunksHandler(queryable)))
203203
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(t.distributor.UserStatsHandler)))
204204

205-
// Start the query frontend worker once the query engine and the store
206-
// have been successfully initialized.
207-
t.worker, err = frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
205+
// Query frontend worker will only be started after all its dependencies are started, not here.
206+
// Worker may also be nil, if not configured, which is OK.
207+
worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
208208
if err != nil {
209209
return
210210
}
211211

212-
// TODO: If queryable returned from querier.New was a service, it could actually wait for storeQueryable
213-
// (if it also implemented Service) to finish starting... and return error if it's not in Running state.
214-
// This requires extra work, which is out of scope for this proof-of-concept...
215-
// BUT this extra functionality is ONE OF THE REASONS to introduce entire "Services" concept into Cortex.
216-
// For now, only return service that stops the worker, and Querier will be used even before storeQueryable has finished starting.
217-
218-
return services.NewIdleService(nil, func(_ error) error {
219-
t.worker.Stop()
220-
return nil
221-
}), nil
212+
return worker, nil
222213
}
223214

224215
// Latest Prometheus requires r.RemoteAddr to be set to addr:port, otherwise it reject the request.

pkg/querier/frontend/frontend_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"google.golang.org/grpc"
2828

2929
"github.com/cortexproject/cortex/pkg/util/flagext"
30+
"github.com/cortexproject/cortex/pkg/util/services"
3031
)
3132

3233
const (
@@ -197,7 +198,9 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {
197198

198199
worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger)
199200
require.NoError(t, err)
200-
defer worker.Stop()
201+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))
201202

202203
test(httpListen.Addr().String())
204+
205+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker))
203206
}

pkg/querier/frontend/worker.go

Lines changed: 30 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/go-kit/kit/log"
1212
"github.com/go-kit/kit/log/level"
13+
"github.com/pkg/errors"
1314
"github.com/weaveworks/common/httpgrpc"
1415
"github.com/weaveworks/common/httpgrpc/server"
1516
"github.com/weaveworks/common/middleware"
@@ -18,6 +19,7 @@ import (
1819

1920
"github.com/cortexproject/cortex/pkg/util"
2021
"github.com/cortexproject/cortex/pkg/util/grpcclient"
22+
"github.com/cortexproject/cortex/pkg/util/services"
2123
)
2224

2325
var (
@@ -46,31 +48,21 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
4648
}
4749

4850
// Worker is the counter-part to the frontend, actually processing requests.
49-
type Worker interface {
50-
Stop()
51-
}
52-
5351
type worker struct {
5452
cfg WorkerConfig
5553
log log.Logger
5654
server *server.Server
5755

58-
ctx context.Context
59-
cancel context.CancelFunc
6056
watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed.
6157
wg sync.WaitGroup
6258
}
6359

64-
type noopWorker struct {
65-
}
66-
67-
func (noopWorker) Stop() {}
68-
69-
// NewWorker creates a new Worker.
70-
func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) {
60+
// NewWorker creates a new worker and returns a service that is wrapping it.
61+
// If no address is specified, it returns nil service (and no error).
62+
func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (services.Service, error) {
7163
if cfg.Address == "" {
7264
level.Info(log).Log("msg", "no address specified, not starting worker")
73-
return noopWorker{}, nil
65+
return nil, nil
7466
}
7567

7668
resolver, err := naming.NewDNSResolverWithFreq(cfg.DNSLookupDuration)
@@ -83,52 +75,50 @@ func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker,
8375
return nil, err
8476
}
8577

86-
ctx, cancel := context.WithCancel(context.Background())
87-
8878
w := &worker{
89-
cfg: cfg,
90-
log: log,
91-
server: server,
92-
93-
ctx: ctx,
94-
cancel: cancel,
79+
cfg: cfg,
80+
log: log,
81+
server: server,
9582
watcher: watcher,
9683
}
97-
w.wg.Add(1)
98-
go w.watchDNSLoop()
99-
return w, nil
84+
return services.NewBasicService(nil, w.watchDNSLoop, w.stopping), nil
10085
}
10186

102-
// Stop the worker.
103-
func (w *worker) Stop() {
104-
w.watcher.Close()
105-
w.cancel()
87+
func (w *worker) stopping(_ error) error {
88+
// wait until all per-address workers are done. This is only called after watchDNSLoop exits.
10689
w.wg.Wait()
90+
return nil
10791
}
10892

10993
// watchDNSLoop watches for changes in DNS and starts or stops workers.
110-
func (w *worker) watchDNSLoop() {
111-
defer w.wg.Done()
94+
func (w *worker) watchDNSLoop(servCtx context.Context) error {
95+
go func() {
96+
// Close the watcher, when this service is asked to stop.
97+
// Closing the watcher makes watchDNSLoop exit, since it only iterates on watcher updates, and has no other
98+
// way to stop. We cannot close the watcher in `stopping` method, because it is only called *after*
99+
// watchDNSLoop exits.
100+
<-servCtx.Done()
101+
w.watcher.Close()
102+
}()
112103

113104
cancels := map[string]context.CancelFunc{}
114-
defer func() {
115-
for _, cancel := range cancels {
116-
cancel()
117-
}
118-
}()
119105

120106
for {
121107
updates, err := w.watcher.Next()
122108
if err != nil {
123-
level.Error(w.log).Log("msg", "error from DNS watcher", "err", err)
124-
return
109+
// watcher.Next returns error when Close is called, but we call Close when our context is done.
110+
// we don't want to report error in that case.
111+
if servCtx.Err() != nil {
112+
return nil
113+
}
114+
return errors.Wrapf(err, "error from DNS watcher")
125115
}
126116

127117
for _, update := range updates {
128118
switch update.Op {
129119
case naming.Add:
130120
level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr)
131-
ctx, cancel := context.WithCancel(w.ctx)
121+
ctx, cancel := context.WithCancel(servCtx)
132122
cancels[update.Addr] = cancel
133123
w.runMany(ctx, update.Addr)
134124

@@ -139,7 +129,7 @@ func (w *worker) watchDNSLoop() {
139129
}
140130

141131
default:
142-
panic("unknown op")
132+
return fmt.Errorf("unknown op: %v", update.Op)
143133
}
144134
}
145135
}

0 commit comments

Comments
 (0)