From 8fa9901989f7365a665ab63ebccf177e39d1a851 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 10 May 2023 15:15:19 +0300 Subject: [PATCH 1/3] test: fix data race in NewWatcher test Goroutines could set a value to `ret` shared variable without protection. The shared variable has been replaced with a channel. Part of #284 --- tarantool_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tarantool_test.go b/tarantool_test.go index 125642dcf..3c2181f2e 100644 --- a/tarantool_test.go +++ b/tarantool_test.go @@ -3835,7 +3835,7 @@ func TestConnection_NewWatcher_concurrent(t *testing.T) { var wg sync.WaitGroup wg.Add(testConcurrency) - var ret error + errors := make(chan error, testConcurrency) for i := 0; i < testConcurrency; i++ { go func(i int) { defer wg.Done() @@ -3846,21 +3846,22 @@ func TestConnection_NewWatcher_concurrent(t *testing.T) { close(events) }) if err != nil { - ret = err + errors <- err } else { select { case <-events: case <-time.After(time.Second): - ret = fmt.Errorf("Unable to get an event %d", i) + errors <- fmt.Errorf("Unable to get an event %d", i) } watcher.Unregister() } }(i) } wg.Wait() + close(errors) - if ret != nil { - t.Fatalf("An error found: %s", ret) + for err := range errors { + t.Errorf("An error found: %s", err) } } From 986a9a1b692cd1e124529db15f53d49dd2ce13c3 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 10 May 2023 17:28:49 +0300 Subject: [PATCH 2/3] internal: finish an async request after it queued The change helps to make the order of execution and sending of async requests the same. As example: conn.Do(AsyncRequest1).Get() conn.Do(AsyncRequest2).Get() It is now guaranteed that AsyncRequest2 will be sent to the network after AsyncRequest1. Before the patch the order was undefined. Part of #284 --- connection.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/connection.go b/connection.go index 7da3f26d0..47505dfa7 100644 --- a/connection.go +++ b/connection.go @@ -1066,6 +1066,10 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { } shard.bufmut.Unlock() + if firstWritten { + conn.dirtyShard <- shardn + } + if req.Async() { if fut = conn.fetchFuture(reqid); fut != nil { resp := &Response{ @@ -1076,10 +1080,6 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) { conn.markDone(fut) } } - - if firstWritten { - conn.dirtyShard <- shardn - } } func (conn *Connection) markDone(fut *Future) { From a76dcd6393a9abd6763d783a72854992bbf6261a Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 10 May 2023 16:49:12 +0300 Subject: [PATCH 3/3] bugfix: watcher events loss Watchers may behave incorrectly if a request timeout is too small. A re-IPROTO_WATCH request may be not send to a server. It could lead to loss of events stream. It also could lead to a lost IPROTO_UNREGISTER request, but a user won't see the problem. Closes #284 --- CHANGELOG.md | 1 + connection.go | 14 +++++++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 24e7c5f65..80fc40f5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Several non-critical data race issues (#218) - ConnectionPool does not properly handle disconnection with Opts.Reconnect set (#272) +- Watcher events loss with a small per-request timeout (#284) ## [1.10.0] - 2022-12-31 diff --git a/connection.go b/connection.go index 47505dfa7..65d21365c 100644 --- a/connection.go +++ b/connection.go @@ -1456,10 +1456,13 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc st <- state if sendAck { - conn.Do(newWatchRequest(key)).Get() // We expect a reconnect and re-subscribe if it fails to // send the watch request. So it looks ok do not check a - // result. + // result. But we need to make sure that the re-watch + // request will not be finished by a small per-request + // timeout. + req := newWatchRequest(key).Context(context.Background()) + conn.Do(req).Get() } } @@ -1477,7 +1480,12 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc if !conn.ClosedNow() { // conn.ClosedNow() check is a workaround for calling // Unregister from connectionClose(). - conn.Do(newUnwatchRequest(key)).Get() + // + // We need to make sure that the unwatch request will + // not be finished by a small per-request timeout to + // avoid lost of the request. + req := newUnwatchRequest(key).Context(context.Background()) + conn.Do(req).Get() } conn.watchMap.Delete(key) close(state.unready)