Skip to content

[Work] kwargs independent work_klass creation and work core #1051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2265,8 +2265,8 @@ usage: -m [-h] [--tunnel-hostname TUNNEL_HOSTNAME] [--tunnel-port TUNNEL_PORT]
[--tunnel-username TUNNEL_USERNAME]
[--tunnel-ssh-key TUNNEL_SSH_KEY]
[--tunnel-ssh-key-passphrase TUNNEL_SSH_KEY_PASSPHRASE]
[--tunnel-remote-port TUNNEL_REMOTE_PORT] [--enable-events]
[--threadless] [--threaded] [--num-workers NUM_WORKERS]
[--tunnel-remote-port TUNNEL_REMOTE_PORT] [--threadless]
[--threaded] [--num-workers NUM_WORKERS] [--enable-events]
[--local-executor LOCAL_EXECUTOR] [--backlog BACKLOG]
[--hostname HOSTNAME] [--port PORT] [--ports PORTS [PORTS ...]]
[--port-file PORT_FILE] [--unix-socket-path UNIX_SOCKET_PATH]
Expand Down Expand Up @@ -2294,7 +2294,7 @@ usage: -m [-h] [--tunnel-hostname TUNNEL_HOSTNAME] [--tunnel-port TUNNEL_PORT]
[--filtered-client-ips FILTERED_CLIENT_IPS]
[--filtered-url-regex-config FILTERED_URL_REGEX_CONFIG]

proxy.py v2.4.0rc8.dev7+g1871027
proxy.py v2.4.0rc8.dev17+g59a4335.d20220123

