Skip to content

Support for PINGs from client to server to detect connection loss #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions pb/protocol.proto
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
// Copyright 2017 Apcera Inc. All rights reserved.
// Copyright 2016-2018 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// compiled via `protoc --proto_path=pb --python_out=stan/pb pb/protocol.proto`
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Uses https://github.com/gogo/protobuf
// compiled via `protoc -I=. -I=$GOPATH/src --gogofaster_out=. protocol.proto`

syntax = "proto3";
package pb;

// import "github.com/gogo/protobuf/gogoproto/gogo.proto";

// option (gogoproto.marshaler_all) = true;
// option (gogoproto.sizer_all) = true;
// option (gogoproto.unmarshaler_all) = true;
// option (gogoproto.goproto_getters_all) = false;

// How messages are delivered to the STAN cluster
message PubMsg {
string clientID = 1; // ClientID
string guid = 2; // guid
string subject = 3; // subject
string reply = 4; // optional reply
bytes data = 5; // payload
bytes connID = 6; // Connection ID. For servers that know about this field, clientID can be omitted

bytes sha256 = 10; // optional sha256 of data
}
Expand Down Expand Up @@ -45,6 +65,10 @@ message Ack {
message ConnectRequest {
string clientID = 1; // Client name/identifier.
string heartbeatInbox = 2; // Inbox for server initiated heartbeats.
int32 protocol = 3; // Protocol the client is at.
bytes connID = 4; // Connection ID, a way to uniquely identify a connection (no connection should ever have the same)
int32 pingInterval = 5; // Interval at which client wishes to send PINGs (expressed in seconds).
int32 pingMaxOut = 6; // Maximum number of PINGs without a response after which the connection can be considered lost.
}

// Response to a client connect
Expand All @@ -55,10 +79,24 @@ message ConnectResponse {
string closeRequests = 4; // Subject for closing the stan connection
string error = 5; // err string, empty/omitted if no error
string subCloseRequests = 6; // Subject to use for subscription close requests
string pingRequests = 7; // Subject to use for PING requests
int32 pingInterval = 8; // Interval at which client should send PINGs (expressed in seconds).
int32 pingMaxOut = 9; // Maximum number of PINGs without a response after which the connection can be considered lost
int32 protocol = 10; // Protocol version the server is at

string publicKey = 100; // Possibly used to sign acks, etc.
}

// PING from client to server
message Ping {
bytes connID = 1; // Connection ID
}

// PING response from the server
message PingResponse {
string error = 1; // Error string, empty/omitted if no error
}

// Enum for start position type.
enum StartPosition {
NewOnly = 0;
Expand Down Expand Up @@ -104,4 +142,4 @@ message CloseRequest {
// Response for CloseRequest
message CloseResponse {
string error = 1; // err string, empty/omitted if no error
}
}
2 changes: 1 addition & 1 deletion script/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

export PYTHONPATH=$(pwd)
pip install --upgrade pip
pip install protobuf==3.4
pip install protobuf==3.7
pip install asyncio-nats-client
python tests/test.py
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@
license='Apache 2 License',
packages=['stan', 'stan.aio', 'stan.pb'],
zip_safe=True,
install_requires=['protobuf>=3.4','asyncio-nats-client>=0.7.0'],
install_requires=['protobuf>=3.7','asyncio-nats-client>=0.7.0'],
)
103 changes: 103 additions & 0 deletions stan/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import stan.pb.protocol_pb2 as protocol
from stan.aio.errors import *
from time import time as now
from nats.aio.errors import ErrConnectionClosed

__version__ = '0.3.0'

Expand All @@ -42,6 +43,13 @@
# to be processed on a single subscriptions.
DEFAULT_PENDING_LIMIT = 8192

PROTOCOL_ONE = 1

# Default interval (in seconds) at which a connection sends a PING to the server
DEFAULT_PING_INTERVAL = 5
# Default number of PINGs without a response before the connection is considered lost.
DEFAULT_PING_MAX_OUT = 3

logger = logging.getLogger(__name__)

class Client:
Expand Down Expand Up @@ -80,13 +88,26 @@ def __init__(self):
# Map of subscriptions related to the NATS Streaming session.
self._sub_map = {}

self._conn_lost_cb = None
self._ping_sub = None
self._ping_bytes = None
self._ping_requests = None
self._ping_inbox = None
self._ping_interval = None
self._ping_max_out = None
self._ping_out = 0
self._ping_server_task = None

def __repr__(self):
return "<nats streaming client v{}>".format(__version__)

async def connect(self, cluster_id, client_id,
nats=None,
connect_timeout=DEFAULT_CONNECT_TIMEOUT,
max_pub_acks_inflight=DEFAULT_MAX_PUB_ACKS_INFLIGHT,
ping_interval=DEFAULT_PING_INTERVAL,
ping_max_out=DEFAULT_PING_MAX_OUT,
conn_lost_cb=None,
loop=None,
):
"""
Expand All @@ -99,6 +120,8 @@ async def connect(self, cluster_id, client_id,
self._client_id = client_id
self._loop = loop
self._connect_timeout = connect_timeout
self._conn_id = bytes(new_guid(), "utf-8")
self._conn_lost_cb = conn_lost_cb

if nats is not None:
self._nc = nats
Expand All @@ -110,6 +133,7 @@ async def connect(self, cluster_id, client_id,
self._discover_subject = DEFAULT_DISCOVER_SUBJECT % self._cluster_id
self._hb_inbox = DEFAULT_INBOX_SUBJECT % new_guid()
self._ack_subject = DEFAULT_ACKS_SUBJECT % new_guid()
self._ping_inbox = DEFAULT_INBOX_SUBJECT % new_guid()

# Pending pub acks inflight
self._pending_pub_acks_queue = asyncio.Queue(
Expand All @@ -128,10 +152,20 @@ async def connect(self, cluster_id, client_id,
)
await self._nc.flush()

# Ping subscription
self._ping_sub = await self._nc.subscribe(
self._ping_inbox,
cb=self._process_ping_response,
)

# Start NATS Streaming session by sending ConnectRequest
creq = protocol.ConnectRequest()
creq.clientID = self._client_id
creq.heartbeatInbox = self._hb_inbox
creq.connID = self._conn_id
creq.protocol = PROTOCOL_ONE
creq.pingInterval = ping_interval
creq.pingMaxOut = ping_max_out
payload = creq.SerializeToString()

msg = None
Expand Down Expand Up @@ -162,6 +196,25 @@ async def connect(self, cluster_id, client_id,
self._close_req_subject = resp.closeRequests
self._sub_close_req_subject = resp.subCloseRequests

unsub_ping_sub = True
if resp.protocol >= PROTOCOL_ONE:
if resp.pingInterval != 0:
unsub_ping_sub = False

self._ping_requests = resp.pingRequests
self._ping_interval = resp.pingInterval
self._ping_max_out = resp.pingMaxOut
ping = protocol.Ping()
ping.connID = self._conn_id
self._ping_bytes = ping.SerializeToString()
self._ping_server_task = self._loop.create_task(
self._ping_server())

if unsub_ping_sub:
await self._nc.unsubscribe(self._ping_sub)
self._ping_sub = None
self._conn_id = b''

async def _process_heartbeats(self, msg):
"""
Receives heartbeats sent to the client by the server.
Expand Down Expand Up @@ -190,6 +243,28 @@ async def _process_ack(self, msg):
# TODO: Check for protocol error
return

async def _ping_server(self):
"""
Sends a PING (contianing connection's ID) to the server at intervals specified
by ping_interval. Everytime a PING is sent, the number of outstanding PINGs is increased.
If the total number is > than the ping_max_out option, then the connection is closed,
and conn_lost_cb callback is invoked if one was specified.
"""
while True:
try:
await asyncio.sleep(self._ping_interval)
self._ping_out += 1
if self._ping_out > self._ping_max_out:
await self._close_due_to_ping(StanError("stan: connection lost due to PING failure"))
break
try:
await self._nc.publish_request(self._ping_requests, self._ping_inbox, self._ping_bytes, )
except ErrConnectionClosed as e:
await self._close_due_to_ping(StanError(e))
break
except asyncio.CancelledError:
break

async def _process_msg(self, sub):
"""
Receives the msgs from the STAN subscriptions and replies.
Expand Down Expand Up @@ -228,6 +303,20 @@ async def _process_msg(self, sub):
)
continue

async def _process_ping_response(self, msg):
"""
Receives PING responses from the server.
If the response contains an error message, the connection is closed
and the conn_lost_cb callback is invoked if one was specified.
Otherwise _ping_out is reset to 0 indicating that connection is fine
"""
ping_resp = protocol.PingResponse()
ping_resp.ParseFromString(msg.data)
if ping_resp.error != "":
await self._close_due_to_ping(StanError(ping_resp.error))
return
self._ping_out = 0

async def ack(self, msg):
"""
Used to manually acks a message.
Expand Down Expand Up @@ -260,6 +349,7 @@ async def publish(self, subject, payload,
pe.guid = guid
pe.subject = subject
pe.data = payload
pe.connID = self._conn_id

# Control max inflight pubs for the client with a buffered queue.
await self._pending_pub_acks_queue.put(None)
Expand Down Expand Up @@ -430,6 +520,13 @@ async def _close(self):

# Remove the core NATS Streaming subscriptions.
try:
if self._ping_sub is not None:
await self._nc.unsubscribe(self._ping_sub)
self._ping_sub = None
self._ping_inbox = None
if self._ping_server_task is not None:
self._ping_server_task.cancel()
self._ping_server_task = None
if self._hb_inbox_sid is not None:
await self._nc.unsubscribe(self._hb_inbox_sid)
self._hb_inbox = None
Expand All @@ -452,6 +549,12 @@ async def _close(self):
continue
self._sub_map = {}

async def _close_due_to_ping(self, err):
await self._close()
if self._conn_lost_cb is not None:
await self._conn_lost_cb(err)
self._conn_lost_cb = None

async def close(self):
"""
Close terminates a session with NATS Streaming.
Expand Down
Loading