Skip to content
This repository was archived by the owner on Apr 14, 2022. It is now read-only.

Allow closing connection pools and responses #4

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions trio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@


async def main():
http = urllib3.PoolManager(TrioBackend())
r = await http.request('GET', 'http://httpbin.org/robots.txt', preload_content=False)
print(r.status) # prints "200"
print(await r.read()) # prints "User-agent: *\nDisallow: /deny\n"
async with urllib3.PoolManager(TrioBackend()) as http:
r = await http.request(
'GET', 'http://httpbin.org/robots.txt', preload_content=False)
print(r.status) # prints "200"
print(await r.read()) # prints "User-agent: *\nDisallow: /deny\n"
await r.aclose()

trio.run(main)
4 changes: 2 additions & 2 deletions urllib3/_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ def __len__(self):
def __iter__(self):
raise NotImplementedError('Iteration over this class is unlikely to be threadsafe.')

def clear(self):
async def clear(self):
with self.lock:
# Copy pointers to all values, then wipe the mapping
values = list(itervalues(self._container))
self._container.clear()

if self.dispose_func:
for value in values:
self.dispose_func(value)
await self.dispose_func(value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I think RecentlyUsedContainer will need a larger rewrite, because dispose_func also gets called in other places... and in particular from __setitem__ and __delitem__, which can't be marked async. Fortunately, it seems to be a private interface, so I guess we can give it a new interface and update the callers. (I'm guessing urllib3 doesn't actually use the full MutableMapping interface here...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the attribute PoolManager.pools I guess is nominally public (no underscore). I think our options are to either add an underscore, or else go back and make closing things synchronous after all. It doesn't seem like it's intended to be a public interface, so maybe adding an underscore makes sense, dunno. Since we're still in proof-of-concept mode we should probably pick whichever is easiest for now.


def keys(self):
with self.lock:
Expand Down
2 changes: 1 addition & 1 deletion urllib3/backends/sync_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ async def send_and_receive_for_a_while(self, produce_bytes, consume_bytes):
except LoopAbort:
pass

def forceful_close(self):
async def aclose_forcefully(self):
self._sock.close()

def is_readable(self):
Expand Down
2 changes: 1 addition & 1 deletion urllib3/backends/trio_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def receiver():
except LoopAbort:
pass

async def forceful_close(self):
async def aclose_forcefully(self):
await trio.aclose_forcefully(self._stream)

def is_readable(self):
Expand Down
4 changes: 2 additions & 2 deletions urllib3/connectionpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
# Return False to re-raise any potential exceptions
return False

async def close(self):
async def aclose(self):
"""
Close all pooled connections and disable the pool.
"""
Expand Down Expand Up @@ -421,7 +421,7 @@ async def _make_request(
def _absolute_url(self, path):
return Url(scheme=self.scheme, host=self.host, port=self.port, path=path).url

async def close(self):
async def aclose(self):
"""
Close all pooled connections and disable the pool.
"""
Expand Down
16 changes: 9 additions & 7 deletions urllib3/poolmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,22 @@ class PoolManager(RequestMethods):
def __init__(self, backend=None, num_pools=10, headers=None, **connection_pool_kw):
RequestMethods.__init__(self, headers)
self.connection_pool_kw = connection_pool_kw
self.pools = RecentlyUsedContainer(num_pools,
dispose_func=lambda p: p.close())

async def aclose(p):
await p.aclose()
self.pools = RecentlyUsedContainer(num_pools, dispose_func=aclose)

# Locally set the pool classes and keys so other PoolManagers can
# override them.
self.pool_classes_by_scheme = pool_classes_by_scheme
self.key_fn_by_scheme = key_fn_by_scheme.copy()
self.backend = backend

def __enter__(self):
async def __aenter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.clear()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.clear()
# Return False to re-raise any potential exceptions
return False

Expand Down Expand Up @@ -187,14 +189,14 @@ def _new_pool(self, scheme, host, port, request_context=None):

return pool_cls(host, port, **request_context, backend=self.backend)

def clear(self):
async def clear(self):
"""
Empty our store of pools and direct them all to close.

This will not affect in-flight connections, but they will not be
re-used after completion.
"""
self.pools.clear()
await self.pools.clear()

def connection_from_host(self, host, port=None, scheme='http', pool_kwargs=None):
"""
Expand Down
5 changes: 2 additions & 3 deletions urllib3/response.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,14 @@ def getheaders(self):
def getheader(self, name, default=None):
return self.headers.get(name, default)

# Overrides from io.IOBase
def close(self):
async def aclose(self):
if not self.closed:
self._fp.close()
self._buffer = b''
self._fp = None

if self._connection:
self._connection.close()
await self._connection.aclose()

@property
def closed(self):
Expand Down
6 changes: 3 additions & 3 deletions urllib3/sync_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ async def _tunnel(self, conn):
tunnel_response = _response_from_h11(h11_response, self)

if h11_response.status_code != 200:
conn.forceful_close()
await conn.aclose_forcefully()
raise FailedTunnelError(
"Unable to establish CONNECT tunnel", tunnel_response
)
Expand Down Expand Up @@ -430,14 +430,14 @@ async def connect(self, ssl_context=None,
# XX We should pick one of these names and use it consistently...
self._sock = conn

async def close(self):
async def aclose(self):
"""
Close this connection.
"""
if self._sock is not None:
# Make sure self._sock is None even if closing raises an exception
sock, self._sock = self._sock, None
await sock.forceful_close()
await sock.aclose_forcefully()

def is_dropped(self):
"""
Expand Down