Skip to content

Pre-evaluate args.threadless = is_threadless #881

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions helper/benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ run_benchmark

POST_RUN_OPEN_FILES=$(./helper/monitor_open_files.sh)

echo $output

echo "Open files diff:"
diff <( echo "$PRE_RUN_OPEN_FILES" ) <( echo "$POST_RUN_OPEN_FILES" )

Expand Down
28 changes: 18 additions & 10 deletions proxy/common/flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ._compat import IS_WINDOWS # noqa: WPS436
from .plugins import Plugins
from .types import IpAddress
from .utils import bytes_, is_py2, set_open_file_limit
from .utils import bytes_, is_py2, is_threadless, set_open_file_limit
from .constants import COMMA, DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_NUM_ACCEPTORS, DEFAULT_NUM_WORKERS
from .constants import DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE
from .constants import PLUGIN_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL, DEFAULT_MIN_COMPRESSION_LIMIT
Expand Down Expand Up @@ -186,7 +186,7 @@ def initialize(
for p in FlagParser.get_default_plugins(args)
]
requested_plugins = Plugins.resolve_plugin_flag(
args.plugins, opts.get('plugins', None),
args.plugins, opts.get('plugins'),
)
plugins = Plugins.load(
default_plugins + auth_plugins + requested_plugins,
Expand Down Expand Up @@ -340,21 +340,29 @@ def initialize(
),
)
args.timeout = cast(int, opts.get('timeout', args.timeout))
args.threadless = cast(bool, opts.get('threadless', args.threadless))
args.threaded = cast(bool, opts.get('threaded', args.threaded))
args.pid_file = cast(
Optional[str], opts.get(
'pid_file',
args.pid_file,
),
)
args.local_executor = cast(
bool,
opts.get(
'local_executor',
args.local_executor,
),
)
args.threaded = cast(bool, opts.get('threaded', args.threaded))
# Pre-evaluate threadless values based upon environment and config
#
# --threadless is now default mode of execution
# but we still have exceptions based upon OS config.
# Make sure executors are not started if is_threadless
# evaluates to False.
args.threadless = cast(bool, opts.get('threadless', args.threadless))
args.threadless = is_threadless(args.threadless, args.threaded)

args.pid_file = cast(
Optional[str], opts.get(
'pid_file',
args.pid_file,
),
)

args.proxy_py_data_dir = DEFAULT_DATA_DIRECTORY_PATH
os.makedirs(args.proxy_py_data_dir, exist_ok=True)
Expand Down
20 changes: 10 additions & 10 deletions proxy/common/pki.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,16 +298,16 @@ def run_openssl_command(command: List[str], timeout: int) -> bool:
', '.join(available_actions),
)
sys.exit(1)
if args.action in ('gen_private_key', 'gen_public_key'):
if args.private_key_path is None:
logger.error('--private-key-path is required for ' + args.action)
sys.exit(1)
if args.action == 'gen_public_key':
if args.public_key_path is None:
logger.error(
'--public-key-file is required for private key generation',
)
sys.exit(1)
if args.action in ('gen_private_key', 'gen_public_key') and \
args.private_key_path is None:
logger.error('--private-key-path is required for ' + args.action)
sys.exit(1)
if args.action == 'gen_public_key' and \
args.public_key_path is None:
logger.error(
'--public-key-file is required for private key generation',
)
sys.exit(1)

