diff --git a/docs/conf.py b/docs/conf.py index a370246a74..cfa55f2a38 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -322,5 +322,5 @@ (_py_obj_role, 'proxy.core.work.threadless.T'), (_py_obj_role, 'proxy.core.work.work.T'), (_py_obj_role, 'proxy.core.base.tcp_server.T'), - (_py_obj_role, 'proxy.core.work.fd.T'), + (_py_obj_role, 'proxy.core.work.fd.fd.T'), ] diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 640595f331..0df4c2a555 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -23,8 +23,9 @@ from multiprocessing import connection from multiprocessing.reduction import recv_handle -from ..work import LocalExecutor, start_threaded_work, delegate_work_to_pool +from ..work import start_threaded_work, delegate_work_to_pool from ..event import EventQueue +from ..work.fd import LocalFdExecutor from ...common.flag import flags from ...common.types import HostPort from ...common.logger import Logger @@ -99,7 +100,7 @@ def __init__( # Internals self._total: Optional[int] = None self._local_work_queue: Optional['NonBlockingQueue'] = None - self._local: Optional[LocalExecutor] = None + self._local: Optional[LocalFdExecutor] = None self._lthread: Optional[threading.Thread] = None def accept( @@ -195,7 +196,7 @@ def _recv_and_setup_socks(self) -> None: def _start_local(self) -> None: assert self.socks self._local_work_queue = NonBlockingQueue() - self._local = LocalExecutor( + self._local = LocalFdExecutor( iid=self.idd, work_queue=self._local_work_queue, flags=self.flags, diff --git a/proxy/core/work/__init__.py b/proxy/core/work/__init__.py index a89add7841..d09f7d3dd4 100644 --- a/proxy/core/work/__init__.py +++ b/proxy/core/work/__init__.py @@ -14,8 +14,7 @@ """ from .pool import ThreadlessPool from .work import Work -from .local import LocalExecutor -from .remote import RemoteExecutor +from .local import BaseLocalExecutor from .delegate import delegate_work_to_pool from .threaded import start_threaded_work from .threadless import Threadless @@ -24,9 +23,8 @@ __all__ = [ 'Work', 'Threadless', - 'RemoteExecutor', - 'LocalExecutor', 'ThreadlessPool', 'delegate_work_to_pool', 'start_threaded_work', + 'BaseLocalExecutor', ] diff --git a/proxy/core/work/fd/__init__.py b/proxy/core/work/fd/__init__.py new file mode 100644 index 0000000000..f277bd5a8e --- /dev/null +++ b/proxy/core/work/fd/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +from .fd import ThreadlessFdExecutor +from .local import LocalFdExecutor +from .remote import RemoteFdExecutor + + +__all__ = [ + 'ThreadlessFdExecutor', + 'LocalFdExecutor', + 'RemoteFdExecutor', +] diff --git a/proxy/core/work/fd.py b/proxy/core/work/fd/fd.py similarity index 86% rename from proxy/core/work/fd.py rename to proxy/core/work/fd/fd.py index 797d478b23..71a52af1da 100644 --- a/proxy/core/work/fd.py +++ b/proxy/core/work/fd/fd.py @@ -12,9 +12,9 @@ import logging from typing import Any, TypeVar, Optional -from ..event import eventNames -from .threadless import Threadless -from ...common.types import HostPort, TcpOrTlsSocket +from ...event import eventNames +from ..threadless import Threadless +from ....common.types import HostPort, TcpOrTlsSocket T = TypeVar('T') @@ -23,12 +23,10 @@ class ThreadlessFdExecutor(Threadless[T]): + """A threadless executor which handles file descriptors + and works with read/write events over a socket.""" - def work( - self, - *args: Any, - **kwargs: Any, - ) -> None: + def work(self, **kwargs: Any) -> None: fileno: int = kwargs['fileno'] addr: Optional[HostPort] = kwargs.get('addr', None) conn: Optional[TcpOrTlsSocket] = \ diff --git a/proxy/core/work/fd/local.py b/proxy/core/work/fd/local.py new file mode 100644 index 0000000000..0c62d30080 --- /dev/null +++ b/proxy/core/work/fd/local.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import queue +import asyncio +import contextlib +from typing import Any, Optional + +from .fd import ThreadlessFdExecutor +from ....common.backports import NonBlockingQueue + + +class LocalFdExecutor(ThreadlessFdExecutor[NonBlockingQueue]): + """A threadless executor implementation which uses a queue to receive new work.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._loop: Optional[asyncio.AbstractEventLoop] = None + + @property + def loop(self) -> Optional[asyncio.AbstractEventLoop]: + if self._loop is None: + self._loop = asyncio.get_event_loop_policy().new_event_loop() + return self._loop + + def work_queue_fileno(self) -> Optional[int]: + return None + + def receive_from_work_queue(self) -> bool: + with contextlib.suppress(queue.Empty): + work = self.work_queue.get() + if isinstance(work, bool) and work is False: + return True + self.initialize(work) + return False + + def initialize(self, work: Any) -> None: + assert isinstance(work, tuple) + conn, addr = work + # NOTE: Here we are assuming to receive a connection object + # and not a fileno because we are a LocalExecutor. + fileno = conn.fileno() + self.work(fileno=fileno, addr=addr, conn=conn) diff --git a/proxy/core/work/remote.py b/proxy/core/work/fd/remote.py similarity index 93% rename from proxy/core/work/remote.py rename to proxy/core/work/fd/remote.py index 6168bb33f6..f7ad87a680 100644 --- a/proxy/core/work/remote.py +++ b/proxy/core/work/fd/remote.py @@ -9,7 +9,6 @@ :license: BSD, see LICENSE for more details. """ import asyncio -import logging from typing import Any, Optional from multiprocessing import connection from multiprocessing.reduction import recv_handle @@ -17,10 +16,7 @@ from .fd import ThreadlessFdExecutor -logger = logging.getLogger(__name__) - - -class RemoteExecutor(ThreadlessFdExecutor[connection.Connection]): +class RemoteFdExecutor(ThreadlessFdExecutor[connection.Connection]): """A threadless executor implementation which receives work over a connection. NOTE: RemoteExecutor uses ``recv_handle`` to accept file descriptors. @@ -40,12 +36,6 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]: self._loop = asyncio.get_event_loop_policy().get_event_loop() return self._loop - def work_queue_fileno(self) -> Optional[int]: - return self.work_queue.fileno() - - def close_work_queue(self) -> None: - self.work_queue.close() - def receive_from_work_queue(self) -> bool: # Acceptor will not send address for # unix socket domain environments. @@ -55,3 +45,9 @@ def receive_from_work_queue(self) -> bool: fileno = recv_handle(self.work_queue) self.work(fileno=fileno, addr=addr) return False + + def work_queue_fileno(self) -> Optional[int]: + return self.work_queue.fileno() + + def close_work_queue(self) -> None: + self.work_queue.close() diff --git a/proxy/core/work/local.py b/proxy/core/work/local.py index cdb26bd74f..9a72fabcf1 100644 --- a/proxy/core/work/local.py +++ b/proxy/core/work/local.py @@ -8,22 +8,14 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. """ -import queue import asyncio -import logging -import contextlib from typing import Any, Optional -from .fd import ThreadlessFdExecutor -from ...common.backports import ( # noqa: W0611, F401 pylint: disable=unused-import - NonBlockingQueue, -) +from .threadless import Threadless +from ...common.backports import NonBlockingQueue -logger = logging.getLogger(__name__) - - -class LocalExecutor(ThreadlessFdExecutor['NonBlockingQueue']): +class BaseLocalExecutor(Threadless[NonBlockingQueue]): """A threadless executor implementation which uses a queue to receive new work.""" def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -38,16 +30,3 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]: def work_queue_fileno(self) -> Optional[int]: return None - - def receive_from_work_queue(self) -> bool: - with contextlib.suppress(queue.Empty): - work = self.work_queue.get() - if isinstance(work, bool) and work is False: - return True - assert isinstance(work, tuple) - conn, addr = work - # NOTE: Here we are assuming to receive a connection object - # and not a fileno because we are a LocalExecutor. - fileno = conn.fileno() - self.work(fileno=fileno, addr=addr, conn=conn) - return False diff --git a/proxy/core/work/pool.py b/proxy/core/work/pool.py index 96c43a7228..5458f0a89d 100644 --- a/proxy/core/work/pool.py +++ b/proxy/core/work/pool.py @@ -11,16 +11,18 @@ import logging import argparse import multiprocessing -from typing import TYPE_CHECKING, Any, List, Optional +from typing import TYPE_CHECKING, Any, List, Type, TypeVar, Optional from multiprocessing import connection -from .remote import RemoteExecutor from ...common.flag import flags from ...common.constants import DEFAULT_THREADLESS, DEFAULT_NUM_WORKERS if TYPE_CHECKING: # pragma: no cover from ..event import EventQueue + from .threadless import Threadless + +T = TypeVar('T', bound='Threadless[Any]') logger = logging.getLogger(__name__) @@ -70,6 +72,7 @@ class ThreadlessPool: def __init__( self, flags: argparse.Namespace, + executor_klass: Type['T'], event_queue: Optional['EventQueue'] = None, ) -> None: self.flags = flags @@ -79,7 +82,9 @@ def __init__( self.work_pids: List[int] = [] self.work_locks: List['multiprocessing.synchronize.Lock'] = [] # List of threadless workers - self._workers: List[RemoteExecutor] = [] + self._executor_klass = executor_klass + # FIXME: Instead of Any type must be the executor klass + self._workers: List[Any] = [] self._processes: List[multiprocessing.Process] = [] def __enter__(self) -> 'ThreadlessPool': @@ -115,8 +120,8 @@ def _start_worker(self, index: int) -> None: self.work_locks.append(multiprocessing.Lock()) pipe = multiprocessing.Pipe() self.work_queues.append(pipe[0]) - w = RemoteExecutor( - iid=index, + w = self._executor_klass( + iid=str(index), work_queue=pipe[1], flags=self.flags, event_queue=self.event_queue, diff --git a/proxy/core/work/threadless.py b/proxy/core/work/threadless.py index ac76c7eed7..216a5bd1cf 100644 --- a/proxy/core/work/threadless.py +++ b/proxy/core/work/threadless.py @@ -120,7 +120,7 @@ def work_queue_fileno(self) -> Optional[int]: raise NotImplementedError() @abstractmethod - def work(self, *args: Any, **kwargs: Any) -> None: + def work(self, **kwargs: Any) -> None: raise NotImplementedError() def close_work_queue(self) -> None: diff --git a/proxy/core/work/work.py b/proxy/core/work/work.py index 08560103de..0cb4b673da 100644 --- a/proxy/core/work/work.py +++ b/proxy/core/work/work.py @@ -55,16 +55,14 @@ def create(**kwargs: Any) -> T: creation of externally defined work class objects.""" raise NotImplementedError() - @abstractmethod async def get_events(self) -> SelectableEvents: """Return sockets and events (read or write) that we are interested in.""" return {} # pragma: no cover - @abstractmethod async def handle_events( self, - readables: Readables, - writables: Writables, + _readables: Readables, + _writables: Writables, ) -> bool: """Handle readable and writable sockets. diff --git a/proxy/proxy.py b/proxy/proxy.py index fcf8f79e90..522ce41adf 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -21,6 +21,7 @@ from .core.event import EventManager from .common.flag import FlagParser, flags from .common.utils import bytes_ +from .core.work.fd import RemoteFdExecutor from .core.acceptor import AcceptorPool from .core.listener import ListenerPool from .common.constants import ( @@ -213,6 +214,7 @@ def setup(self) -> None: self.executors = ThreadlessPool( flags=self.flags, event_queue=event_queue, + executor_klass=RemoteFdExecutor, ) self.executors.setup() # Setup acceptors diff --git a/tests/test_main.py b/tests/test_main.py index ce341c8ac6..1d4c11538f 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -16,6 +16,7 @@ from proxy.proxy import main, entry_point from proxy.common.utils import bytes_ +from proxy.core.work.fd import RemoteFdExecutor from proxy.common.constants import ( # noqa: WPS450 DEFAULT_PORT, DEFAULT_PLUGINS, DEFAULT_TIMEOUT, DEFAULT_KEY_FILE, DEFAULT_LOG_FILE, DEFAULT_PAC_FILE, DEFAULT_PID_FILE, PLUGIN_DASHBOARD, @@ -119,6 +120,7 @@ def test_entry_point( mock_executor_pool.assert_called_once_with( flags=mock_initialize.return_value, event_queue=None, + executor_klass=RemoteFdExecutor, ) mock_acceptor_pool.assert_called_once_with( flags=mock_initialize.return_value, @@ -169,6 +171,7 @@ def test_main_with_no_flags( mock_executor_pool.assert_called_once_with( flags=mock_initialize.return_value, event_queue=None, + executor_klass=RemoteFdExecutor, ) mock_acceptor_pool.assert_called_once_with( flags=mock_initialize.return_value, @@ -214,6 +217,7 @@ def test_enable_events( mock_executor_pool.assert_called_once_with( flags=mock_initialize.return_value, event_queue=mock_event_manager.return_value.queue, + executor_klass=RemoteFdExecutor, ) mock_acceptor_pool.assert_called_once_with( flags=mock_initialize.return_value,