Skip to content

DescriptorsHandlerMixin and Descriptors, SelectableEvents types #938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ lib-clean:
lib-dep:
pip install --upgrade pip && \
pip install \
-r requirements.txt \
-r requirements-testing.txt \
-r requirements-release.txt \
-r requirements-tunnel.txt && \
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@
(_py_class_role, 'proxy.core.acceptor.threadless.T'),
(_py_class_role, 'proxy.core.acceptor.work.T'),
(_py_class_role, 'queue.Queue[Any]'),
(_py_class_role, 'SelectableEvents'),
(_py_class_role, 'TcpClientConnection'),
(_py_class_role, 'TcpServerConnection'),
(_py_class_role, 'unittest.case.TestCase'),
Expand Down
6 changes: 2 additions & 4 deletions examples/web_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
"""
import time

from typing import Dict

from proxy import Proxy
from proxy.core.acceptor import Work
from proxy.core.connection import TcpClientConnection
from proxy.common.types import Readables, Writables
from proxy.common.types import Readables, SelectableEvents, Writables


class WebScraper(Work[TcpClientConnection]):
Expand All @@ -40,7 +38,7 @@ class WebScraper(Work[TcpClientConnection]):
only PUBSUB protocol.
"""

async def get_events(self) -> Dict[int, int]:
async def get_events(self) -> SelectableEvents:
"""Return sockets and events (read or write) that we are interested in."""
return {}

Expand Down
6 changes: 3 additions & 3 deletions helper/monitor_open_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l)
echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR"

pgrep -P "$acceptorPid" | while read -r threadlessPid; do
OPEN_FILES_BY_THREADLESS=$(lsof -p "$threadlessPid" | wc -l)
echo " [$threadlessPid] Threadless process: $OPEN_FILES_BY_THREADLESS"
pgrep -P "$acceptorPid" | while read -r childPid; do
OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l)
echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC"
done
done
28 changes: 13 additions & 15 deletions proxy/common/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
:license: BSD, see LICENSE for more details.
"""
import os
import abc
import logging
import inspect
import itertools
Expand Down Expand Up @@ -72,20 +71,19 @@ def load(
klass, module_name = Plugins.importer(plugin_)
assert klass and module_name
mro = list(inspect.getmro(klass))
mro.reverse()
iterator = iter(mro)
try:
while next(iterator) is not abc.ABC:
pass
base_klass = next(iterator)
if klass not in p[bytes_(base_klass.__name__)]:
p[bytes_(base_klass.__name__)].append(klass)
logger.info('Loaded plugin %s.%s', module_name, klass.__name__)
except StopIteration:
logger.warn(
'%s is NOT a valid plugin',
text_(plugin_),
)
# Find the base plugin class that
# this plugin_ is implementing
found = False
for base_klass in mro:
if bytes_(base_klass.__name__) in p:
found = True
break
if not found:
raise ValueError('%s is NOT a valid plugin' % text_(plugin_))
if klass not in p[bytes_(base_klass.__name__)]:
p[bytes_(base_klass.__name__)].append(klass)
logger.info('Loaded plugin %s.%s', module_name, klass.__name__)
# print(p)
return p

@staticmethod
Expand Down
23 changes: 7 additions & 16 deletions proxy/common/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,8 @@
"""
import queue
import ipaddress
import sys

from typing import TYPE_CHECKING, Dict, Any, List, Union

# NOTE: Using try/except causes linting problems which is why it's necessary
# NOTE: to use this mypy/pylint idiom for py36-py38 compatibility
# Ref: https://github.com/python/typeshed/issues/3500#issuecomment-560958608
if sys.version_info >= (3, 8):
from typing import Protocol
else:
from typing_extensions import Protocol
from typing import TYPE_CHECKING, Dict, Any, List, Tuple, Union


if TYPE_CHECKING:
Expand All @@ -29,11 +20,11 @@
DictQueueType = queue.Queue


class HasFileno(Protocol):
def fileno(self) -> int:
... # pragma: no cover

Selectable = int
Selectables = List[Selectable]
SelectableEvents = Dict[Selectable, int] # Values are event masks
Readables = Selectables
Writables = Selectables
Descriptors = Tuple[Readables, Writables]

Readables = List[Union[int, HasFileno]]
Writables = List[Union[int, HasFileno]]
IpAddress = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
8 changes: 4 additions & 4 deletions proxy/core/acceptor/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Any, Dict, Optional, Tuple, List, Set, Generic, TypeVar, Union