# Execute
if args.action == 'gen_private_key':
Expand Down
18 changes: 12 additions & 6 deletions proxy/common/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,18 @@ def load(
mro = list(inspect.getmro(klass))
mro.reverse()
iterator = iter(mro)
while next(iterator) is not abc.ABC:
pass
base_klass = next(iterator)
if klass not in p[bytes_(base_klass.__name__)]:
p[bytes_(base_klass.__name__)].append(klass)
logger.info('Loaded plugin %s.%s', module_name, klass.__name__)
try:
while next(iterator) is not abc.ABC:
pass
base_klass = next(iterator)
if klass not in p[bytes_(base_klass.__name__)]:
p[bytes_(base_klass.__name__)].append(klass)
logger.info('Loaded plugin %s.%s', module_name, klass.__name__)
except StopIteration:
logger.warn(
'%s is NOT a valid plugin',
text_(plugin_),
)
return p

@staticmethod
Expand Down
6 changes: 3 additions & 3 deletions proxy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def build_http_response(
headers = {}
has_content_length = False
has_transfer_encoding = False
for k in headers:
for k, _ in headers.items():
if k.lower() == b'content-length':
has_content_length = True
if k.lower() == b'transfer-encoding':
Expand All @@ -124,8 +124,8 @@ def build_http_pkt(
"""Build and returns a HTTP request or response packet."""
pkt = WHITESPACE.join(line) + CRLF
if headers is not None:
for k in headers:
pkt += build_http_header(k, headers[k]) + CRLF
for k, v in headers.items():
pkt += build_http_header(k, v) + CRLF
pkt += CRLF
if body:
pkt += body
Expand Down
25 changes: 12 additions & 13 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from typing import List, Optional, Tuple

from ...common.flag import flags
from ...common.utils import is_threadless
from ...common.logger import Logger
from ...common.backports import NonBlockingQueue
from ...common.constants import DEFAULT_LOCAL_EXECUTOR
Expand Down Expand Up @@ -113,17 +112,17 @@ def accept(
) -> List[Tuple[socket.socket, Optional[Tuple[str, int]]]]:
works = []
for _, mask in events:
if mask & selectors.EVENT_READ:
if self.sock is not None:
try:
conn, addr = self.sock.accept()
logging.debug(
'Accepting new work#{0}'.format(conn.fileno()),
)
works.append((conn, addr or None))
except BlockingIOError:
# logger.info('blocking io error')
pass
if mask & selectors.EVENT_READ and \
self.sock is not None:
try:
conn, addr = self.sock.accept()
logging.debug(
'Accepting new work#{0}'.format(conn.fileno()),
)
works.append((conn, addr or None))
except BlockingIOError:
# logger.info('blocking io error')
pass
return works

def run_once(self) -> None:
Expand Down Expand Up @@ -207,7 +206,7 @@ def _stop_local(self) -> None:

def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None:
self._total = self._total or 0
if is_threadless(self.flags.threadless, self.flags.threaded):
if self.flags.threadless:
# Index of worker to which this work should be dispatched
# Use round-robin strategy by default.
#
Expand Down
5 changes: 2 additions & 3 deletions proxy/core/acceptor/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from ..event import EventQueue, eventNames

from ...common.flag import flags
from ...common.utils import is_threadless
from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -150,7 +149,7 @@ def start_threaded_work(

def setup(self) -> None:
"""Setup threadless processes."""
if is_threadless(self.flags.threadless, self.flags.threaded):
if self.flags.threadless:
for index in range(self.flags.num_workers):
self._start_worker(index)
logger.info(
Expand All @@ -161,7 +160,7 @@ def setup(self) -> None:

def shutdown(self) -> None:
"""Shutdown threadless processes."""
if is_threadless(self.flags.threadless, self.flags.threaded):
if self.flags.threadless:
self._shutdown_workers()
logger.info(
'Stopped {0} threadless workers'.format(
Expand Down
8 changes: 4 additions & 4 deletions proxy/core/base/tcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ async def handle_writables(self, writables: Writables) -> bool:
'Flushing buffer to client {0}'.format(self.work.address),
)
self.work.flush()
if self.must_flush_before_shutdown is True:
if not self.work.has_buffer():
teardown = True
self.must_flush_before_shutdown = False
if self.must_flush_before_shutdown is True and \
not self.work.has_buffer():
teardown = True
self.must_flush_before_shutdown = False
return teardown

async def handle_readables(self, readables: Readables) -> bool:
Expand Down
4 changes: 2 additions & 2 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .exception import HttpProtocolException

from ..common.types import Readables, Writables
from ..common.utils import wrap_socket, is_threadless
from ..common.utils import wrap_socket
from ..core.base import BaseTcpServerHandler
from ..core.connection import TcpClientConnection
from ..common.flag import flags
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(self, *args: Any, **kwargs: Any):
enable_proxy_protocol=self.flags.enable_proxy_protocol,
)
self.selector: Optional[selectors.DefaultSelector] = None
if not is_threadless(self.flags.threadless, self.flags.threaded):
if not self.flags.threadless:
self.selector = selectors.DefaultSelector()
self.plugins: Dict[str, HttpProtocolHandlerPlugin] = {}

Expand Down
2 changes: 1 addition & 1 deletion proxy/http/websocket/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
Websocket
"""
import io
import hashlib
import base64
import struct
import hashlib
import secrets
import logging

Expand Down
21 changes: 13 additions & 8 deletions proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ def setup(self) -> None:
event_queue = self.event_manager.queue \
if self.event_manager is not None \
else None
# Setup remote executors
if not self.flags.local_executor:
# Setup remote executors only if
# --local-executor mode isn't enabled.
if self.remote_executors_enabled:
self.executors = ThreadlessPool(
flags=self.flags,
event_queue=event_queue,
Expand All @@ -193,7 +194,7 @@ def setup(self) -> None:
def shutdown(self) -> None:
assert self.acceptors
self.acceptors.shutdown()
if not self.flags.local_executor:
if self.remote_executors_enabled:
assert self.executors
self.executors.shutdown()
if self.flags.enable_events:
Expand All @@ -214,14 +215,18 @@ def _delete_pid_file(self) -> None:
if self.flags.pid_file and os.path.exists(self.flags.pid_file):
os.remove(self.flags.pid_file)

@property
def remote_executors_enabled(self) -> bool:
return self.flags.threadless and not self.flags.local_executor


def main(**opts: Any) -> None:
try:
with Proxy(sys.argv[1:], **opts):
while True:
with Proxy(sys.argv[1:], **opts):
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
pass
except KeyboardInterrupt:
break


def entry_point() -> None:
Expand Down
7 changes: 1 addition & 6 deletions proxy/testing/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TestCase(unittest.TestCase):
"""Base TestCase class that automatically setup and tear down proxy.py."""

DEFAULT_PROXY_PY_STARTUP_FLAGS = [
'--hostname', '127.0.0.1',
'--port', '0',
'--num-workers', '1',
'--num-acceptors', '1',
Expand All @@ -37,16 +38,10 @@ def setUpClass(cls) -> None:
cls.INPUT_ARGS = getattr(cls, 'PROXY_PY_STARTUP_FLAGS') \
if hasattr(cls, 'PROXY_PY_STARTUP_FLAGS') \
else cls.DEFAULT_PROXY_PY_STARTUP_FLAGS
cls.INPUT_ARGS.append('--hostname')
cls.INPUT_ARGS.append('0.0.0.0')
cls.INPUT_ARGS.append('--port')
cls.INPUT_ARGS.append('0')

cls.PROXY = Proxy(cls.INPUT_ARGS)
cls.PROXY.flags.plugins[b'HttpProxyBasePlugin'].append(
CacheResponsesPlugin,
)

cls.PROXY.__enter__()
assert cls.PROXY.acceptors
cls.wait_for_server(cls.PROXY.flags.port)
Expand Down
11 changes: 7 additions & 4 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from unittest import mock

from proxy.proxy import main, entry_point
from proxy.common.constants import _env_threadless_compliant # noqa: WPS450
from proxy.common.utils import bytes_

from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOCAL_EXECUTOR, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE
Expand Down Expand Up @@ -244,8 +245,9 @@ def test_enable_dashboard(
mock_event_manager.assert_called_once()
mock_event_manager.return_value.setup.assert_called_once()
mock_event_manager.return_value.shutdown.assert_called_once()
mock_executor_pool.assert_called_once()
mock_executor_pool.return_value.setup.assert_called_once()
if _env_threadless_compliant():
mock_executor_pool.assert_called_once()
mock_executor_pool.return_value.setup.assert_called_once()
mock_acceptor_pool.assert_called_once()
mock_acceptor_pool.return_value.setup.assert_called_once()
mock_listener.return_value.setup.assert_called_once()
Expand Down Expand Up @@ -283,8 +285,9 @@ def test_enable_devtools(
mock_parse_args.assert_called_once()
# Currently --enable-devtools flag alone doesn't enable eventing core
mock_event_manager.assert_not_called()
mock_executor_pool.assert_called_once()
mock_executor_pool.return_value.setup.assert_called_once()
if _env_threadless_compliant():
mock_executor_pool.assert_called_once()
mock_executor_pool.return_value.setup.assert_called_once()
mock_acceptor_pool.assert_called_once()
mock_acceptor_pool.return_value.setup.assert_called_once()
mock_listener.return_value.setup.assert_called_once()
Expand Down