Skip to content

Commit dd2476f

Browse files
authored
Refactor into separate Work module (#977)
* work module * Fix imports * String based typing for multiprocessing.synchronize * Fix `test_accepts_client_from_server_socket` * Move staticmethod outside of threadless pool class * Fix doc build * Fix test mock * mp grouped together * pylint happy * import only for type checking * doc build * wrong import order
1 parent f0d39eb commit dd2476f

File tree

17 files changed

+148
-92
lines changed

17 files changed

+148
-92
lines changed

docs/conf.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,9 @@
296296
(_py_class_role, 'proxy.plugin.cache.store.base.CacheStore'),
297297
(_py_class_role, 'proxy.core.pool.AcceptorPool'),
298298
(_py_class_role, 'proxy.core.executors.ThreadlessPool'),
299-
(_py_class_role, 'proxy.core.acceptor.threadless.T'),
300-
(_py_class_role, 'proxy.core.acceptor.work.T'),
299+
(_py_class_role, 'proxy.core.work.threadless.T'),
300+
(_py_class_role, 'proxy.core.work.work.T'),
301+
(_py_class_role, 'proxy.core.acceptor.threadless.Threadless'),
301302
(_py_class_role, 'queue.Queue[Any]'),
302303
(_py_class_role, 'SelectableEvents'),
303304
(_py_class_role, 'TcpClientConnection'),
@@ -309,6 +310,8 @@
309310
(_py_class_role, 'Url'),
310311
(_py_class_role, 'WebsocketFrame'),
311312
(_py_class_role, 'Work'),
312-
(_py_obj_role, 'proxy.core.acceptor.threadless.T'),
313-
(_py_obj_role, 'proxy.core.acceptor.work.T'),
313+
(_py_class_role, 'proxy.core.acceptor.work.Work'),
314+
(_py_class_role, 'connection.Connection'),
315+
(_py_obj_role, 'proxy.core.work.threadless.T'),
316+
(_py_obj_role, 'proxy.core.work.work.T'),
314317
]

examples/web_scraper.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from proxy import Proxy
1414
from proxy.common.types import Readables, Writables, SelectableEvents
15-
from proxy.core.acceptor import Work
15+
from proxy.core.work import Work
1616
from proxy.core.connection import TcpClientConnection
1717

1818

proxy/core/acceptor/__init__.py

+2-12
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,12 @@
1212
1313
pre
1414
"""
15+
from .listener import Listener
1516
from .acceptor import Acceptor
1617
from .pool import AcceptorPool
17-
from .work import Work
18-
from .threadless import Threadless
19-
from .remote import RemoteExecutor
20-
from .local import LocalExecutor
21-
from .executors import ThreadlessPool
22-
from .listener import Listener
2318

2419
__all__ = [
20+
'Listener',
2521
'Acceptor',
2622
'AcceptorPool',
27-
'Work',
28-
'Threadless',
29-
'RemoteExecutor',
30-
'LocalExecutor',
31-
'ThreadlessPool',
32-
'Listener',
3323
]

proxy/core/acceptor/acceptor.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@
3232

3333
from ..event import EventQueue
3434

35-
from .local import LocalExecutor
36-
from .executors import ThreadlessPool
35+
from ..work import LocalExecutor, delegate_work_to_pool, start_threaded_work
3736

3837
logger = logging.getLogger(__name__)
3938

@@ -72,11 +71,11 @@ def __init__(
7271
idd: int,
7372
fd_queue: connection.Connection,
7473
flags: argparse.Namespace,
75-
lock: multiprocessing.synchronize.Lock,
74+
lock: 'multiprocessing.synchronize.Lock',
7675
# semaphore: multiprocessing.synchronize.Semaphore,
7776
executor_queues: List[connection.Connection],
7877
executor_pids: List[int],
79-
executor_locks: List[multiprocessing.synchronize.Lock],
78+
executor_locks: List['multiprocessing.synchronize.Lock'],
8079
event_queue: Optional[EventQueue] = None,
8180
) -> None:
8281
super().__init__()
@@ -214,7 +213,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
214213
# 1st workers. To randomize, we offset index by idd.
215214
index = (self._total + self.idd) % self.flags.num_workers
216215
thread = threading.Thread(
217-
target=ThreadlessPool.delegate,
216+
target=delegate_work_to_pool,
218217
args=(
219218
self.executor_pids[index],
220219
self.executor_queues[index],
@@ -231,7 +230,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
231230
),
232231
)
233232
else:
234-
_, thread = ThreadlessPool.start_threaded_work(
233+
_, thread = start_threaded_work(
235234
self.flags,
236235
conn,
237236
addr,

proxy/core/acceptor/pool.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class AcceptorPool:
5959
while True:
6060
time.sleep(1)
6161
62-
`flags.work_klass` must implement `work.Work` class.
62+
`flags.work_klass` must implement :py:class:`~proxy.core.work.Work` class.
6363
"""
6464

