Skip to content

Commit c06cb75

Browse files
authored
Enhancements (#763)
* Update `make lib-profile` * Optimize `utils.find_http_line` * Pass work to executors within their own multiprocessing lock * Fix tests * Add `(_py_class_role, Url)` to fix rtfd :D
1 parent 9b3b662 commit c06cb75

File tree

11 files changed

+52
-30
lines changed

11 files changed

+52
-30
lines changed

Makefile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,16 @@ lib-coverage:
128128
$(OPEN) htmlcov/index.html
129129

130130
lib-profile:
131-
sudo py-spy record -o profile.svg -t -F -s -- python -m proxy
131+
sudo py-spy record \
132+
-o profile.svg \
133+
-t -F -s -- \
134+
python -m proxy \
135+
--num-acceptors 1 \
136+
--num-workers 1 \
137+
--disable-http-proxy \
138+
--enable-web-server \
139+
--plugin proxy.plugin.WebServerPlugin \
140+
--log-file /tmp/proxy.log
132141

133142
devtools:
134143
pushd dashboard && npm run devtools && popd

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,4 +273,5 @@
273273
(_py_class_role, 'unittest.result.TestResult'),
274274
(_py_class_role, 'UUID'),
275275
(_py_class_role, 'WebsocketFrame'),
276+
(_py_class_role, 'Url'),
276277
]

proxy/common/utils.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,10 @@ def find_http_line(raw: bytes) -> Tuple[Optional[bytes], bytes]:
180180
"""Find and returns first line ending in CRLF along with following buffer.
181181
182182
If no ending CRLF is found, line is None."""
183-
pos = raw.find(CRLF)
184-
if pos == -1:
183+
parts = raw.split(CRLF)
184+
if len(parts) == 1:
185185
return None, raw
186-
line = raw[:pos]
187-
rest = raw[pos + len(CRLF):]
188-
return line, rest
186+
return parts[0], CRLF.join(parts[1:])
189187

190188

