diff --git a/pb/protocol.proto b/pb/protocol.proto index dfd1ccb..4ef15ed 100644 --- a/pb/protocol.proto +++ b/pb/protocol.proto @@ -1,10 +1,29 @@ -// Copyright 2017 Apcera Inc. All rights reserved. +// Copyright 2016-2018 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// compiled via `protoc --proto_path=pb --python_out=stan/pb pb/protocol.proto` +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Uses https://github.com/gogo/protobuf +// compiled via `protoc -I=. -I=$GOPATH/src --gogofaster_out=. protocol.proto` syntax = "proto3"; package pb; +// import "github.com/gogo/protobuf/gogoproto/gogo.proto"; + +// option (gogoproto.marshaler_all) = true; +// option (gogoproto.sizer_all) = true; +// option (gogoproto.unmarshaler_all) = true; +// option (gogoproto.goproto_getters_all) = false; + // How messages are delivered to the STAN cluster message PubMsg { string clientID = 1; // ClientID @@ -12,6 +31,7 @@ message PubMsg { string subject = 3; // subject string reply = 4; // optional reply bytes data = 5; // payload + bytes connID = 6; // Connection ID. For servers that know about this field, clientID can be omitted bytes sha256 = 10; // optional sha256 of data } @@ -45,6 +65,10 @@ message Ack { message ConnectRequest { string clientID = 1; // Client name/identifier. string heartbeatInbox = 2; // Inbox for server initiated heartbeats. + int32 protocol = 3; // Protocol the client is at. + bytes connID = 4; // Connection ID, a way to uniquely identify a connection (no connection should ever have the same) + int32 pingInterval = 5; // Interval at which client wishes to send PINGs (expressed in seconds). + int32 pingMaxOut = 6; // Maximum number of PINGs without a response after which the connection can be considered lost. } // Response to a client connect @@ -55,10 +79,24 @@ message ConnectResponse { string closeRequests = 4; // Subject for closing the stan connection string error = 5; // err string, empty/omitted if no error string subCloseRequests = 6; // Subject to use for subscription close requests + string pingRequests = 7; // Subject to use for PING requests + int32 pingInterval = 8; // Interval at which client should send PINGs (expressed in seconds). + int32 pingMaxOut = 9; // Maximum number of PINGs without a response after which the connection can be considered lost + int32 protocol = 10; // Protocol version the server is at string publicKey = 100; // Possibly used to sign acks, etc. } +// PING from client to server +message Ping { + bytes connID = 1; // Connection ID +} + +// PING response from the server +message PingResponse { + string error = 1; // Error string, empty/omitted if no error +} + // Enum for start position type. enum StartPosition { NewOnly = 0; @@ -104,4 +142,4 @@ message CloseRequest { // Response for CloseRequest message CloseResponse { string error = 1; // err string, empty/omitted if no error -} +} \ No newline at end of file diff --git a/script/test.sh b/script/test.sh index 58c8c76..e350b49 100755 --- a/script/test.sh +++ b/script/test.sh @@ -2,6 +2,6 @@ export PYTHONPATH=$(pwd) pip install --upgrade pip -pip install protobuf==3.4 +pip install protobuf==3.7 pip install asyncio-nats-client python tests/test.py diff --git a/setup.py b/setup.py index e3e0446..f109ee2 100644 --- a/setup.py +++ b/setup.py @@ -27,5 +27,5 @@ license='Apache 2 License', packages=['stan', 'stan.aio', 'stan.pb'], zip_safe=True, - install_requires=['protobuf>=3.4','asyncio-nats-client>=0.7.0'], + install_requires=['protobuf>=3.7','asyncio-nats-client>=0.7.0'], ) diff --git a/stan/aio/client.py b/stan/aio/client.py index 226fa90..b0c05cf 100644 --- a/stan/aio/client.py +++ b/stan/aio/client.py @@ -18,6 +18,7 @@ import stan.pb.protocol_pb2 as protocol from stan.aio.errors import * from time import time as now +from nats.aio.errors import ErrConnectionClosed __version__ = '0.3.0' @@ -42,6 +43,13 @@ # to be processed on a single subscriptions. DEFAULT_PENDING_LIMIT = 8192 +PROTOCOL_ONE = 1 + +# Default interval (in seconds) at which a connection sends a PING to the server +DEFAULT_PING_INTERVAL = 5 +# Default number of PINGs without a response before the connection is considered lost. +DEFAULT_PING_MAX_OUT = 3 + logger = logging.getLogger(__name__) class Client: @@ -80,6 +88,16 @@ def __init__(self): # Map of subscriptions related to the NATS Streaming session. self._sub_map = {} + self._conn_lost_cb = None + self._ping_sub = None + self._ping_bytes = None + self._ping_requests = None + self._ping_inbox = None + self._ping_interval = None + self._ping_max_out = None + self._ping_out = 0 + self._ping_server_task = None + def __repr__(self): return "".format(__version__) @@ -87,6 +105,9 @@ async def connect(self, cluster_id, client_id, nats=None, connect_timeout=DEFAULT_CONNECT_TIMEOUT, max_pub_acks_inflight=DEFAULT_MAX_PUB_ACKS_INFLIGHT, + ping_interval=DEFAULT_PING_INTERVAL, + ping_max_out=DEFAULT_PING_MAX_OUT, + conn_lost_cb=None, loop=None, ): """ @@ -99,6 +120,8 @@ async def connect(self, cluster_id, client_id, self._client_id = client_id self._loop = loop self._connect_timeout = connect_timeout + self._conn_id = bytes(new_guid(), "utf-8") + self._conn_lost_cb = conn_lost_cb if nats is not None: self._nc = nats @@ -110,6 +133,7 @@ async def connect(self, cluster_id, client_id, self._discover_subject = DEFAULT_DISCOVER_SUBJECT % self._cluster_id self._hb_inbox = DEFAULT_INBOX_SUBJECT % new_guid() self._ack_subject = DEFAULT_ACKS_SUBJECT % new_guid() + self._ping_inbox = DEFAULT_INBOX_SUBJECT % new_guid() # Pending pub acks inflight self._pending_pub_acks_queue = asyncio.Queue( @@ -128,10 +152,20 @@ async def connect(self, cluster_id, client_id, ) await self._nc.flush() + # Ping subscription + self._ping_sub = await self._nc.subscribe( + self._ping_inbox, + cb=self._process_ping_response, + ) + # Start NATS Streaming session by sending ConnectRequest creq = protocol.ConnectRequest() creq.clientID = self._client_id creq.heartbeatInbox = self._hb_inbox + creq.connID = self._conn_id + creq.protocol = PROTOCOL_ONE + creq.pingInterval = ping_interval + creq.pingMaxOut = ping_max_out payload = creq.SerializeToString() msg = None @@ -162,6 +196,25 @@ async def connect(self, cluster_id, client_id, self._close_req_subject = resp.closeRequests self._sub_close_req_subject = resp.subCloseRequests + unsub_ping_sub = True + if resp.protocol >= PROTOCOL_ONE: + if resp.pingInterval != 0: + unsub_ping_sub = False + + self._ping_requests = resp.pingRequests + self._ping_interval = resp.pingInterval + self._ping_max_out = resp.pingMaxOut + ping = protocol.Ping() + ping.connID = self._conn_id + self._ping_bytes = ping.SerializeToString() + self._ping_server_task = self._loop.create_task( + self._ping_server()) + + if unsub_ping_sub: + await self._nc.unsubscribe(self._ping_sub) + self._ping_sub = None + self._conn_id = b'' + async def _process_heartbeats(self, msg): """ Receives heartbeats sent to the client by the server. @@ -190,6 +243,28 @@ async def _process_ack(self, msg): # TODO: Check for protocol error return + async def _ping_server(self): + """ + Sends a PING (contianing connection's ID) to the server at intervals specified + by ping_interval. Everytime a PING is sent, the number of outstanding PINGs is increased. + If the total number is > than the ping_max_out option, then the connection is closed, + and conn_lost_cb callback is invoked if one was specified. + """ + while True: + try: + await asyncio.sleep(self._ping_interval) + self._ping_out += 1 + if self._ping_out > self._ping_max_out: + await self._close_due_to_ping(StanError("stan: connection lost due to PING failure")) + break + try: + await self._nc.publish_request(self._ping_requests, self._ping_inbox, self._ping_bytes, ) + except ErrConnectionClosed as e: + await self._close_due_to_ping(StanError(e)) + break + except asyncio.CancelledError: + break + async def _process_msg(self, sub): """ Receives the msgs from the STAN subscriptions and replies. @@ -228,6 +303,20 @@ async def _process_msg(self, sub): ) continue + async def _process_ping_response(self, msg): + """ + Receives PING responses from the server. + If the response contains an error message, the connection is closed + and the conn_lost_cb callback is invoked if one was specified. + Otherwise _ping_out is reset to 0 indicating that connection is fine + """ + ping_resp = protocol.PingResponse() + ping_resp.ParseFromString(msg.data) + if ping_resp.error != "": + await self._close_due_to_ping(StanError(ping_resp.error)) + return + self._ping_out = 0 + async def ack(self, msg): """ Used to manually acks a message. @@ -260,6 +349,7 @@ async def publish(self, subject, payload, pe.guid = guid pe.subject = subject pe.data = payload + pe.connID = self._conn_id # Control max inflight pubs for the client with a buffered queue. await self._pending_pub_acks_queue.put(None) @@ -430,6 +520,13 @@ async def _close(self): # Remove the core NATS Streaming subscriptions. try: + if self._ping_sub is not None: + await self._nc.unsubscribe(self._ping_sub) + self._ping_sub = None + self._ping_inbox = None + if self._ping_server_task is not None: + self._ping_server_task.cancel() + self._ping_server_task = None if self._hb_inbox_sid is not None: await self._nc.unsubscribe(self._hb_inbox_sid) self._hb_inbox = None @@ -452,6 +549,12 @@ async def _close(self): continue self._sub_map = {} + async def _close_due_to_ping(self, err): + await self._close() + if self._conn_lost_cb is not None: + await self._conn_lost_cb(err) + self._conn_lost_cb = None + async def close(self): """ Close terminates a session with NATS Streaming. diff --git a/stan/pb/protocol_pb2.py b/stan/pb/protocol_pb2.py index a07d0bd..589247f 100644 --- a/stan/pb/protocol_pb2.py +++ b/stan/pb/protocol_pb2.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: protocol.proto @@ -8,7 +9,6 @@ from google.protobuf import message as _message from google.protobuf import reflection as _reflection from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() @@ -20,9 +20,9 @@ name='protocol.proto', package='pb', syntax='proto3', - serialized_pb=_b('\n\x0eprotocol.proto\x12\x02pb\"f\n\x06PubMsg\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x0c\n\x04guid\x18\x02 \x01(\t\x12\x0f\n\x07subject\x18\x03 \x01(\t\x12\r\n\x05reply\x18\x04 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x12\x0e\n\x06sha256\x18\n \x01(\x0c\"%\n\x06PubAck\x12\x0c\n\x04guid\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"\x81\x01\n\x08MsgProto\x12\x10\n\x08sequence\x18\x01 \x01(\x04\x12\x0f\n\x07subject\x18\x02 \x01(\t\x12\r\n\x05reply\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\x12\x11\n\ttimestamp\x18\x05 \x01(\x03\x12\x13\n\x0bredelivered\x18\x06 \x01(\x08\x12\r\n\x05\x43RC32\x18\n \x01(\r\"(\n\x03\x41\x63k\x12\x0f\n\x07subject\x18\x01 \x01(\t\x12\x10\n\x08sequence\x18\x02 \x01(\x04\":\n\x0e\x43onnectRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x16\n\x0eheartbeatInbox\x18\x02 \x01(\t\"\xa3\x01\n\x0f\x43onnectResponse\x12\x11\n\tpubPrefix\x18\x01 \x01(\t\x12\x13\n\x0bsubRequests\x18\x02 \x01(\t\x12\x15\n\runsubRequests\x18\x03 \x01(\t\x12\x15\n\rcloseRequests\x18\x04 \x01(\t\x12\r\n\x05\x65rror\x18\x05 \x01(\t\x12\x18\n\x10subCloseRequests\x18\x06 \x01(\t\x12\x11\n\tpublicKey\x18\x64 \x01(\t\"\xf1\x01\n\x13SubscriptionRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x0f\n\x07subject\x18\x02 \x01(\t\x12\x0e\n\x06qGroup\x18\x03 \x01(\t\x12\r\n\x05inbox\x18\x04 \x01(\t\x12\x13\n\x0bmaxInFlight\x18\x05 \x01(\x05\x12\x15\n\rackWaitInSecs\x18\x06 \x01(\x05\x12\x13\n\x0b\x64urableName\x18\x07 \x01(\t\x12(\n\rstartPosition\x18\n \x01(\x0e\x32\x11.pb.StartPosition\x12\x15\n\rstartSequence\x18\x0b \x01(\x04\x12\x16\n\x0estartTimeDelta\x18\x0c \x01(\x03\"7\n\x14SubscriptionResponse\x12\x10\n\x08\x61\x63kInbox\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"[\n\x12UnsubscribeRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x0f\n\x07subject\x18\x02 \x01(\t\x12\r\n\x05inbox\x18\x03 \x01(\t\x12\x13\n\x0b\x64urableName\x18\x04 \x01(\t\" \n\x0c\x43loseRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\"\x1e\n\rCloseResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t*`\n\rStartPosition\x12\x0b\n\x07NewOnly\x10\x00\x12\x10\n\x0cLastReceived\x10\x01\x12\x12\n\x0eTimeDeltaStart\x10\x02\x12\x11\n\rSequenceStart\x10\x03\x12\t\n\x05\x46irst\x10\x04\x62\x06proto3') + serialized_options=None, + serialized_pb=_b('\n\x0eprotocol.proto\x12\x02pb\"v\n\x06PubMsg\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x0c\n\x04guid\x18\x02 \x01(\t\x12\x0f\n\x07subject\x18\x03 \x01(\t\x12\r\n\x05reply\x18\x04 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x12\x0e\n\x06\x63onnID\x18\x06 \x01(\x0c\x12\x0e\n\x06sha256\x18\n \x01(\x0c\"%\n\x06PubAck\x12\x0c\n\x04guid\x18\x01 \x01(\t\x12\r\n\x05\x65rror\x18\x02 \x01(\t\"\x81\x01\n\x08MsgProto\x12\x10\n\x08sequence\x18\x01 \x01(\x04\x12\x0f\n\x07subject\x18\x02 \x01(\t\x12\r\n\x05reply\x18\x03 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x04 \x01(\x0c\x12\x11\n\ttimestamp\x18\x05 \x01(\x03\x12\x13\n\x0bredelivered\x18\x06 \x01(\x08\x12\r\n\x05\x43RC32\x18\n \x01(\r\"(\n\x03\x41\x63k\x12\x0f\n\x07subject\x18\x01 \x01(\t\x12\x10\n\x08sequence\x18\x02 \x01(\x04\"\x86\x01\n\x0e\x43onnectRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x16\n\x0eheartbeatInbox\x18\x02 \x01(\t\x12\x10\n\x08protocol\x18\x03 \x01(\x05\x12\x0e\n\x06\x63onnID\x18\x04 \x01(\x0c\x12\x14\n\x0cpingInterval\x18\x05 \x01(\x05\x12\x12\n\npingMaxOut\x18\x06 \x01(\x05\"\xf5\x01\n\x0f\x43onnectResponse\x12\x11\n\tpubPrefix\x18\x01 \x01(\t\x12\x13\n\x0bsubRequests\x18\x02 \x01(\t\x12\x15\n\runsubRequests\x18\x03 \x01(\t\x12\x15\n\rcloseRequests\x18\x04 \x01(\t\x12\r\n\x05\x65rror\x18\x05 \x01(\t\x12\x18\n\x10subCloseRequests\x18\x06 \x01(\t\x12\x14\n\x0cpingRequests\x18\x07 \x01(\t\x12\x14\n\x0cpingInterval\x18\x08 \x01(\x05\x12\x12\n\npingMaxOut\x18\t \x01(\x05\x12\x10\n\x08protocol\x18\n \x01(\x05\x12\x11\n\tpublicKey\x18\x64 \x01(\t\"\x16\n\x04Ping\x12\x0e\n\x06\x63onnID\x18\x01 \x01(\x0c\"\x1d\n\x0cPingResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t\"\xf1\x01\n\x13SubscriptionRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x0f\n\x07subject\x18\x02 \x01(\t\x12\x0e\n\x06qGroup\x18\x03 \x01(\t\x12\r\n\x05inbox\x18\x04 \x01(\t\x12\x13\n\x0bmaxInFlight\x18\x05 \x01(\x05\x12\x15\n\rackWaitInSecs\x18\x06 \x01(\x05\x12\x13\n\x0b\x64urableName\x18\x07 \x01(\t\x12(\n\rstartPosition\x18\n \x01(\x0e\x32\x11.pb.StartPosition\x12\x15\n\rstartSequence\x18\x0b \x01(\x04\x12\x16\n\x0estartTimeDelta\x18\x0c \x01(\x03\"7\n\x14SubscriptionResponse\x12\x10\n\x08\x61\x63kInbox\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"[\n\x12UnsubscribeRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\x12\x0f\n\x07subject\x18\x02 \x01(\t\x12\r\n\x05inbox\x18\x03 \x01(\t\x12\x13\n\x0b\x64urableName\x18\x04 \x01(\t\" \n\x0c\x43loseRequest\x12\x10\n\x08\x63lientID\x18\x01 \x01(\t\"\x1e\n\rCloseResponse\x12\r\n\x05\x65rror\x18\x01 \x01(\t*`\n\rStartPosition\x12\x0b\n\x07NewOnly\x10\x00\x12\x10\n\x0cLastReceived\x10\x01\x12\x12\n\x0eTimeDeltaStart\x10\x02\x12\x11\n\rSequenceStart\x10\x03\x12\t\n\x05\x46irst\x10\x04\x62\x06proto3') ) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) _STARTPOSITION = _descriptor.EnumDescriptor( name='StartPosition', @@ -32,29 +32,29 @@ values=[ _descriptor.EnumValueDescriptor( name='NewOnly', index=0, number=0, - options=None, + serialized_options=None, type=None), _descriptor.EnumValueDescriptor( name='LastReceived', index=1, number=1, - options=None, + serialized_options=None, type=None), _descriptor.EnumValueDescriptor( name='TimeDeltaStart', index=2, number=2, - options=None, + serialized_options=None, type=None), _descriptor.EnumValueDescriptor( name='SequenceStart', index=3, number=3, - options=None, + serialized_options=None, type=None), _descriptor.EnumValueDescriptor( name='First', index=4, number=4, - options=None, + serialized_options=None, type=None), ], containing_type=None, - options=None, - serialized_start=1025, - serialized_end=1121, + serialized_options=None, + serialized_start=1255, + serialized_end=1351, ) _sym_db.RegisterEnumDescriptor(_STARTPOSITION) @@ -80,56 +80,63 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='guid', full_name='pb.PubMsg.guid', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='subject', full_name='pb.PubMsg.subject', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='reply', full_name='pb.PubMsg.reply', index=3, number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='pb.PubMsg.data', index=4, number=5, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='connID', full_name='pb.PubMsg.connID', index=5, + number=6, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='sha256', full_name='pb.PubMsg.sha256', index=5, + name='sha256', full_name='pb.PubMsg.sha256', index=6, number=10, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], serialized_start=22, - serialized_end=124, + serialized_end=140, ) @@ -146,28 +153,28 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='error', full_name='pb.PubAck.error', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=126, - serialized_end=163, + serialized_start=142, + serialized_end=179, ) @@ -184,63 +191,63 @@ has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='subject', full_name='pb.MsgProto.subject', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='reply', full_name='pb.MsgProto.reply', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='data', full_name='pb.MsgProto.data', index=3, number=4, type=12, cpp_type=9, label=1, has_default_value=False, default_value=_b(""), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='timestamp', full_name='pb.MsgProto.timestamp', index=4, number=5, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='redelivered', full_name='pb.MsgProto.redelivered', index=5, number=6, type=8, cpp_type=7, label=1, has_default_value=False, default_value=False, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='CRC32', full_name='pb.MsgProto.CRC32', index=6, number=10, type=13, cpp_type=3, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=166, - serialized_end=295, + serialized_start=182, + serialized_end=311, ) @@ -257,28 +264,28 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='sequence', full_name='pb.Ack.sequence', index=1, number=2, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=297, - serialized_end=337, + serialized_start=313, + serialized_end=353, ) @@ -295,28 +302,56 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='heartbeatInbox', full_name='pb.ConnectRequest.heartbeatInbox', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='protocol', full_name='pb.ConnectRequest.protocol', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='connID', full_name='pb.ConnectRequest.connID', index=3, + number=4, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='pingInterval', full_name='pb.ConnectRequest.pingInterval', index=4, + number=5, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='pingMaxOut', full_name='pb.ConnectRequest.pingMaxOut', index=5, + number=6, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=339, - serialized_end=397, + serialized_start=356, + serialized_end=490, ) @@ -333,63 +368,153 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='subRequests', full_name='pb.ConnectResponse.subRequests', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='unsubRequests', full_name='pb.ConnectResponse.unsubRequests', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='closeRequests', full_name='pb.ConnectResponse.closeRequests', index=3, number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='error', full_name='pb.ConnectResponse.error', index=4, number=5, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='subCloseRequests', full_name='pb.ConnectResponse.subCloseRequests', index=5, number=6, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( - name='publicKey', full_name='pb.ConnectResponse.publicKey', index=6, + name='pingRequests', full_name='pb.ConnectResponse.pingRequests', index=6, + number=7, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='pingInterval', full_name='pb.ConnectResponse.pingInterval', index=7, + number=8, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='pingMaxOut', full_name='pb.ConnectResponse.pingMaxOut', index=8, + number=9, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='protocol', full_name='pb.ConnectResponse.protocol', index=9, + number=10, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='publicKey', full_name='pb.ConnectResponse.publicKey', index=10, number=100, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=400, - serialized_end=563, + serialized_start=493, + serialized_end=738, +) + + +_PING = _descriptor.Descriptor( + name='Ping', + full_name='pb.Ping', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='connID', full_name='pb.Ping.connID', index=0, + number=1, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=740, + serialized_end=762, +) + + +_PINGRESPONSE = _descriptor.Descriptor( + name='PingResponse', + full_name='pb.PingResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='error', full_name='pb.PingResponse.error', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=764, + serialized_end=793, ) @@ -406,84 +531,84 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='subject', full_name='pb.SubscriptionRequest.subject', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='qGroup', full_name='pb.SubscriptionRequest.qGroup', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='inbox', full_name='pb.SubscriptionRequest.inbox', index=3, number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='maxInFlight', full_name='pb.SubscriptionRequest.maxInFlight', index=4, number=5, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='ackWaitInSecs', full_name='pb.SubscriptionRequest.ackWaitInSecs', index=5, number=6, type=5, cpp_type=1, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='durableName', full_name='pb.SubscriptionRequest.durableName', index=6, number=7, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='startPosition', full_name='pb.SubscriptionRequest.startPosition', index=7, number=10, type=14, cpp_type=8, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='startSequence', full_name='pb.SubscriptionRequest.startSequence', index=8, number=11, type=4, cpp_type=4, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='startTimeDelta', full_name='pb.SubscriptionRequest.startTimeDelta', index=9, number=12, type=3, cpp_type=2, label=1, has_default_value=False, default_value=0, message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=566, - serialized_end=807, + serialized_start=796, + serialized_end=1037, ) @@ -500,28 +625,28 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='error', full_name='pb.SubscriptionResponse.error', index=1, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=809, - serialized_end=864, + serialized_start=1039, + serialized_end=1094, ) @@ -538,42 +663,42 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='subject', full_name='pb.UnsubscribeRequest.subject', index=1, number=2, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='inbox', full_name='pb.UnsubscribeRequest.inbox', index=2, number=3, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), _descriptor.FieldDescriptor( name='durableName', full_name='pb.UnsubscribeRequest.durableName', index=3, number=4, type=9, cpp_type=9, label=1, has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=866, - serialized_end=957, + serialized_start=1096, + serialized_end=1187, ) @@ -590,21 +715,21 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=959, - serialized_end=991, + serialized_start=1189, + serialized_end=1221, ) @@ -621,21 +746,21 @@ has_default_value=False, default_value=_b("").decode('utf-8'), message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, - options=None), + serialized_options=None, file=DESCRIPTOR), ], extensions=[ ], nested_types=[], enum_types=[ ], - options=None, + serialized_options=None, is_extendable=False, syntax='proto3', extension_ranges=[], oneofs=[ ], - serialized_start=993, - serialized_end=1023, + serialized_start=1223, + serialized_end=1253, ) _SUBSCRIPTIONREQUEST.fields_by_name['startPosition'].enum_type = _STARTPOSITION @@ -645,12 +770,15 @@ DESCRIPTOR.message_types_by_name['Ack'] = _ACK DESCRIPTOR.message_types_by_name['ConnectRequest'] = _CONNECTREQUEST DESCRIPTOR.message_types_by_name['ConnectResponse'] = _CONNECTRESPONSE +DESCRIPTOR.message_types_by_name['Ping'] = _PING +DESCRIPTOR.message_types_by_name['PingResponse'] = _PINGRESPONSE DESCRIPTOR.message_types_by_name['SubscriptionRequest'] = _SUBSCRIPTIONREQUEST DESCRIPTOR.message_types_by_name['SubscriptionResponse'] = _SUBSCRIPTIONRESPONSE DESCRIPTOR.message_types_by_name['UnsubscribeRequest'] = _UNSUBSCRIBEREQUEST DESCRIPTOR.message_types_by_name['CloseRequest'] = _CLOSEREQUEST DESCRIPTOR.message_types_by_name['CloseResponse'] = _CLOSERESPONSE DESCRIPTOR.enum_types_by_name['StartPosition'] = _STARTPOSITION +_sym_db.RegisterFileDescriptor(DESCRIPTOR) PubMsg = _reflection.GeneratedProtocolMessageType('PubMsg', (_message.Message,), dict( DESCRIPTOR = _PUBMSG, @@ -694,6 +822,20 @@ )) _sym_db.RegisterMessage(ConnectResponse) +Ping = _reflection.GeneratedProtocolMessageType('Ping', (_message.Message,), dict( + DESCRIPTOR = _PING, + __module__ = 'protocol_pb2' + # @@protoc_insertion_point(class_scope:pb.Ping) + )) +_sym_db.RegisterMessage(Ping) + +PingResponse = _reflection.GeneratedProtocolMessageType('PingResponse', (_message.Message,), dict( + DESCRIPTOR = _PINGRESPONSE, + __module__ = 'protocol_pb2' + # @@protoc_insertion_point(class_scope:pb.PingResponse) + )) +_sym_db.RegisterMessage(PingResponse) + SubscriptionRequest = _reflection.GeneratedProtocolMessageType('SubscriptionRequest', (_message.Message,), dict( DESCRIPTOR = _SUBSCRIPTIONREQUEST, __module__ = 'protocol_pb2' diff --git a/tests/client_test.py b/tests/client_test.py index 4558210..243a1ca 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -5,6 +5,7 @@ from stan.aio.client import Client as STAN from stan.aio.errors import * + import sys import nats import time @@ -446,10 +447,10 @@ async def cb(msg): # Start a subscription and wait to receive all the messages # which have been sent so far. - sub = await sc.subscribe("hi", cb=cb) + sub = await sc_2.subscribe("hi", cb=cb) for i in range(0, 10): - await sc.publish("hi", b'hello') + await sc_2.publish("hi", b'hello') try: await asyncio.wait_for(future, 2, loop=self.loop) @@ -546,6 +547,81 @@ async def cb(msg): self.assertEqual(len(msgs), 10) await sc.close() + @async_test + async def test_ping_responses_trigger_conn_lost_cb(self): + nc = NATS() + await nc.connect(loop=self.loop) + + class STAN2(STAN): + def __init__(self): + STAN.__init__(self) + async def _process_heartbeats(self, msg): + pass + + expected_client_replaced_str = "client has been replaced or is no longer registered" + received_error_str = "" + future = asyncio.Future(loop=self.loop) + async def conn_lost_cb(err): + nonlocal received_error_str + received_error_str = str(err) + future.set_result(True) + + sc = STAN2() + client_id = generate_client_id() + await sc.connect("test-cluster", client_id, nats=nc, ping_interval=1, ping_max_out=10, conn_lost_cb=conn_lost_cb) + + sc_2 = STAN() + await sc_2.connect("test-cluster", client_id, nats=nc) + + try: + await asyncio.wait_for(future, 4, loop=self.loop) + except: + pass + + self.assertEqual(received_error_str, expected_client_replaced_str) + + await sc_2.close() + + with self.assertRaises(StanError): + await sc.close() + + self.assertTrue(nc.is_connected) + + await nc.close() + self.assertFalse(nc.is_connected) + + @async_test + async def test_missing_ping_responses_trigger_conn_lost_cb(self): + nc = NATS() + await nc.connect(loop=self.loop) + + class STAN2(STAN): + def __init__(self): + STAN.__init__(self) + async def _process_ping_response(self, msg): + pass + + expected_ping_max_out_reached_str = "stan: connection lost due to PING failure" + received_error_str = "" + future = asyncio.Future(loop=self.loop) + async def conn_lost_cb(err): + nonlocal received_error_str + received_error_str = str(err) + future.set_result(True) + + sc = STAN2() + await sc.connect("test-cluster", generate_client_id(), nats=nc, ping_interval=1, ping_max_out=3, conn_lost_cb=conn_lost_cb) + + try: + await asyncio.wait_for(future, 5, loop=self.loop) + except: + pass + + self.assertEqual(received_error_str, expected_ping_max_out_reached_str) + + await nc.close() + self.assertFalse(nc.is_connected) + class SubscriptionsTest(SingleServerTestCase): @async_test diff --git a/tests/utils.py b/tests/utils.py index 12f34bd..91e2fc0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -130,7 +130,6 @@ def tearDown(self): gnatsd.stop() self.loop.close() - def start_nats_streaming(server: StanServer): server.start()