Skip to content

Commit 401c01b

Browse files
author
Albin Kerouanton
committed
Try to asynchronously connect to Fluentd before writing
PR fluent#77 introduced a new parameter named ForceStopAsyncSend. It can be used to tell the logger to not try to send all the log messages in its buffer before closing. Without this parameter, the logger hangs out whenever it has logs to write and the target Fluentd server is down. But this new parameter is not enough: the connection is currently lazily initialized when the logger receive its first message. This blocks the select reading messages from the queue (until the connection is ready). Moreover, the connection dialing uses an exponential back-off retry. Because of that, the logger won't look for messages on `stopRunning` channel (the channel introduced by PR fluent#77), either because it's blocked by the Sleep used for the retry or because the connection dialing is waiting for tcp timeout. To fix these edge cases, the connection isn't initialized lazily anymore. However, it's still initialized asynchronously and with the exponential back-off retry. The Fluent.run() method has been adapted to wait for either the connection to become ready or to receive a stop signal on the stopRunning channel before starting to unqueue logs. This fix is motivated by the issue described in: moby/moby#40063. Signed-off-by: Albin Kerouanton <[email protected]>
1 parent 28f6a3e commit 401c01b

File tree

2 files changed

+112
-69
lines changed

2 files changed

+112
-69
lines changed

fluent/fluent.go

Lines changed: 99 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fluent
22