6565
def __init__(
@@ -68,7 +68,7 @@ def __init__(
6868
listener: Listener,
6969
executor_queues: List[connection.Connection],
7070
executor_pids: List[int],
71-
executor_locks: List[multiprocessing.synchronize.Lock],
71+
executor_locks: List['multiprocessing.synchronize.Lock'],
7272
event_queue: Optional[EventQueue] = None,
7373
) -> None:
7474
self.flags = flags
@@ -77,7 +77,7 @@ def __init__(
7777
# Available executors
7878
self.executor_queues: List[connection.Connection] = executor_queues
7979
self.executor_pids: List[int] = executor_pids
80-
self.executor_locks: List[multiprocessing.synchronize.Lock] = executor_locks
80+
self.executor_locks: List['multiprocessing.synchronize.Lock'] = executor_locks
8181
# Eventing core queue
8282
self.event_queue: Optional[EventQueue] = event_queue
8383
# Acceptor process instances

proxy/core/base/tcp_server.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from abc import abstractmethod
1919
from typing import Any, Optional
2020

21-
from ...core.acceptor import Work
21+
from ...core.work import Work
2222
from ...core.connection import TcpClientConnection
2323
from ...common.types import Readables, SelectableEvents, Writables
2424

proxy/core/connection/pool.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from ...common.flag import flags
2222
from ...common.types import Readables, SelectableEvents, Writables
2323

24-
from ..acceptor.work import Work
24+
from ..work import Work
2525

2626
from .server import TcpServerConnection
2727

proxy/core/work/__init__.py

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
11+
.. spelling::
12+
13+
pre
14+
"""
15+
from .work import Work
16+
from .threadless import Threadless
17+
from .remote import RemoteExecutor
18+
from .local import LocalExecutor
19+
from .pool import ThreadlessPool
20+
from .delegate import delegate_work_to_pool
21+
from .threaded import start_threaded_work
22+
23+
__all__ = [
24+
'Work',
25+
'Threadless',
26+
'RemoteExecutor',
27+
'LocalExecutor',
28+
'ThreadlessPool',
29+
'delegate_work_to_pool',
30+
'start_threaded_work',
31+
]

proxy/core/work/delegate.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
from typing import TYPE_CHECKING, Optional, Tuple
12+
from multiprocessing.reduction import send_handle
13+
14+
if TYPE_CHECKING:
15+
import socket
16+
import multiprocessing
17+
from multiprocessing import connection
18+
19+
20+
def delegate_work_to_pool(
21+
worker_pid: int,
22+
work_queue: 'connection.Connection',
23+
work_lock: 'multiprocessing.synchronize.Lock',
24+
conn: 'socket.socket',
25+
addr: Optional[Tuple[str, int]],
26+
unix_socket_path: Optional[str] = None,
27+
) -> None:
28+
"""Utility method to delegate a work to threadless executor pool."""
29+
with work_lock:
30+
# Accepted client address is empty string for
31+
# unix socket domain, avoid sending empty string
32+
# for optimization.
33+
if not unix_socket_path:
34+
work_queue.send(addr)
35+
send_handle(
36+
work_queue,
37+
conn.fileno(),
38+
worker_pid,
39+
)
40+
conn.close()
File renamed without changes.

proxy/core/acceptor/executors.py renamed to proxy/core/work/pool.py

+3-61
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,17 @@
88
:copyright: (c) 2013-present by Abhinav Singh and contributors.
99
:license: BSD, see LICENSE for more details.
1010
"""
11-
import socket
1211
import logging
1312
import argparse
14-
import threading
1513
import multiprocessing
1614

1715
from multiprocessing import connection
18-
from multiprocessing.reduction import send_handle
1916

20-
from typing import Any, Optional, List, Tuple
17+
from typing import Any, Optional, List
2118

22-
from .work import Work
2319
from .remote import RemoteExecutor
2420

25-
from ..connection import TcpClientConnection
26-
from ..event import EventQueue, eventNames
21+
from ..event import EventQueue
2722

2823
from ...common.flag import flags
2924
from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS
@@ -83,7 +78,7 @@ def __init__(
8378
# Threadless worker communication states
8479
self.work_queues: List[connection.Connection] = []
8580
self.work_pids: List[int] = []
86-
self.work_locks: List[multiprocessing.synchronize.Lock] = []
81+
self.work_locks: List['multiprocessing.synchronize.Lock'] = []
8782
# List of threadless workers
8883
self._workers: List[RemoteExecutor] = []
8984
self._processes: List[multiprocessing.Process] = []
@@ -95,59 +90,6 @@ def __enter__(self) -> 'ThreadlessPool':
9590
def __exit__(self, *args: Any) -> None:
9691
self.shutdown()
9792

98-
@staticmethod
99-
def delegate(
100-
worker_pid: int,
101-
work_queue: connection.Connection,
102-
work_lock: multiprocessing.synchronize.Lock,
103-
conn: socket.socket,
104-
addr: Optional[Tuple[str, int]],
105-
unix_socket_path: Optional[str] = None,
106-
) -> None:
107-
"""Utility method to delegate a work to threadless executor pool."""
108-
with work_lock:
109-
# Accepted client address is empty string for
110-
# unix socket domain, avoid sending empty string
111-
# for optimization.
112-
if not unix_socket_path:
113-
work_queue.send(addr)
114-
send_handle(
115-
work_queue,
116-
conn.fileno(),
117-
worker_pid,
118-
)
119-
conn.close()
120-
121-
@staticmethod
122-
def start_threaded_work(
123-
flags: argparse.Namespace,
124-
conn: socket.socket,
125-
addr: Optional[Tuple[str, int]],
126-
event_queue: Optional[EventQueue] = None,
127-
publisher_id: Optional[str] = None,
128-
) -> Tuple[Work[TcpClientConnection], threading.Thread]:
129-
"""Utility method to start a work in a new thread."""
130-
work = flags.work_klass(
131-
TcpClientConnection(conn, addr),
132-
flags=flags,
133-
event_queue=event_queue,
134-
upstream_conn_pool=None,
135-
)
136-
# TODO: Keep reference to threads and join during shutdown.
137-
# This will ensure connections are not abruptly closed on shutdown
138-
# for threaded execution mode.
139-
thread = threading.Thread(target=work.run)
140-
thread.daemon = True
141-
thread.start()
142-
work.publish_event(
143-
event_name=eventNames.WORK_STARTED,
144-
event_payload={'fileno': conn.fileno(), 'addr': addr},
145-
publisher_id=publisher_id or 'thread#{0}'.format(
146-
thread.ident,
147-
),
148-
)
149-
return (work, thread)
150-
15193
def setup(self) -> None:
15294
"""Setup threadless processes."""
15395
if self.flags.threadless:
File renamed without changes.

proxy/core/work/threaded.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import socket
12+
import argparse
13+
import threading
14+
from typing import Optional, Tuple
15+
16+
from .work import Work
17+
18+
from ..connection import TcpClientConnection
19+
from ..event import EventQueue, eventNames
20+
21+
22+
def start_threaded_work(
23+
flags: argparse.Namespace,
24+
conn: socket.socket,
25+
addr: Optional[Tuple[str, int]],
26+
event_queue: Optional[EventQueue] = None,
27+
publisher_id: Optional[str] = None,
28+
) -> Tuple[Work[TcpClientConnection], threading.Thread]:
29+
"""Utility method to start a work in a new thread."""
30+
work = flags.work_klass(
31+
TcpClientConnection(conn, addr),
32+
flags=flags,
33+
event_queue=event_queue,
34+
upstream_conn_pool=None,
35+
)
36+
# TODO: Keep reference to threads and join during shutdown.
37+
# This will ensure connections are not abruptly closed on shutdown
38+
# for threaded execution mode.
39+
thread = threading.Thread(target=work.run)
40+
thread.daemon = True
41+
thread.start()
42+
work.publish_event(
43+
event_name=eventNames.WORK_STARTED,
44+
event_payload={'fileno': conn.fileno(), 'addr': addr},
45+
publisher_id=publisher_id or 'thread#{0}'.format(
46+
thread.ident,
47+
),
48+
)
49+
return (work, thread)

proxy/core/acceptor/threadless.py renamed to proxy/core/work/threadless.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ async def _selected_events(self) -> Tuple[
260260
Returned boolean value indicates whether there is
261261
a newly accepted work waiting to be received and
262262
queued for processing. This is only applicable when
263-
:class:`~proxy.core.acceptor.threadless.Threadless.work_queue_fileno`
263+
:class:`~proxy.core.work.threadless.Threadless.work_queue_fileno`
264264
returns a valid fd.
265265
"""
266266
assert self.selector is not None
File renamed without changes.

proxy/proxy.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
from typing import List, Optional, Any
1818

19-
from .core.acceptor import AcceptorPool, ThreadlessPool, Listener
19+
from .core.work import ThreadlessPool
2020
from .core.event import EventManager
21+
from .core.acceptor import AcceptorPool, Listener
22+
2123
from .common.utils import bytes_
2224
from .common.flag import FlagParser, flags
2325
from .common.constants import DEFAULT_LOCAL_EXECUTOR, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL, IS_WINDOWS

tests/core/test_acceptor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def test_continues_when_no_events(
6363
sock.accept.assert_not_called()
6464
self.flags.work_klass.assert_not_called()
6565

66-
@mock.patch('proxy.core.acceptor.executors.TcpClientConnection')
66+
@mock.patch('proxy.core.work.threaded.TcpClientConnection')
6767
@mock.patch('threading.Thread')
6868
@mock.patch('selectors.DefaultSelector')
6969
@mock.patch('socket.fromfd')

0 commit comments

Comments
 (0)