from ...common.logger import Logger
from ...common.types import Readables, Writables
from ...common.types import Readables, SelectableEvents, Writables
from ...common.constants import DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT, DEFAULT_SELECTOR_SELECT_TIMEOUT
from ...common.constants import DEFAULT_WAIT_FOR_TASKS_TIMEOUT

Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(
# work_id
int,
# fileno, mask
Dict[int, int],
SelectableEvents,
] = {}
self.wait_timeout: float = DEFAULT_WAIT_FOR_TASKS_TIMEOUT
self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT
Expand Down Expand Up @@ -288,9 +288,9 @@ async def _selected_events(self) -> Tuple[
if key.data not in work_by_ids:
work_by_ids[key.data] = ([], [])
if mask & selectors.EVENT_READ:
work_by_ids[key.data][0].append(key.fileobj)
work_by_ids[key.data][0].append(key.fd)
if mask & selectors.EVENT_WRITE:
work_by_ids[key.data][1].append(key.fileobj)
work_by_ids[key.data][1].append(key.fd)
return (work_by_ids, new_work_available)

async def _wait_for_tasks(self) -> Set['asyncio.Task[bool]']:
Expand Down
6 changes: 3 additions & 3 deletions proxy/core/acceptor/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
"""
import argparse

from abc import ABC, abstractmethod
from uuid import uuid4
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, TypeVar, Generic, TYPE_CHECKING

from ..event import eventNames, EventQueue
from ...common.types import Readables, Writables
from ...common.types import Readables, SelectableEvents, Writables

if TYPE_CHECKING:
from ..connection import UpstreamConnectionPool
Expand Down Expand Up @@ -48,7 +48,7 @@ def __init__(
self.upstream_conn_pool = upstream_conn_pool

@abstractmethod
async def get_events(self) -> Dict[int, int]:
async def get_events(self) -> SelectableEvents:
"""Return sockets and events (read or write) that we are interested in."""
return {} # pragma: no cover

Expand Down
6 changes: 3 additions & 3 deletions proxy/core/base/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
import selectors

from abc import abstractmethod
from typing import Dict, Any, Optional
from typing import Any, Optional

from ...core.acceptor import Work
from ...core.connection import TcpClientConnection
from ...common.types import Readables, Writables
from ...common.types import Readables, SelectableEvents, Writables

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -61,7 +61,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]:
"""Optionally return True to close client connection."""
pass # pragma: no cover

async def get_events(self) -> Dict[int, int]:
async def get_events(self) -> SelectableEvents:
events = {}
# We always want to read from client
# Register for EVENT_READ events
Expand Down
8 changes: 4 additions & 4 deletions proxy/core/base/tcp_tunnel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import selectors

from abc import abstractmethod
from typing import Any, Optional, Dict
from typing import Any, Optional

from ...http.parser import HttpParser, httpParserTypes
from ...common.types import Readables, Writables
from ...common.types import Readables, SelectableEvents, Writables
from ...common.utils import text_

from ..connection import TcpServerConnection
Expand Down Expand Up @@ -60,9 +60,9 @@ def shutdown(self) -> None:
self.upstream.close()
super().shutdown()

async def get_events(self) -> Dict[int, int]:
async def get_events(self) -> SelectableEvents:
# Get default client events
ev: Dict[int, int] = await super().get_events()
ev: SelectableEvents = await super().get_events()
# Read from server if we are connected
if self.upstream and self.upstream._conn is not None:
ev[self.upstream.connection.fileno()] = selectors.EVENT_READ
Expand Down
10 changes: 5 additions & 5 deletions proxy/core/base/tcp_upstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import logging

from abc import ABC, abstractmethod
from typing import Tuple, List, Optional, Any
from typing import Optional, Any

from ...common.types import Readables, Writables
from ...common.types import Readables, Writables, Descriptors
from ...core.connection import TcpServerConnection

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,15 +62,15 @@ def handle_upstream_data(self, raw: memoryview) -> None:
def initialize_upstream(self, addr: str, port: int) -> None:
self.upstream = TcpServerConnection(addr, port)

def get_descriptors(self) -> Tuple[List[int], List[int]]:
async def get_descriptors(self) -> Descriptors:
if not self.upstream:
return [], []
return [self.upstream.connection.fileno()], \
[self.upstream.connection.fileno()] \
if self.upstream.has_buffer() \
else []

def read_from_descriptors(self, r: Readables) -> bool:
async def read_from_descriptors(self, r: Readables) -> bool:
if self.upstream and \
self.upstream.connection.fileno() in r:
try:
Expand All @@ -89,7 +89,7 @@ def read_from_descriptors(self, r: Readables) -> bool:
return True
return False

def write_to_descriptors(self, w: Writables) -> bool:
async def write_to_descriptors(self, w: Writables) -> bool:
if self.upstream and \
self.upstream.connection.fileno() in w and \
self.upstream.has_buffer():
Expand Down
4 changes: 2 additions & 2 deletions proxy/core/connection/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING, Set, Dict, Tuple

from ...common.flag import flags
from ...common.types import Readables, Writables
from ...common.types import Readables, SelectableEvents, Writables

from ..acceptor.work import Work

Expand Down Expand Up @@ -129,7 +129,7 @@ def release(self, conn: TcpServerConnection) -> None:
# Reset for reusability
conn.reset()

async def get_events(self) -> Dict[int, int]:
async def get_events(self) -> SelectableEvents:
"""Returns read event flag for all reusable connections in the pool."""
events = {}
for connections in self.pools.values():
Expand Down
40 changes: 40 additions & 0 deletions proxy/http/descriptors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
from ..common.types import Readables, Writables, Descriptors


# Since 3.4.0
class DescriptorsHandlerMixin:
"""DescriptorsHandlerMixin provides abstraction used by several core HTTP modules
include web and proxy plugins. By using DescriptorsHandlerMixin, class
becomes complaint with core event loop."""

# @abstractmethod
async def get_descriptors(self) -> Descriptors:
"""Implementations must return a list of descriptions that they wish to
read from and write into."""
return [], [] # pragma: no cover

# @abstractmethod
async def write_to_descriptors(self, w: Writables) -> bool:
"""Implementations must now write/flush data over the socket.

Note that buffer management is in-build into the connection classes.
Hence implementations MUST call
:meth:`~proxy.core.connection.connection.TcpConnection.flush`
here, to send any buffered data over the socket.
"""
return False # pragma: no cover

# @abstractmethod
async def read_from_descriptors(self, r: Readables) -> bool:
"""Implementations must now read data over the socket."""
return False # pragma: no cover
16 changes: 8 additions & 8 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import logging
import selectors

from typing import Tuple, List, Type, Union, Optional, Dict, Any
from typing import Tuple, List, Type, Union, Optional, Any

from ..common.flag import flags
from ..common.utils import wrap_socket
from ..core.base import BaseTcpServerHandler
from ..common.types import Readables, Writables
from ..core.connection import TcpClientConnection
from ..common.types import Readables, SelectableEvents, Writables
from ..common.constants import DEFAULT_CLIENT_RECVBUF_SIZE, DEFAULT_KEY_FILE
from ..common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT, DEFAULT_TIMEOUT

Expand Down Expand Up @@ -142,12 +142,12 @@ def shutdown(self) -> None:
logger.debug('Client connection closed')
super().shutdown()

async def get_events(self) -> Dict[int, int]:
async def get_events(self) -> SelectableEvents:
# Get default client events
events: Dict[int, int] = await super().get_events()
events: SelectableEvents = await super().get_events()
# HttpProtocolHandlerPlugin.get_descriptors
if self.plugin:
plugin_read_desc, plugin_write_desc = self.plugin.get_descriptors()
plugin_read_desc, plugin_write_desc = await self.plugin.get_descriptors()
for rfileno in plugin_read_desc:
if rfileno not in events:
events[rfileno] = selectors.EVENT_READ
Expand Down Expand Up @@ -400,7 +400,7 @@ async def _run_once(self) -> bool:
# FIXME: Returning events is only necessary because we cannot use async context manager
# for < Python 3.8. As a reason, this method is no longer a context manager and caller
# is responsible for unregistering the descriptors.
async def _selected_events(self) -> Tuple[Dict[int, int], Readables, Writables]:
async def _selected_events(self) -> Tuple[SelectableEvents, Readables, Writables]:
assert self.selector
events = await self.get_events()
for fd in events:
Expand All @@ -410,9 +410,9 @@ async def _selected_events(self) -> Tuple[Dict[int, int], Readables, Writables]:
writables = []
for key, mask in ev:
if mask & selectors.EVENT_READ:
readables.append(key.fileobj)
readables.append(key.fd)
if mask & selectors.EVENT_WRITE:
writables.append(key.fileobj)
writables.append(key.fd)
return (events, readables, writables)

def _flush(self) -> None:
Expand Down
Loading