Skip to content

bugfix: watcher events loss #289

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Several non-critical data race issues (#218)
- ConnectionPool does not properly handle disconnection with Opts.Reconnect
set (#272)
- Watcher events loss with a small per-request timeout (#284)

## [1.10.0] - 2022-12-31

Expand Down
22 changes: 15 additions & 7 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,10 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
}
shard.bufmut.Unlock()

if firstWritten {
conn.dirtyShard <- shardn
}

if req.Async() {
if fut = conn.fetchFuture(reqid); fut != nil {
resp := &Response{
Expand All @@ -1076,10 +1080,6 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
conn.markDone(fut)
}
}

if firstWritten {
conn.dirtyShard <- shardn
}
}

func (conn *Connection) markDone(fut *Future) {
Expand Down Expand Up @@ -1456,10 +1456,13 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
st <- state

if sendAck {
conn.Do(newWatchRequest(key)).Get()
// We expect a reconnect and re-subscribe if it fails to
// send the watch request. So it looks ok do not check a
// result.
// result. But we need to make sure that the re-watch
// request will not be finished by a small per-request
// timeout.
req := newWatchRequest(key).Context(context.Background())
conn.Do(req).Get()
}
}

Expand All @@ -1477,7 +1480,12 @@ func (conn *Connection) newWatcherImpl(key string, callback WatchCallback) (Watc
if !conn.ClosedNow() {
// conn.ClosedNow() check is a workaround for calling
// Unregister from connectionClose().
conn.Do(newUnwatchRequest(key)).Get()
//
// We need to make sure that the unwatch request will
// not be finished by a small per-request timeout to
// avoid lost of the request.
req := newUnwatchRequest(key).Context(context.Background())
conn.Do(req).Get()
}
conn.watchMap.Delete(key)
close(state.unready)
Expand Down
11 changes: 6 additions & 5 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3835,7 +3835,7 @@ func TestConnection_NewWatcher_concurrent(t *testing.T) {
var wg sync.WaitGroup
wg.Add(testConcurrency)

var ret error
errors := make(chan error, testConcurrency)
for i := 0; i < testConcurrency; i++ {
go func(i int) {
defer wg.Done()
Expand All @@ -3846,21 +3846,22 @@ func TestConnection_NewWatcher_concurrent(t *testing.T) {
close(events)
})
if err != nil {
ret = err
errors <- err
} else {
select {
case <-events:
case <-time.After(time.Second):
ret = fmt.Errorf("Unable to get an event %d", i)
errors <- fmt.Errorf("Unable to get an event %d", i)
}
watcher.Unregister()
}
}(i)
}
wg.Wait()
close(errors)

if ret != nil {
t.Fatalf("An error found: %s", ret)
for err := range errors {
t.Errorf("An error found: %s", err)
}
}

Expand Down