diff --git a/fluent/fluent.go b/fluent/fluent.go index 43a70ea..a7bb3e7 100644 --- a/fluent/fluent.go +++ b/fluent/fluent.go @@ -98,7 +98,7 @@ type Fluent struct { cancelDialings context.CancelFunc pending chan *msgToSend pendingMutex sync.RWMutex - chanClosed bool + closed bool wg sync.WaitGroup muconn sync.RWMutex @@ -274,7 +274,11 @@ func (f *Fluent) postRawData(msg *msgToSend) error { if f.Config.Async { return f.appendBuffer(msg) } + // Synchronous write + if f.closed { + return fmt.Errorf("fluent#postRawData: Logger already closed") + } return f.writeWithRetry(context.Background(), msg) } @@ -350,11 +354,11 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (msg func (f *Fluent) Close() (err error) { if f.Config.Async { f.pendingMutex.Lock() - if f.chanClosed { + if f.closed { f.pendingMutex.Unlock() return nil } - f.chanClosed = true + f.closed = true f.pendingMutex.Unlock() if f.Config.ForceStopAsyncSend { @@ -364,7 +368,7 @@ func (f *Fluent) Close() (err error) { close(f.pending) // If ForceStopAsyncSend is false, all logs in the channel have to be sent - // before closing the connection. At this point chanClosed is true so no more + // before closing the connection. At this point closed is true so no more // logs are written to the channel and f.pending has been closed, so run() // goroutine will exit as soon as all logs in the channel are sent. if !f.Config.ForceStopAsyncSend { @@ -374,6 +378,7 @@ func (f *Fluent) Close() (err error) { f.muconn.Lock() f.close() + f.closed = true f.muconn.Unlock() // If ForceStopAsyncSend is true, we shall close the connection before waiting for @@ -383,7 +388,6 @@ func (f *Fluent) Close() (err error) { if f.Config.ForceStopAsyncSend { f.wg.Wait() } - return } @@ -391,7 +395,7 @@ func (f *Fluent) Close() (err error) { func (f *Fluent) appendBuffer(msg *msgToSend) error { f.pendingMutex.RLock() defer f.pendingMutex.RUnlock() - if f.chanClosed { + if f.closed { return fmt.Errorf("fluent#appendBuffer: Logger already closed") } select { diff --git a/fluent/fluent_test.go b/fluent/fluent_test.go index 3c3a2da..00a6086 100644 --- a/fluent/fluent_test.go +++ b/fluent/fluent_test.go @@ -732,6 +732,39 @@ func TestCloseWhileWaitingForAckResponse(t *testing.T) { }, "failed to close the logger") } +func TestSyncWriteAfterCloseFails(t *testing.T) { + d := newTestDialer() + + go func() { + f, err := newWithDialer(Config{Async: false}, d) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + err = f.PostWithTime("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + err = f.Close() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Now let's post some event after Fluent.Close(). + err = f.PostWithTime("tag_name", time.Unix(1482493050, 0), map[string]string{"foo": "buzz"}) + + // The event submission must fail, + assert.NotEqual(t, err, nil); + + // and also must keep Fluentd closed. + assert.NotEqual(t, f.closed, false); + }() + + conn := d.waitForNextDialing(true, false) + conn.waitForNextWrite(true, "") +} + func Benchmark_PostWithShortMessage(b *testing.B) { b.StopTimer() d := newTestDialer()