33
import (
4+
"context"
45
"encoding/json"
56
"errors"
67
"fmt"
@@ -15,8 +16,9 @@ import (
1516
"bytes"
1617
"encoding/base64"
1718
"encoding/binary"
18-
"github.com/tinylib/msgp/msgp"
1919
"math/rand"
20+
21+
"github.com/tinylib/msgp/msgp"
2022
)
2123

2224
const (
@@ -84,6 +86,7 @@ type msgToSend struct {
8486
type Fluent struct {
8587
Config
8688

89+
ready chan bool
8790
stopRunning chan bool
8891
pending chan *msgToSend
8992
wg sync.WaitGroup
@@ -130,14 +133,16 @@ func New(config Config) (f *Fluent, err error) {
130133
}
131134
if config.Async {
132135
f = &Fluent{
133-
Config: config,
134-
pending: make(chan *msgToSend, config.BufferLimit),
136+
Config: config,
137+
ready: make(chan bool),
138+
stopRunning: make(chan bool),
139+
pending: make(chan *msgToSend, config.BufferLimit),
135140
}
136141
f.wg.Add(1)
137142
go f.run()
138143
} else {
139144
f = &Fluent{Config: config}
140-
err = f.connect()
145+
err = f.connect(context.Background())
141146
}
142147
return
143148
}
@@ -339,38 +344,112 @@ func (f *Fluent) close(c net.Conn) {
339344
}
340345

341346
// connect establishes a new connection using the specified transport.
342-
func (f *Fluent) connect() (err error) {
347+
func (f *Fluent) connect(ctx context.Context) (err error) {
348+
f.muconn.Lock()
349+
defer f.muconn.Unlock()
350+
351+
dialer := net.Dialer{Timeout: f.Config.Timeout}
343352

344353
switch f.Config.FluentNetwork {
345354
case "tcp":
346-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort), f.Config.Timeout)
355+
f.conn, err = dialer.DialContext(ctx,
356+
f.Config.FluentNetwork,
357+
f.Config.FluentHost+":"+strconv.Itoa(f.Config.FluentPort))
347358
case "unix":
348-
f.conn, err = net.DialTimeout(f.Config.FluentNetwork, f.Config.FluentSocketPath, f.Config.Timeout)
359+
f.conn, err = dialer.DialContext(ctx,
360+
f.Config.FluentNetwork,
361+
f.Config.FluentSocketPath)
349362
default:
350363
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
351364
}
365+
352366
return err
353367
}
354368

355-
func (f *Fluent) run() {
356-
drainEvents := false
357-
var emitEventDrainMsg sync.Once
358-
for {
369+
func (f *Fluent) connectAsync(ctx context.Context, stopAsyncConnect <-chan bool) {
370+
defer f.wg.Done()
371+
372+
waiter := time.After(time.Duration(0))
373+
for i := 0; i < f.Config.MaxRetry; i++ {
359374
select {
360-
case entry, ok := <-f.pending:
361-
if !ok {
362-
f.wg.Done()
375+
case <-waiter:
376+
if f.conn != nil {
377+
f.ready <- true
378+
}
379+
380+
err := f.connect(ctx)
381+
if err == nil {
382+
f.ready <- true
363383
return
364384
}
365-
if drainEvents {
366-
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
367-
continue
385+
386+
if _, ok := err.(*ErrUnknownNetwork); ok {
387+
// No need to retry on unknown network error. Thus false is passed
388+
// to ready channel to let the other end drain the message queue.
389+
f.ready <- false
390+
return
368391
}
369-
err := f.write(entry)
370-
if err != nil {
371-
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
392+
393+
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
394+
if waitTime > f.Config.MaxRetryWait {
395+
waitTime = f.Config.MaxRetryWait
372396
}
397+
398+
waiter = time.After(time.Duration(waitTime) * time.Millisecond)
399+
case <-stopAsyncConnect:
400+
return
373401
}
402+
}
403+
}
404+
405+
func (f *Fluent) run() {
406+
drainEvents := false
407+
var emitEventDrainMsg sync.Once
408+
409+
// First we need to wait for the connection to become ready. We cannot
410+
// initialize the connection lazily (eg. when the first message is
411+
// received) because it'd be done in the first for-select iteration on
412+
// f.pending and this would block the select without letting the chance to
413+
// the select on f.stopRunning to signal its end to this goroutine.
414+
ctx, cancelDialing := context.WithCancel(context.Background())
415+
stopAsyncConnect := make(chan bool)
416+
f.wg.Add(1)
417+
go f.connectAsync(ctx, stopAsyncConnect)
418+
419+
select {
420+
case <-f.stopRunning:
421+
drainEvents = true
422+
// Stop any connection dialing and then tell connectAsync to stop
423+
// trying to dial the connection. This has to be done in this
424+
// specifc order to make sure connectAsync() is not blocking on the
425+
// connection dialing.
426+
cancelDialing()
427+
close(stopAsyncConnect)
428+
break
429+
case ready, ok := <-f.ready:
430+
if !ready || !ok {
431+
drainEvents = true
432+
}
433+
break
434+
}
435+
436+
// At this point we can go ahead: the connection is either ready to use or
437+
// drainEvents is true and thus all logs should be discarded.
438+
for {
439+
entry, ok := <-f.pending
440+
if !ok {
441+
f.wg.Done()
442+
return
443+
}
444+
if drainEvents {
445+
emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) })
446+
continue
447+
}
448+
err := f.write(entry)
449+
if err != nil {
450+
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))
451+
}
452+
374453
select {
375454
case stopRunning, ok := <-f.stopRunning:
376455
if stopRunning || !ok {
@@ -389,31 +468,6 @@ func (f *Fluent) write(msg *msgToSend) error {
389468
var c net.Conn
390469
for i := 0; i < f.Config.MaxRetry; i++ {
391470
c = f.conn
392-
// Connect if needed
393-
if c == nil {
394-
f.muconn.Lock()
395-
if f.conn == nil {
396-
err := f.connect()
397-
if err != nil {
398-
f.muconn.Unlock()
399-
400-
if _, ok := err.(*ErrUnknownNetwork); ok {
401-
// do not retry on unknown network error
402-
break
403-
}
404-
waitTime := f.Config.RetryWait * e(defaultReconnectWaitIncreRate, float64(i-1))
405-
if waitTime > f.Config.MaxRetryWait {
406-
waitTime = f.Config.MaxRetryWait
407-
}
408-
time.Sleep(time.Duration(waitTime) * time.Millisecond)
409-
continue
410-
}
411-
}
412-
c = f.conn
413-
f.muconn.Unlock()
414-
}
415-
416-
// We're connected, write msg
417471
t := f.Config.WriteTimeout
418472
if time.Duration(0) < t {
419473
c.SetWriteDeadline(time.Now().Add(t))

fluent/fluent_test.go

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ func Test_New_itShouldUseConfigValuesFromMashalAsJSONArgument(t *testing.T) {
140140
}
141141

142142
func Test_send_WritePendingToConn(t *testing.T) {
143-
f, _ := New(Config{Async: true})
143+
f, _ := New(Config{
144+
Async: true,
145+
ForceStopAsyncSend: true,
146+
})
144147

145148
conn := &Conn{}
146149
f.conn = conn
@@ -274,31 +277,17 @@ func TestJsonConfig(t *testing.T) {
274277
}
275278
}
276279

277-
func TestAsyncConnect(t *testing.T) {
278-
type result struct {
279-
f *Fluent
280-
err error
280+
func TestAsyncConnectDoesNotPreventClose(t *testing.T) {
281+
config := Config{
282+
FluentPort: 6666,
283+
Async: true,
284+
ForceStopAsyncSend: true,
281285
}
282-
ch := make(chan result, 1)
283-
go func() {
284-
config := Config{
285-
FluentPort: 8888,
286-
Async: true,
287-
}
288-
f, err := New(config)
289-
ch <- result{f: f, err: err}
290-
}()
291-
292-
select {
293-
case res := <-ch:
294-
if res.err != nil {
295-
t.Errorf("fluent.New() failed with %#v", res.err)
296-
return
297-
}
298-
res.f.Close()
299-
case <-time.After(time.Millisecond * 500):
300-
t.Error("Async must not block")
286+
f, err := New(config)
287+
if err != nil {
288+
t.Errorf("Unexpected error: %v", err)
301289
}
290+
f.Close()
302291
}
303292

304293
func Test_PostWithTimeNotTimeOut(t *testing.T) {

0 commit comments

Comments
 (0)