Skip to content
This repository was archived by the owner on Apr 14, 2022. It is now read-only.

Commit 42a61ef

Browse files
authored
Merge pull request #3 from pquentin/bleach-spike
Scaffolding to make a simple trio request work
2 parents 74518f6 + 49a15ee commit 42a61ef

9 files changed

+114
-69
lines changed

trio_test.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import trio
2+
import urllib3
3+
from urllib3.backends.trio_backend import TrioBackend
4+
5+
6+
async def main():
7+
http = urllib3.PoolManager(TrioBackend())
8+
r = await http.request('GET', 'http://httpbin.org/robots.txt', preload_content=False)
9+
print(r.status) # prints "200"
10+
print(await r.read()) # prints "User-agent: *\nDisallow: /deny\n"
11+
12+
trio.run(main)

urllib3/backends/_util.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
else:
1616
DEFAULT_SELECTOR = selectors.PollSelector
1717

18+
1819
def is_readable(sock):
1920
s = DEFAULT_SELECTOR()
2021
s.register(sock, selectors.EVENT_READ)

urllib3/backends/sync_backend.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import errno
2+
import select
3+
import socket
14
import ssl
25
from ..util.connection import create_connection
36
from ..util.ssl_ import ssl_wrap_socket
@@ -10,6 +13,7 @@
1013

1114
BUFSIZE = 65536
1215

16+
1317
class SyncBackend(object):
1418
def __init__(self, connect_timeout, read_timeout):
1519
self._connect_timeout = connect_timeout
@@ -72,7 +76,7 @@ async def receive_some(self):
7276
else:
7377
raise
7478

75-
async def send_and_receive_for_a_while(produce_bytes, consume_bytes):
79+
async def send_and_receive_for_a_while(self, produce_bytes, consume_bytes):
7680
outgoing_finished = False
7781
outgoing = b""
7882
try:

urllib3/backends/trio_backend.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
from . import LoopAbort
44
from ._util import is_readable
55

