1
1
package fluent
2
2
3
3
import (
4
+ "context"
4
5
"encoding/json"
5
6
"errors"
6
7
"fmt"
7
8
"math"
9
+ "math/rand"
8
10
"net"
9
11
"os"
10
12
"reflect"
@@ -15,7 +17,6 @@ import (
15
17
"bytes"
16
18
"encoding/base64"
17
19
"encoding/binary"
18
- "math/rand"
19
20
20
21
"github.com/tinylib/msgp/msgp"
21
22
)
@@ -37,6 +38,13 @@ const (
37
38
defaultSubSecondPrecision = false
38
39
)
39
40
41
+ // randomGenerator is used by getUniqueId to generate ack hashes.
42
+ // Unfortunately, using a seeded random number generator isn't enough to ensure
43
+ // ack hashes are deterministically generated during tests, thus we need to
44
+ // change randomGenerator value to ensure the hashes are stable during tests
45
+ // such that we can expect how the logger behaves with RequestAck option enabled.
46
+ var randomGenerator = rand .Uint64
47
+
40
48
type Config struct {
41
49
FluentPort int `json:"fluent_port"`
42
50
FluentHost string `json:"fluent_host"`
@@ -85,15 +93,22 @@ type msgToSend struct {
85
93
type Fluent struct {
86
94
Config
87
95
88
- dialer dialer
89
- stopRunning chan bool
90
- pending chan * msgToSend
91
- wg sync.WaitGroup
96
+ dialer dialer
97
+ // stopRunning is used in async mode to signal to run() it should abort.
98
+ stopRunning chan struct {}
99
+ // cancelDialings is used by Close() to stop any in-progress dialing.
100
+ cancelDialings context.CancelFunc
101
+ pending chan * msgToSend
102
+ wg sync.WaitGroup
92
103
93
104
muconn sync.Mutex
94
105
conn net.Conn
95
106
}
96
107
108
+ type dialer interface {
109
+ DialContext (ctx context.Context , network , address string ) (net.Conn , error )
110
+ }
111
+
97
112
// New creates a new Logger.
98
113
func New (config Config ) (* Fluent , error ) {
99
114
if config .Timeout == 0 {
@@ -104,10 +119,6 @@ func New(config Config) (*Fluent, error) {
104
119
})
105
120
}
106
121
107
- type dialer interface {
108
- Dial (string , string ) (net.Conn , error )
109
- }
110
-
111
122
func newWithDialer (config Config , d dialer ) (f * Fluent , err error ) {
112
123
if config .FluentNetwork == "" {
113
124
config .FluentNetwork = defaultNetwork
@@ -142,20 +153,24 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
142
153
}
143
154
144
155
if config .Async {
156
+ ctx , cancel := context .WithCancel (context .Background ())
157
+
145
158
f = & Fluent {
146
- Config : config ,
147
- dialer : d ,
148
- pending : make (chan * msgToSend , config .BufferLimit ),
149
- stopRunning : make (chan bool , 1 ),
159
+ Config : config ,
160
+ dialer : d ,
161
+ stopRunning : make (chan struct {}),
162
+ cancelDialings : cancel ,
163
+ pending : make (chan * msgToSend , config .BufferLimit ),
150
164
}
165
+
151
166
f .wg .Add (1 )
152
- go f .run ()
167
+ go f .run (ctx )
153
168
} else {
154
169
f = & Fluent {
155
170
Config : config ,
156
171
dialer : d ,
157
172
}
158
- err = f .connect ()
173
+ err = f .connect (context . Background () )
159
174
}
160
175
return
161
176
}
@@ -252,7 +267,7 @@ func (f *Fluent) postRawData(msg *msgToSend) error {
252
267
return f .appendBuffer (msg )
253
268
}
254
269
// Synchronous write
255
- return f .write (msg )
270
+ return f .write (context . Background (), msg )
256
271
}
257
272
258
273
// For sending forward protocol adopted JSON
@@ -286,7 +301,7 @@ func getUniqueID(timeUnix int64) (string, error) {
286
301
enc .Close ()
287
302
return "" , err
288
303
}
289
- if err := binary .Write (enc , binary .LittleEndian , rand . Uint64 ()); err != nil {
304
+ if err := binary .Write (enc , binary .LittleEndian , randomGenerator ()); err != nil {
290
305
enc .Close ()
291
306
return "" , err
292
307
}
@@ -326,13 +341,18 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg
326
341
func (f * Fluent ) Close () (err error ) {
327
342
if f .Config .Async {
328
343
if f .Config .ForceStopAsyncSend {
329
- f .stopRunning <- true
330
344
close (f .stopRunning )
345
+ f .cancelDialings ()
331
346
}
347
+
332
348
close (f .pending )
333
349
f .wg .Wait ()
334
350
}
335
- f .close (f .conn )
351
+
352
+ f .muconn .Lock ()
353
+ f .close ()
354
+ f .muconn .Unlock ()
355
+
336
356
return
337
357
}
338
358
@@ -346,58 +366,91 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
346
366
return nil
347
367
}
348
368
349
- // close closes the connection.
350
- func (f * Fluent ) close (c net.Conn ) {
351
- f .muconn .Lock ()
352
- if f .conn != nil && f .conn == c {
369
+ // close closes the connection. Callers should take care of locking muconn first.
370
+ func (f * Fluent ) close () {
371
+ if f .conn != nil {
353
372
f .conn .Close ()
354
373
f .conn = nil
355
374
}
356
- f .muconn .Unlock ()
357
375
}
358
376
359
377
// connect establishes a new connection using the specified transport.
360
- func (f * Fluent ) connect () (err error ) {
378
+ func (f * Fluent ) connect (ctx context. Context ) (err error ) {
361
379
switch f .Config .FluentNetwork {
362
380
case "tcp" :
363
- f .conn , err = f .dialer .Dial (
381
+ f .conn , err = f .dialer .DialContext ( ctx ,
364
382
f .Config .FluentNetwork ,
365
383
f .Config .FluentHost + ":" + strconv .Itoa (f .Config .FluentPort ))
366
384
case "unix" :
367
- f .conn , err = f .dialer .Dial (
385
+ f .conn , err = f .dialer .DialContext ( ctx ,
368
386
f .Config .FluentNetwork ,
369
387
f .Config .FluentSocketPath )
370
388
default :
371
389
err = NewErrUnknownNetwork (f .Config .FluentNetwork )
372
390
}
391
+
373
392
return err
374
393
}
375
394
376
- func (f * Fluent ) run () {
377
- drainEvents := false
378
- var emitEventDrainMsg sync.Once
395
+ var errIsClosing = errors .New ("fluent logger is closing" )
396
+
397
+ func (f * Fluent ) connectOrRetry (ctx context.Context ) error {
398
+ // Use a Time channel instead of time.Sleep() to avoid blocking this
399
+ // goroutine during possibly way too much time (because of the exponential
400
+ // back-off retry).
401
+ waiter := time .After (time .Duration (0 ))
402
+ for i := 0 ; i < f .Config .MaxRetry ; i ++ {
403
+ select {
404
+ case <- waiter :
405
+ err := f .connect (ctx )
406
+ if err == nil {
407
+ return nil
408
+ }
409
+
410
+ if _ , ok := err .(* ErrUnknownNetwork ); ok {
411
+ return err
412
+ }
413
+ if err == context .Canceled {
414
+ return errIsClosing
415
+ }
416
+
417
+ waitTime := f .Config .RetryWait * e (defaultReconnectWaitIncreRate , float64 (i - 1 ))
418
+ if waitTime > f .Config .MaxRetryWait {
419
+ waitTime = f .Config .MaxRetryWait
420
+ }
421
+
422
+ waiter = time .After (time .Duration (waitTime ) * time .Millisecond )
423
+
424
+ fmt .Fprintf (os .Stderr , "[%s] An error happened during connect: %s. Retrying to connect in %dms." , time .Now ().Format (time .RFC3339 ), err , waitTime )
425
+ case <- ctx .Done ():
426
+ return errIsClosing
427
+ }
428
+ }
429
+
430
+ return fmt .Errorf ("could not connect to fluentd after %d retries" , f .Config .MaxRetry )
431
+ }
432
+
433
+ // run is the goroutine used to unqueue and write logs in async mode. That
434
+ // goroutine is meant to run during the whole life of the Fluent logger.
435
+ func (f * Fluent ) run (ctx context.Context ) {
379
436
for {
380
437
select {
381
438
case entry , ok := <- f .pending :
439
+ // f.stopRunning is closed before f.pending only when ForceStopAsyncSend
440
+ // is enabled. Otherwise, f.pending is closed when Close() is called.
382
441
if ! ok {
383
442
f .wg .Done ()
384
443
return
385
444
}
386
- if drainEvents {
387
- emitEventDrainMsg .Do (func () { fmt .Fprintf (os .Stderr , "[%s] Discarding queued events...\n " , time .Now ().Format (time .RFC3339 )) })
388
- continue
389
- }
390
- err := f .write (entry )
391
- if err != nil {
445
+
446
+ if err := f .write (ctx , entry ); err != nil && err != errIsClosing {
392
447
fmt .Fprintf (os .Stderr , "[%s] Unable to send logs to fluentd, reconnecting...\n " , time .Now ().Format (time .RFC3339 ))
393
448
}
394
- }
395
- select {
396
- case stopRunning , ok := <- f .stopRunning :
397
- if stopRunning || ! ok {
398
- drainEvents = true
399
- }
400
- default :
449
+ case <- f .stopRunning :
450
+ fmt .Fprintf (os .Stderr , "[%s] Discarding queued events...\n " , time .Now ().Format (time .RFC3339 ))
451
+
452
+ f .wg .Done ()
453
+ return
401
454
}
402
455
}
403
456
}
@@ -406,63 +459,59 @@ func e(x, y float64) int {
406
459
return int (math .Pow (x , y ))
407
460
}
408
461
409
- func (f * Fluent ) write (msg * msgToSend ) error {
410
- var c net.Conn
411
- for i := 0 ; i < f .Config .MaxRetry ; i ++ {
412
- c = f .conn
413
- // Connect if needed
414
- if c == nil {
415
- f .muconn .Lock ()
416
- if f .conn == nil {
417
- err := f .connect ()
418
- if err != nil {
419
- f .muconn .Unlock ()
420
-
421
- if _ , ok := err .(* ErrUnknownNetwork ); ok {
422
- // do not retry on unknown network error
423
- break
424
- }
425
- waitTime := f .Config .RetryWait * e (defaultReconnectWaitIncreRate , float64 (i - 1 ))
426
- if waitTime > f .Config .MaxRetryWait {
427
- waitTime = f .Config .MaxRetryWait
428
- }
429
- time .Sleep (time .Duration (waitTime ) * time .Millisecond )
430
- continue
431
- }
462
+ func (f * Fluent ) write (ctx context.Context , msg * msgToSend ) error {
463
+ writer := func () (bool , error ) {
464
+ // This function is used to ensure muconn is properly locked and unlocked
465
+ // between each retry. This gives the importunity to other goroutines to
466
+ // lock it (e.g. to close the connection).
467
+ f .muconn .Lock ()
468
+ defer f .muconn .Unlock ()
469
+
470
+ if f .conn == nil {
471
+ if err := f .connectOrRetry (ctx ); err != nil {
472
+ return false , err
432
473
}
433
- c = f .conn
434
- f .muconn .Unlock ()
435
474
}
436
475
437
- // We're connected, write msg
438
476
t := f .Config .WriteTimeout
439
477
if time .Duration (0 ) < t {
440
- c .SetWriteDeadline (time .Now ().Add (t ))
478
+ f . conn .SetWriteDeadline (time .Now ().Add (t ))
441
479
} else {
442
- c .SetWriteDeadline (time.Time {})
480
+ f . conn .SetWriteDeadline (time.Time {})
443
481
}
444
- _ , err := c .Write (msg .data )
482
+
483
+ _ , err := f .conn .Write (msg .data )
445
484
if err != nil {
446
- f .close (c )
447
- } else {
448
- // Acknowledgment check
449
- if msg .ack != "" {
450
- resp := & AckResp {}
451
- if f .Config .MarshalAsJSON {
452
- dec := json .NewDecoder (c )
453
- err = dec .Decode (resp )
454
- } else {
455
- r := msgp .NewReader (c )
456
- err = resp .DecodeMsg (r )
457
- }
458
- if err != nil || resp .Ack != msg .ack {
459
- f .close (c )
460
- continue
461
- }
485
+ f .close ()
486
+ return true , err
487
+ }
488
+
489
+ // Acknowledgment check
490
+ if msg .ack != "" {
491
+ resp := & AckResp {}
492
+ if f .Config .MarshalAsJSON {
493
+ dec := json .NewDecoder (f .conn )
494
+ err = dec .Decode (resp )
495
+ } else {
496
+ r := msgp .NewReader (f .conn )
497
+ err = resp .DecodeMsg (r )
462
498
}
499
+
500
+ if err != nil || resp .Ack != msg .ack {
501
+ fmt .Fprintf (os .Stderr , "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection..." , resp .Ack , msg .ack )
502
+ f .close ()
503
+ return true , err
504
+ }
505
+ }
506
+
507
+ return false , nil
508
+ }
509
+
510
+ for i := 0 ; i < f .Config .MaxRetry ; i ++ {
511
+ if retry , err := writer (); ! retry {
463
512
return err
464
513
}
465
514
}
466
515
467
- return fmt .Errorf ("fluent#write: failed to reconnect, max retry: %v " , f .Config .MaxRetry )
516
+ return fmt .Errorf ("fluent#write: failed to write after %d attempts " , f .Config .MaxRetry )
468
517
}
0 commit comments