Skip to content

Commit 2317ded

Browse files
Albin Kerouantonakerouanton
Albin Kerouanton
authored andcommitted
Properly stop logger during (re)connect failure
PR fluent#77 introduced a new parameter named ForceStopAsyncSend. It can be used to tell the logger to not try to send all the log messages in its buffer before closing. Without this parameter, the logger hangs out whenever it has logs to write and the target Fluentd server is down. But this new parameter is not enough: the connection is currently lazily initialized when the logger receive its first message. This blocks the select reading messages from the queue (until the connection is ready). Moreover, the connection dialing uses an exponential back-off retry. Because of that, the logger won't look for messages on `stopRunning` channel (the channel introduced by PR fluent#77), either because it's blocked by the Sleep used for the retry or because the connection dialing is waiting until dialing timeout (eg. TCP timeout). To fix these edge cases, the time.Sleep() used for back-off retry has been transformed into a time.After() and a new stopAsyncConnect channel has been introduced to stop it. Moreover, the dialer.Dial() call used to connect to Fluentd has been replaced with dialer.DialContext() and a cancellable context is now used to stop the call to that method. Finally, the Fluent.run() method has been adapted to wait for both new messages on f.pending and stop signal sent to f.stopRunning channel. Previously, both channels were selected independently, in a procedural fashion. This fix is motivated by the issue described in: moby/moby#40063. Signed-off-by: Albin Kerouanton <[email protected]>
1 parent 89c0329 commit 2317ded

File tree

2 files changed

+162
-77
lines changed

2 files changed

+162
-77
lines changed

fluent/fluent.go

Lines changed: 152 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fluent
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -85,15 +86,24 @@ type msgToSend struct {
8586
type Fluent struct {
8687
Config
8788

88-
dialer dialer
89+
dialer dialer
90+
// stopRunning is used in async mode to signal to run() and connectOrRetryAsync()
91+
// they should abort.
8992
stopRunning chan bool
90-
pending chan *msgToSend
91-
wg sync.WaitGroup
93+
// stopAsyncConnect is used by connectOrRetryAsync() to signal to
94+
// connectOrRetry() it should abort.
95+
stopAsyncConnect chan bool
96+
pending chan *msgToSend
97+
wg sync.WaitGroup
9298

9399
muconn sync.Mutex
94100
conn net.Conn
95101
}
96102

103+
type dialer interface {
104+
DialContext(ctx context.Context, network, address string) (net.Conn, error)
105+
}
106+
97107
// New creates a new Logger.
98108
func New(config Config) (*Fluent, error) {
99109
if config.Timeout == 0 {
@@ -104,10 +114,6 @@ func New(config Config) (*Fluent, error) {
104114
})
105115
}
106116

107-
type dialer interface {
108-
Dial(string, string) (net.Conn, error)
109-
}
110-
111117
func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
112118
if config.FluentNetwork == "" {
113119
config.FluentNetwork = defaultNetwork
@@ -143,9 +149,11 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
143149

144150
if config.Async {
145151
f = &Fluent{
146-
Config: config,
147-
dialer: d,
148-
pending: make(chan *msgToSend, config.BufferLimit),
152+
Config: config,
153+
dialer: d,
154+
stopRunning: make(chan bool),
155+
stopAsyncConnect: make(chan bool),
156+
pending: make(chan *msgToSend, config.BufferLimit),
149157
}
150158
f.wg.Add(1)
151159
go f.run()
@@ -154,7 +162,7 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
154162
Config: config,
155163
dialer: d,
156164
}
157-
err = f.connect()
165+
err = f.connect(context.Background())
158166
}
159167
return
160168
}
@@ -331,7 +339,11 @@ func (f *Fluent) Close() (err error) {
331339
close(f.pending)
332340
f.wg.Wait()
333341
}
334-
f.close(f.conn)
342+
343+
f.muconn.Lock()
344+
f.close()
345+
f.muconn.Unlock()
346+
335347
return
336348
}
337349

@@ -346,35 +358,105 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
346358
}
347359

