Skip to content

Commit da23c7f

Browse files
authored
Work (#693)
* Refactor work acceptor and executor * Lint fixes * Fix expression-not-assigned pylint error
1 parent 462624f commit da23c7f

File tree

5 files changed

+115
-91
lines changed

5 files changed

+115
-91
lines changed

proxy/core/acceptor/acceptor.py

+60-40
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,25 @@
4242

4343

4444
class Acceptor(multiprocessing.Process):
45-
"""Socket server acceptor process.
45+
"""Work acceptor process.
4646
47-
Accepts a server socket fd over `work_queue` and start listening for client
48-
connections over the passed server socket. By default, it spawns a separate thread
49-
to handle each client request.
47+
On start-up, `Acceptor` accepts a file descriptor which will be used to
48+
accept new work. File descriptor is accepted over a `work_queue` which is
49+
closed immediately after receiving the descriptor.
5050
51-
However, if `--threadless` option is enabled, Acceptor process will also pre-spawns a `Threadless`
52-
process at startup. Accepted client connections are then passed to the `Threadless` process
53-
which internally uses asyncio event loop to handle client connections.
51+
`Acceptor` goes on to listen for new work over the received server socket.
52+
By default, `Acceptor` will spawn a new thread to handle each work.
5453
55-
TODO(abhinavsingh): Instead of starting `Threadless` process, can we work with a `Threadless` thread?
56-
What are the performance implications of sharing fds between threads vs processes? How much performance
57-
degradation happen when processes are running on separate CPU cores?
54+
However, when `--threadless` option is enabled, `Acceptor` process will also pre-spawns a
55+
`Threadless` process during start-up. Accepted work is passed to these `Threadless` processes.
56+
`Acceptor` process shares accepted work with a `Threadless` process over it's dedicated pipe.
57+
58+
TODO(abhinavsingh): Open questions:
59+
1) Instead of starting `Threadless` process, can we work with a `Threadless` thread?
60+
2) What are the performance implications of sharing fds between threads vs processes?
61+
3) How much performance degradation happens when acceptor and threadless processes are
62+
running on separate CPU cores?
63+
4) Can we ensure both acceptor and threadless process are pinned to the same CPU core?
5864
"""
5965

6066
def __init__(
@@ -67,18 +73,26 @@ def __init__(
6773
event_queue: Optional[EventQueue] = None,
6874
) -> None:
6975
super().__init__()
76+
self.flags = flags
77+
# Lock shared by all acceptor processes
78+
# to avoid concurrent accept over server socket
79+
self.lock = lock
80+
# Index assigned by `AcceptorPool`
7081
self.idd = idd
82+
# Queue over which server socket fd is received on start-up
7183
self.work_queue: connection.Connection = work_queue
72-
self.flags = flags
84+
# Worker class
7385
self.work_klass = work_klass
74-
self.lock = lock
86+
# Eventing core queue
7587
self.event_queue = event_queue
76-
88+
# Selector & threadless states
7789
self.running = multiprocessing.Event()
7890
self.selector: Optional[selectors.DefaultSelector] = None
79-
self.sock: Optional[socket.socket] = None
8091
self.threadless_process: Optional[Threadless] = None
8192
self.threadless_client_queue: Optional[connection.Connection] = None
93+
# File descriptor used to accept new work
94+
# Currently, a socket fd is assumed.
95+
self.sock: Optional[socket.socket] = None
8296

8397
def start_threadless_process(self) -> None:
8498
pipe = multiprocessing.Pipe()
@@ -99,31 +113,30 @@ def shutdown_threadless_process(self) -> None:
99113
self.threadless_process.join()
100114
self.threadless_client_queue.close()
101115

102-
def start_work(self, conn: socket.socket, addr: Tuple[str, int]) -> None:
103-
if self.flags.threadless and \
104-
self.threadless_client_queue and \
105-
self.threadless_process:
106-
self.threadless_client_queue.send(addr)
107-
send_handle(
108-
self.threadless_client_queue,
109-
conn.fileno(),
110-
self.threadless_process.pid,
111-
)
112-
conn.close()
113-
else:
114-
work = self.work_klass(
115-
TcpClientConnection(conn, addr),
116-
flags=self.flags,
117-
event_queue=self.event_queue,
118-
)
119-
work_thread = threading.Thread(target=work.run)
120-
work_thread.daemon = True
121-
work.publish_event(
122-
event_name=eventNames.WORK_STARTED,
123-
event_payload={'fileno': conn.fileno(), 'addr': addr},
124-
publisher_id=self.__class__.__name__,
125-
)
126-
work_thread.start()
116+
def _start_threadless_work(self, conn: socket.socket, addr: Tuple[str, int]) -> None:
117+
assert self.threadless_process and self.threadless_client_queue
118+
self.threadless_client_queue.send(addr)
119+
send_handle(
120+
self.threadless_client_queue,
121+
conn.fileno(),
122+
self.threadless_process.pid,
123+
)
124+
conn.close()
125+
126+
def _start_threaded_work(self, conn: socket.socket, addr: Tuple[str, int]) -> None:
127+
work = self.work_klass(
128+
TcpClientConnection(conn, addr),
129+
flags=self.flags,
130+
event_queue=self.event_queue,
131+
)
132+
work_thread = threading.Thread(target=work.run)
133+
work_thread.daemon = True
134+
work.publish_event(
135+
event_name=eventNames.WORK_STARTED,
136+
event_payload={'fileno': conn.fileno(), 'addr': addr},
137+
publisher_id=self.__class__.__name__,
138+
)
139+
work_thread.start()
127140

128141
def run_once(self) -> None:
129142
with self.lock:
@@ -132,7 +145,14 @@ def run_once(self) -> None:
132145
if len(events) == 0:
133146
return
134147
conn, addr = self.sock.accept()
135-
self.start_work(conn, addr)
148+
if (
149+
self.flags.threadless and
150+
self.threadless_client_queue and
151+
self.threadless_process
152+
):
153+
self._start_threadless_work(conn, addr)
154+
else:
155+
self._start_threaded_work(conn, addr)
136156

137157
def run(self) -> None:
138158
setup_logger(

proxy/core/acceptor/threadless.py

+21-19
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,22 @@
3434

3535

3636
class Threadless(multiprocessing.Process):
37-
"""Threadless process provides an event loop.
37+
"""Work executor process.
3838
39-
Internally, for each client connection, an instance of `work_klass`
40-
is created. Threadless will invoke necessary lifecycle of the `Work` class
41-
allowing implementations to handle accepted client connections as they wish.
39+
Threadless process provides an event loop, which is shared across
40+
multiple `Work` instances to handle work.
4241
43-
Note that, all `Work` implementations share the same underlying event loop.
42+
Threadless takes input a `work_klass` and an `event_queue`. `work_klass`
43+
must conform to the `Work` protocol. Work is received over the
44+
`event_queue`.
4445
45-
When --threadless option is enabled, each Acceptor process also
46-
spawns one Threadless process. And instead of spawning new thread
47-
for each accepted client connection, Acceptor process sends
48-
accepted client connection to Threadless process over a pipe.
46+
When a work is accepted, threadless creates a new instance of `work_klass`.
47+
Threadless will then invoke necessary lifecycle of the `Work` protocol,
48+
allowing `work_klass` implementation to handle the assigned work.
4949
50-
Example, HttpProtocolHandler implements Work class to hooks into the
51-
event loop provided by Threadless process.
50+
Example, `BaseTcpServerHandler` implements `Work` protocol. It expects
51+
a client connection as work payload and hooks into the threadless
52+
event loop to handle the client connection.
5253
"""
5354

