diff --git a/changelog.d/1172.added.rst b/changelog.d/1172.added.rst new file mode 100644 index 00000000..27f260b4 --- /dev/null +++ b/changelog.d/1172.added.rst @@ -0,0 +1 @@ +Pyright type checking support in tox configuration to improve type safety and compatibility. diff --git a/changelog.d/1177.fixed.rst b/changelog.d/1177.fixed.rst new file mode 100644 index 00000000..698f8a5d --- /dev/null +++ b/changelog.d/1177.fixed.rst @@ -0,0 +1 @@ +``RuntimeError: There is no current event loop in thread 'MainThread'`` when using shared event loops after any test unsets the event loop (such as when using ``asyncio.run`` and ``asyncio.Runner``). diff --git a/pytest_asyncio/plugin.py b/pytest_asyncio/plugin.py index ec52ee4c..4808af23 100644 --- a/pytest_asyncio/plugin.py +++ b/pytest_asyncio/plugin.py @@ -49,6 +49,16 @@ PytestPluginManager, ) +_seen_markers: set[int] = set() + + +def _warn_scope_deprecation_once(marker_id: int) -> None: + """Issues deprecation warning exactly once per marker ID.""" + if marker_id not in _seen_markers: + _seen_markers.add(marker_id) + warnings.warn(PytestDeprecationWarning(_MARKER_SCOPE_KWARG_DEPRECATION_WARNING)) + + if sys.version_info >= (3, 10): from typing import ParamSpec else: @@ -62,7 +72,9 @@ _ScopeName = Literal["session", "package", "module", "class", "function"] _R = TypeVar("_R", bound=Union[Awaitable[Any], AsyncIterator[Any]]) _P = ParamSpec("_P") +T = TypeVar("T") FixtureFunction = Callable[_P, _R] +CoroutineFunction = Callable[_P, Awaitable[T]] class PytestAsyncioError(Exception): @@ -291,7 +303,7 @@ def _asyncgen_fixture_wrapper( gen_obj = fixture_function(*args, **kwargs) async def setup(): - res = await gen_obj.__anext__() # type: ignore[union-attr] + res = await gen_obj.__anext__() return res context = contextvars.copy_context() @@ -304,7 +316,7 @@ def finalizer() -> None: async def async_finalizer() -> None: try: - await gen_obj.__anext__() # type: ignore[union-attr] + await gen_obj.__anext__() except StopAsyncIteration: pass else: @@ -333,8 +345,7 @@ def _wrap_async_fixture( runner: Runner, request: FixtureRequest, ) -> Callable[AsyncFixtureParams, AsyncFixtureReturnType]: - - @functools.wraps(fixture_function) # type: ignore[arg-type] + @functools.wraps(fixture_function) def _async_fixture_wrapper( *args: AsyncFixtureParams.args, **kwargs: AsyncFixtureParams.kwargs, @@ -447,7 +458,7 @@ def _can_substitute(item: Function) -> bool: return inspect.iscoroutinefunction(func) def runtest(self) -> None: - synchronized_obj = wrap_in_sync(self.obj) + synchronized_obj = get_async_test_wrapper(self, self.obj) with MonkeyPatch.context() as c: c.setattr(self, "obj", synchronized_obj) super().runtest() @@ -489,7 +500,7 @@ def _can_substitute(item: Function) -> bool: ) def runtest(self) -> None: - synchronized_obj = wrap_in_sync(self.obj) + synchronized_obj = get_async_test_wrapper(self, self.obj) with MonkeyPatch.context() as c: c.setattr(self, "obj", synchronized_obj) super().runtest() @@ -511,7 +522,10 @@ def _can_substitute(item: Function) -> bool: ) def runtest(self) -> None: - synchronized_obj = wrap_in_sync(self.obj.hypothesis.inner_test) + synchronized_obj = get_async_test_wrapper( + self, + self.obj.hypothesis.inner_test, + ) with MonkeyPatch.context() as c: c.setattr(self.obj.hypothesis, "inner_test", synchronized_obj) super().runtest() @@ -602,6 +616,62 @@ def _set_event_loop(loop: AbstractEventLoop | None) -> None: asyncio.set_event_loop(loop) +_session_loop: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = ( + contextvars.ContextVar( + "_session_loop", + default=None, + ) +) +_package_loop: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = ( + contextvars.ContextVar( + "_package_loop", + default=None, + ) +) +_module_loop: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = ( + contextvars.ContextVar( + "_module_loop", + default=None, + ) +) +_class_loop: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = ( + contextvars.ContextVar( + "_class_loop", + default=None, + ) +) +_function_loop: contextvars.ContextVar[asyncio.AbstractEventLoop | None] = ( + contextvars.ContextVar( + "_function_loop", + default=None, + ) +) + +_SCOPE_TO_CONTEXTVAR = { + "session": _session_loop, + "package": _package_loop, + "module": _module_loop, + "class": _class_loop, + "function": _function_loop, +} + + +def _get_or_restore_event_loop(loop_scope: _ScopeName) -> asyncio.AbstractEventLoop: + """ + Get or restore the appropriate event loop for the given scope. + + If we have a shared loop for this scope, restore and return it. + Otherwise, get the current event loop or create a new one. + """ + shared_loop = _SCOPE_TO_CONTEXTVAR[loop_scope].get() + if shared_loop is not None: + policy = _get_event_loop_policy() + policy.set_event_loop(shared_loop) + return shared_loop + else: + return _get_event_loop_no_warn() + + @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_pyfunc_call(pyfuncitem: Function) -> object | None: """ @@ -652,9 +722,22 @@ def pytest_pyfunc_call(pyfuncitem: Function) -> object | None: return None -def wrap_in_sync( - func: Callable[..., Awaitable[Any]], -): +def get_async_test_wrapper( + item: Function, + func: CoroutineFunction[_P, T], +) -> Callable[_P, None]: + """Returns a synchronous wrapper for the specified async test function.""" + marker = item.get_closest_marker("asyncio") + assert marker is not None + default_loop_scope = _get_default_test_loop_scope(item.config) + loop_scope = _get_marked_loop_scope(marker, default_loop_scope) + return _wrap_in_sync(func, loop_scope) + + +def _wrap_in_sync( + func: CoroutineFunction[_P, T], + loop_scope: _ScopeName, +) -> Callable[_P, None]: """ Return a sync wrapper around an async function executing it in the current event loop. @@ -663,7 +746,7 @@ def wrap_in_sync( @functools.wraps(func) def inner(*args, **kwargs): coro = func(*args, **kwargs) - _loop = _get_event_loop_no_warn() + _loop = _get_or_restore_event_loop(loop_scope) task = asyncio.ensure_future(coro, loop=_loop) try: _loop.run_until_complete(task) @@ -746,7 +829,7 @@ def _get_marked_loop_scope( if "scope" in asyncio_marker.kwargs: if "loop_scope" in asyncio_marker.kwargs: raise pytest.UsageError(_DUPLICATE_LOOP_SCOPE_DEFINITION_ERROR) - warnings.warn(PytestDeprecationWarning(_MARKER_SCOPE_KWARG_DEPRECATION_WARNING)) + _warn_scope_deprecation_once(id(asyncio_marker)) scope = asyncio_marker.kwargs.get("loop_scope") or asyncio_marker.kwargs.get( "scope" ) @@ -756,7 +839,7 @@ def _get_marked_loop_scope( return scope -def _get_default_test_loop_scope(config: Config) -> _ScopeName: +def _get_default_test_loop_scope(config: Config) -> Any: return config.getini("asyncio_default_test_loop_scope") @@ -784,6 +867,8 @@ def _scoped_runner( debug_mode = _get_asyncio_debug(request.config) with _temporary_event_loop_policy(new_loop_policy): runner = Runner(debug=debug_mode).__enter__() + shared_loop = runner.get_loop() + _SCOPE_TO_CONTEXTVAR[scope].set(shared_loop) try: yield runner except Exception as e: diff --git a/tests/test_set_event_loop.py b/tests/test_set_event_loop.py new file mode 100644 index 00000000..1ef8a4b6 --- /dev/null +++ b/tests/test_set_event_loop.py @@ -0,0 +1,384 @@ +from __future__ import annotations + +import sys +from textwrap import dedent + +import pytest +from pytest import Pytester + + +@pytest.mark.parametrize( + "test_loop_scope", + ("function", "module", "package", "session"), +) +@pytest.mark.parametrize( + "loop_breaking_action", + [ + "asyncio.set_event_loop(None)", + "asyncio.run(asyncio.sleep(0))", + pytest.param( + "with asyncio.Runner(): pass", + marks=pytest.mark.skipif( + sys.version_info < (3, 11), + reason="asyncio.Runner requires Python 3.11+", + ), + ), + ], +) +def test_set_event_loop_none( + pytester: Pytester, + test_loop_scope: str, + loop_breaking_action: str, +): + pytester.makeini( + dedent( + f"""\ + [pytest] + asyncio_default_test_loop_scope = {test_loop_scope} + asyncio_default_fixture_loop_scope = function + """ + ) + ) + pytester.makepyfile( + dedent( + f"""\ + import asyncio + import pytest + + pytest_plugins = "pytest_asyncio" + + + @pytest.mark.asyncio + async def test_before(): + pass + + + def test_set_event_loop_none(): + {loop_breaking_action} + + + @pytest.mark.asyncio + async def test_after(): + pass + """ + ) + ) + result = pytester.runpytest_subprocess() + result.assert_outcomes(passed=3) + + +@pytest.mark.parametrize( + "loop_breaking_action", + [ + "asyncio.set_event_loop(None)", + "asyncio.run(asyncio.sleep(0))", + pytest.param( + "with asyncio.Runner(): pass", + marks=pytest.mark.skipif( + sys.version_info < (3, 11), + reason="asyncio.Runner requires Python 3.11+", + ), + ), + ], +) +def test_set_event_loop_none_class(pytester: Pytester, loop_breaking_action: str): + pytester.makeini( + dedent( + """\ + [pytest] + asyncio_default_test_loop_scope = class + asyncio_default_fixture_loop_scope = function + """ + ) + ) + pytester.makepyfile( + dedent( + f"""\ + import asyncio + import pytest + + pytest_plugins = "pytest_asyncio" + + + class TestClass: + @pytest.mark.asyncio + async def test_before(self): + pass + + + def test_set_event_loop_none(self): + {loop_breaking_action} + + + @pytest.mark.asyncio + async def test_after(self): + pass + """ + ) + ) + result = pytester.runpytest_subprocess() + result.assert_outcomes(passed=3) + + +@pytest.mark.parametrize("test_loop_scope", ("module", "package", "session")) +@pytest.mark.parametrize( + "loop_breaking_action", + [ + "asyncio.set_event_loop(None)", + "asyncio.run(asyncio.sleep(0))", + pytest.param( + "with asyncio.Runner(): pass", + marks=pytest.mark.skipif( + sys.version_info < (3, 11), + reason="asyncio.Runner requires Python 3.11+", + ), + ), + ], +) +def test_original_shared_loop_is_reinstated_not_fresh_loop( + pytester: Pytester, + test_loop_scope: str, + loop_breaking_action: str, +): + pytester.makeini( + dedent( + f"""\ + [pytest] + asyncio_default_test_loop_scope = {test_loop_scope} + asyncio_default_fixture_loop_scope = function + """ + ) + ) + pytester.makepyfile( + dedent( + f"""\ + import asyncio + import pytest + + pytest_plugins = "pytest_asyncio" + + original_shared_loop: asyncio.AbstractEventLoop = None + + @pytest.mark.asyncio + async def test_store_original_shared_loop(): + global original_shared_loop + original_shared_loop = asyncio.get_running_loop() + original_shared_loop._custom_marker = "original_loop_marker" + + + def test_unset_event_loop(): + {loop_breaking_action} + + + @pytest.mark.asyncio + async def test_verify_original_loop_reinstated(): + global original_shared_loop + current_loop = asyncio.get_running_loop() + assert current_loop is original_shared_loop + assert hasattr(current_loop, '_custom_marker') + assert current_loop._custom_marker == "original_loop_marker" + """ + ) + ) + result = pytester.runpytest_subprocess("--asyncio-mode=strict") + result.assert_outcomes(passed=3) + + +@pytest.mark.parametrize("test_loop_scope", ("module", "package", "session")) +@pytest.mark.parametrize( + "loop_breaking_action", + [ + "asyncio.set_event_loop(None)", + "asyncio.run(asyncio.sleep(0))", + pytest.param( + "with asyncio.Runner(): pass", + marks=pytest.mark.skipif( + sys.version_info < (3, 11), + reason="asyncio.Runner requires Python 3.11+", + ), + ), + ], +) +def test_shared_loop_with_fixture_preservation( + pytester: Pytester, + test_loop_scope: str, + loop_breaking_action: str, +): + pytester.makeini( + dedent( + f"""\ + [pytest] + asyncio_default_test_loop_scope = {test_loop_scope} + asyncio_default_fixture_loop_scope = {test_loop_scope} + """ + ) + ) + pytester.makepyfile( + dedent( + f"""\ + import asyncio + import pytest + import pytest_asyncio + + pytest_plugins = "pytest_asyncio" + + fixture_loop: asyncio.AbstractEventLoop = None + long_running_task = None + + @pytest_asyncio.fixture + async def webserver(): + global fixture_loop, long_running_task + fixture_loop = asyncio.get_running_loop() + + async def background_task(): + while True: + await asyncio.sleep(1) + + long_running_task = asyncio.create_task(background_task()) + yield + long_running_task.cancel() + + + @pytest.mark.asyncio + async def test_before(webserver): + global fixture_loop, long_running_task + assert asyncio.get_running_loop() is fixture_loop + assert not long_running_task.done() + + + def test_set_event_loop_none(): + {loop_breaking_action} + + + @pytest.mark.asyncio + async def test_after(webserver): + global fixture_loop, long_running_task + current_loop = asyncio.get_running_loop() + assert current_loop is fixture_loop + assert not long_running_task.done() + """ + ) + ) + result = pytester.runpytest_subprocess("--asyncio-mode=strict") + result.assert_outcomes(passed=3) + + +@pytest.mark.parametrize( + "first_scope,second_scope", + [ + ("module", "session"), + ("session", "module"), + ("package", "session"), + ("session", "package"), + ("package", "module"), + ("module", "package"), + ], +) +@pytest.mark.parametrize( + "loop_breaking_action", + [ + "asyncio.set_event_loop(None)", + "asyncio.run(asyncio.sleep(0))", + pytest.param( + "with asyncio.Runner(): pass", + marks=pytest.mark.skipif( + sys.version_info < (3, 11), + reason="asyncio.Runner requires Python 3.11+", + ), + ), + ], +) +def test_shared_loop_with_multiple_fixtures_preservation( + pytester: Pytester, + first_scope: str, + second_scope: str, + loop_breaking_action: str, +): + pytester.makeini( + dedent( + """\ + [pytest] + asyncio_default_test_loop_scope = session + asyncio_default_fixture_loop_scope = session + """ + ) + ) + pytester.makepyfile( + dedent( + f"""\ + import asyncio + import pytest + import pytest_asyncio + + pytest_plugins = "pytest_asyncio" + + first_fixture_loop: asyncio.AbstractEventLoop = None + second_fixture_loop: asyncio.AbstractEventLoop = None + first_long_running_task = None + second_long_running_task = None + + @pytest_asyncio.fixture(scope="{first_scope}", loop_scope="{first_scope}") + async def first_webserver(): + global first_fixture_loop, first_long_running_task + first_fixture_loop = asyncio.get_running_loop() + + async def background_task(): + while True: + await asyncio.sleep(0.1) + + first_long_running_task = asyncio.create_task(background_task()) + yield + first_long_running_task.cancel() + + + @pytest_asyncio.fixture(scope="{second_scope}", loop_scope="{second_scope}") + async def second_webserver(): + global second_fixture_loop, second_long_running_task + second_fixture_loop = asyncio.get_running_loop() + + async def background_task(): + while True: + await asyncio.sleep(0.1) + + second_long_running_task = asyncio.create_task(background_task()) + yield + second_long_running_task.cancel() + + + @pytest.mark.asyncio(loop_scope="{first_scope}") + async def test_before_first(first_webserver): + global first_fixture_loop, first_long_running_task + assert asyncio.get_running_loop() is first_fixture_loop + assert not first_long_running_task.done() + + + @pytest.mark.asyncio(loop_scope="{second_scope}") + async def test_before_second(second_webserver): + global second_fixture_loop, second_long_running_task + assert asyncio.get_running_loop() is second_fixture_loop + assert not second_long_running_task.done() + + + def test_set_event_loop_none(): + {loop_breaking_action} + + + @pytest.mark.asyncio(loop_scope="{first_scope}") + async def test_after_first(first_webserver): + global first_fixture_loop, first_long_running_task + current_loop = asyncio.get_running_loop() + assert current_loop is first_fixture_loop + assert not first_long_running_task.done() + + + @pytest.mark.asyncio(loop_scope="{second_scope}") + async def test_after_second(second_webserver): + global second_fixture_loop, second_long_running_task + current_loop = asyncio.get_running_loop() + assert current_loop is second_fixture_loop + assert not second_long_running_task.done() + """ + ) + ) + result = pytester.runpytest_subprocess("--asyncio-mode=strict") + result.assert_outcomes(passed=5)