Skip to content

Max connection pool size #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions neo4j/bolt/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,27 @@
from select import select
from socket import socket, SOL_SOCKET, SO_KEEPALIVE, SHUT_RDWR, error as SocketError, timeout as SocketTimeout, AF_INET, AF_INET6
from struct import pack as struct_pack, unpack as struct_unpack
from threading import RLock
from threading import RLock, Condition

from neo4j.addressing import SocketAddress, is_ip_address
from neo4j.bolt.cert import KNOWN_HOSTS
from neo4j.bolt.response import InitResponse, AckFailureResponse, ResetResponse
from neo4j.compat.ssl import SSL_AVAILABLE, HAS_SNI, SSLError
from neo4j.exceptions import ProtocolError, SecurityError, ServiceUnavailable
from neo4j.exceptions import ClientError, ProtocolError, SecurityError, ServiceUnavailable
from neo4j.meta import version
from neo4j.packstream import Packer, Unpacker
from neo4j.util import import_best as _import_best
from time import clock

ChunkedInputBuffer = _import_best("neo4j.bolt._io", "neo4j.bolt.io").ChunkedInputBuffer
ChunkedOutputBuffer = _import_best("neo4j.bolt._io", "neo4j.bolt.io").ChunkedOutputBuffer


INFINITE = -1
DEFAULT_MAX_CONNECTION_LIFETIME = INFINITE
DEFAULT_MAX_CONNECTION_POOL_SIZE = INFINITE
DEFAULT_CONNECTION_TIMEOUT = 5.0
DEFAULT_CONNECTION_ACQUISITION_TIMEOUT = 60
DEFAULT_PORT = 7687
DEFAULT_USER_AGENT = "neo4j-python/%s" % version

Expand Down Expand Up @@ -178,6 +183,8 @@ def __init__(self, address, sock, error_handler, **config):
self.packer = Packer(self.output_buffer)
self.unpacker = Unpacker()
self.responses = deque()
self._max_connection_lifetime = config.get("max_connection_lifetime", DEFAULT_MAX_CONNECTION_LIFETIME)
self._creation_timestamp = clock()

# Determine the user agent and ensure it is a Unicode value
user_agent = config.get("user_agent", DEFAULT_USER_AGENT)
Expand All @@ -201,6 +208,7 @@ def __init__(self, address, sock, error_handler, **config):
# Pick up the server certificate, if any
self.der_encoded_server_certificate = config.get("der_encoded_server_certificate")