348360
// close closes the connection.
349-
func (f *Fluent) close(c net.Conn) {
350-
f.muconn.Lock()
351-
if f.conn != nil && f.conn == c {
361+
func (f *Fluent) close() {
362+
if f.conn != nil {
352363
f.conn.Close()
353364
f.conn = nil
354365
}
355-
f.muconn.Unlock()
356366
}
357367

358368
// connect establishes a new connection using the specified transport.
359-
func (f *Fluent) connect() (err error) {
369+
func (f *Fluent) connect(ctx context.Context) (err error) {
360370
switch f.Config.FluentNetwork {
361371
case "tcp":
362-
f.conn, err = f.dialer.Dial(
372+
f.conn, err = f.dialer.DialContext(ctx,
363373
f.Config.FluentNetwork,
364374
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
365375
case "unix":
366-
f.conn, err = f.dialer.Dial(
376+
f.conn, err = f.dialer.DialContext(ctx,
367377
f.Config.FluentNetwork,
368378
f.Config.FluentSocketPath)
369379
default:
370380
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
371381
}
382+
372383
return err
373384
}
374385

386+
var errIsClosing = errors.New("fluent logger is closing, aborting async connect")
387+
388+
func (f *Fluent) connectOrRetry(ctx context.Context) error {
389+
// Use a Time channel instead of time.Sleep() to avoid blocking this
390+
// goroutine during eventually way too much time (because of the exponential
391+
// back-off retry).
392+
waiter := time.After(time.Duration(0))
393+
for i := 0; i < f.Config.MaxRetry; i++ {
394+
select {
395+
case <-waiter:
396+
err := f.connect(ctx)
397+
if err == nil {
398+
return nil
399+
}
400+
401+
if _, ok := err.(*ErrUnknownNetwork); ok {
402+
// No need to retry on unknown network error. Thus false is passed
403+
// to ready channel to let the other end drain the message queue.
404+
return err
405+
}
406+
407+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
408+
if waitTime > f.Config.MaxRetryWait {
409+
waitTime = f.Config.MaxRetryWait
410+
}
411+
412+
waiter = time.After(time.Duration(waitTime) * time.Millisecond)
413+
case <-f.stopAsyncConnect:
414+
return errIsClosing
415+
}
416+
}
417+
418+
return fmt.Errorf("could not connect to fluentd after %d retries", f.Config.MaxRetry)
419+
}
420+
421+
// connectOrRetryAsync returns an error when it fails to connect to fluentd or
422+
// when Close() has been called.
423+
func (f *Fluent) connectOrRetryAsync(ctx context.Context) error {
424+
ctx, cancelDialing := context.WithCancel(ctx)
425+
errCh := make(chan error)
426+
427+
f.wg.Add(1)
428+
go func(ctx context.Context, errCh chan<- error) {
429+
defer f.wg.Done()
430+
errCh <- f.connectOrRetry(ctx)
431+
}(ctx, errCh)
432+
433+
for {
434+
select {
435+
case _, ok := <-f.stopRunning:
436+
// If f.stopRunning is closed before we got something on errCh,
437+
// we need to wait a bit more.
438+
if !ok {
439+
break
440+
}
441+
442+
// Stop any connection dialing and then tell connectOrRetry to stop
443+
// trying to dial the connection. This has to be done in this
444+
// specifc order to make sure connectOrRetry() is not blocking on
445+
// the connection dialing.
446+
cancelDialing()
447+
f.stopAsyncConnect <- true
448+
case err := <-errCh:
449+
return err
450+
}
451+
}
452+
}
453+
454+
// run is the goroutine used to unqueue and write logs in async mode. That
455+
// goroutine is meant to run during the whole life of the Fluent logger.
375456
func (f *Fluent) run() {
376457
drainEvents := false
377458
var emitEventDrainMsg sync.Once
459+
378460
for {
379461
select {
380462
case entry, ok := <-f.pending:
@@ -387,16 +469,16 @@ func (f *Fluent) run() {
387469
continue
388470
}
389471
err := f.write(entry)
390-
if err != nil {
472+
if err == errIsClosing {
473+
drainEvents = true
474+
} else if err != nil {
475+
// TODO: log failing message?
391476
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
392477
}
393-
}
394-
select {
395478
case stopRunning, ok := <-f.stopRunning:
396479
if stopRunning || !ok {
397480
drainEvents = true
398481
}
399-
default:
400482
}
401483
}
402484
}
@@ -406,62 +488,64 @@ func e(x, y float64) int {
406488
}
407489

408490
func (f *Fluent) write(msg *msgToSend) error {
409-
var c net.Conn
410-
for i := 0; i < f.Config.MaxRetry; i++ {
411-
c = f.conn
412-
// Connect if needed
413-
if c == nil {
414-
f.muconn.Lock()
415-
if f.conn == nil {
416-
err := f.connect()
417-
if err != nil {
418-
f.muconn.Unlock()
419-
420-
if _, ok := err.(*ErrUnknownNetwork); ok {
421-
// do not retry on unknown network error
422-
break
423-
}
424-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
425-
if waitTime > f.Config.MaxRetryWait {
426-
waitTime = f.Config.MaxRetryWait
427-
}
428-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
429-
continue
430-
}
491+
// This function is used to ensure muconn is properly locked and unlocked
492+
// between each retry. This gives the importunity to other goroutines to
493+
// lock it (e.g. to close the connection).
494+
writer := func() (bool, error) {
495+
f.muconn.Lock()
496+
defer f.muconn.Unlock()
497+
498+
if f.conn == nil {
499+
var err error
500+
if f.Config.Async {
501+
err = f.connectOrRetryAsync(context.Background())
502+
} else {
503+
err = f.connectOrRetry(context.Background())
504+
}
505+
506+
if err != nil {
507+
return false, err
431508
}
432-
c = f.conn
433-
f.muconn.Unlock()
434509
}
435510

436-
// We're connected, write msg
437511
t := f.Config.WriteTimeout
438512
if time.Duration(0) < t {
439-
c.SetWriteDeadline(time.Now().Add(t))
513+
f.conn.SetWriteDeadline(time.Now().Add(t))
440514
} else {
441-
c.SetWriteDeadline(time.Time{})
515+
f.conn.SetWriteDeadline(time.Time{})
442516
}
443-
_, err := c.Write(msg.data)
517+
518+
_, err := f.conn.Write(msg.data)
444519
if err != nil {
445-
f.close(c)
446-
} else {
447-
// Acknowledgment check
448-
if msg.ack != "" {
449-
resp := &AckResp{}
450-
if f.Config.MarshalAsJSON {
451-
dec := json.NewDecoder(c)
452-
err = dec.Decode(resp)
453-
} else {
454-
r := msgp.NewReader(c)
455-
err = resp.DecodeMsg(r)
456-
}
457-
if err != nil || resp.Ack != msg.ack {
458-
f.close(c)
459-
continue
460-
}
520+
f.close()
521+
return true, err
522+
}
523+
524+
// Acknowledgment check
525+
if msg.ack != "" {
526+
resp := &AckResp{}
527+
if f.Config.MarshalAsJSON {
528+
dec := json.NewDecoder(f.conn)
529+
err = dec.Decode(resp)
530+
} else {
531+
r := msgp.NewReader(f.conn)
532+
err = resp.DecodeMsg(r)
461533
}
534+
535+
if err != nil || resp.Ack != msg.ack {
536+
f.close()
537+
return true, err
538+
}
539+
}
540+
541+
return false, nil
542+
}
543+
544+
for i := 0; i < f.Config.MaxRetry; i++ {
545+
if retry, err := writer(); !retry {
462546
return err
463547
}
464548
}
465549

466-
return fmt.Errorf("fluent#write: failed to reconnect, max retry: %v", f.Config.MaxRetry)
550+
return fmt.Errorf("fluent#write: failed to write after %d attempts", f.Config.MaxRetry)
467551
}

fluent/fluent_test.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package fluent
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
67
"errors"
78
"io/ioutil"
@@ -114,16 +115,20 @@ type testDialer struct {
114115
dialCh chan *Conn
115116
}
116117

117-
// Dial is the stubbed method called by the logger to establish the connection to
118+
// DialContext is the stubbed method called by the logger to establish the connection to
118119
// Fluentd. It is paired with waitForNextDialing().
119-
func (d *testDialer) Dial(string, string) (net.Conn, error) {
120+
func (d *testDialer) DialContext(ctx context.Context, _, _ string) (net.Conn, error) {
120121
// It waits for a *Conn to be pushed into dialCh using waitForNextDialing(). When the
121122
// *Conn is nil, the Dial is deemed to fail.
122-
conn := <-d.dialCh
123-
if conn == nil {
123+
select {
124+
case conn := <-d.dialCh:
125+
if conn == nil {
126+
return nil, errors.New("failed to dial")
127+
}
128+
return conn, nil
129+
case <-ctx.Done():
124130
return nil, errors.New("failed to dial")
125131
}
126-
return conn, nil
127132
}
128133

129134
// waitForNextDialing is the method used by test cases below to indicate whether the next
@@ -520,8 +525,6 @@ func timeout(t *testing.T, duration time.Duration, fn func(), reason string) {
520525
}
521526

522527
func TestCloseOnFailingAsyncConnect(t *testing.T) {
523-
t.Skip("Broken tests")
524-
525528
testcases := map[string]Config{
526529
"with ForceStopAsyncSend and with RequestAck": {
527530
Async: true,
@@ -573,8 +576,6 @@ func ackRespMsgp(t *testing.T, ack string) string {
573576
}
574577

575578
func TestCloseOnFailingAsyncReconnect(t *testing.T) {
576-
t.Skip("Broken tests")
577-
578579
testcases := map[string]Config{
579580
"with RequestAck": {
580581
Async: true,

0 commit comments

Comments
 (0)