5455
def __init__(
@@ -82,13 +83,10 @@ def selected_events(self) -> Generator[
8283
for fd in worker_events:
8384
# Can throw ValueError: Invalid file descriptor: -1
8485
#
85-
# Work classes must handle the exception and shutdown
86-
# gracefully otherwise this will result in bringing down the
87-
# entire threadless process
88-
#
89-
# This is only possible when work.get_events pass
90-
# an invalid file descriptor. Example, because of bad
91-
# exception handling within the work implementation class.
86+
# A guard within Work classes may not help here due to
87+
# asynchronous nature. Hence, threadless will handle
88+
# ValueError exceptions raised by selector.register
89+
# for invalid fd.
9290
self.selector.register(fd, worker_events[fd])
9391
ev = self.selector.select(timeout=1)
9492
readables = []
@@ -180,6 +178,10 @@ def run_once(self) -> None:
180178
# Note that selector from now on is idle,
181179
# until all the logic below completes.
182180
#
181+
# This is where one process per CPU architecture shines,
182+
# as other threadless processes can continue process work
183+
# within their context.
184+
#
183185
# Invoke Threadless.handle_events
184186
#
185187
# TODO: Only send readable / writables that client originally
@@ -194,7 +196,7 @@ def run_once(self) -> None:
194196
self.accept_client()
195197
# Wait for Threadless.handle_events to complete
196198
self.loop.run_until_complete(self.wait_for_tasks(tasks))
197-
# Remove and shutdown inactive connections
199+
# Remove and shutdown inactive workers
198200
self.cleanup_inactive()
199201

200202
def run(self) -> None:

proxy/core/base/tcp_server.py

-8
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def handle_readables(self, readables: Readables) -> bool:
102102
if self.client.connection in readables:
103103
data = self.client.recv(self.flags.client_recvbuf_size)
104104
if data is None:
105-
# Client closed connection, signal shutdown
106105
logger.debug(
107106
'Connection closed by client {0}'.format(
108107
self.client.addr,
@@ -126,11 +125,4 @@ def handle_readables(self, readables: Readables) -> bool:
126125
self.must_flush_before_shutdown = True
127126
else:
128127
teardown = True
129-
# except ConnectionResetError:
130-
# logger.debug(
131-
# 'Connection reset by client {0}'.format(
132-
# self.client.addr,
133-
# ),
134-
# )
135-
# teardown = True
136128
return teardown

proxy/http/handler.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]:
234234
elif isinstance(upgraded_sock, bool) and upgraded_sock is True:
235235
return True
236236
except HttpProtocolException as e:
237-
logger.debug(
238-
'HttpProtocolException type raised',
239-
)
237+
logger.debug('HttpProtocolException raised')
240238
response: Optional[memoryview] = e.response(self.request)
241239
if response:
242240
self.client.queue(response)

proxy/http/proxy/server.py

+33-21
Original file line numberDiff line numberDiff line change
@@ -151,10 +151,18 @@ def get_descriptors(
151151

152152
r: List[socket.socket] = []
153153
w: List[socket.socket] = []
154-
if self.upstream and not self.upstream.closed and self.upstream.connection:
154+
if (
155+
self.upstream and
156+
not self.upstream.closed and
157+
self.upstream.connection
158+
):
155159
r.append(self.upstream.connection)
156-
if self.upstream and not self.upstream.closed and \
157-
self.upstream.has_buffer() and self.upstream.connection:
160+
if (
161+
self.upstream and
162+
not self.upstream.closed and
163+
self.upstream.has_buffer() and
164+
self.upstream.connection
165+
):
158166
w.append(self.upstream.connection)
159167

160168
# TODO(abhinavsingh): We need to keep a mapping of plugin and
@@ -658,19 +666,11 @@ def generate_upstream_certificate(
658666

659667
def intercept(self) -> Union[socket.socket, bool]:
660668
# Perform SSL/TLS handshake with upstream
661-
teardown = self.wrap_server()
662-
if teardown:
663-
raise HttpProtocolException(
664-
'Exception when wrapping server for interception',
665-
)
669+
self.wrap_server()
666670
# Generate certificate and perform handshake with client
667671
# wrap_client also flushes client data before wrapping
668672
# sending to client can raise, handle expected exceptions
669-
teardown = self.wrap_client()
670-
if teardown:
671-
raise HttpProtocolException(
672-
'Exception when wrapping client for interception',
673-
)
673+
self.wrap_client()
674674
# Update all plugin connection reference
675675
# TODO(abhinavsingh): Is this required?
676676
for plugin in self.plugins.values():
@@ -680,6 +680,7 @@ def intercept(self) -> Union[socket.socket, bool]:
680680
def wrap_server(self) -> bool:
681681
assert self.upstream is not None
682682
assert isinstance(self.upstream.connection, socket.socket)
683+
do_close = False
683684
try:
684685
self.upstream.wrap(text_(self.request.host), self.flags.ca_file)
685686
except ssl.SSLCertVerificationError: # Server raised certificate verification error
@@ -692,7 +693,7 @@ def wrap_server(self) -> bool:
692693
self.upstream.addr[0],
693694
),
694695
)
695-
return True
696+
do_close = True
696697
except ssl.SSLError as e:
697698
if e.reason == 'SSLV3_ALERT_HANDSHAKE_FAILURE':
698699
logger.warning(
@@ -707,13 +708,19 @@ def wrap_server(self) -> bool:
707708
self.upstream.addr[0],
708709
), exc_info=e,
709710
)
710-
return True
711+
do_close = True
712+
finally:
713+
if do_close:
714+
raise HttpProtocolException(
715+
'Exception when wrapping server for interception',
716+
)
711717
assert isinstance(self.upstream.connection, ssl.SSLSocket)
712718
return False
713719

714720
def wrap_client(self) -> bool:
715721
assert self.upstream is not None and self.flags.ca_signing_key_file is not None
716722
assert isinstance(self.upstream.connection, ssl.SSLSocket)
723+
do_close = False
717724
try:
718725
# TODO: Perform async certificate generation
719726
generated_cert = self.generate_upstream_certificate(
@@ -724,7 +731,7 @@ def wrap_client(self) -> bool:
724731
logger.exception(
725732
'TimeoutExpired during certificate generation', exc_info=e,
726733
)
727-
return True
734+
do_close = True
728735
except ssl.SSLCertVerificationError: # Client raised certificate verification error
729736
# When --disable-interception-on-ssl-cert-verification-error flag is on,
730737
# we will cache such upstream hosts and avoid intercepting them for future
@@ -735,14 +742,14 @@ def wrap_client(self) -> bool:
735742
self.upstream.addr[0],
736743
),
737744
)
738-
return True
745+
do_close = True
739746
except ssl.SSLEOFError as e:
740747
logger.warning(
741748
'ssl.SSLEOFError {0} when wrapping client for upstream: {1}'.format(
742749
str(e), self.upstream.addr[0],
743750
),
744751
)
745-
return True
752+
do_close = True
746753
except ssl.SSLError as e:
747754
if e.reason in ('TLSV1_ALERT_UNKNOWN_CA', 'UNSUPPORTED_PROTOCOL'):
748755
logger.warning(
@@ -757,21 +764,26 @@ def wrap_client(self) -> bool:
757764
self.upstream.addr[0],
758765
), exc_info=e,
759766
)
760-
return True
767+
do_close = True
761768
except BrokenPipeError:
762769
logger.error(
763770
'BrokenPipeError when wrapping client for upstream: {0}'.format(
764771
self.upstream.addr[0],
765772
),
766773
)
767-
return True
774+
do_close = True
768775
except OSError as e:
769776
logger.exception(
770777
'OSError when wrapping client for upstream: {0}'.format(
771778
self.upstream.addr[0],
772779
), exc_info=e,
773780
)
774-
return True
781+
do_close = True
782+
finally:
783+
if do_close:
784+
raise HttpProtocolException(
785+
'Exception when wrapping client for interception',
786+
)
775787
logger.debug('TLS intercepting using %s', generated_cert)
776788
return False
777789

0 commit comments

Comments
 (0)