def Init(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init

response = InitResponse(self)
self.append(INIT, (self.user_agent, self.auth_dict), response=response)
self.sync()
Expand Down Expand Up @@ -360,6 +368,9 @@ def _unpack(self):
more = False
return details, summary_signature, summary_metadata

def timedout(self):
return 0 <= self._max_connection_lifetime <= clock() - self._creation_timestamp

def sync(self):
""" Send and fetch all outstanding messages.

Expand Down Expand Up @@ -396,11 +407,14 @@ class ConnectionPool(object):

_closed = False

def __init__(self, connector, connection_error_handler):
def __init__(self, connector, connection_error_handler, **config):
self.connector = connector
self.connection_error_handler = connection_error_handler
self.connections = {}
self.lock = RLock()
self.cond = Condition(self.lock)
self._max_connection_pool_size = config.get("max_connection_pool_size", DEFAULT_MAX_CONNECTION_POOL_SIZE)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make them public

self._connection_acquisition_timeout = config.get("connection_acquisition_timeout", DEFAULT_CONNECTION_ACQUISITION_TIMEOUT)

def __enter__(self):
return self
Expand All @@ -424,23 +438,42 @@ def acquire_direct(self, address):
connections = self.connections[address]
except KeyError:
connections = self.connections[address] = deque()
for connection in list(connections):
if connection.closed() or connection.defunct():
connections.remove(connection)
continue
if not connection.in_use:
connection.in_use = True
return connection
try:
connection = self.connector(address, self.connection_error_handler)
except ServiceUnavailable:
self.remove(address)
raise
else:
connection.pool = self
connection.in_use = True
connections.append(connection)
return connection

connection_acquisition_start_timestamp = clock()
while True:
# try to find a free connection in pool
for connection in list(connections):
if connection.closed() or connection.defunct() or connection.timedout():
connections.remove(connection)
continue
if not connection.in_use:
connection.in_use = True
return connection
# all connections in pool are in-use
can_create_new_connection = self._max_connection_pool_size == INFINITE or len(connections) < self._max_connection_pool_size
if can_create_new_connection:
try:
connection = self.connector(address, self.connection_error_handler)
except ServiceUnavailable:
self.remove(address)
raise
else:
connection.pool = self
connection.in_use = True
connections.append(connection)
return connection

# failed to obtain a connection from pool because the pool is full and no free connection in the pool
span_timeout = self._connection_acquisition_timeout - (clock() - connection_acquisition_start_timestamp)
if span_timeout > 0:
self.cond.wait(span_timeout)
# if timed out, then we throw error. This time computation is needed, as with python 2.7, we cannot
# tell if the condition is notified or timed out when we come to this line
if self._connection_acquisition_timeout <= (clock() - connection_acquisition_start_timestamp):
raise ClientError("Failed to obtain a connection from pool within {!r}s".format(
self._connection_acquisition_timeout))
else:
raise ClientError("Failed to obtain a connection from pool within {!r}s".format(self._connection_acquisition_timeout))

def acquire(self, access_mode=None):
""" Acquire a connection to a server that can satisfy a set of parameters.
Expand All @@ -454,6 +487,7 @@ def release(self, connection):
"""
with self.lock:
connection.in_use = False
self.cond.notify_all()

def in_use_connection_count(self, address):
""" Count the number of connections currently in use to a given
Expand Down Expand Up @@ -600,8 +634,10 @@ def connect(address, ssl_context=None, error_handler=None, **config):
s.shutdown(SHUT_RDWR)
s.close()
elif agreed_version == 1:
return Connection(address, s, der_encoded_server_certificate=der_encoded_server_certificate,
connection = Connection(address, s, der_encoded_server_certificate=der_encoded_server_certificate,
error_handler=error_handler, **config)
connection.Init()
return connection
elif agreed_version == 0x48545450:
log_error("S: [CLOSE]")
s.close()
Expand Down
3 changes: 0 additions & 3 deletions neo4j/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


from neo4j.exceptions import *

from .api import *
from .direct import *
from .exceptions import *
Expand Down
4 changes: 2 additions & 2 deletions neo4j/v1/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
from time import time, sleep
from warnings import warn

from neo4j.bolt import ProtocolError, ServiceUnavailable
from neo4j.compat import unicode, urlparse
from neo4j.exceptions import ProtocolError, ServiceUnavailable
from neo4j.compat import urlparse
from neo4j.exceptions import CypherError, TransientError

from .exceptions import DriverError, SessionError, SessionExpired, TransactionError
Expand Down
8 changes: 4 additions & 4 deletions neo4j/v1/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


from neo4j.addressing import SocketAddress, resolve
from neo4j.bolt import DEFAULT_PORT, ConnectionPool, connect, ConnectionErrorHandler
from neo4j.bolt.connection import DEFAULT_PORT, ConnectionPool, connect, ConnectionErrorHandler
from neo4j.exceptions import ServiceUnavailable
from neo4j.v1.api import Driver
from neo4j.v1.security import SecurityPlan
Expand All @@ -37,8 +37,8 @@ def __init__(self):

class DirectConnectionPool(ConnectionPool):

def __init__(self, connector, address):
super(DirectConnectionPool, self).__init__(connector, DirectConnectionErrorHandler())
def __init__(self, connector, address, **config):
super(DirectConnectionPool, self).__init__(connector, DirectConnectionErrorHandler(), **config)
self.address = address

def acquire(self, access_mode=None):
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(self, uri, **config):
def connector(address, error_handler):
return connect(address, security_plan.ssl_context, error_handler, **config)

pool = DirectConnectionPool(connector, self.address)
pool = DirectConnectionPool(connector, self.address, **config)
pool.release(pool.acquire())
Driver.__init__(self, pool, **config)

Expand Down
6 changes: 3 additions & 3 deletions neo4j/v1/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = 0
LOAD_BALANCING_STRATEGY_ROUND_ROBIN = 1
LOAD_BALANCING_STRATEGY_DEFAULT = LOAD_BALANCING_STRATEGY_LEAST_CONNECTED
DEFAULT_LOAD_BALANCING_STRATEGY = LOAD_BALANCING_STRATEGY_LEAST_CONNECTED


class OrderedSet(MutableSet):
Expand Down Expand Up @@ -166,7 +166,7 @@ class LoadBalancingStrategy(object):

@classmethod
def build(cls, connection_pool, **config):
load_balancing_strategy = config.get("load_balancing_strategy", LOAD_BALANCING_STRATEGY_DEFAULT)
load_balancing_strategy = config.get("load_balancing_strategy", DEFAULT_LOAD_BALANCING_STRATEGY)
if load_balancing_strategy == LOAD_BALANCING_STRATEGY_LEAST_CONNECTED:
return LeastConnectedLoadBalancingStrategy(connection_pool)
elif load_balancing_strategy == LOAD_BALANCING_STRATEGY_ROUND_ROBIN:
Expand Down Expand Up @@ -265,7 +265,7 @@ class RoutingConnectionPool(ConnectionPool):
"""

def __init__(self, connector, initial_address, routing_context, *routers, **config):
super(RoutingConnectionPool, self).__init__(connector, RoutingConnectionErrorHandler(self))
super(RoutingConnectionPool, self).__init__(connector, RoutingConnectionErrorHandler(self), **config)
self.initial_address = initial_address
self.routing_context = routing_context
self.routing_table = RoutingTable(routers)
Expand Down
4 changes: 2 additions & 2 deletions test/integration/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
# limitations under the License.


from neo4j.v1 import GraphDatabase, ProtocolError, ServiceUnavailable

from neo4j.v1 import GraphDatabase, ServiceUnavailable
from neo4j.exceptions import ProtocolError
from test.integration.tools import IntegrationTestCase


Expand Down
3 changes: 2 additions & 1 deletion test/integration/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from ssl import SSLSocket
from unittest import skipUnless

from neo4j.v1 import GraphDatabase, SSL_AVAILABLE, TRUST_ON_FIRST_USE, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, AuthError
from neo4j.v1 import GraphDatabase, SSL_AVAILABLE, TRUST_ON_FIRST_USE, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES
from neo4j.exceptions import AuthError

from test.integration.tools import IntegrationTestCase

Expand Down
3 changes: 2 additions & 1 deletion test/integration/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from neo4j.v1 import \
READ_ACCESS, WRITE_ACCESS, \
CypherError, SessionError, TransactionError, \
Node, Relationship, Path, CypherSyntaxError
Node, Relationship, Path
from neo4j.exceptions import CypherSyntaxError

from test.integration.tools import DirectIntegrationTestCase

Expand Down
3 changes: 2 additions & 1 deletion test/integration/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

from boltkit.controller import WindowsController, UnixController

from neo4j.v1 import GraphDatabase, AuthError
from neo4j.v1 import GraphDatabase
from neo4j.exceptions import AuthError
from neo4j.util import ServerVersion

from test.env import NEO4J_SERVER_PACKAGE, NEO4J_USER, NEO4J_PASSWORD
Expand Down
3 changes: 2 additions & 1 deletion test/performance/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

from boltkit.controller import WindowsController, UnixController

from neo4j.v1 import GraphDatabase, AuthError
from neo4j.v1 import GraphDatabase
from neo4j.exceptions import AuthError
from neo4j.util import ServerVersion

from test.env import NEO4J_SERVER_PACKAGE, NEO4J_USER, NEO4J_PASSWORD
Expand Down
3 changes: 2 additions & 1 deletion test/stub/test_routingdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

from neo4j.v1 import GraphDatabase, READ_ACCESS, WRITE_ACCESS, SessionExpired, \
RoutingDriver, RoutingConnectionPool, LeastConnectedLoadBalancingStrategy, LOAD_BALANCING_STRATEGY_ROUND_ROBIN, \
RoundRobinLoadBalancingStrategy, TransientError, ClientError
RoundRobinLoadBalancingStrategy, TransientError
from neo4j.exceptions import ClientError
from neo4j.bolt import ProtocolError, ServiceUnavailable

from test.stub.tools import StubTestCase, StubCluster
Expand Down
Loading