From 61880ab35f09c865412435ddc521ebbf73ff4aca Mon Sep 17 00:00:00 2001 From: JamesJJ Date: Fri, 6 Dec 2019 22:38:32 +0800 Subject: [PATCH 1/4] Add `AsyncStop` config option to facilitate graceful stop in async mode on Close() Signed-off-by: JamesJJ --- README.md | 5 +++++ fluent/fluent.go | 22 ++++++++++++++++++++-- fluent/fluent_test.go | 1 + fluent/testdata/config.json | 1 + 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c32e255..603ace7 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,11 @@ Since the default is zero value, Write will not time out. Enable asynchronous I/O (connect and write) for sending events to Fluentd. The default is false. +### AsyncStop + +Enables discarding bufferred events when Close() is called in Async mode. If Fluentd is continuously unavailable, Close() will block forever otherwise. +The default is false. + ### RequestAck Sets whether to request acknowledgment from Fluentd to increase the reliability diff --git a/fluent/fluent.go b/fluent/fluent.go index 03d671a..3607beb 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -49,6 +49,7 @@ type Config struct { MaxRetryWait int `json:"max_retry_wait"` TagPrefix string `json:"tag_prefix"` Async bool `json:"async"` + AsyncStop bool `json:"async_stop"` // Deprecated: Use Async instead AsyncConnect bool `json:"async_connect"` MarshalAsJSON bool `json:"marshal_as_json"` @@ -83,8 +84,9 @@ type msgToSend struct { type Fluent struct { Config - pending chan *msgToSend - wg sync.WaitGroup + stopRunning chan bool + pending chan *msgToSend + wg sync.WaitGroup muconn sync.Mutex conn net.Conn @@ -305,6 +307,10 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { if f.Config.Async { + if f.Config.AsyncStop { + f.stopRunning <- true + close(f.stopRunning) + } close(f.pending) f.wg.Wait() } @@ -347,13 +353,25 @@ func (f *Fluent) connect() (err error) { } func (f *Fluent) run() { + drainEvents := false for { + select { + case stopRunning, ok := <-f.stopRunning: + if stopRunning || !ok { + drainEvents = true + } +default: + } select { case entry, ok := <-f.pending: if !ok { f.wg.Done() return } + if drainEvents { + fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, discarding...\n", time.Now().Format(time.RFC3339)) + continue + } err := f.write(entry) if err != nil { fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index eed7f9a..b1e8337 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -260,6 +260,7 @@ func TestJsonConfig(t *testing.T) { MaxRetry: 3, TagPrefix: "fluent", Async: false, + AsyncStop: false, MarshalAsJSON: true, } diff --git a/fluent/testdata/config.json b/fluent/testdata/config.json index 5a6b501..7cee322 100644 --- a/fluent/testdata/config.json +++ b/fluent/testdata/config.json @@ -10,5 +10,6 @@ "max_retry":3, "tag_prefix":"fluent", "async": false, + "async_stop": false, "marshal_as_json": true } From 5b04f689e48c1160801a308655e4666c20604a86 Mon Sep 17 00:00:00 2001 From: JamesJJ Date: Tue, 18 Feb 2020 17:22:38 +0800 Subject: [PATCH 2/4] Rename AsyncStop to ForceStopAsyncSend Signed-off-by: JamesJJ --- README.md | 4 ++-- fluent/fluent.go | 30 +++++++++++++++--------------- fluent/fluent_test.go | 26 +++++++++++++------------- fluent/testdata/config.json | 2 +- 4 files changed, 31 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index 603ace7..ac02cea 100644 --- a/README.md +++ b/README.md @@ -69,9 +69,9 @@ Since the default is zero value, Write will not time out. Enable asynchronous I/O (connect and write) for sending events to Fluentd. The default is false. -### AsyncStop +### ForceStopAsyncSend -Enables discarding bufferred events when Close() is called in Async mode. If Fluentd is continuously unavailable, Close() will block forever otherwise. +When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning) The default is false. ### RequestAck diff --git a/fluent/fluent.go b/fluent/fluent.go index 3607beb..eda87d9 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -37,19 +37,19 @@ const ( ) type Config struct { - FluentPort int `json:"fluent_port"` - FluentHost string `json:"fluent_host"` - FluentNetwork string `json:"fluent_network"` - FluentSocketPath string `json:"fluent_socket_path"` - Timeout time.Duration `json:"timeout"` - WriteTimeout time.Duration `json:"write_timeout"` - BufferLimit int `json:"buffer_limit"` - RetryWait int `json:"retry_wait"` - MaxRetry int `json:"max_retry"` - MaxRetryWait int `json:"max_retry_wait"` - TagPrefix string `json:"tag_prefix"` - Async bool `json:"async"` - AsyncStop bool `json:"async_stop"` + FluentPort int `json:"fluent_port"` + FluentHost string `json:"fluent_host"` + FluentNetwork string `json:"fluent_network"` + FluentSocketPath string `json:"fluent_socket_path"` + Timeout time.Duration `json:"timeout"` + WriteTimeout time.Duration `json:"write_timeout"` + BufferLimit int `json:"buffer_limit"` + RetryWait int `json:"retry_wait"` + MaxRetry int `json:"max_retry"` + MaxRetryWait int `json:"max_retry_wait"` + TagPrefix string `json:"tag_prefix"` + Async bool `json:"async"` + ForceStopAsyncSend bool `json:"force_stop_async_send"` // Deprecated: Use Async instead AsyncConnect bool `json:"async_connect"` MarshalAsJSON bool `json:"marshal_as_json"` @@ -307,7 +307,7 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { if f.Config.Async { - if f.Config.AsyncStop { + if f.Config.ForceStopAsyncSend { f.stopRunning <- true close(f.stopRunning) } @@ -360,7 +360,7 @@ func (f *Fluent) run() { if stopRunning || !ok { drainEvents = true } -default: + default: } select { case entry, ok := <-f.pending: diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index b1e8337..02898b4 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -249,19 +249,19 @@ func TestJsonConfig(t *testing.T) { } var got Config expect := Config{ - FluentPort: 8888, - FluentHost: "localhost", - FluentNetwork: "tcp", - FluentSocketPath: "/var/tmp/fluent.sock", - Timeout: 3000, - WriteTimeout: 6000, - BufferLimit: 10, - RetryWait: 5, - MaxRetry: 3, - TagPrefix: "fluent", - Async: false, - AsyncStop: false, - MarshalAsJSON: true, + FluentPort: 8888, + FluentHost: "localhost", + FluentNetwork: "tcp", + FluentSocketPath: "/var/tmp/fluent.sock", + Timeout: 3000, + WriteTimeout: 6000, + BufferLimit: 10, + RetryWait: 5, + MaxRetry: 3, + TagPrefix: "fluent", + Async: false, + ForceStopAsyncSend: false, + MarshalAsJSON: true, } err = json.Unmarshal(b, &got) diff --git a/fluent/testdata/config.json b/fluent/testdata/config.json index 7cee322..ea9c895 100644 --- a/fluent/testdata/config.json +++ b/fluent/testdata/config.json @@ -10,6 +10,6 @@ "max_retry":3, "tag_prefix":"fluent", "async": false, - "async_stop": false, + "force_stop_async_send": false, "marshal_as_json": true } From aacea74066277afbf6a6cb045fc282a886cd3453 Mon Sep 17 00:00:00 2001 From: JamesJJ Date: Tue, 18 Feb 2020 21:54:24 +0800 Subject: [PATCH 3/4] Avoid excessive warning messages on StdErr Signed-off-by: JamesJJ --- fluent/fluent.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index eda87d9..f4b5f52 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -354,6 +354,7 @@ func (f *Fluent) connect() (err error) { func (f *Fluent) run() { drainEvents := false + var emitEventDrainMsg sync.Once for { select { case stopRunning, ok := <-f.stopRunning: @@ -369,7 +370,7 @@ func (f *Fluent) run() { return } if drainEvents { - fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, discarding...\n", time.Now().Format(time.RFC3339)) + emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) }) continue } err := f.write(entry) From 005bb40a0ff5b2ca7cb485378d6464951d78fe05 Mon Sep 17 00:00:00 2001 From: JamesJJ Date: Wed, 19 Feb 2020 11:32:46 +0800 Subject: [PATCH 4/4] Skip select if drainEvents Signed-off-by: JamesJJ --- fluent/fluent.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fluent/fluent.go b/fluent/fluent.go index f4b5f52..215bba3 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -356,13 +356,6 @@ func (f *Fluent) run() { drainEvents := false var emitEventDrainMsg sync.Once for { - select { - case stopRunning, ok := <-f.stopRunning: - if stopRunning || !ok { - drainEvents = true - } - default: - } select { case entry, ok := <-f.pending: if !ok { @@ -378,6 +371,13 @@ func (f *Fluent) run() { fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) } } + select { + case stopRunning, ok := <-f.stopRunning: + if stopRunning || !ok { + drainEvents = true + } + default: + } } }