Skip to content

Commit b40aeb4

Browse files
committed
api: add events subscription support
A user can create watcher by the Connection.NewWatcher() call: watcher = conn.NewWatcker("key", func(event WatchEvent) { // The callback code. }) After that, the watcher callback is invoked for the first time. In this case, the callback is triggered whether or not the key has already been broadcast. All subsequent invocations are triggered with box.broadcast() called on the remote host. If a watcher is subscribed for a key that has not been broadcast yet, the callback is triggered only once, after the registration of the watcher. If the key is updated while the watcher callback is running, the callback will be invoked again with the latest value as soon as it returns. Multiple watchers can be created for one key. If you don’t need the watcher anymore, you can unregister it using the Unregister method: watcher.Unregister() The api is similar to net.box implementation [1]. It also adds a BroadcastRequest to make it easier to send broadcast messages. 1. https://www.tarantool.io/ru/doc/latest/reference/reference_lua/net_box/#conn-watch Closes #119
1 parent 48cf0c7 commit b40aeb4

9 files changed

+750
-9
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1010

1111
### Added
1212

13+
- Event subscription support (#119)
14+
1315
### Changed
1416

1517
### Fixed

connection.go

+247-7
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ const (
5353
// LogUnexpectedResultId is logged when response with unknown id was received.
5454
// Most probably it is due to request timeout.
5555
LogUnexpectedResultId
56+
// LogReadWatchEventFailed is logged when failed to read a watch event.
57+
LogReadWatchEventFailed
5658
)
5759

5860
// ConnEvent is sent throw Notify channel specified in Opts.
@@ -62,6 +64,12 @@ type ConnEvent struct {
6264
When time.Time
6365
}
6466

67+
// A raw watch event.
68+
type connWatchEvent struct {
69+
key string
70+
value interface{}
71+
}
72+
6573
var epoch = time.Now()
6674

6775
// Logger is logger type expected to be passed in options.
@@ -83,6 +91,9 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
8391
case LogUnexpectedResultId:
8492
resp := v[0].(*Response)
8593
log.Printf("tarantool: connection %s got unexpected resultId (%d) in response", conn.addr, resp.RequestId)
94+
case LogReadWatchEventFailed:
95+
err := v[0].(error)
96+
log.Printf("tarantool: unable to parse watch event: %s\n", err)
8697
default:
8798
args := append([]interface{}{"tarantool: unexpected event ", event, conn}, v...)
8899
log.Print(args...)
@@ -146,6 +157,9 @@ type Connection struct {
146157
lenbuf [PacketLengthBytes]byte
147158

148159
lastStreamId uint64
160+
161+
eventMap sync.Map
162+
eventCh chan connWatchEvent
149163
}
150164

151165
var _ = Connector(&Connection{}) // Check compatibility with connector interface.
@@ -321,6 +335,7 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
321335
control: make(chan struct{}),
322336
opts: opts,
323337
dec: newDecoder(&smallBuf{}),
338+
eventCh: make(chan connWatchEvent, 1024),
324339
}
325340
maxprocs := uint32(runtime.GOMAXPROCS(-1))
326341
if conn.opts.Concurrency == 0 || conn.opts.Concurrency > maxprocs*128 {
@@ -355,6 +370,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
355370
conn.opts.Logger = defaultLogger{}
356371
}
357372

373+
go conn.eventer(conn.eventCh)
374+
358375
if err = conn.createConnection(false); err != nil {
359376
ter, ok := err.(Error)
360377
if conn.opts.Reconnect <= 0 {
@@ -520,7 +537,20 @@ func (conn *Connection) dial() (err error) {
520537
}
521538
}
522539

523-
// Only if connected and authenticated.
540+
// Watchers
541+
conn.eventMap.Range(func(key, value interface{}) bool {
542+
req := newWatchRequest(key.(string))
543+
if err = conn.writeRequest(w, req); err != nil {
544+
return false
545+
}
546+
return true
547+
})
548+
549+
if err != nil {
550+
return fmt.Errorf("unable to register watch: %w", err)
551+
}
552+
553+
// Only if connected and fully initialized.
524554
conn.lockShards()
525555
conn.c = connection
526556
atomic.StoreUint32(&conn.state, connConnected)
@@ -581,23 +611,33 @@ func pack(h *smallWBuf, enc *encoder, reqid uint32,
581611
return
582612
}
583613

584-
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
614+
func (conn *Connection) writeRequest(w *bufio.Writer, req Request) (err error) {
585615
var packet smallWBuf
586-
req := newAuthRequest(conn.opts.User, string(scramble))
587616
err = pack(&packet, newEncoder(&packet), 0, req, ignoreStreamId, conn.Schema)
588617

589618
if err != nil {
590-
return errors.New("auth: pack error " + err.Error())
619+
return fmt.Errorf("pack error %w", err)
591620
}
592621
if err := write(w, packet.b); err != nil {
593-
return errors.New("auth: write error " + err.Error())
622+
return fmt.Errorf("write error %w", err)
594623
}
595624
if err = w.Flush(); err != nil {
596-
return errors.New("auth: flush error " + err.Error())
625+
return fmt.Errorf("flush error %w", err)
597626
}
598627
return
599628
}
600629

630+
func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err error) {
631+
req := newAuthRequest(conn.opts.User, string(scramble))
632+
633+
err = conn.writeRequest(w, req)
634+
if err != nil {
635+
return fmt.Errorf("auth: %w", err)
636+
}
637+
638+
return nil
639+
}
640+
601641
func (conn *Connection) readAuthResponse(r io.Reader) (err error) {
602642
respBytes, err := conn.read(r)
603643
if err != nil {
@@ -655,6 +695,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
655695
defer conn.unlockShards()
656696
if forever {
657697
if conn.state != connClosed {
698+
close(conn.eventCh)
658699
close(conn.control)
659700
atomic.StoreUint32(&conn.state, connClosed)
660701
conn.notify(Closed)
@@ -774,6 +815,44 @@ func (conn *Connection) writer(w *bufio.Writer, c net.Conn) {
774815
}
775816
}
776817

818+
func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
819+
keyExist := false
820+
event := connWatchEvent{}
821+
d := newDecoder(reader)
822+
823+
if l, err := d.DecodeMapLen(); err == nil {
824+
for ; l > 0; l-- {
825+
if cd, err := d.DecodeInt(); err == nil {
826+
switch cd {
827+
case KeyEvent:
828+
if event.key, err = d.DecodeString(); err != nil {
829+
return event, err
830+
}
831+
keyExist = true
832+
case KeyEventData:
833+
if event.value, err = d.DecodeInterface(); err != nil {
834+
return event, err
835+
}
836+
default:
837+
if err = d.Skip(); err != nil {
838+
return event, err
839+
}
840+
}
841+
} else {
842+
return event, err
843+
}
844+
}
845+
} else {
846+
return event, err
847+
}
848+
849+
if !keyExist {
850+
return event, errors.New("watch event does not have a key")
851+
}
852+
853+
return event, nil
854+
}
855+
777856
func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
778857
for atomic.LoadUint32(&conn.state) != connClosed {
779858
respBytes, err := conn.read(r)
@@ -789,7 +868,14 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
789868
}
790869

791870
var fut *Future = nil
792-
if resp.Code == PushCode {
871+
if resp.Code == EventCode {
872+
if event, err := readWatchEvent(&resp.buf); err == nil {
873+
conn.eventCh <- event
874+
} else {
875+
conn.opts.Logger.Report(LogReadWatchEventFailed, conn, err)
876+
}
877+
continue
878+
} else if resp.Code == PushCode {
793879
if fut = conn.peekFuture(resp.RequestId); fut != nil {
794880
fut.AppendPush(resp)
795881
}
@@ -799,12 +885,36 @@ func (conn *Connection) reader(r *bufio.Reader, c net.Conn) {
799885
conn.markDone(fut)
800886
}
801887
}
888+
802889
if fut == nil {
803890
conn.opts.Logger.Report(LogUnexpectedResultId, conn, resp)
804891
}
805892
}
806893
}
807894

895+
func (conn *Connection) eventer(ch chan connWatchEvent) {
896+
for {
897+
event, ok := <-ch
898+
if !ok {
899+
break
900+
}
901+
902+
if value, ok := conn.eventMap.Load(event.key); ok {
903+
data := value.(*watchDataShared)
904+
data.condMutex.Lock()
905+
data.value = event.value
906+
data.version += 1
907+
data.condMutex.Unlock()
908+
909+
data.cond.Broadcast()
910+
911+
if atomic.LoadUint32(&conn.state) == connConnected {
912+
conn.Do(newWatchRequest(event.key))
913+
}
914+
}
915+
}
916+
}
917+
808918
func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
809919
fut = NewFuture()
810920
if conn.rlimit != nil && conn.opts.RLimitAction == RLimitDrop {
@@ -960,6 +1070,18 @@ func (conn *Connection) putFuture(fut *Future, req Request, streamId uint64) {
9601070
return
9611071
}
9621072
shard.bufmut.Unlock()
1073+
1074+
if req.Async() {
1075+
if fut = conn.fetchFuture(reqid); fut != nil {
1076+
resp := &Response{
1077+
RequestId: reqid,
1078+
Code: OkCode,
1079+
}
1080+
fut.SetResponse(resp)
1081+
conn.markDone(fut)
1082+
}
1083+
}
1084+
9631085
if firstWritten {
9641086
conn.dirtyShard <- shardn
9651087
}
@@ -1163,3 +1285,121 @@ func (conn *Connection) NewStream() (*Stream, error) {
11631285
Conn: conn,
11641286
}, nil
11651287
}
1288+
1289+
type watchDataShared struct {
1290+
value interface{}
1291+
version uint
1292+
cond *sync.Cond
1293+
condMutex sync.RWMutex
1294+
1295+
watchCnt int32
1296+
watchMutex sync.Mutex
1297+
}
1298+
1299+
type connWatcher struct {
1300+
shared *watchDataShared
1301+
done chan struct{}
1302+
}
1303+
1304+
// Unregister unregisters the connection watcher.
1305+
//
1306+
// Keep in mind that garbage collection of a watcher handle doesn’t lead to the
1307+
// watcher’s destruction. In this case, the watcher remains registered. You
1308+
// need to call Unregister() directly.
1309+
func (w *connWatcher) Unregister() {
1310+
if w.shared != nil {
1311+
close(w.done)
1312+
w.shared.cond.Broadcast()
1313+
w.shared = nil
1314+
w.done = nil
1315+
}
1316+
}
1317+
1318+
// NewWatcher creates a new Watcher object for the connection.
1319+
//
1320+
// After watcher creation, the watcher callback is invoked for the first time.
1321+
// In this case, the callback is triggered whether or not the key has already
1322+
// been broadcast. All subsequent invocations are triggered with
1323+
// box.broadcast() called on the remote host. If a watcher is subscribed for a
1324+
// key that has not been broadcast yet, the callback is triggered only once,
1325+
// after the registration of the watcher.
1326+
//
1327+
// The watcher callbacks are always invoked in a separate goroutine. A watcher
1328+
// callback is never executed in parallel with itself, but they can be executed
1329+
// in parallel to other watchers.
1330+
//
1331+
// If the key is updated while the watcher callback is running, the callback
1332+
// will be invoked again with the latest value as soon as it returns.
1333+
//
1334+
// Watchers survive reconnection. All registered watchers are automatically
1335+
// resubscribed when the connection is reestablished.
1336+
//
1337+
// See:
1338+
// https://www.tarantool.io/en/doc/latest/reference/reference_lua/box_events/#box-watchers
1339+
//
1340+
// Since 1.10.0
1341+
func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher, error) {
1342+
var shared *watchDataShared
1343+
1344+
if val, ok := conn.eventMap.Load(key); !ok {
1345+
shared = &watchDataShared{
1346+
value: nil,
1347+
version: 0,
1348+
watchCnt: 0,
1349+
}
1350+
shared.cond = sync.NewCond(shared.condMutex.RLocker())
1351+
1352+
if val, ok := conn.eventMap.LoadOrStore(key, shared); ok {
1353+
shared = val.(*watchDataShared)
1354+
}
1355+
} else {
1356+
shared = val.(*watchDataShared)
1357+
}
1358+
1359+
shared.watchMutex.Lock()
1360+
if shared.watchCnt == 0 {
1361+
if _, err := conn.Do(newWatchRequest(key)).Get(); err != nil {
1362+
return nil, err
1363+
}
1364+
}
1365+
atomic.AddInt32(&shared.watchCnt, 1)
1366+
shared.watchMutex.Unlock()
1367+
1368+
version := uint(0)
1369+
done := make(chan struct{})
1370+
go func() {
1371+
for {
1372+
shared.cond.L.Lock()
1373+
for {
1374+
select {
1375+
case <-done:
1376+
shared.cond.L.Unlock()
1377+
if atomic.AddInt32(&shared.watchCnt, -1) == 0 {
1378+
conn.Do(newUnwatchRequest(key))
1379+
}
1380+
return
1381+
default:
1382+
}
1383+
if version != shared.version {
1384+
break
1385+
}
1386+
shared.cond.Wait()
1387+
}
1388+
1389+
value := shared.value
1390+
version = shared.version
1391+
shared.cond.L.Unlock()
1392+
1393+
callback(WatchEvent{
1394+
Conn: conn,
1395+
Key: key,
1396+
Value: value,
1397+
})
1398+
}
1399+
}()
1400+
1401+
return &connWatcher{
1402+
shared: shared,
1403+
done: done,
1404+
}, nil
1405+
}

0 commit comments

Comments
 (0)