@@ -25,14 +25,17 @@ const ignoreStreamId = 0
25
25
const (
26
26
connDisconnected = 0
27
27
connConnected = 1
28
- connClosed = 2
28
+ connShutdown = 2
29
+ connClosed = 3
29
30
)
30
31
31
32
const (
32
33
connTransportNone = ""
33
34
connTransportSsl = "ssl"
34
35
)
35
36
37
+ const shutdownEventKey = "box.shutdown"
38
+
36
39
type ConnEventKind int
37
40
type ConnLogKind int
38
41
@@ -45,6 +48,8 @@ const (
45
48
ReconnectFailed
46
49
// Either reconnect attempts exhausted, or explicit Close is called.
47
50
Closed
51
+ // Shutdown signals that shutdown callback is processing.
52
+ Shutdown
48
53
49
54
// LogReconnectFailed is logged when reconnect attempt failed.
50
55
LogReconnectFailed ConnLogKind = iota + 1
@@ -134,10 +139,19 @@ func (d defaultLogger) Report(event ConnLogKind, conn *Connection, v ...interfac
134
139
// always returns array of array (array of tuples for space related methods).
135
140
// For Eval* and Call* Tarantool always returns array, but does not forces
136
141
// array of arrays.
142
+ //
143
+ // If connected to Tarantool 2.10 or newer, connection supports server graceful
144
+ // shutdown. In this case, server will wait until all client requests will be
145
+ // finished and client disconnects before going down (server also may go down
146
+ // by timeout). Client reconnect will happen if connection options enable
147
+ // reconnect. Beware that graceful shutdown event initialization is asynchronous.
148
+ //
149
+ // More on graceful shutdown: https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
137
150
type Connection struct {
138
151
addr string
139
152
c net.Conn
140
153
mutex sync.Mutex
154
+ cond * sync.Cond
141
155
// Schema contains schema loaded on connection.
142
156
Schema * Schema
143
157
// requestId contains the last request ID for requests with nil context.
@@ -162,6 +176,11 @@ type Connection struct {
162
176
serverProtocolInfo ProtocolInfo
163
177
// watchMap is a map of key -> chan watchState.
164
178
watchMap sync.Map
179
+
180
+ // shutdownWatcher is the "box.shutdown" event watcher.
181
+ shutdownWatcher Watcher
182
+ // requestCnt is a counter of active requests.
183
+ requestCnt int64
165
184
}
166
185
167
186
var _ = Connector (& Connection {}) // Check compatibility with connector interface.
@@ -387,6 +406,8 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
387
406
conn .opts .Logger = defaultLogger {}
388
407
}
389
408
409
+ conn .cond = sync .NewCond (& conn .mutex )
410
+
390
411
if err = conn .createConnection (false ); err != nil {
391
412
ter , ok := err .(Error )
392
413
if conn .opts .Reconnect <= 0 {
@@ -612,10 +633,20 @@ func (conn *Connection) dial() (err error) {
612
633
conn .lockShards ()
613
634
conn .c = connection
614
635
atomic .StoreUint32 (& conn .state , connConnected )
636
+ conn .cond .Broadcast ()
615
637
conn .unlockShards ()
616
638
go conn .writer (w , connection )
617
639
go conn .reader (r , connection )
618
640
641
+ // Subscribe shutdown event to process graceful shutdown.
642
+ if conn .shutdownWatcher == nil && isFeatureInSlice (WatchersFeature , conn .serverProtocolInfo .Features ) {
643
+ watcher , werr := conn .newWatcherImpl (shutdownEventKey , shutdownEventCallback )
644
+ if werr != nil {
645
+ return werr
646
+ }
647
+ conn .shutdownWatcher = watcher
648
+ }
649
+
619
650
return
620
651
}
621
652
@@ -745,10 +776,17 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
745
776
if conn .state != connClosed {
746
777
close (conn .control )
747
778
atomic .StoreUint32 (& conn .state , connClosed )
779
+ conn .cond .Broadcast ()
780
+ // Free the resources.
781
+ if conn .shutdownWatcher != nil {
782
+ go conn .shutdownWatcher .Unregister ()
783
+ conn .shutdownWatcher = nil
784
+ }
748
785
conn .notify (Closed )
749
786
}
750
787
} else {
751
788
atomic .StoreUint32 (& conn .state , connDisconnected )
789
+ conn .cond .Broadcast ()
752
790
conn .notify (Disconnected )
753
791
}
754
792
if conn .c != nil {
@@ -767,9 +805,7 @@ func (conn *Connection) closeConnection(neterr error, forever bool) (err error)
767
805
return
768
806
}
769
807
770
- func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
771
- conn .mutex .Lock ()
772
- defer conn .mutex .Unlock ()
808
+ func (conn * Connection ) reconnectImpl (neterr error , c net.Conn ) {
773
809
if conn .opts .Reconnect > 0 {
774
810
if c == conn .c {
775
811
conn .closeConnection (neterr , false )
@@ -782,6 +818,13 @@ func (conn *Connection) reconnect(neterr error, c net.Conn) {
782
818
}
783
819
}
784
820
821
+ func (conn * Connection ) reconnect (neterr error , c net.Conn ) {
822
+ conn .mutex .Lock ()
823
+ defer conn .mutex .Unlock ()
824
+ conn .reconnectImpl (neterr , c )
825
+ conn .cond .Broadcast ()
826
+ }
827
+
785
828
func (conn * Connection ) lockShards () {
786
829
for i := range conn .shard {
787
830
conn .shard [i ].rmut .Lock ()
@@ -1009,6 +1052,15 @@ func (conn *Connection) newFuture(ctx context.Context) (fut *Future) {
1009
1052
fut .done = nil
1010
1053
shard .rmut .Unlock ()
1011
1054
return
1055
+ case connShutdown :
1056
+ fut .err = ClientError {
1057
+ ErrConnectionShutdown ,
1058
+ "server shutdown in progress" ,
1059
+ }
1060
+ fut .ready = nil
1061
+ fut .done = nil
1062
+ shard .rmut .Unlock ()
1063
+ return
1012
1064
}
1013
1065
pos := (fut .requestId / conn .opts .Concurrency ) & (requestsMap - 1 )
1014
1066
if ctx != nil {
@@ -1060,11 +1112,25 @@ func (conn *Connection) contextWatchdog(fut *Future, ctx context.Context) {
1060
1112
}
1061
1113
}
1062
1114
1115
+ func (conn * Connection ) incrementRequestCnt () {
1116
+ atomic .AddInt64 (& conn .requestCnt , int64 (1 ))
1117
+ }
1118
+
1119
+ func (conn * Connection ) decrementRequestCnt () {
1120
+ if atomic .AddInt64 (& conn .requestCnt , int64 (- 1 )) == 0 {
1121
+ conn .cond .Broadcast ()
1122
+ }
1123
+ }
1124
+
1063
1125
func (conn * Connection ) send (req Request , streamId uint64 ) * Future {
1126
+ conn .incrementRequestCnt ()
1127
+
1064
1128
fut := conn .newFuture (req .Ctx ())
1065
1129
if fut .ready == nil {
1130
+ conn .decrementRequestCnt ()
1066
1131
return fut
1067
1132
}
1133
+
1068
1134
if req .Ctx () != nil {
1069
1135
select {
1070
1136
case <- req .Ctx ().Done ():
@@ -1075,6 +1141,7 @@ func (conn *Connection) send(req Request, streamId uint64) *Future {
1075
1141
go conn .contextWatchdog (fut , req .Ctx ())
1076
1142
}
1077
1143
conn .putFuture (fut , req , streamId )
1144
+
1078
1145
return fut
1079
1146
}
1080
1147
@@ -1141,6 +1208,7 @@ func (conn *Connection) markDone(fut *Future) {
1141
1208
if conn .rlimit != nil {
1142
1209
<- conn .rlimit
1143
1210
}
1211
+ conn .decrementRequestCnt ()
1144
1212
}
1145
1213
1146
1214
func (conn * Connection ) peekFuture (reqid uint32 ) (fut * Future ) {
@@ -1426,6 +1494,15 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
1426
1494
return st , nil
1427
1495
}
1428
1496
1497
+ func isFeatureInSlice (expected ProtocolFeature , actualSlice []ProtocolFeature ) bool {
1498
+ for _ , actual := range actualSlice {
1499
+ if expected == actual {
1500
+ return true
1501
+ }
1502
+ }
1503
+ return false
1504
+ }
1505
+
1429
1506
// NewWatcher creates a new Watcher object for the connection.
1430
1507
//
1431
1508
// You need to require WatchersFeature to use watchers, see examples for the
@@ -1464,20 +1541,16 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1464
1541
// asynchronous. We do not expect any response from a Tarantool instance
1465
1542
// That's why we can't just check the Tarantool response for an unsupported
1466
1543
// request error.
1467
- watchersRequired := false
1468
- for _ , feature := range conn .opts .RequiredProtocolInfo .Features {
1469
- if feature == WatchersFeature {
1470
- watchersRequired = true
1471
- break
1472
- }
1473
- }
1474
-
1475
- if ! watchersRequired {
1544
+ if ! isFeatureInSlice (WatchersFeature , conn .opts .RequiredProtocolInfo .Features ) {
1476
1545
err := fmt .Errorf ("the feature %s must be required by connection " +
1477
1546
"options to create a watcher" , WatchersFeature )
1478
1547
return nil , err
1479
1548
}
1480
1549
1550
+ return conn .newWatcherImpl (key , callback )
1551
+ }
1552
+
1553
+ func (conn * Connection ) newWatcherImpl (key string , callback WatchCallback ) (Watcher , error ) {
1481
1554
st , err := subscribeWatchChannel (conn , key )
1482
1555
if err != nil {
1483
1556
return nil , err
@@ -1531,7 +1604,11 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
1531
1604
1532
1605
if state .cnt == 0 {
1533
1606
// The last one sends IPROTO_UNWATCH.
1534
- conn .Do (newUnwatchRequest (key )).Get ()
1607
+ if ! conn .ClosedNow () {
1608
+ // conn.ClosedNow() check is a workaround for calling
1609
+ // Unregister from connectionClose().
1610
+ conn .Do (newUnwatchRequest (key )).Get ()
1611
+ }
1535
1612
conn .watchMap .Delete (key )
1536
1613
close (state .unready )
1537
1614
}
@@ -1637,3 +1714,51 @@ func (conn *Connection) ClientProtocolInfo() ProtocolInfo {
1637
1714
info .Auth = conn .opts .Auth
1638
1715
return info
1639
1716
}
1717
+
1718
+ func shutdownEventCallback (event WatchEvent ) {
1719
+ // Receives "true" on server shutdown.
1720
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1721
+ // step 2.
1722
+ val , ok := event .Value .(bool )
1723
+ if ok && val {
1724
+ go event .Conn .shutdown ()
1725
+ }
1726
+ }
1727
+
1728
+ func (conn * Connection ) shutdown () {
1729
+ // Forbid state changes.
1730
+ conn .mutex .Lock ()
1731
+ defer conn .mutex .Unlock ()
1732
+
1733
+ if ! atomic .CompareAndSwapUint32 (& (conn .state ), connConnected , connShutdown ) {
1734
+ return
1735
+ }
1736
+ conn .cond .Broadcast ()
1737
+ conn .notify (Shutdown )
1738
+
1739
+ c := conn .c
1740
+ for {
1741
+ if (atomic .LoadUint32 (& conn .state ) != connShutdown ) || (c != conn .c ) {
1742
+ return
1743
+ }
1744
+ if atomic .LoadInt64 (& conn .requestCnt ) == 0 {
1745
+ break
1746
+ }
1747
+ // Use cond var on conn.mutex since request execution may
1748
+ // call reconnect(). It is ok if state changes as part of
1749
+ // reconnect since Tarantool server won't allow to reconnect
1750
+ // in the middle of shutting down.
1751
+ conn .cond .Wait ()
1752
+ }
1753
+
1754
+ // Start to reconnect based on common rules, same as in net.box.
1755
+ // Reconnect also closes the connection: server waits until all
1756
+ // subscribed connections are terminated.
1757
+ // See https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/graceful_shutdown/
1758
+ // step 3.
1759
+ conn .reconnectImpl (
1760
+ ClientError {
1761
+ ErrConnectionClosed ,
1762
+ "connection closed after server shutdown" ,
1763
+ }, conn .c )
1764
+ }
0 commit comments