Skip to content

[Work] Refactor as FD work #1049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,5 @@
(_py_obj_role, 'proxy.core.work.threadless.T'),
(_py_obj_role, 'proxy.core.work.work.T'),
(_py_obj_role, 'proxy.core.base.tcp_server.T'),
(_py_obj_role, 'proxy.core.work.fd.T'),
(_py_obj_role, 'proxy.core.work.fd.fd.T'),
]
7 changes: 4 additions & 3 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from multiprocessing import connection
from multiprocessing.reduction import recv_handle

from ..work import LocalExecutor, start_threaded_work, delegate_work_to_pool
from ..work import start_threaded_work, delegate_work_to_pool
from ..event import EventQueue
from ..work.fd import LocalFdExecutor
from ...common.flag import flags
from ...common.types import HostPort
from ...common.logger import Logger
Expand Down Expand Up @@ -99,7 +100,7 @@ def __init__(
# Internals
self._total: Optional[int] = None
self._local_work_queue: Optional['NonBlockingQueue'] = None
self._local: Optional[LocalExecutor] = None
self._local: Optional[LocalFdExecutor] = None
self._lthread: Optional[threading.Thread] = None

def accept(
Expand Down Expand Up @@ -195,7 +196,7 @@ def _recv_and_setup_socks(self) -> None:
def _start_local(self) -> None:
assert self.socks
self._local_work_queue = NonBlockingQueue()
self._local = LocalExecutor(
self._local = LocalFdExecutor(
iid=self.idd,
work_queue=self._local_work_queue,
flags=self.flags,
Expand Down
6 changes: 2 additions & 4 deletions proxy/core/work/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
"""
from .pool import ThreadlessPool
from .work import Work
from .local import LocalExecutor
from .remote import RemoteExecutor
from .local import BaseLocalExecutor
from .delegate import delegate_work_to_pool
from .threaded import start_threaded_work
from .threadless import Threadless
Expand All @@ -24,9 +23,8 @@
__all__ = [
'Work',
'Threadless',
'RemoteExecutor',
'LocalExecutor',
'ThreadlessPool',
'delegate_work_to_pool',
'start_threaded_work',
'BaseLocalExecutor',
]
20 changes: 20 additions & 0 deletions proxy/core/work/fd/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from .fd import ThreadlessFdExecutor
from .local import LocalFdExecutor
from .remote import RemoteFdExecutor


__all__ = [
'ThreadlessFdExecutor',
'LocalFdExecutor',
'RemoteFdExecutor',
]
14 changes: 6 additions & 8 deletions proxy/core/work/fd.py → proxy/core/work/fd/fd.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import logging
from typing import Any, TypeVar, Optional

from ..event import eventNames
from .threadless import Threadless
from ...common.types import HostPort, TcpOrTlsSocket
from ...event import eventNames
from ..threadless import Threadless
from ....common.types import HostPort, TcpOrTlsSocket


T = TypeVar('T')
Expand All @@ -23,12 +23,10 @@


class ThreadlessFdExecutor(Threadless[T]):
"""A threadless executor which handles file descriptors
and works with read/write events over a socket."""

def work(
self,
*args: Any,
**kwargs: Any,
) -> None:
def work(self, **kwargs: Any) -> None:
fileno: int = kwargs['fileno']
addr: Optional[HostPort] = kwargs.get('addr', None)
conn: Optional[TcpOrTlsSocket] = \
Expand Down
50 changes: 50 additions & 0 deletions proxy/core/work/fd/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import queue
import asyncio
import contextlib
from typing import Any, Optional

from .fd import ThreadlessFdExecutor
from ....common.backports import NonBlockingQueue


class LocalFdExecutor(ThreadlessFdExecutor[NonBlockingQueue]):
"""A threadless executor implementation which uses a queue to receive new work."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._loop: Optional[asyncio.AbstractEventLoop] = None

@property
def loop(self) -> Optional[asyncio.AbstractEventLoop]:
if self._loop is None:
self._loop = asyncio.get_event_loop_policy().new_event_loop()
return self._loop

def work_queue_fileno(self) -> Optional[int]:
return None

def receive_from_work_queue(self) -> bool:
with contextlib.suppress(queue.Empty):
work = self.work_queue.get()
if isinstance(work, bool) and work is False:
return True
self.initialize(work)
return False

def initialize(self, work: Any) -> None:
assert isinstance(work, tuple)
conn, addr = work
# NOTE: Here we are assuming to receive a connection object
# and not a fileno because we are a LocalExecutor.
fileno = conn.fileno()
self.work(fileno=fileno, addr=addr, conn=conn)
18 changes: 7 additions & 11 deletions proxy/core/work/remote.py → proxy/core/work/fd/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@
:license: BSD, see LICENSE for more details.
"""
import asyncio
import logging
from typing import Any, Optional
from multiprocessing import connection
from multiprocessing.reduction import recv_handle

from .fd import ThreadlessFdExecutor


logger = logging.getLogger(__name__)


class RemoteExecutor(ThreadlessFdExecutor[connection.Connection]):
class RemoteFdExecutor(ThreadlessFdExecutor[connection.Connection]):
"""A threadless executor implementation which receives work over a connection.

NOTE: RemoteExecutor uses ``recv_handle`` to accept file descriptors.
Expand All @@ -40,12 +36,6 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:
self._loop = asyncio.get_event_loop_policy().get_event_loop()
return self._loop

def work_queue_fileno(self) -> Optional[int]:
return self.work_queue.fileno()

def close_work_queue(self) -> None:
self.work_queue.close()

def receive_from_work_queue(self) -> bool:
# Acceptor will not send address for
# unix socket domain environments.
Expand All @@ -55,3 +45,9 @@ def receive_from_work_queue(self) -> bool:
fileno = recv_handle(self.work_queue)
self.work(fileno=fileno, addr=addr)
return False

def work_queue_fileno(self) -> Optional[int]:
return self.work_queue.fileno()

def close_work_queue(self) -> None:
self.work_queue.close()
27 changes: 3 additions & 24 deletions proxy/core/work/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,14 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import queue
import asyncio
import logging
import contextlib
from typing import Any, Optional

from .fd import ThreadlessFdExecutor
from ...common.backports import ( # noqa: W0611, F401 pylint: disable=unused-import
NonBlockingQueue,
)
from .threadless import Threadless
from ...common.backports import NonBlockingQueue


logger = logging.getLogger(__name__)


class LocalExecutor(ThreadlessFdExecutor['NonBlockingQueue']):
class BaseLocalExecutor(Threadless[NonBlockingQueue]):
"""A threadless executor implementation which uses a queue to receive new work."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
Expand All @@ -38,16 +30,3 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:

def work_queue_fileno(self) -> Optional[int]:
return None

def receive_from_work_queue(self) -> bool:
with contextlib.suppress(queue.Empty):
work = self.work_queue.get()
if isinstance(work, bool) and work is False:
return True
assert isinstance(work, tuple)
conn, addr = work
# NOTE: Here we are assuming to receive a connection object
# and not a fileno because we are a LocalExecutor.
fileno = conn.fileno()
self.work(fileno=fileno, addr=addr, conn=conn)
return False
15 changes: 10 additions & 5 deletions proxy/core/work/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
import logging
import argparse
import multiprocessing
from typing import TYPE_CHECKING, Any, List, Optional
from typing import TYPE_CHECKING, Any, List, Type, TypeVar, Optional
from multiprocessing import connection

from .remote import RemoteExecutor
from ...common.flag import flags
from ...common.constants import DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS


if TYPE_CHECKING: # pragma: no cover
from ..event import EventQueue
from .threadless import Threadless

T = TypeVar('T', bound='Threadless[Any]')

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,6 +72,7 @@ class ThreadlessPool:
def __init__(
self,
flags: argparse.Namespace,
executor_klass: Type['T'],
event_queue: Optional['EventQueue'] = None,
) -> None:
self.flags = flags
Expand All @@ -79,7 +82,9 @@ def __init__(
self.work_pids: List[int] = []
self.work_locks: List['multiprocessing.synchronize.Lock'] = []
# List of threadless workers
self._workers: List[RemoteExecutor] = []
self._executor_klass = executor_klass
# FIXME: Instead of Any type must be the executor klass
self._workers: List[Any] = []
self._processes: List[multiprocessing.Process] = []

def __enter__(self) -> 'ThreadlessPool':
Expand Down Expand Up @@ -115,8 +120,8 @@ def _start_worker(self, index: int) -> None:
self.work_locks.append(multiprocessing.Lock())
pipe = multiprocessing.Pipe()
self.work_queues.append(pipe[0])
w = RemoteExecutor(
iid=index,
w = self._executor_klass(
iid=str(index),
work_queue=pipe[1],
flags=self.flags,
event_queue=self.event_queue,
Expand Down
2 changes: 1 addition & 1 deletion proxy/core/work/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def work_queue_fileno(self) -> Optional[int]:
raise NotImplementedError()

@abstractmethod
def work(self, *args: Any, **kwargs: Any) -> None:
def work(self, **kwargs: Any) -> None:
raise NotImplementedError()

def close_work_queue(self) -> None:
Expand Down
6 changes: 2 additions & 4 deletions proxy/core/work/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,14 @@ def create(**kwargs: Any) -> T:
creation of externally defined work class objects."""
raise NotImplementedError()

@abstractmethod
async def get_events(self) -> SelectableEvents:
"""Return sockets and events (read or write) that we are interested in."""
return {} # pragma: no cover

@abstractmethod
async def handle_events(
self,
readables: Readables,
writables: Writables,
_readables: Readables,
_writables: Writables,
) -> bool:
"""Handle readable and writable sockets.

Expand Down
2 changes: 2 additions & 0 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .core.event import EventManager
from .common.flag import FlagParser, flags
from .common.utils import bytes_
from .core.work.fd import RemoteFdExecutor
from .core.acceptor import AcceptorPool
from .core.listener import ListenerPool
from .common.constants import (
Expand Down Expand Up @@ -213,6 +214,7 @@ def setup(self) -> None:
self.executors = ThreadlessPool(
flags=self.flags,
event_queue=event_queue,
executor_klass=RemoteFdExecutor,
)
self.executors.setup()
# Setup acceptors
Expand Down
4 changes: 4 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from proxy.proxy import main, entry_point
from proxy.common.utils import bytes_
from proxy.core.work.fd import RemoteFdExecutor
from proxy.common.constants import ( # noqa: WPS450
DEFAULT_PORT, DEFAULT_PLUGINS, DEFAULT_TIMEOUT, DEFAULT_KEY_FILE,
DEFAULT_LOG_FILE, DEFAULT_PAC_FILE, DEFAULT_PID_FILE, PLUGIN_DASHBOARD,
Expand Down Expand Up @@ -119,6 +120,7 @@ def test_entry_point(
mock_executor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
event_queue=None,
executor_klass=RemoteFdExecutor,
)
mock_acceptor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
Expand Down Expand Up @@ -169,6 +171,7 @@ def test_main_with_no_flags(
mock_executor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
event_queue=None,
executor_klass=RemoteFdExecutor,
)
mock_acceptor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
Expand Down Expand Up @@ -214,6 +217,7 @@ def test_enable_events(
mock_executor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
event_queue=mock_event_manager.return_value.queue,
executor_klass=RemoteFdExecutor,
)
mock_acceptor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
Expand Down