diff --git a/README.md b/README.md index c32e255..ac02cea 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,11 @@ Since the default is zero value, Write will not time out. Enable asynchronous I/O (connect and write) for sending events to Fluentd. The default is false. +### ForceStopAsyncSend + +When Async is enabled, immediately discard the event queue on close() and return (instead of trying MaxRetry times for each event in the queue before returning) +The default is false. + ### RequestAck Sets whether to request acknowledgment from Fluentd to increase the reliability diff --git a/fluent/fluent.go b/fluent/fluent.go index 03d671a..215bba3 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -37,18 +37,19 @@ const ( ) type Config struct { - FluentPort int `json:"fluent_port"` - FluentHost string `json:"fluent_host"` - FluentNetwork string `json:"fluent_network"` - FluentSocketPath string `json:"fluent_socket_path"` - Timeout time.Duration `json:"timeout"` - WriteTimeout time.Duration `json:"write_timeout"` - BufferLimit int `json:"buffer_limit"` - RetryWait int `json:"retry_wait"` - MaxRetry int `json:"max_retry"` - MaxRetryWait int `json:"max_retry_wait"` - TagPrefix string `json:"tag_prefix"` - Async bool `json:"async"` + FluentPort int `json:"fluent_port"` + FluentHost string `json:"fluent_host"` + FluentNetwork string `json:"fluent_network"` + FluentSocketPath string `json:"fluent_socket_path"` + Timeout time.Duration `json:"timeout"` + WriteTimeout time.Duration `json:"write_timeout"` + BufferLimit int `json:"buffer_limit"` + RetryWait int `json:"retry_wait"` + MaxRetry int `json:"max_retry"` + MaxRetryWait int `json:"max_retry_wait"` + TagPrefix string `json:"tag_prefix"` + Async bool `json:"async"` + ForceStopAsyncSend bool `json:"force_stop_async_send"` // Deprecated: Use Async instead AsyncConnect bool `json:"async_connect"` MarshalAsJSON bool `json:"marshal_as_json"` @@ -83,8 +84,9 @@ type msgToSend struct { type Fluent struct { Config - pending chan *msgToSend - wg sync.WaitGroup + stopRunning chan bool + pending chan *msgToSend + wg sync.WaitGroup muconn sync.Mutex conn net.Conn @@ -305,6 +307,10 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg // Close closes the connection, waiting for pending logs to be sent func (f *Fluent) Close() (err error) { if f.Config.Async { + if f.Config.ForceStopAsyncSend { + f.stopRunning <- true + close(f.stopRunning) + } close(f.pending) f.wg.Wait() } @@ -347,6 +353,8 @@ func (f *Fluent) connect() (err error) { } func (f *Fluent) run() { + drainEvents := false + var emitEventDrainMsg sync.Once for { select { case entry, ok := <-f.pending: @@ -354,11 +362,22 @@ func (f *Fluent) run() { f.wg.Done() return } + if drainEvents { + emitEventDrainMsg.Do(func() { fmt.Fprintf(os.Stderr, "[%s] Discarding queued events...\n", time.Now().Format(time.RFC3339)) }) + continue + } err := f.write(entry) if err != nil { fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339)) } } + select { + case stopRunning, ok := <-f.stopRunning: + if stopRunning || !ok { + drainEvents = true + } + default: + } } } diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index eed7f9a..02898b4 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -249,18 +249,19 @@ func TestJsonConfig(t *testing.T) { } var got Config expect := Config{ - FluentPort: 8888, - FluentHost: "localhost", - FluentNetwork: "tcp", - FluentSocketPath: "/var/tmp/fluent.sock", - Timeout: 3000, - WriteTimeout: 6000, - BufferLimit: 10, - RetryWait: 5, - MaxRetry: 3, - TagPrefix: "fluent", - Async: false, - MarshalAsJSON: true, + FluentPort: 8888, + FluentHost: "localhost", + FluentNetwork: "tcp", + FluentSocketPath: "/var/tmp/fluent.sock", + Timeout: 3000, + WriteTimeout: 6000, + BufferLimit: 10, + RetryWait: 5, + MaxRetry: 3, + TagPrefix: "fluent", + Async: false, + ForceStopAsyncSend: false, + MarshalAsJSON: true, } err = json.Unmarshal(b, &got) diff --git a/fluent/testdata/config.json b/fluent/testdata/config.json index 5a6b501..ea9c895 100644 --- a/fluent/testdata/config.json +++ b/fluent/testdata/config.json @@ -10,5 +10,6 @@ "max_retry":3, "tag_prefix":"fluent", "async": false, + "force_stop_async_send": false, "marshal_as_json": true }