6+
BUFSIZE = 65536
7+
8+
69
class TrioBackend:
710
async def connect(
811
self, host, port, source_address=None, socket_options=None):
@@ -23,6 +26,8 @@ async def connect(
2326
# cancellation, but we probably should do something to detect when the stream
2427
# has been broken by cancellation (e.g. a timeout) and make is_readable return
2528
# True so the connection won't be reused.
29+
30+
2631
class TrioSocket:
2732
def __init__(self, stream):
2833
self._stream = stream
@@ -40,7 +45,7 @@ def getpeercert(self, binary=False):
4045
async def receive_some(self):
4146
return await self._stream.receive_some(BUFSIZE)
4247

43-
async def send_and_receive_for_a_while(produce_bytes, consume_bytes):
48+
async def send_and_receive_for_a_while(self, produce_bytes, consume_bytes):
4449
async def sender():
4550
while True:
4651
outgoing = await produce_bytes()
@@ -50,18 +55,18 @@ async def sender():
5055

5156
async def receiver():
5257
while True:
53-
incoming = await stream.receive_some(BUFSIZE)
58+
incoming = await self._stream.receive_some(BUFSIZE)
5459
consume_bytes(incoming)
5560

5661
try:
5762
async with trio.open_nursery() as nursery:
58-
nursery.spawn(sender)
59-
nursery.spawn(receiver)
63+
nursery.start_soon(sender)
64+
nursery.start_soon(receiver)
6065
except LoopAbort:
6166
pass
6267

63-
def forceful_close(self):
64-
self._stream.forceful_close()
68+
async def forceful_close(self):
69+
await trio.aclose_forcefully(self._stream)
6570

6671
def is_readable(self):
6772
# This is a bit of a hack, but I can't think of a better API that trio
@@ -73,6 +78,6 @@ def is_readable(self):
7378
sock_stream = sock_stream.transport_stream
7479
sock = sock_stream.socket
7580
return is_readable(sock)
76-
81+
7782
def set_readable_watch_state(self, enabled):
7883
pass

urllib3/backends/twisted_backend.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import socket
22
import OpenSSL.crypto
3-
from twisted.internet import protocol
3+
from twisted.internet import protocol, ssl
4+
from twisted.internet.interfaces import IHandshakeListener
45
from twisted.internet.endpoints import HostnameEndpoint, connectProtocol
56
from twisted.internet.defer import (
67
Deferred, DeferredList, CancelledError, ensureDeferred,
@@ -11,6 +12,8 @@
1112
from . import LoopAbort
1213

1314
# XX need to add timeout support, esp. on connect
15+
16+
1417
class TwistedBackend:
1518
def __init__(self, reactor):
1619
self._reactor = reactor
@@ -21,7 +24,7 @@ async def connect(self, host, port, source_address=None, socket_options=None):
2124
raise NotImplementedError(
2225
"twisted backend doesn't support setting source_address")
2326

24-
factory = protocol.Factory.forProtocol(TwistedSocketProtocol)
27+
# factory = protocol.Factory.forProtocol(TwistedSocketProtocol)
2528
endpoint = HostnameEndpoint(self._reactor, host, port)
2629
d = connectProtocol(endpoint, TwistedSocketProtocol())
2730
# XX d.addTimeout(...)
@@ -39,8 +42,12 @@ async def connect(self, host, port, source_address=None, socket_options=None):
3942
# enums
4043
class _DATA_RECEIVED:
4144
pass
45+
46+
4247
class _RESUME_PRODUCING:
4348
pass
49+
50+
4451
class _HANDSHAKE_COMPLETED:
4552
pass
4653

@@ -71,6 +78,7 @@ async def _wait_for(self, event):
7178
d = Deferred()
7279
# We might get callbacked, we might get cancelled; either way we want
7380
# to clean up then pass through the result:
81+
7482
def cleanup(obj):
7583
del self._events[event]
7684
return obj
@@ -138,6 +146,7 @@ def set_readable_watch_state(self, enabled):
138146
else:
139147
self.transport.pauseProducing()
140148

149+
141150
class DoubleError(Exception):
142151
def __init__(self, exc1, exc2):
143152
self.exc1 = exc1
@@ -173,7 +182,7 @@ def getpeercert(self, binary=False):
173182
async def receive_some(self):
174183
return await self._protocol.receive_some()
175184

176-
async def send_and_receive_for_a_while(produce_bytes, consume_bytes):
185+
async def send_and_receive_for_a_while(self, produce_bytes, consume_bytes):
177186
async def sender():
178187
while True:
179188
outgoing = await produce_bytes()
@@ -209,7 +218,7 @@ def receive_loop_allback(result):
209218

210219
# Wait for both to finish, and then figure out if we need to raise an
211220
# exception.
212-
results = await DeferredList([d1, d2])
221+
results = await DeferredList([send_loop, receive_loop])
213222
# First, find the failure objects - but since we've almost always
214223
# cancelled one of the deferreds, which causes it to raise
215224
# CancelledError, we can't treat these at face value.

urllib3/connectionpool.py

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
123123
# Return False to re-raise any potential exceptions
124124
return False
125125

126-
def close(self):
126+
async def close(self):
127127
"""
128128
Close all pooled connections and disable the pool.
129129
"""
@@ -248,7 +248,7 @@ def _new_conn(self):
248248
**self.conn_kw)
249249
return conn
250250

251-
def _get_conn(self, timeout=None):
251+
async def _get_conn(self, timeout=None):
252252
"""
253253
Get a connection. Will return a pooled connection if one is available.
254254
@@ -277,11 +277,11 @@ def _get_conn(self, timeout=None):
277277
# If this is a persistent connection, check if it got disconnected
278278
if conn and is_connection_dropped(conn):
279279
log.debug("Resetting dropped connection: %s", self.host)
280-
conn.close()
280+
await conn.close()
281281

282282
return conn or self._new_conn()
283283

284-
def _put_conn(self, conn):
284+
async def _put_conn(self, conn):
285285
"""
286286
Put a connection back into the pool.
287287
@@ -309,13 +309,13 @@ def _put_conn(self, conn):
309309

310310
# Connection never got put back into the pool, close it.
311311
if conn:
312-
conn.close()
312+
await conn.close()
313313

314-
def _start_conn(self, conn, connect_timeout):
314+
async def _start_conn(self, conn, connect_timeout):
315315
"""
316316
Called right before a request is made, after the socket is created.
317317
"""
318-
conn.connect(connect_timeout=connect_timeout)
318+
await conn.connect(connect_timeout=connect_timeout)
319319

320320
def _get_timeout(self, timeout):
321321
""" Helper that always returns a :class:`urllib3.util.Timeout` """
@@ -347,8 +347,8 @@ def _raise_timeout(self, err, url, timeout_value):
347347
if 'timed out' in str(err) or 'did not complete (read)' in str(err): # Python 2.6
348348
raise ReadTimeoutError(self, url, "Read timed out. (read timeout=%s)" % timeout_value)
349349

350-
def _make_request(self, conn, method, url, timeout=_Default, body=None,
351-
headers=None):
350+
async def _make_request(
351+
self, conn, method, url, timeout=_Default, body=None, headers=None):
352352
"""
353353
Perform a request on a given urllib connection object taken from our
354354
pool.
@@ -370,7 +370,7 @@ def _make_request(self, conn, method, url, timeout=_Default, body=None,
370370

371371
# Trigger any extra validation we need to do.
372372
try:
373-
self._start_conn(conn, timeout_obj.connect_timeout)
373+
await self._start_conn(conn, timeout_obj.connect_timeout)
374374
except (SocketTimeout, BaseSSLError) as e:
375375
# Py2 raises this as a BaseSSLError, Py3 raises it as socket timeout.
376376
self._raise_timeout(err=e, url=url, timeout_value=conn.timeout)
@@ -405,7 +405,8 @@ def _make_request(self, conn, method, url, timeout=_Default, body=None,
405405

406406
# Receive the response from the server
407407
try:
408-
response = conn.send_request(request, read_timeout=read_timeout)
408+
response = await conn.send_request(
409+
request, read_timeout=read_timeout)
409410
except (SocketTimeout, BaseSSLError, SocketError) as e:
410411
self._raise_timeout(err=e, url=url, timeout_value=read_timeout)
411412
raise
@@ -420,7 +421,7 @@ def _make_request(self, conn, method, url, timeout=_Default, body=None,
420421
def _absolute_url(self, path):
421422
return Url(scheme=self.scheme, host=self.host, port=self.port, path=path).url
422423

423-
def close(self):
424+
async def close(self):
424425
"""
425426
Close all pooled connections and disable the pool.
426427
"""
@@ -431,7 +432,7 @@ def close(self):
431432
while True:
432433
conn = old_pool.get(block=False)
433434
if conn:
434-
conn.close()
435+
await conn.close()
435436

436437
except queue.Empty:
437438
pass # Done.
@@ -457,8 +458,9 @@ def is_same_host(self, url):
457458

458459
return (scheme, host, port) == (self.scheme, self.host, self.port)
459460

460-
def urlopen(self, method, url, body=None, headers=None, retries=None,
461-
timeout=_Default, pool_timeout=None, body_pos=None, **response_kw):
461+
async def urlopen(self, method, url, body=None, headers=None, retries=None,
462+
timeout=_Default, pool_timeout=None, body_pos=None,
463+
**response_kw):
462464
"""
463465
Get a connection from the pool and perform an HTTP request. This is the
464466
lowest level call for making a request, so you'll need to specify all
@@ -554,14 +556,15 @@ def urlopen(self, method, url, body=None, headers=None, retries=None,
554556
try:
555557
# Request a connection from the queue.
556558
timeout_obj = self._get_timeout(timeout)
557-
conn = self._get_conn(timeout=pool_timeout)
559+
conn = await self._get_conn(timeout=pool_timeout)
558560

559561
conn.timeout = timeout_obj.connect_timeout
560562

561563
# Make the request on the base connection object.
562-
base_response = self._make_request(conn, method, url,
563-
timeout=timeout_obj,
564-
body=body, headers=headers)
564+
base_response = await self._make_request(conn, method, url,
565+
timeout=timeout_obj,
566+
body=body,
567+
headers=headers)
565568

566569
# Pass method to Response for length checking
567570
response_kw['request_method'] = method
@@ -615,22 +618,23 @@ def urlopen(self, method, url, body=None, headers=None, retries=None,
615618
# to throw the connection away unless explicitly told not to.
616619
# Close the connection, set the variable to None, and make sure
617620
# we put the None back in the pool to avoid leaking it.
618-
conn = conn and conn.close()
621+
conn = conn and await conn.close()
619622
release_this_conn = True
620623

621624
if release_this_conn:
622625
# Put the connection back to be reused. If the connection is
623626
# expired then it will be None, which will get replaced with a
624627
# fresh connection during _get_conn.
625-
self._put_conn(conn)
628+
await self._put_conn(conn)
626629

627630
if not conn:
628631
# Try again
629632
log.warning("Retrying (%r) after connection "
630633
"broken by '%r': %s", retries, err, url)
631-
return self.urlopen(method, url, body, headers, retries,
632-
timeout=timeout, pool_timeout=pool_timeout,
633-
body_pos=body_pos, **response_kw)
634+
return await self.urlopen(method, url, body, headers, retries,
635+
timeout=timeout,
636+
pool_timeout=pool_timeout,
637+
body_pos=body_pos, **response_kw)
634638

635639
# Check if we should retry the HTTP response.
636640
has_retry_after = bool(response.getheader('Retry-After'))
@@ -646,7 +650,7 @@ def urlopen(self, method, url, body=None, headers=None, retries=None,
646650
return response
647651
retries.sleep(response)
648652
log.debug("Retry: %s", url)
649-
return self.urlopen(
653+
return await self.urlopen(
650654
method, url, body, headers,
651655
retries=retries, timeout=timeout, pool_timeout=pool_timeout,
652656
body_pos=body_pos, **response_kw)

urllib3/poolmanager.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class PoolManager(RequestMethods):
140140

141141
proxy = None
142142

143-
def __init__(self, num_pools=10, headers=None, **connection_pool_kw):
143+
def __init__(self, backend=None, num_pools=10, headers=None, **connection_pool_kw):
144144
RequestMethods.__init__(self, headers)
145145
self.connection_pool_kw = connection_pool_kw
146146
self.pools = RecentlyUsedContainer(num_pools,
@@ -150,6 +150,7 @@ def __init__(self, num_pools=10, headers=None, **connection_pool_kw):
150150
# override them.
151151
self.pool_classes_by_scheme = pool_classes_by_scheme
152152
self.key_fn_by_scheme = key_fn_by_scheme.copy()
153+
self.backend = backend
153154

154155
def __enter__(self):
155156
return self
@@ -184,7 +185,7 @@ def _new_pool(self, scheme, host, port, request_context=None):
184185
for kw in SSL_KEYWORDS:
185186
request_context.pop(kw, None)
186187

187-
return pool_cls(host, port, **request_context)
188+
return pool_cls(host, port, **request_context, backend=self.backend)
188189

189190
def clear(self):
190191
"""
@@ -290,7 +291,7 @@ def _merge_pool_kwargs(self, override):
290291
base_pool_kwargs[key] = value
291292
return base_pool_kwargs
292293

293-
def urlopen(self, method, url, redirect=True, **kw):
294+
async def urlopen(self, method, url, redirect=True, **kw):
294295
"""
295296
Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
296297
with redirect logic and only sends the request-uri portion of the
@@ -312,9 +313,9 @@ def urlopen(self, method, url, redirect=True, **kw):
312313
kw['headers'] = self.headers
313314

314315
if self.proxy is not None and u.scheme == "http":
315-
response = conn.urlopen(method, url, **kw)
316+
response = await conn.urlopen(method, url, **kw)
316317
else:
317-
response = conn.urlopen(method, u.request_uri, **kw)
318+
response = await conn.urlopen(method, u.request_uri, **kw)
318319

319320
redirect_location = redirect and response.get_redirect_location()
320321
if not redirect_location:

0 commit comments

Comments
 (0)