Description
Feature or enhancement
Proposal:
asyncio.to_thread
is a critical utility for running blocking operations without blocking the event loop. The current implementation always wraps the target function with functools.partial(contextvars.copy_context().run, ...)
. This ensures context variables are propagated but incurs a significant performance penalty, even when the context is empty.
In high-frequency use cases, such as web servers or data processing pipelines, the context is often empty. The overhead from copy_context()
and, more substantially, from the ctx.run()
wrapper, becomes a bottleneck. The cost is not just in the context management itself but also in the extra function call layer (partial
) that is passed to the executor.
This proposal suggests a direct optimization: check if the copied context is empty. If it is, we can call loop.run_in_executor
with the original function directly. If the context is not empty, we then use ctx.run
as the callable for the executor. This approach bypasses both the context-running and the functools.partial
overhead in the common case of an empty context.
This change is fully backward-compatible and offers a measurable performance improvement by streamlining the execution path for the most frequent use case.
Here is the proposed implementation for Lib/asyncio/threads.py
:
# Lib/asyncio/threads.py
import contextvars
from . import events
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.
Return a coroutine that can be awaited to get the eventual result of *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
# Optimization: If the context is empty, we can avoid the overhead of
# both `functools.partial` and `ctx.run()`. We pass the function
# and its arguments directly to the executor.
if not ctx:
return await loop.run_in_executor(None, func, *args, **kwargs)
# If the context is not empty, we use `ctx.run` as the callable
# for the executor to ensure context propagation.
return await loop.run_in_executor(
None, ctx.run, func, *args, **kwargs)
Benchmark
To validate the performance gain, a benchmark was conducted to measure the theoretical maximum performance improvement by isolating the to_thread
call with a no-op function. The test was run with 500,000 operations.
Benchmark Results:
The results show a clear and consistent performance gain.
- Original Implementation Best Time: 21.0525s (23,750 ops/sec)
- Optimized Implementation Best Time: 20.0661s (24,918 ops/sec)
- Theoretical Max Performance Improvement: 4.69%
This benchmark demonstrates that the optimization provides a tangible speedup by reducing overhead in the most common execution path.
Benchmark Code:
import asyncio
import time
import contextvars
import functools
# Original `to_thread` implementation for comparison
async def original_to_thread(func, /, *args, **kwargs):
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
# Optimized `to_thread` implementation
async def optimized_to_thread(func, /, *args, **kwargs):
loop = asyncio.get_running_loop()
ctx = contextvars.copy_context()
if not ctx:
return await loop.run_in_executor(None, func, *args, **kwargs)
else:
return await loop.run_in_executor(
None, ctx.run, func, *args, **kwargs)
def blocking_noop():
"""A blocking function that does nothing, for measuring overhead."""
pass
async def run_benchmark(to_thread_func, n_calls):
"""Runs a benchmark for a given to_thread implementation."""
tasks = [to_thread_func(blocking_noop) for _ in range(n_calls)]
start_time = time.monotonic()
await asyncio.gather(*tasks)
end_time = time.monotonic()
return end_time - start_time
async def main():
n_calls = 500_000
print(f"--- Running benchmark for {n_calls} calls ---")
# Benchmark original implementation
original_duration = await run_benchmark(original_to_thread, n_calls)
print(f"Original to_thread took: {original_duration:.4f} seconds")
# Benchmark optimized implementation
optimized_duration = await run_benchmark(optimized_to_thread, n_calls)
print(f"Optimized to_thread took: {optimized_duration:.4f} seconds")
# Calculate and display performance improvement
improvement = (original_duration - optimized_duration) / original_duration * 100
if improvement > 0:
print(f"\nPerformance improvement: {improvement:.2f}%")
print(f"Speedup: {original_duration / optimized_duration:.2f}x")
else:
print("\nNo significant performance improvement observed.")
if __name__ == "__main__":
asyncio.run(main())
Has this already been discussed elsewhere?
This is a minor feature, which does not need previous discussion elsewhere
Links to previous discussion of this feature:
No response
Linked PRs
Metadata
Metadata
Assignees
Projects
Status