Skip to content

Commit 47a9a76

Browse files
author
Albin Kerouanton
committed
Try to asynchronously connect to Fluentd before writing
PR fluent#77 introduced a new parameter named ForceStopAsyncSend. It can be used to tell the logger to not try to send all the log messages in its buffer before closing. Without this parameter, the logger hangs out whenever it has logs to write and the target Fluentd server is down. But this new parameter is not enough: the connection is currently lazily initialized when the logger receive its first message. This blocks the select reading messages from the queue (until the connection is ready). Moreover, the connection dialing uses an exponential back-off retry. Because of that, the logger won't look for messages on `stopRunning` channel (the channel introduced by PR fluent#77), either because it's blocked by the Sleep used for the retry or because the connection dialing is waiting for tcp timeout. To fix these edge cases, the connection isn't initialized lazily anymore. However, it's still initialized asynchronously and with the exponential back-off retry. The Fluent.run() method has been adapted to wait for either the connection to become ready or to receive a stop signal on the stopRunning channel before starting to unqueue logs.
1 parent 28f6a3e commit 47a9a76

File tree

1 file changed

+85
-32
lines changed

1 file changed

+85
-32
lines changed

fluent/fluent.go

Lines changed: 85 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fluent
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -15,8 +16,9 @@ import (
1516
"bytes"
1617
"encoding/base64"
1718
"encoding/binary"
18-
"github.com/tinylib/msgp/msgp"
1919
"math/rand"
20+
21+
"github.com/tinylib/msgp/msgp"
2022
)
2123

2224
const (
@@ -84,6 +86,7 @@ type msgToSend struct {
8486
type Fluent struct {
8587
Config
8688

89+
ready chan bool
8790
stopRunning chan bool
8891
pending chan *msgToSend
8992
wg sync.WaitGroup
@@ -130,14 +133,16 @@ func New(config Config) (f *Fluent, err error) {
130133
}
131134
if config.Async {
132135
f = &Fluent{
133-
Config: config,
134-
pending: make(chan *msgToSend, config.BufferLimit),
136+
Config: config,
137+
ready: make(chan bool),
138+
stopRunning: make(chan bool),
139+
pending: make(chan *msgToSend, config.BufferLimit),
135140
}
136141
f.wg.Add(1)
137142
go f.run()
138143
} else {
139144
f = &Fluent{Config: config}
140-
err = f.connect()
145+
err = f.connect(context.Background())
141146
}
142147
return
143148
}
@@ -339,22 +344,95 @@ func (f *Fluent) close(c net.Conn) {
339344
}
340345

341346
// connect establishes a new connection using the specified transport.
342-
func (f *Fluent) connect() (err error) {
347+
func (f *Fluent) connect(ctx context.Context) (err error) {
348+
f.muconn.Lock()
349+
defer f.muconn.Unlock()
343350

351+
dialer := net.Dialer{Timeout: f.Config.Timeout}
344352
switch f.Config.FluentNetwork {
345353
case "tcp":
346-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
354+
f.conn, err = dialer.DialContext(ctx,
355+
f.Config.FluentNetwork,
356+
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
347357
case "unix":
348-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
358+
f.conn, err = dialer.DialContext(ctx,
359+
f.Config.FluentNetwork,
360+
f.Config.FluentSocketPath)
349361
default:
350362
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
351363
}
364+
352365
return err
353366
}
354367

368+
func (f *Fluent) connectAsync(ctx context.Context, stopAsyncConnect <-chan bool) {
369+
f.wg.Add(1)
370+
defer f.wg.Done()
371+
372+
waiter := time.After(time.Duration(0))
373+
for i := 0; i < f.Config.MaxRetry; i++ {
374+
select {
375+
case <-waiter:
376+
err := f.connect(ctx)
377+
if err == nil {
378+
f.ready <- true
379+
break
380+
}
381+
382+
if _, ok := err.(*ErrUnknownNetwork); ok {
383+
// No need to retry on unknown network error. Thus false is passed
384+
// to ready channel to let the other end drain the message queue.
385+
f.ready <- false
386+
break
387+
}
388+
389+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
390+
if waitTime > f.Config.MaxRetryWait {
391+
waitTime = f.Config.MaxRetryWait
392+
}
393+
394+
waiter = time.After(time.Duration(waitTime) * time.Millisecond)
395+
case <-stopAsyncConnect:
396+
break
397+
}
398+
}
399+
}
400+
355401
func (f *Fluent) run() {
356402
drainEvents := false
357403
var emitEventDrainMsg sync.Once
404+
405+
// First we need to wait for the connection to become ready. We cannot
406+
// initialize the connection lazily (eg. when the first message is
407+
// received) because it'd be done in the first for-select iteration on
408+
// f.pending and this would block the select without letting the chance to
409+
// the select on f.stopRunning to signal its end to this goroutine.
410+
ctx, cancelDialing := context.WithCancel(context.Background())
411+
stopAsyncConnect := make(chan bool, 1)
412+
go f.connectAsync(ctx, stopAsyncConnect)
413+
for i := 0; i < f.Config.MaxRetry; i++ {
414+
select {
415+
case stopRunning, ok := <-f.stopRunning:
416+
if stopRunning || !ok {
417+
drainEvents = true
418+
}
419+
// Stop any connection dialing and then tell connectAsync to stop
420+
// trying to dial the connection. This has to be done in this
421+
// specifc order to make sure connectAsync() is not blocking on the
422+
// connection dialing.
423+
cancelDialing()
424+
close(stopAsyncConnect)
425+
break
426+
case ready, ok := <-f.ready:
427+
if !ready || !ok {
428+
drainEvents = true
429+
}
430+
break
431+
}
432+
}
433+
434+
// At this point we can go ahead: the connection is either ready to use or
435+
// drainEvents is true and thus all logs should be discarded.
358436
for {
359437
select {
360438
case entry, ok := <-f.pending:
@@ -389,31 +467,6 @@ func (f *Fluent) write(msg *msgToSend) error {
389467
var c net.Conn
390468
for i := 0; i < f.Config.MaxRetry; i++ {
391469
c = f.conn
392-
// Connect if needed
393-
if c == nil {
394-
f.muconn.Lock()
395-
if f.conn == nil {
396-
err := f.connect()
397-
if err != nil {
398-
f.muconn.Unlock()
399-
400-
if _, ok := err.(*ErrUnknownNetwork); ok {
401-
// do not retry on unknown network error
402-
break
403-
}
404-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
405-
if waitTime > f.Config.MaxRetryWait {
406-
waitTime = f.Config.MaxRetryWait
407-
}
408-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
409-
continue
410-
}
411-
}
412-
c = f.conn
413-
f.muconn.Unlock()
414-
}
415-
416-
// We're connected, write msg
417470
t := f.Config.WriteTimeout
418471
if time.Duration(0) < t {
419472
c.SetWriteDeadline(time.Now().Add(t))

0 commit comments

Comments
 (0)