options:
-h, --help show this help message and exit
Expand All @@ -2313,9 +2313,6 @@ options:
--tunnel-remote-port TUNNEL_REMOTE_PORT
Default: 8899. Remote port which will be forwarded
locally for proxy.
--enable-events Default: False. Enables core to dispatch lifecycle
events. Plugins can be used to subscribe for core
events.
--threadless Default: True. Enabled by default on Python 3.8+ (mac,
linux). When disabled a new thread is spawned to
handle each client connection.
Expand All @@ -2324,6 +2321,9 @@ options:
handle each client connection.
--num-workers NUM_WORKERS
Defaults to number of CPU cores.
--enable-events Default: False. Enables core to dispatch lifecycle
events. Plugins can be used to subscribe for core
events.
--local-executor LOCAL_EXECUTOR
Default: 1. Enabled by default. Use 0 to disable. When
enabled acceptors will make use of local (same
Expand Down Expand Up @@ -2422,8 +2422,9 @@ options:
Default: proxy.http.proxy.auth.AuthPlugin. Auth plugin
to use instead of default basic auth plugin.
--cache-dir CACHE_DIR
Default: A temporary directory. Flag only applicable
when cache plugin is used with on-disk storage.
Default: /Users/abhinavsingh/.proxy/cache. Flag only
applicable when cache plugin is used with on-disk
storage.
--proxy-pool PROXY_POOL
List of upstream proxies to use in the pool
--enable-web-server Default: False. Whether to enable
Expand Down
4 changes: 2 additions & 2 deletions examples/ssl_echo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class EchoSSLServerHandler(BaseTcpServerHandler[TcpClientConnection]):
"""Wraps client socket during initialization."""

@staticmethod
def create(**kwargs: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(**kwargs)
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(*args)

def initialize(self) -> None:
# Acceptors don't perform TLS handshake. Perform the same
Expand Down
120 changes: 120 additions & 0 deletions examples/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import time
import argparse
import threading
import multiprocessing
from typing import Any

from proxy.core.work import (
Work, ThreadlessPool, BaseLocalExecutor, BaseRemoteExecutor,
)
from proxy.common.flag import FlagParser
from proxy.common.backports import NonBlockingQueue


class Task:
"""This will be our work object."""

def __init__(self, payload: bytes) -> None:
self.payload = payload
print(payload)


class TaskWork(Work[Task]):
"""This will be our handler class, created for each received work."""

@staticmethod
def create(*args: Any) -> Task:
"""Work core doesn't know how to create work objects for us, so
we must provide an implementation of create method here."""
return Task(*args)


class LocalTaskExecutor(BaseLocalExecutor):
"""We'll define a local executor which is capable of receiving
log lines over a non blocking queue."""

def work(self, *args: Any) -> None:
task_id = int(time.time())
uid = '%s-%s' % (self.iid, task_id)
self.works[task_id] = self.create(uid, *args)


class RemoteTaskExecutor(BaseRemoteExecutor):

def work(self, *args: Any) -> None:
task_id = int(time.time())
uid = '%s-%s' % (self.iid, task_id)
self.works[task_id] = self.create(uid, *args)


def start_local(flags: argparse.Namespace) -> None:
work_queue = NonBlockingQueue()
executor = LocalTaskExecutor(iid=1, work_queue=work_queue, flags=flags)

t = threading.Thread(target=executor.run)
t.daemon = True
t.start()

try:
i = 0
while True:
work_queue.put(('%d' % i).encode('utf-8'))
i += 1
except KeyboardInterrupt:
pass
finally:
executor.running.set()
t.join()


def start_remote(flags: argparse.Namespace) -> None:
pipe = multiprocessing.Pipe()
work_queue = pipe[0]
executor = RemoteTaskExecutor(iid=1, work_queue=pipe[1], flags=flags)

p = multiprocessing.Process(target=executor.run)
p.daemon = True
p.start()

try:
i = 0
while True:
work_queue.send(('%d' % i).encode('utf-8'))
i += 1
except KeyboardInterrupt:
pass
finally:
executor.running.set()
p.join()


def start_remote_pool(flags: argparse.Namespace) -> None:
with ThreadlessPool(flags=flags, executor_klass=RemoteTaskExecutor) as pool:
try:
i = 0
while True:
work_queue = pool.work_queues[i % flags.num_workers]
work_queue.send(('%d' % i).encode('utf-8'))
i += 1
except KeyboardInterrupt:
pass


if __name__ == '__main__':
flags = FlagParser.initialize(
['--disable-http-proxy'],
work_klass=TaskWork,
)
start_remote_pool(flags)
# start_remote(flags)
# start_local(flags)
4 changes: 2 additions & 2 deletions examples/tcp_echo_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ class EchoServerHandler(BaseTcpServerHandler[TcpClientConnection]):
"""Sets client socket to non-blocking during initialization."""

@staticmethod
def create(**kwargs: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(**kwargs)
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(*args)

def initialize(self) -> None:
self.work.connection.setblocking(False)
Expand Down
4 changes: 2 additions & 2 deletions proxy/core/base/tcp_tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def handle_data(self, data: memoryview) -> Optional[bool]:
pass # pragma: no cover

@staticmethod
def create(**kwargs: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(**kwargs)
def create(*args: Any) -> TcpClientConnection: # pragma: no cover
return TcpClientConnection(*args)

def initialize(self) -> None:
self.work.connection.setblocking(False)
Expand Down
6 changes: 3 additions & 3 deletions proxy/core/connection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def __init__(self) -> None:
self.pools: Dict[HostPort, Set[TcpServerConnection]] = {}

@staticmethod
def create(**kwargs: Any) -> TcpServerConnection: # pragma: no cover
return TcpServerConnection(**kwargs)
def create(*args: Any) -> TcpServerConnection: # pragma: no cover
return TcpServerConnection(*args)

def acquire(self, addr: HostPort) -> Tuple[bool, TcpServerConnection]:
"""Returns a reusable connection from the pool.
Expand Down Expand Up @@ -154,7 +154,7 @@ def add(self, addr: HostPort) -> TcpServerConnection:

NOTE: You must not use the returned connection, instead use `acquire`.
"""
new_conn = self.create(host=addr[0], port=addr[1])
new_conn = self.create(addr[0], addr[1])
new_conn.connect()
self._add(new_conn)
logger.debug(
Expand Down
20 changes: 5 additions & 15 deletions proxy/core/work/fd/fd.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,16 @@ class ThreadlessFdExecutor(Threadless[T]):
"""A threadless executor which handles file descriptors
and works with read/write events over a socket."""

def work(self, **kwargs: Any) -> None:
fileno: int = kwargs['fileno']
addr: Optional[HostPort] = kwargs.get('addr', None)
conn: Optional[TcpOrTlsSocket] = \
kwargs.get('conn', None)
def work(self, *args: Any) -> None:
fileno: int = args[0]
addr: Optional[HostPort] = args[1]
conn: Optional[TcpOrTlsSocket] = args[2]
conn = conn or socket.fromfd(
fileno, family=socket.AF_INET if self.flags.hostname.version == 4 else socket.AF_INET6,
type=socket.SOCK_STREAM,
)
uid = '%s-%s-%s' % (self.iid, self._total, fileno)
self.works[fileno] = self.flags.work_klass(
self.flags.work_klass.create(
conn=conn,
addr=addr,
),
flags=self.flags,
event_queue=self.event_queue,
uid=uid,
upstream_conn_pool=self._upstream_conn_pool,
)
self.works[fileno] = self.create(uid, conn, addr)
self.works[fileno].publish_event(
event_name=eventNames.WORK_STARTED,
event_payload={'fileno': fileno, 'addr': addr},
Expand Down
2 changes: 1 addition & 1 deletion proxy/core/work/fd/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,4 @@ def initialize(self, work: Any) -> None:
# NOTE: Here we are assuming to receive a connection object
# and not a fileno because we are a LocalExecutor.
fileno = conn.fileno()
self.work(fileno=fileno, addr=addr, conn=conn)
self.work(fileno, addr, conn)
2 changes: 1 addition & 1 deletion proxy/core/work/fd/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def receive_from_work_queue(self) -> bool:
if not self.flags.unix_socket_path:
addr = self.work_queue.recv()
fileno = recv_handle(self.work_queue)
self.work(fileno=fileno, addr=addr)
self.work(fileno, addr, None)
return False

def work_queue_fileno(self) -> Optional[int]:
Expand Down
10 changes: 10 additions & 0 deletions proxy/core/work/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import queue
import asyncio
import contextlib
from typing import Any, Optional

from .threadless import Threadless
Expand All @@ -30,3 +32,11 @@ def loop(self) -> Optional[asyncio.AbstractEventLoop]:

def work_queue_fileno(self) -> Optional[int]:
return None

def receive_from_work_queue(self) -> bool:
with contextlib.suppress(queue.Empty):
work = self.work_queue.get()
if isinstance(work, bool) and work is False:
return True
self.work(work)
return False
4 changes: 4 additions & 0 deletions proxy/core/work/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ def work_queue_fileno(self) -> Optional[int]:

def close_work_queue(self) -> None:
self.work_queue.close()

def receive_from_work_queue(self) -> bool:
self.work(self.work_queue.recv())
return False
2 changes: 1 addition & 1 deletion proxy/core/work/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def start_threaded_work(
) -> Tuple['Work[T]', threading.Thread]:
"""Utility method to start a work in a new thread."""
work = flags.work_klass(
flags.work_klass.create(conn=conn, addr=addr),
flags.work_klass.create(conn, addr),
flags=flags,
event_queue=event_queue,
upstream_conn_pool=None,
Expand Down
14 changes: 13 additions & 1 deletion proxy/core/work/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from abc import ABC, abstractmethod
from typing import (
TYPE_CHECKING, Any, Set, Dict, List, Tuple, Generic, TypeVar, Optional,
cast,
)

from ...common.types import Readables, Writables, SelectableEvents
Expand Down Expand Up @@ -120,9 +121,20 @@ def work_queue_fileno(self) -> Optional[int]:
raise NotImplementedError()

@abstractmethod
def work(self, **kwargs: Any) -> None:
def work(self, *args: Any) -> None:
raise NotImplementedError()

def create(self, uid: str, *args: Any) -> 'Work[T]':
return cast(
'Work[T]', self.flags.work_klass(
self.flags.work_klass.create(*args),
flags=self.flags,
event_queue=self.event_queue,
uid=uid,
upstream_conn_pool=self._upstream_conn_pool,
),
)

def close_work_queue(self) -> None:
"""Only called if ``work_queue_fileno`` returns an integer.
If an fd is select-able for work queue, make sure
Expand Down
2 changes: 1 addition & 1 deletion proxy/core/work/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(

@staticmethod
@abstractmethod
def create(**kwargs: Any) -> T:
def create(*args: Any) -> T:
"""Implementations are responsible for creation of work objects
from incoming args. This helps keep work core agnostic to
creation of externally defined work class objects."""
Expand Down
4 changes: 2 additions & 2 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def __init__(self, *args: Any, **kwargs: Any):
##

@staticmethod
def create(**kwargs: Any) -> HttpClientConnection: # pragma: no cover
return HttpClientConnection(**kwargs)
def create(*args: Any) -> HttpClientConnection: # pragma: no cover
return HttpClientConnection(*args)

def initialize(self) -> None:
super().initialize()
Expand Down
4 changes: 2 additions & 2 deletions proxy/socks/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)

@staticmethod
def create(**kwargs: Any) -> SocksClientConnection:
return SocksClientConnection(**kwargs)
def create(*args: Any) -> SocksClientConnection:
return SocksClientConnection(*args)

def handle_data(self, data: memoryview) -> Optional[bool]:
return super().handle_data(data)
Loading