Skip to content

Enhancements #763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,16 @@ lib-coverage:
$(OPEN) htmlcov/index.html

lib-profile:
sudo py-spy record -o profile.svg -t -F -s -- python -m proxy
sudo py-spy record \
-o profile.svg \
-t -F -s -- \
python -m proxy \
--num-acceptors 1 \
--num-workers 1 \
--disable-http-proxy \
--enable-web-server \
--plugin proxy.plugin.WebServerPlugin \
--log-file /tmp/proxy.log

devtools:
pushd dashboard && npm run devtools && popd
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,5 @@
(_py_class_role, 'unittest.result.TestResult'),
(_py_class_role, 'UUID'),
(_py_class_role, 'WebsocketFrame'),
(_py_class_role, 'Url'),
]
8 changes: 3 additions & 5 deletions proxy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,10 @@ def find_http_line(raw: bytes) -> Tuple[Optional[bytes], bytes]:
"""Find and returns first line ending in CRLF along with following buffer.

If no ending CRLF is found, line is None."""
pos = raw.find(CRLF)
if pos == -1:
parts = raw.split(CRLF)
if len(parts) == 1:
return None, raw
line = raw[:pos]
rest = raw[pos + len(CRLF):]
return line, rest
return parts[0], CRLF.join(parts[1:])


