Skip to content

Optimize #780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ lib-profile:
--plugin proxy.plugin.WebServerPlugin \
--local-executor \
--backlog 65536 \
--open-file-limit 65536
--open-file-limit 65536 \
--log-file /dev/null

devtools:
Expand Down
56 changes: 28 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,47 +128,47 @@
```console
# On Macbook Pro 2019 / 2.4 GHz 8-Core Intel Core i9 / 32 GB RAM
❯ ./helper/benchmark.sh
CONCURRENCY: 100 workers, TOTAL REQUESTS: 100000 req, QPS: 5000 req/sec, TIMEOUT: 1 sec
CONCURRENCY: 100 workers, TOTAL REQUESTS: 100000 req, QPS: 8000 req/sec, TIMEOUT: 1 sec

Summary:
Total: 3.1560 secs
Slowest: 0.0375 secs
Fastest: 0.0006 secs
Average: 0.0031 secs
Requests/sec: 31685.9140
Total: 3.1217 secs
Slowest: 0.0499 secs
Fastest: 0.0004 secs
Average: 0.0030 secs
Requests/sec: 32033.7261

Total data: 1900000 bytes
Size/request: 19 bytes

Response time histogram:
0.001 [1] |
0.004 [91680] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.008 [7929] |■■■
0.012 [263] |
0.015 [29] |
0.019 [8] |
0.023 [23] |
0.026 [15] |
0.030 [27] |
0.034 [16] |
0.037 [9] |
0.000 [1] |
0.005 [92268] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■
0.010 [7264] |■■■
0.015 [318] |
0.020 [102] |
0.025 [32] |
0.030 [6] |
0.035 [4] |
0.040 [1] |
0.045 [2] |
0.050 [2] |


Latency distribution:
10% in 0.0022 secs
25% in 0.0025 secs
50% in 0.0029 secs
75% in 0.0034 secs
90% in 0.0041 secs
95% in 0.0048 secs
99% in 0.0066 secs
10% in 0.0017 secs
25% in 0.0020 secs
50% in 0.0025 secs
75% in 0.0036 secs
90% in 0.0050 secs
95% in 0.0060 secs
99% in 0.0087 secs

Details (average, fastest, slowest):
DNS+dialup: 0.0000 secs, 0.0006 secs, 0.0375 secs
DNS+dialup: 0.0000 secs, 0.0004 secs, 0.0499 secs
DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs
req write: 0.0000 secs, 0.0000 secs, 0.0046 secs
resp wait: 0.0030 secs, 0.0006 secs, 0.0320 secs
resp read: 0.0000 secs, 0.0000 secs, 0.0029 secs
req write: 0.0000 secs, 0.0000 secs, 0.0020 secs
resp wait: 0.0030 secs, 0.0004 secs, 0.0462 secs
resp read: 0.0000 secs, 0.0000 secs, 0.0027 secs

Status code distribution:
[200] 100000 responses
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@
(_py_class_role, 'HttpWebServerBasePlugin'),
(_py_class_role, 'multiprocessing.context.Process'),
(_py_class_role, 'multiprocessing.synchronize.Lock'),
(_py_class_role, 'NonBlockingQueue'),
(_py_class_role, 'paramiko.channel.Channel'),
(_py_class_role, 'proxy.http.parser.parser.T'),
(_py_class_role, 'proxy.plugin.cache.store.base.CacheStore'),
Expand Down
11 changes: 1 addition & 10 deletions helper/benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ if [ $(basename $PWD) != "proxy.py" ]; then
fi

TIMEOUT=1
QPS=20000
QPS=8000
CONCURRENCY=100
TOTAL_REQUESTS=100000
OPEN_FILE_LIMIT=65536
Expand All @@ -41,15 +41,6 @@ PID_FILE=/tmp/proxy.pid

ulimit -n $OPEN_FILE_LIMIT

# time python -m \
# proxy \
# --enable-web-server \
# --plugin proxy.plugin.WebServerPlugin \
# --backlog $BACKLOG \
# --open-file-limit $OPEN_FILE_LIMIT \
# --pid-file $PID_FILE \
# --log-file /dev/null

PID=$(cat $PID_FILE)
if [[ -z "$PID" ]]; then
echo "Either pid file doesn't exist or no pid found in the pid file"
Expand Down
38 changes: 37 additions & 1 deletion proxy/common/backports.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
:license: BSD, see LICENSE for more details.
"""
import time
import threading

from typing import Any
from typing import Any, Deque
from queue import Empty
from collections import deque


class cached_property:
Expand Down Expand Up @@ -80,3 +83,36 @@ def __get__(self, inst: Any, owner: Any) -> Any:
finally:
cache[self.__name__] = (value, now)
return value


class NonBlockingQueue:
'''Simple, unbounded, non-blocking FIFO queue.

Supports only a single consumer.

NOTE: This is available in Python since 3.7 as SimpleQueue.
Here because proxy.py still supports 3.6
'''

def __init__(self) -> None:
self._queue: Deque[Any] = deque()
self._count: threading.Semaphore = threading.Semaphore(0)

def put(self, item: Any) -> None:
'''Put the item on the queue.'''
self._queue.append(item)
self._count.release()

def get(self) -> Any:
'''Remove and return an item from the queue.'''
if not self._count.acquire(False, None):
raise Empty
return self._queue.popleft()

def empty(self) -> bool:
'''Return True if the queue is empty, False otherwise (not reliable!).'''
return len(self._queue) == 0

def qsize(self) -> int:
'''Return the approximate size of the queue (not reliable!).'''
return len(self._queue)
5 changes: 4 additions & 1 deletion proxy/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ def _env_threadless_compliant() -> bool:
DEFAULT_MAX_SEND_SIZE = 16 * 1024
DEFAULT_WORK_KLASS = 'proxy.http.HttpProtocolHandler'
DEFAULT_ENABLE_PROXY_PROTOCOL = False
DEFAULT_SELECTOR_SELECT_TIMEOUT = 0.1
# 25 milliseconds to keep the loops hot
# Will consume ~0.3-0.6% CPU when idle.
DEFAULT_SELECTOR_SELECT_TIMEOUT = 25 / 1000
DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT = 1 # in seconds

DEFAULT_DEVTOOLS_DOC_URL = 'http://proxy'
DEFAULT_DEVTOOLS_FRAME_ID = secrets.token_hex(8)
Expand Down
8 changes: 4 additions & 4 deletions proxy/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ def find_http_line(raw: bytes) -> Tuple[Optional[bytes], bytes]:
"""Find and returns first line ending in CRLF along with following buffer.

If no ending CRLF is found, line is None."""
parts = raw.split(CRLF)
if len(parts) == 1:
return None, raw
return parts[0], CRLF.join(parts[1:])
parts = raw.split(CRLF, 1)
return (None, raw) \
if len(parts) == 1 \
else (parts[0], parts[1])


def wrap_socket(
Expand Down
9 changes: 4 additions & 5 deletions proxy/core/acceptor/acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
acceptor
pre
"""
import queue
import socket
import logging
import argparse
Expand All @@ -26,11 +25,11 @@
from multiprocessing.reduction import recv_handle

from typing import List, Optional, Tuple
from typing import Any # noqa: W0611 pylint: disable=unused-import

from ...common.flag import flags
from ...common.utils import is_threadless
from ...common.logger import Logger
from ...common.backports import NonBlockingQueue
from ...common.constants import DEFAULT_LOCAL_EXECUTOR

from ..event import EventQueue
Expand Down Expand Up @@ -103,7 +102,7 @@ def __init__(
self.sock: Optional[socket.socket] = None
# Internals
self._total: Optional[int] = None
self._local_work_queue: Optional['queue.Queue[Any]'] = None
self._local_work_queue: Optional['NonBlockingQueue'] = None
self._local: Optional[LocalExecutor] = None
self._lthread: Optional[threading.Thread] = None

Expand All @@ -118,7 +117,7 @@ def accept(self, events: List[Tuple[selectors.SelectorKey, int]]) -> None:
work = (conn, addr or None)
if self.flags.local_executor:
assert self._local_work_queue
self._local_work_queue.put_nowait(work)
self._local_work_queue.put(work)
else:
self._work(*work)

Expand Down Expand Up @@ -171,7 +170,7 @@ def run(self) -> None:

def _start_local(self) -> None:
assert self.sock
self._local_work_queue = queue.Queue()
self._local_work_queue = NonBlockingQueue()
self._local = LocalExecutor(
work_queue=self._local_work_queue,
flags=self.flags,
Expand Down
2 changes: 2 additions & 0 deletions proxy/core/acceptor/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,15 @@ def shutdown(self) -> None:
def _listen_unix_socket(self) -> None:
self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.bind(self.flags.unix_socket_path)
self._socket.listen(self.flags.backlog)
self._socket.setblocking(False)

def _listen_server_port(self) -> None:
self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.bind((str(self.flags.hostname), self.flags.port))
self._socket.listen(self.flags.backlog)
self._socket.setblocking(False)
Expand Down
8 changes: 5 additions & 3 deletions proxy/core/acceptor/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import contextlib

from typing import Optional
from typing import Any # noqa: W0611 pylint: disable=unused-import
from typing import Any

from ...common.backports import NonBlockingQueue # noqa: W0611, F401 pylint: disable=unused-import

from .threadless import Threadless

logger = logging.getLogger(__name__)


class LocalExecutor(Threadless['queue.Queue[Any]']):
class LocalExecutor(Threadless['NonBlockingQueue']):
"""A threadless executor implementation which uses a queue to receive new work."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
Expand All @@ -44,7 +46,7 @@ def work_queue_fileno(self) -> Optional[int]:

def receive_from_work_queue(self) -> bool:
with contextlib.suppress(queue.Empty):
work = self.work_queue.get(block=False)
work = self.work_queue.get()
if isinstance(work, bool) and work is False:
return True
assert isinstance(work, tuple)
Expand Down
44 changes: 26 additions & 18 deletions proxy/core/acceptor/threadless.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from ...common.logger import Logger
from ...common.types import Readables, Writables
from ...common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT
from ...common.constants import DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT, DEFAULT_SELECTOR_SELECT_TIMEOUT

from ..connection import TcpClientConnection
from ..event import eventNames, EventQueue
Expand Down Expand Up @@ -86,6 +86,7 @@ def __init__(
Dict[int, int],
] = {}
self.wait_timeout: float = DEFAULT_SELECTOR_SELECT_TIMEOUT
self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT

@property
@abstractmethod
Expand Down Expand Up @@ -216,12 +217,9 @@ async def _selected_events(self) -> Tuple[
work_by_ids[key.data][1].append(key.fileobj)
return (work_by_ids, new_work_available)

async def _wait_for_tasks(
self,
pending: Set['asyncio.Task[bool]'],
) -> None:
async def _wait_for_tasks(self) -> None:
finished, self.unfinished = await asyncio.wait(
pending,
self.unfinished,
timeout=self.wait_timeout,
return_when=asyncio.FIRST_COMPLETED,
)
Expand All @@ -236,10 +234,6 @@ def _fromfd(self, fileno: int) -> socket.socket:
type=socket.SOCK_STREAM,
)

# TODO: Use cached property to avoid execution repeatedly
# within a second interval. Note that our selector timeout
# is 0.1 second which can unnecessarily result in cleanup
# checks within a second boundary.
def _cleanup_inactive(self) -> None:
inactive_works: List[int] = []
for work_id in self.works:
Expand Down Expand Up @@ -294,21 +288,35 @@ async def _run_once(self) -> bool:
if teardown:
return teardown
if len(work_by_ids) == 0:
self._cleanup_inactive()
return False
# Invoke Threadless.handle_events
self.unfinished.update(self._create_tasks(work_by_ids))
# logger.debug('Executing {0} works'.format(len(self.unfinished)))
await self._wait_for_tasks(self.unfinished)
await self._wait_for_tasks()
# logger.debug(
# 'Done executing works, {0} pending, {1} registered'.format(
# len(self.unfinished), len(self.registered_events_by_work_ids),
# ),
# )
# Remove and shutdown inactive workers
self._cleanup_inactive()
return False

async def _run_forever(self) -> None:
tick = 0
try:
while True:
if await self._run_once():
break
# Check for inactive and shutdown signal only second
if (tick * DEFAULT_SELECTOR_SELECT_TIMEOUT) > self.cleanup_inactive_timeout:
self._cleanup_inactive()
if self.running.is_set():
break
tick = 0
tick += 1
finally:
if self.loop:
self.loop.stop()

def run(self) -> None:
Logger.setup(
self.flags.log_file, self.flags.log_level,
Expand All @@ -324,10 +332,9 @@ def run(self) -> None:
data=wqfileno,
)
assert self.loop
while not self.running.is_set():
# logger.debug('Working on {0} works'.format(len(self.works)))
if self.loop.run_until_complete(self._run_once()):
break
# logger.debug('Working on {0} works'.format(len(self.works)))
self.loop.create_task(self._run_forever())
self.loop.run_forever()
except KeyboardInterrupt:
pass
finally:
Expand All @@ -336,4 +343,5 @@ def run(self) -> None:
self.selector.unregister(wqfileno)
self.close_work_queue()
assert self.loop is not None
self.loop.run_until_complete(self.loop.shutdown_asyncgens())
self.loop.close()
Loading