Skip to content

Commit 1d0a764

Browse files
authored
Merge pull request #181 from njsmith/thread-limiter
Implement capacity limitation for run_in_worker_thread
2 parents 4b9466c + 9f21a04 commit 1d0a764

File tree

6 files changed

+803
-57
lines changed

6 files changed

+803
-57
lines changed

docs/source/conf.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@
2121
# import sys
2222
# sys.path.insert(0, os.path.abspath('.'))
2323

24+
# Warn about all references to unknown targets
25+
nitpicky = True
26+
# Except for these ones, which we expect to point to unknown targets:
27+
nitpick_ignore = [
28+
("py:obj", "CapacityLimiter-like object"),
29+
]
30+
2431
# XX hack the RTD theme until
2532
# https://github.com/rtfd/sphinx_rtd_theme/pull/382
2633
# is shipped (should be in the release after 0.2.4)

docs/source/reference-core.rst

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1358,6 +1358,9 @@ synchronization logic. All of classes discussed in this section are
13581358
implemented on top of the public APIs in :mod:`trio.hazmat`; they
13591359
don't have any special access to trio's internals.)
13601360

1361+
.. autoclass:: CapacityLimiter
1362+
:members:
1363+
13611364
.. autoclass:: Semaphore
13621365
:members:
13631366

@@ -1387,14 +1390,146 @@ like "blocking".
13871390
In acknowledgment of this reality, Trio provides two useful utilities
13881391
for working with real, operating-system level,
13891392
:mod:`threading`\-module-style threads. First, if you're in Trio but
1390-
need to push some work into a thread, there's
1393+
need to push some blocking I/O into a thread, there's
13911394
:func:`run_in_worker_thread`. And if you're in a thread and need to
13921395
communicate back with trio, there's the closely related
13931396
:func:`current_run_in_trio_thread` and
13941397
:func:`current_await_in_trio_thread`.
13951398