def wrap_socket(
Expand Down
3 changes: 3 additions & 0 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def __init__(
lock: multiprocessing.synchronize.Lock,
executor_queues: List[connection.Connection],
executor_pids: List[int],
executor_locks: List[multiprocessing.synchronize.Lock],
event_queue: Optional[EventQueue] = None,
) -> None:
super().__init__()
Expand All @@ -94,6 +95,7 @@ def __init__(
# Available executors
self.executor_queues = executor_queues
self.executor_pids = executor_pids
self.executor_locks = executor_locks
# Selector
self.running = multiprocessing.Event()
self.selector: Optional[selectors.DefaultSelector] = None
Expand Down Expand Up @@ -152,6 +154,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
ThreadlessPool.delegate(
self.executor_pids[index],
self.executor_queues[index],
self.executor_locks[index],
conn,
addr,
self.flags.unix_socket_path,
Expand Down
47 changes: 26 additions & 21 deletions proxy/core/acceptor/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def __init__(
# Threadless worker communication states
self.work_queues: List[connection.Connection] = []
self.work_pids: List[int] = []
self.work_locks: List[multiprocessing.synchronize.Lock] = []
# List of threadless workers
self._workers: List[Threadless] = []

Expand All @@ -108,22 +109,24 @@ def __exit__(self, *args: Any) -> None:
def delegate(
worker_pid: int,
work_queue: connection.Connection,
work_lock: multiprocessing.synchronize.Lock,
conn: socket.socket,
addr: Optional[Tuple[str, int]],
unix_socket_path: Optional[str] = None,
) -> None:
"""Utility method to delegate a work to threadless executor pool."""
# Accepted client address is empty string for
# unix socket domain, avoid sending empty string
# for optimization.
if not unix_socket_path:
work_queue.send(addr)
send_handle(
work_queue,
conn.fileno(),
worker_pid,
)
conn.close()
with work_lock:
# Accepted client address is empty string for
# unix socket domain, avoid sending empty string
# for optimization.
if not unix_socket_path:
work_queue.send(addr)
send_handle(
work_queue,
conn.fileno(),
worker_pid,
)
conn.close()

@staticmethod
def start_threaded_work(
Expand Down Expand Up @@ -168,8 +171,7 @@ def setup(self) -> None:
def shutdown(self) -> None:
"""Shutdown threadless processes."""
if is_threadless(self.flags.threadless, self.flags.threaded):
for _ in range(self.flags.num_workers):
self._shutdown_worker()
self._shutdown_workers()
logger.info(
'Stopped {0} threadless workers'.format(
self.flags.num_workers,
Expand All @@ -178,6 +180,7 @@ def shutdown(self) -> None:

def _start_worker(self, index: int) -> None:
"""Starts a threadless worker."""
self.work_locks.append(multiprocessing.Lock())
pipe = multiprocessing.Pipe()
self.work_queues.append(pipe[0])
w = Threadless(
Expand All @@ -191,12 +194,14 @@ def _start_worker(self, index: int) -> None:
self.work_pids.append(w.pid)
logger.debug('Started threadless#%d process#%d', index, w.pid)

def _shutdown_worker(self) -> None:
def _shutdown_workers(self) -> None:
"""Pop a running threadless worker and clean it up."""
w = self._workers.pop()
pid = w.pid
w.running.set()
w.join()
self.work_pids.pop()
self.work_queues.pop().close()
logger.debug('Stopped threadless process#%d', pid)
for index in range(self.flags.num_workers):
self._workers[index].running.set()
for index in range(self.flags.num_workers):
pid = self._workers[index].pid
self._workers[index].join()
self.work_pids.pop()
self.work_queues.pop().close()
logger.debug('Stopped threadless process#%d', pid)
self.work_locks = []
3 changes: 3 additions & 0 deletions proxy/core/acceptor/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(
listener: Listener,
executor_queues: List[connection.Connection],
executor_pids: List[int],
executor_locks: List[multiprocessing.synchronize.Lock],
event_queue: Optional[EventQueue] = None,
) -> None:
self.flags = flags
Expand All @@ -80,6 +81,7 @@ def __init__(
# Available executors
self.executor_queues: List[connection.Connection] = executor_queues
self.executor_pids: List[int] = executor_pids
self.executor_locks: List[multiprocessing.synchronize.Lock] = executor_locks
# Eventing core queue
self.event_queue: Optional[EventQueue] = event_queue
# Acceptor process instances
Expand Down Expand Up @@ -128,6 +130,7 @@ def _start(self) -> None:
event_queue=self.event_queue,
executor_queues=self.executor_queues,
executor_pids=self.executor_pids,
executor_locks=self.executor_locks,
)
acceptor.start()
logger.debug(
Expand Down
2 changes: 0 additions & 2 deletions proxy/plugin/reverse_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class ReverseProxyPlugin(TcpUpstreamConnectionHandler, HttpWebServerBasePlugin):
"url": "http://localhost/get"
}
```

"""

# TODO: We must use nginx python parser and
Expand All @@ -69,7 +68,6 @@ class ReverseProxyPlugin(TcpUpstreamConnectionHandler, HttpWebServerBasePlugin):

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
# Chosen upstream proxy_pass url
self.choice: Optional[Url] = None

def handle_upstream_data(self, raw: memoryview) -> None:
Expand Down
1 change: 1 addition & 0 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def setup(self) -> None:
listener=self.listener,
executor_queues=self.executors.work_queues,
executor_pids=self.executors.work_pids,
executor_locks=self.executors.work_locks,
event_queue=event_queue,
)
self.acceptors.setup()
Expand Down
1 change: 1 addition & 0 deletions tests/core/test_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def setUp(self) -> None:
lock=multiprocessing.Lock(),
executor_queues=[],
executor_pids=[],
executor_locks=[],
)

@mock.patch('selectors.DefaultSelector')
Expand Down
2 changes: 1 addition & 1 deletion tests/core/test_acceptor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def test_setup_and_shutdown(

pool = AcceptorPool(
flags=flags, listener=mock_listener.return_value,
executor_queues=[], executor_pids=[],
executor_queues=[], executor_pids=[], executor_locks=[],
)
pool.setup()

Expand Down
3 changes: 3 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def test_entry_point(
listener=mock_listener.return_value,
executor_queues=mock_executor_pool.return_value.work_queues,
executor_pids=mock_executor_pool.return_value.work_pids,
executor_locks=mock_executor_pool.return_value.work_locks,
event_queue=None,
)
mock_acceptor_pool.return_value.setup.assert_called_once()
Expand Down Expand Up @@ -156,6 +157,7 @@ def test_main_with_no_flags(
listener=mock_listener.return_value,
executor_queues=mock_executor_pool.return_value.work_queues,
executor_pids=mock_executor_pool.return_value.work_pids,
executor_locks=mock_executor_pool.return_value.work_locks,
event_queue=None,
)
mock_acceptor_pool.return_value.setup.assert_called_once()
Expand Down Expand Up @@ -197,6 +199,7 @@ def test_enable_events(
event_queue=mock_event_manager.return_value.queue,
executor_queues=mock_executor_pool.return_value.work_queues,
executor_pids=mock_executor_pool.return_value.work_pids,
executor_locks=mock_executor_pool.return_value.work_locks,
)
mock_acceptor_pool.return_value.setup.assert_called_once()
mock_acceptor_pool.return_value.shutdown.assert_called_once()
Expand Down