191189
def wrap_socket(

proxy/core/acceptor/acceptor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ def __init__(
7878
lock: multiprocessing.synchronize.Lock,
7979
executor_queues: List[connection.Connection],
8080
executor_pids: List[int],
81+
executor_locks: List[multiprocessing.synchronize.Lock],
8182
event_queue: Optional[EventQueue] = None,
8283
) -> None:
8384
super().__init__()
@@ -94,6 +95,7 @@ def __init__(
9495
# Available executors
9596
self.executor_queues = executor_queues
9697
self.executor_pids = executor_pids
98+
self.executor_locks = executor_locks
9799
# Selector
98100
self.running = multiprocessing.Event()
99101
self.selector: Optional[selectors.DefaultSelector] = None
@@ -152,6 +154,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
152154
ThreadlessPool.delegate(
153155
self.executor_pids[index],
154156
self.executor_queues[index],
157+
self.executor_locks[index],
155158
conn,
156159
addr,
157160
self.flags.unix_socket_path,

proxy/core/acceptor/executors.py

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def __init__(
9494
# Threadless worker communication states
9595
self.work_queues: List[connection.Connection] = []
9696
self.work_pids: List[int] = []
97+
self.work_locks: List[multiprocessing.synchronize.Lock] = []
9798
# List of threadless workers
9899
self._workers: List[Threadless] = []
99100

@@ -108,22 +109,24 @@ def __exit__(self, *args: Any) -> None:
108109
def delegate(
109110
worker_pid: int,
110111
work_queue: connection.Connection,
112+
work_lock: multiprocessing.synchronize.Lock,
111113
conn: socket.socket,
112114
addr: Optional[Tuple[str, int]],
113115
unix_socket_path: Optional[str] = None,
114116
) -> None:
115117
"""Utility method to delegate a work to threadless executor pool."""
116-
# Accepted client address is empty string for
117-
# unix socket domain, avoid sending empty string
118-
# for optimization.
119-
if not unix_socket_path:
120-
work_queue.send(addr)
121-
send_handle(
122-
work_queue,
123-
conn.fileno(),
124-
worker_pid,
125-
)
126-
conn.close()
118+
with work_lock:
119+
# Accepted client address is empty string for
120+
# unix socket domain, avoid sending empty string
121+
# for optimization.
122+
if not unix_socket_path:
123+
work_queue.send(addr)
124+
send_handle(
125+
work_queue,
126+
conn.fileno(),
127+
worker_pid,
128+
)
129+
conn.close()
127130

128131
@staticmethod
129132
def start_threaded_work(
@@ -168,8 +171,7 @@ def setup(self) -> None:
168171
def shutdown(self) -> None:
169172
"""Shutdown threadless processes."""
170173
if is_threadless(self.flags.threadless, self.flags.threaded):
171-
for _ in range(self.flags.num_workers):
172-
self._shutdown_worker()
174+
self._shutdown_workers()
173175
logger.info(
174176
'Stopped {0} threadless workers'.format(
175177
self.flags.num_workers,
@@ -178,6 +180,7 @@ def shutdown(self) -> None:
178180

179181
def _start_worker(self, index: int) -> None:
180182
"""Starts a threadless worker."""
183+
self.work_locks.append(multiprocessing.Lock())
181184
pipe = multiprocessing.Pipe()
182185
self.work_queues.append(pipe[0])
183186
w = Threadless(
@@ -191,12 +194,14 @@ def _start_worker(self, index: int) -> None:
191194
self.work_pids.append(w.pid)
192195
logger.debug('Started threadless#%d process#%d', index, w.pid)
193196

194-
def _shutdown_worker(self) -> None:
197+
def _shutdown_workers(self) -> None:
195198
"""Pop a running threadless worker and clean it up."""
196-
w = self._workers.pop()
197-
pid = w.pid
198-
w.running.set()
199-
w.join()
200-
self.work_pids.pop()
201-
self.work_queues.pop().close()
202-
logger.debug('Stopped threadless process#%d', pid)
199+
for index in range(self.flags.num_workers):
200+
self._workers[index].running.set()
201+
for index in range(self.flags.num_workers):
202+
pid = self._workers[index].pid
203+
self._workers[index].join()
204+
self.work_pids.pop()
205+
self.work_queues.pop().close()
206+
logger.debug('Stopped threadless process#%d', pid)
207+
self.work_locks = []

proxy/core/acceptor/pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def __init__(
7272
listener: Listener,
7373
executor_queues: List[connection.Connection],
7474
executor_pids: List[int],
75+
executor_locks: List[multiprocessing.synchronize.Lock],
7576
event_queue: Optional[EventQueue] = None,
7677
) -> None:
7778
self.flags = flags
@@ -80,6 +81,7 @@ def __init__(
8081
# Available executors
8182
self.executor_queues: List[connection.Connection] = executor_queues
8283
self.executor_pids: List[int] = executor_pids
84+
self.executor_locks: List[multiprocessing.synchronize.Lock] = executor_locks
8385
# Eventing core queue
8486
self.event_queue: Optional[EventQueue] = event_queue
8587
# Acceptor process instances
@@ -128,6 +130,7 @@ def _start(self) -> None:
128130
event_queue=self.event_queue,
129131
executor_queues=self.executor_queues,
130132
executor_pids=self.executor_pids,
133+
executor_locks=self.executor_locks,
131134
)
132135
acceptor.start()
133136
logger.debug(

proxy/plugin/reverse_proxy.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ class ReverseProxyPlugin(TcpUpstreamConnectionHandler, HttpWebServerBasePlugin):
5252
"url": "http://localhost/get"
5353
}
5454
```
55-
5655
"""
5756

5857
# TODO: We must use nginx python parser and
@@ -69,7 +68,6 @@ class ReverseProxyPlugin(TcpUpstreamConnectionHandler, HttpWebServerBasePlugin):
6968

7069
def __init__(self, *args: Any, **kwargs: Any):
7170
super().__init__(*args, **kwargs)
72-
# Chosen upstream proxy_pass url
7371
self.choice: Optional[Url] = None
7472

7573
def handle_upstream_data(self, raw: memoryview) -> None:

proxy/proxy.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ def setup(self) -> None:
179179
listener=self.listener,
180180
executor_queues=self.executors.work_queues,
181181
executor_pids=self.executors.work_pids,
182+
executor_locks=self.executors.work_locks,
182183
event_queue=event_queue,
183184
)
184185
self.acceptors.setup()

tests/core/test_acceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ def setUp(self) -> None:
3434
lock=multiprocessing.Lock(),
3535
executor_queues=[],
3636
executor_pids=[],
37+
executor_locks=[],
3738
)
3839

3940
@mock.patch('selectors.DefaultSelector')

tests/core/test_acceptor_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def test_setup_and_shutdown(
4242

4343
pool = AcceptorPool(
4444
flags=flags, listener=mock_listener.return_value,
45-
executor_queues=[], executor_pids=[],
45+
executor_queues=[], executor_pids=[], executor_locks=[],
4646
)
4747
pool.setup()
4848

tests/test_main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ def test_entry_point(
111111
listener=mock_listener.return_value,
112112
executor_queues=mock_executor_pool.return_value.work_queues,
113113
executor_pids=mock_executor_pool.return_value.work_pids,
114+
executor_locks=mock_executor_pool.return_value.work_locks,
114115
event_queue=None,
115116
)
116117
mock_acceptor_pool.return_value.setup.assert_called_once()
@@ -156,6 +157,7 @@ def test_main_with_no_flags(
156157
listener=mock_listener.return_value,
157158
executor_queues=mock_executor_pool.return_value.work_queues,
158159
executor_pids=mock_executor_pool.return_value.work_pids,
160+
executor_locks=mock_executor_pool.return_value.work_locks,
159161
event_queue=None,
160162
)
161163
mock_acceptor_pool.return_value.setup.assert_called_once()
@@ -197,6 +199,7 @@ def test_enable_events(
197199
event_queue=mock_event_manager.return_value.queue,
198200
executor_queues=mock_executor_pool.return_value.work_queues,
199201
executor_pids=mock_executor_pool.return_value.work_pids,
202+
executor_locks=mock_executor_pool.return_value.work_locks,
200203
)
201204
mock_acceptor_pool.return_value.setup.assert_called_once()
202205
mock_acceptor_pool.return_value.shutdown.assert_called_once()

0 commit comments

Comments
 (0)