1399+
1400+
Trio's philosophy about managing worker threads
1401+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1402+
1403+
If you've used other I/O frameworks, you may have encountered the
1404+
concept of a "thread pool", which is most commonly implemented as a
1405+
fixed size collection of threads that hang around waiting for jobs to
1406+
be assigned to them. These solve two different problems: First,
1407+
re-using the same threads over and over is more efficient than
1408+
starting and stopping a new thread for every job you need done;
1409+
basically, the pool acts as a kind of cache for idle threads. And
1410+
second, having a fixed size avoids getting into a situation where
1411+
100,000 jobs are submitted simultaneously, and then 100,000 threads
1412+
are spawned and the system gets overloaded and crashes. Instead, the N
1413+
threads start executing the first N jobs, while the other
1414+
(100,000 - N) jobs sit in a queue and wait their turn. Which is
1415+
generally what you want, and this is how
1416+
:func:`trio.run_in_worker_thread` works by default.
1417+
1418+
The downside of this kind of thread pool is that sometimes, you need
1419+
more sophisticated logic for controlling how many threads are run at
1420+
once. For example, you might want a policy like "at most 20 threads
1421+
total, but no more than 3 of those can be running jobs associated with
1422+
the same user account", or you might want a pool whose size is
1423+
dynamically adjusted over time in response to system conditions.
1424+
1425+
It's even possible for a fixed-size policy to cause unexpected
1426+
`deadlocks <https://en.wikipedia.org/wiki/Deadlock>`__. Imagine a
1427+
situation where we have two different types of blocking jobs that you
1428+
want to run in the thread pool, type A and type B. Type A is pretty
1429+
simple: it just runs and completes pretty quickly. But type B is more
1430+
complicated: it has to stop in the middle and wait for some other work
1431+
to finish, and that other work includes running a type A job. Now,
1432+
suppose you submit N jobs of type B to the pool. They all start
1433+
running, and then eventually end up submitting one or more jobs of
1434+
type A. But since every thread in our pool is already busy, the type A
1435+
jobs don't actually start running – they just sit in a queue waiting
1436+
for the type B jobs to finish. But the type B jobs will never finish,
1437+
because they're waiting for the type A jobs. Our system has
1438+
deadlocked. The ideal solution to this problem is to avoid having type
1439+
B jobs in the first place – generally it's better to keep complex
1440+
synchronization logic in the main Trio thread. But if you can't do
1441+
that, then you need a custom thread allocation policy that tracks
1442+
separate limits for different types of jobs, and make it impossible
1443+
for type B jobs to fill up all the slots that type A jobs need to run.
1444+
1445+
So, we can see that it's important to be able to change the policy
1446+
controlling the allocation of threads to jobs. But in many frameworks,
1447+
this requires implementing a new thread pool from scratch, which is
1448+
highly non-trivial; and if different types of jobs need different
1449+
policies, then you may have to create multiple pools, which is
1450+
inefficient because now you effectively have two different thread
1451+
caches that aren't sharing resources.
1452+
1453+
Trio's solution to this problem is to split worker thread management
1454+
into two layers. The lower layer is responsible for taking blocking
1455+
I/O jobs and arranging for them to run immediately on some worker
1456+
thread. It takes care of solving the tricky concurrency problems
1457+
involved in managing threads and is responsible for optimizations like
1458+
re-using threads, but has no admission control policy: if you give it
1459+
100,000 jobs, it will spawn 100,000 threads. The upper layer is
1460+
responsible for providing the policy to make sure that this doesn't
1461+
happen – but since it *only* has to worry about policy, it can be much
1462+
simpler. In fact, all there is to it is the ``limiter=`` argument
1463+
passed to :func:`run_in_worker_thread`. This defaults to a global
1464+
:class:`CapacityLimiter` object, which gives us the classic fixed-size
1465+
thread pool behavior. (See
1466+
:func:`current_default_worker_thread_limiter`.) But if you want to use
1467+
"separate pools" for type A jobs and type B jobs, then it's just a
1468+
matter of creating two separate :class:`CapacityLimiter` objects and
1469+
passing them in when running these jobs. Or here's an example of
1470+
defining a custom policy that respects the global thread limit, while
1471+
making sure that no individual user can use more than 3 threads at a
1472+
time::
1473+
1474+
class CombinedLimiter:
1475+
def __init__(self, first, second):
1476+
self._first = first
1477+
self._second = second
1478+
1479+
async def acquire_on_behalf_of(self, borrower):
1480+
# Acquire both, being careful to clean up properly on error
1481+
await self._first.acquire_on_behalf_of(borrower)
1482+
try:
1483+
await self._second.acquire_on_behalf_of(borrower)
1484+
except:
1485+
self._first.release_on_behalf_of(borrower)
1486+
raise
1487+
1488+
def release_on_behalf_of(self, borrower):
1489+
# Release both, being careful to clean up properly on error
1490+
try:
1491+
self._second.release_on_behalf_of(borrower)
1492+
finally:
1493+
self._first.release_on_behalf_of(borrower)
1494+
1495+
1496+
# Use a weak value dictionary, so that we don't waste memory holding
1497+
# limiter objects for users who don't have any worker threads running.
1498+
USER_LIMITERS = weakref.WeakValueDictionary()
1499+
MAX_THREADS_PER_USER = 3
1500+
1501+
def get_user_limiter(user_id):
1502+
try:
1503+
return USER_LIMITERS[user_id]
1504+
except KeyError:
1505+
per_user_limiter = trio.CapacityLimiter(MAX_THREADS_PER_USER)
1506+
global_limiter = trio.current_default_worker_thread_limiter()
1507+
# IMPORTANT: acquire the per_user_limiter before the global_limiter.
1508+
# If we get 100 jobs for a user at the same time, we want
1509+
# to only allow 3 of them at a time to even compete for the
1510+
# global thread slots.
1511+
combined_limiter = CombinedLimiter(per_user_limiter, global_limiter)
1512+
USER_LIMITERS[user_id] = limiter
1513+
return limiter
1514+
1515+
1516+
async def run_in_worker_thread_for_user(user_id, async_fn, *args, **kwargs):
1517+
# *args belong to async_fn; **kwargs belong to run_in_worker_thread
1518+
kwargs["limiter"] = get_user_limiter(user_id)
1519+
return await trio.run_in_worker_thread(asycn_fn, *args, **kwargs)
1520+
1521+
1522+
Putting blocking I/O into worker threads
1523+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1524+
13961525
.. autofunction:: run_in_worker_thread
13971526

1527+
.. autofunction:: current_default_worker_thread_limiter
1528+
1529+
1530+
Getting back into the trio thread from another thread
1531+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1532+
13981533
.. function:: current_run_in_trio_thread
13991534
current_await_in_trio_thread
14001535

0 commit comments

Comments
 (0)