Skip to content

Commit d642f4e

Browse files
committed
Merge branch '5.0' into async
2 parents 9e8a3c2 + 8af8661 commit d642f4e

File tree

2 files changed

+44
-4
lines changed

2 files changed

+44
-4
lines changed

neo4j/_async/io/_pool.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
defaultdict,
2222
deque,
2323
)
24+
import logging
2425
from logging import getLogger
2526
from random import choice
2627
from time import perf_counter
@@ -99,11 +100,19 @@ def time_remaining():
99100
# try to find a free connection in pool
100101
for connection in list(self.connections.get(address, [])):
101102
if (connection.closed() or connection.defunct()
102-
or connection.stale()):
103+
or (connection.stale() and not connection.in_use)):
103104
# `close` is a noop on already closed connections.
104-
# This is to make sure that the connection is gracefully
105-
# closed, e.g. if it's just marked as `stale` but still
106-
# alive.
105+
# This is to make sure that the connection is
106+
# gracefully closed, e.g. if it's just marked as
107+
# `stale` but still alive.
108+
if log.isEnabledFor(logging.DEBUG):
109+
log.debug(
110+
"[#%04X] C: <POOL> removing old connection "
111+
"(closed=%s, defunct=%s, stale=%s, in_use=%s)",
112+
connection.local_port,
113+
connection.closed(), connection.defunct(),
114+
connection.stale(), connection.in_use
115+
)
107116
await connection.close()
108117
try:
109118
self.connections.get(address, []).remove(connection)

tests/unit/async_/io/test_neo4j_pool.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,37 @@ def break_connection():
185185
assert cx2 in pool.connections[cx2.addr]
186186

187187

188+
@mark_async_test
189+
async def test_does_not_close_stale_connections_in_use(opener):
190+
pool = AsyncNeo4jPool(
191+
opener, PoolConfig(), WorkspaceConfig(), ROUTER_ADDRESS
192+
)
193+
cx1 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
194+
assert cx1 in pool.connections[cx1.addr]
195+
# simulate connection going stale (e.g. exceeding) while being in use
196+
cx1.stale.return_value = True
197+
cx2 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
198+
await pool.release(cx2)
199+
cx1.close.assert_not_called()
200+
assert cx2 is not cx1
201+
assert cx2.addr == cx1.addr
202+
assert cx1 in pool.connections[cx1.addr]
203+
assert cx2 in pool.connections[cx2.addr]
204+
205+
await pool.release(cx1)
206+
# now that cx1 is back in the pool and still stale,
207+
# it should be closed when trying to acquire the next connection
208+
cx1.close.assert_not_called()
209+
210+
cx3 = await pool.acquire(READ_ACCESS, 30, "test_db", None)
211+
await pool.release(cx3)
212+
cx1.close.assert_called_once()
213+
assert cx2 is cx3
214+
assert cx3.addr == cx1.addr
215+
assert cx1 not in pool.connections[cx1.addr]
216+
assert cx3 in pool.connections[cx2.addr]
217+
218+
188219
@mark_async_test
189220
async def test_release_resets_connections(opener):
190221
pool = AsyncNeo4jPool(

0 commit comments

Comments
 (0)