diff --git a/Lib/test/test_asyncgen.py b/Lib/test/test_asyncgen.py index 5bfd789185c675..4cb1bef8d0f15b 100644 --- a/Lib/test/test_asyncgen.py +++ b/Lib/test/test_asyncgen.py @@ -1,15 +1,14 @@ +import functools +import asyncio import inspect import types import unittest import contextlib from test.support.import_helper import import_module -from test.support import gc_collect, requires_working_socket -asyncio = import_module("asyncio") +from test.support import gc_collect, requires_working_socket, async_yield as _async_yield -requires_working_socket(module=True) - _no_default = object() @@ -17,12 +16,11 @@ class AwaitException(Exception): pass -@types.coroutine -def awaitable(*, throw=False): +async def awaitable(*, throw=False): if throw: - yield ('throw',) + await _async_yield(('throw',)) else: - yield ('result',) + await _async_yield(('result',)) def run_until_complete(coro): @@ -398,12 +396,6 @@ async def gen(): an.send(None) def test_async_gen_asend_throw_concurrent_with_send(self): - import types - - @types.coroutine - def _async_yield(v): - return (yield v) - class MyExc(Exception): pass @@ -431,11 +423,6 @@ async def agenfn(): gen2.send(None) def test_async_gen_athrow_throw_concurrent_with_send(self): - import types - - @types.coroutine - def _async_yield(v): - return (yield v) class MyExc(Exception): pass @@ -464,12 +451,6 @@ async def agenfn(): gen2.send(None) def test_async_gen_asend_throw_concurrent_with_throw(self): - import types - - @types.coroutine - def _async_yield(v): - return (yield v) - class MyExc(Exception): pass @@ -502,11 +483,6 @@ async def agenfn(): gen2.send(None) def test_async_gen_athrow_throw_concurrent_with_throw(self): - import types - - @types.coroutine - def _async_yield(v): - return (yield v) class MyExc(Exception): pass @@ -572,12 +548,6 @@ async def gen(): aclose.close() def test_async_gen_asend_close_runtime_error(self): - import types - - @types.coroutine - def _async_yield(v): - return (yield v) - async def agenfn(): try: await _async_yield(None) @@ -593,11 +563,6 @@ async def agenfn(): gen.close() def test_async_gen_athrow_close_runtime_error(self): - import types - - @types.coroutine - def _async_yield(v): - return (yield v) class MyExc(Exception): pass @@ -619,17 +584,188 @@ async def agenfn(): with self.assertRaisesRegex(RuntimeError, "coroutine ignored GeneratorExit"): gen.close() + def test_aiter_idempotent(self): + async def gen(): + yield 1 + applied_once = aiter(gen()) + applied_twice = aiter(applied_once) + self.assertIs(applied_once, applied_twice) + + def test_anext_iter(self): + class MyError(Exception): + pass + + async def agenfn(): + try: + await _async_yield(1) + except MyError: + await _async_yield(2) + return + yield + + def test1(anext): + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 1) + self.assertEqual(g.throw(MyError()), 2) + try: + g.send(None) + except StopIteration as e: + err = e + else: + self.fail('StopIteration was not raised') + self.assertEqual(err.value, "default") + + def test2(anext): + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 1) + self.assertEqual(g.throw(MyError()), 2) + with self.assertRaises(MyError): + g.throw(MyError()) + + def test3(anext): + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 1) + g.close() + with self.assertRaisesRegex(RuntimeError, 'cannot reuse'): + self.assertEqual(g.send(None), 1) + + def test4(anext): + async def yield_twice(v): + await _async_yield(v*10) + return await _async_yield(v*10 + 1) + + async def agenfn(): + try: + await yield_twice(1) + except MyError: + await yield_twice(2) + return + yield + + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 10) + self.assertEqual(g.throw(MyError()), 20) + with self.assertRaisesRegex(MyError, 'val'): + g.throw(MyError('val')) + + def test5(anext): + async def yield_twice(v): + await _async_yield(v*10) + return await _async_yield(v*10 + 1) + + async def agenfn(): + try: + await yield_twice(1) + except MyError: + return + yield 'aaa' + + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + self.assertEqual(g.send(None), 10) + with self.assertRaisesRegex(StopIteration, 'default'): + g.throw(MyError()) + + def test6(anext): + async def yield_twice(v): + await _async_yield(v*10) + return await _async_yield(v*10 + 1) + + async def agenfn(): + await yield_twice(1) + yield 'aaa' + + agen = agenfn() + with contextlib.closing(anext(agen, "default").__await__()) as g: + with self.assertRaises(MyError): + g.throw(MyError()) + + def run_test(test): + with self.subTest('pure-Python anext()'): + test(py_anext) + with self.subTest('builtin anext()'): + test(anext) + + run_test(test1) + run_test(test2) + run_test(test3) + run_test(test4) + run_test(test5) + run_test(test6) + + def test_async_gen_throw_custom_same_aclose_coro_twice(self): + async def async_iterate(): + yield 1 + yield 2 + + it = async_iterate() + + class MyException(Exception): + pass + + nxt = it.aclose() + with self.assertRaises(MyException): + nxt.throw(MyException) + + with self.assertRaisesRegex( + RuntimeError, + r"cannot reuse already awaited aclose\(\)/athrow\(\)" + ): + nxt.throw(MyException) + + def test_async_gen_throw_custom_same_athrow_coro_twice(self): + async def async_iterate(): + yield 1 + yield 2 + + it = async_iterate() + + class MyException(Exception): + pass + + nxt = it.athrow(MyException) + with self.assertRaises(MyException): + nxt.throw(MyException) + + with self.assertRaisesRegex( + RuntimeError, + r"cannot reuse already awaited aclose\(\)/athrow\(\)" + ): + nxt.throw(MyException) + + def test_async_gen_throw_same_aclose_coro_twice(self): + async def async_iterate(): + yield 1 + yield 2 + + it = async_iterate() + nxt = it.aclose() + with self.assertRaises(StopIteration): + nxt.throw(GeneratorExit) + + with self.assertRaisesRegex( + RuntimeError, + r"cannot reuse already awaited aclose\(\)/athrow\(\)" + ): + nxt.throw(GeneratorExit) +@requires_working_socket() class AsyncGenAsyncioTest(unittest.TestCase): + loop_used = False - def setUp(self): - self.loop = asyncio.new_event_loop() - asyncio._set_event_loop(None) + @functools.cached_property + def loop(self): + self.loop_used = True + loop = asyncio.EventLoop() + self.addCleanup(loop.close) + return loop def tearDown(self): - self.loop.close() - self.loop = None - asyncio._set_event_loop_policy(None) + self.assertTrue(self.loop_used) def check_async_iterator_anext(self, ait_class): with self.subTest(anext="pure-Python"): @@ -710,7 +846,6 @@ async def __anext__(self): self.check_async_iterator_anext(MyAsyncIter) def test_python_async_iterator_types_coroutine_anext(self): - import types class MyAsyncIterWithTypesCoro: """Asynchronously yield 1, then 2.""" def __init__(self): @@ -755,13 +890,6 @@ async def consume(): self.loop.run_until_complete(consume()) self.assertEqual(results, [1, 2]) - def test_aiter_idempotent(self): - async def gen(): - yield 1 - applied_once = aiter(gen()) - applied_twice = aiter(applied_once) - self.assertIs(applied_once, applied_twice) - def test_anext_bad_args(self): async def gen(): yield 1 @@ -851,166 +979,53 @@ async def do_test(): result = self.loop.run_until_complete(do_test()) self.assertEqual(result, "completed") - def test_anext_iter(self): - @types.coroutine - def _async_yield(v): - return (yield v) + def test_aiter_bad_args(self): + async def gen(): + yield 1 + async def call_with_too_few_args(): + await aiter() + async def call_with_too_many_args(): + await aiter(gen(), 1) + async def call_with_wrong_type_arg(): + await aiter(1) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_too_few_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_too_many_args()) + with self.assertRaises(TypeError): + self.loop.run_until_complete(call_with_wrong_type_arg()) - class MyError(Exception): - pass + async def to_list(self, gen): + res = [] + async for i in gen: + res.append(i) + return res - async def agenfn(): - try: - await _async_yield(1) - except MyError: - await _async_yield(2) + def test_async_gen_asyncio_01(self): + async def gen(): + yield 1 + await asyncio.sleep(0.01) + yield 2 + await asyncio.sleep(0.01) return - yield - - def test1(anext): - agen = agenfn() - with contextlib.closing(anext(agen, "default").__await__()) as g: - self.assertEqual(g.send(None), 1) - self.assertEqual(g.throw(MyError()), 2) - try: - g.send(None) - except StopIteration as e: - err = e - else: - self.fail('StopIteration was not raised') - self.assertEqual(err.value, "default") + yield 3 - def test2(anext): - agen = agenfn() - with contextlib.closing(anext(agen, "default").__await__()) as g: - self.assertEqual(g.send(None), 1) - self.assertEqual(g.throw(MyError()), 2) - with self.assertRaises(MyError): - g.throw(MyError()) + res = self.loop.run_until_complete(self.to_list(gen())) + self.assertEqual(res, [1, 2]) - def test3(anext): - agen = agenfn() - with contextlib.closing(anext(agen, "default").__await__()) as g: - self.assertEqual(g.send(None), 1) - g.close() - with self.assertRaisesRegex(RuntimeError, 'cannot reuse'): - self.assertEqual(g.send(None), 1) + def test_async_gen_asyncio_02(self): + async def gen(): + yield 1 + await asyncio.sleep(0.01) + yield 2 + 1 / 0 + yield 3 - def test4(anext): - @types.coroutine - def _async_yield(v): - yield v * 10 - return (yield (v * 10 + 1)) + with self.assertRaises(ZeroDivisionError): + self.loop.run_until_complete(self.to_list(gen())) - async def agenfn(): - try: - await _async_yield(1) - except MyError: - await _async_yield(2) - return - yield - - agen = agenfn() - with contextlib.closing(anext(agen, "default").__await__()) as g: - self.assertEqual(g.send(None), 10) - self.assertEqual(g.throw(MyError()), 20) - with self.assertRaisesRegex(MyError, 'val'): - g.throw(MyError('val')) - - def test5(anext): - @types.coroutine - def _async_yield(v): - yield v * 10 - return (yield (v * 10 + 1)) - - async def agenfn(): - try: - await _async_yield(1) - except MyError: - return - yield 'aaa' - - agen = agenfn() - with contextlib.closing(anext(agen, "default").__await__()) as g: - self.assertEqual(g.send(None), 10) - with self.assertRaisesRegex(StopIteration, 'default'): - g.throw(MyError()) - - def test6(anext): - @types.coroutine - def _async_yield(v): - yield v * 10 - return (yield (v * 10 + 1)) - - async def agenfn(): - await _async_yield(1) - yield 'aaa' - - agen = agenfn() - with contextlib.closing(anext(agen, "default").__await__()) as g: - with self.assertRaises(MyError): - g.throw(MyError()) - - def run_test(test): - with self.subTest('pure-Python anext()'): - test(py_anext) - with self.subTest('builtin anext()'): - test(anext) - - run_test(test1) - run_test(test2) - run_test(test3) - run_test(test4) - run_test(test5) - run_test(test6) - - def test_aiter_bad_args(self): - async def gen(): - yield 1 - async def call_with_too_few_args(): - await aiter() - async def call_with_too_many_args(): - await aiter(gen(), 1) - async def call_with_wrong_type_arg(): - await aiter(1) - with self.assertRaises(TypeError): - self.loop.run_until_complete(call_with_too_few_args()) - with self.assertRaises(TypeError): - self.loop.run_until_complete(call_with_too_many_args()) - with self.assertRaises(TypeError): - self.loop.run_until_complete(call_with_wrong_type_arg()) - - async def to_list(self, gen): - res = [] - async for i in gen: - res.append(i) - return res - - def test_async_gen_asyncio_01(self): - async def gen(): - yield 1 - await asyncio.sleep(0.01) - yield 2 - await asyncio.sleep(0.01) - return - yield 3 - - res = self.loop.run_until_complete(self.to_list(gen())) - self.assertEqual(res, [1, 2]) - - def test_async_gen_asyncio_02(self): - async def gen(): - yield 1 - await asyncio.sleep(0.01) - yield 2 - 1 / 0 - yield 3 - - with self.assertRaises(ZeroDivisionError): - self.loop.run_until_complete(self.to_list(gen())) - - def test_async_gen_asyncio_03(self): - loop = self.loop + def test_async_gen_asyncio_03(self): + loop = self.loop class Gen: async def __aiter__(self): @@ -1686,87 +1701,6 @@ async def wait(): self.assertEqual(finalized, 2) - def test_async_gen_asyncio_shutdown_02(self): - messages = [] - - def exception_handler(loop, context): - messages.append(context) - - async def async_iterate(): - yield 1 - yield 2 - - it = async_iterate() - async def main(): - loop = asyncio.get_running_loop() - loop.set_exception_handler(exception_handler) - - async for i in it: - break - - asyncio.run(main()) - - self.assertEqual(messages, []) - - def test_async_gen_asyncio_shutdown_exception_01(self): - messages = [] - - def exception_handler(loop, context): - messages.append(context) - - async def async_iterate(): - try: - yield 1 - yield 2 - finally: - 1/0 - - it = async_iterate() - async def main(): - loop = asyncio.get_running_loop() - loop.set_exception_handler(exception_handler) - - async for i in it: - break - - asyncio.run(main()) - - message, = messages - self.assertEqual(message['asyncgen'], it) - self.assertIsInstance(message['exception'], ZeroDivisionError) - self.assertIn('an error occurred during closing of asynchronous generator', - message['message']) - - def test_async_gen_asyncio_shutdown_exception_02(self): - messages = [] - - def exception_handler(loop, context): - messages.append(context) - - async def async_iterate(): - try: - yield 1 - yield 2 - finally: - 1/0 - - async def main(): - loop = asyncio.get_running_loop() - loop.set_exception_handler(exception_handler) - - async for i in async_iterate(): - break - gc_collect() - - asyncio.run(main()) - - message, = messages - self.assertIsInstance(message['exception'], ZeroDivisionError) - self.assertIn('unhandled exception during asyncio.run() shutdown', - message['message']) - del message, messages - gc_collect() - def test_async_gen_expression_01(self): async def arange(n): for i in range(n): @@ -1798,29 +1732,6 @@ async def run(): res = self.loop.run_until_complete(run()) self.assertEqual(res, [i * 2 for i in range(1, 10)]) - def test_asyncgen_nonstarted_hooks_are_cancellable(self): - # See https://bugs.python.org/issue38013 - messages = [] - - def exception_handler(loop, context): - messages.append(context) - - async def async_iterate(): - yield 1 - yield 2 - - async def main(): - loop = asyncio.get_running_loop() - loop.set_exception_handler(exception_handler) - - async for i in async_iterate(): - break - - asyncio.run(main()) - - self.assertEqual([], messages) - gc_collect() - def test_async_gen_await_same_anext_coro_twice(self): async def async_iterate(): yield 1 @@ -1857,62 +1768,6 @@ async def run(): self.loop.run_until_complete(run()) - def test_async_gen_throw_same_aclose_coro_twice(self): - async def async_iterate(): - yield 1 - yield 2 - - it = async_iterate() - nxt = it.aclose() - with self.assertRaises(StopIteration): - nxt.throw(GeneratorExit) - - with self.assertRaisesRegex( - RuntimeError, - r"cannot reuse already awaited aclose\(\)/athrow\(\)" - ): - nxt.throw(GeneratorExit) - - def test_async_gen_throw_custom_same_aclose_coro_twice(self): - async def async_iterate(): - yield 1 - yield 2 - - it = async_iterate() - - class MyException(Exception): - pass - - nxt = it.aclose() - with self.assertRaises(MyException): - nxt.throw(MyException) - - with self.assertRaisesRegex( - RuntimeError, - r"cannot reuse already awaited aclose\(\)/athrow\(\)" - ): - nxt.throw(MyException) - - def test_async_gen_throw_custom_same_athrow_coro_twice(self): - async def async_iterate(): - yield 1 - yield 2 - - it = async_iterate() - - class MyException(Exception): - pass - - nxt = it.athrow(MyException) - with self.assertRaises(MyException): - nxt.throw(MyException) - - with self.assertRaisesRegex( - RuntimeError, - r"cannot reuse already awaited aclose\(\)/athrow\(\)" - ): - nxt.throw(MyException) - def test_async_gen_aclose_twice_with_different_coros(self): # Regression test for https://bugs.python.org/issue39606 async def async_iterate(): @@ -2010,10 +1865,6 @@ class MyException(Exception): gc_collect() # does not warn unawaited def test_asend_send_already_running(self): - @types.coroutine - def _async_yield(v): - return (yield v) - async def agenfn(): while True: await _async_yield(1) @@ -2034,10 +1885,6 @@ async def agenfn(): def test_athrow_send_already_running(self): - @types.coroutine - def _async_yield(v): - return (yield v) - async def agenfn(): while True: await _async_yield(1) @@ -2056,5 +1903,116 @@ async def agenfn(): del gen2 gc_collect() # does not warn unawaited + +_asyncio_run = functools.partial(asyncio.run, loop_factory=asyncio.EventLoop) + + +@requires_working_socket() +class AsyncGenAsyncioRunTestCase(unittest.TestCase): + def test_async_gen_asyncio_shutdown_02(self): + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + yield 1 + yield 2 + + it = async_iterate() + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in it: + break + + _asyncio_run(main()) + + self.assertEqual(messages, []) + + def test_async_gen_asyncio_shutdown_exception_01(self): + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + try: + yield 1 + yield 2 + finally: + 1/0 + + it = async_iterate() + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in it: + break + + _asyncio_run(main()) + + message, = messages + self.assertEqual(message['asyncgen'], it) + self.assertIsInstance(message['exception'], ZeroDivisionError) + self.assertIn('an error occurred during closing of asynchronous generator', + message['message']) + + def test_async_gen_asyncio_shutdown_exception_02(self): + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + try: + yield 1 + yield 2 + finally: + 1/0 + + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in async_iterate(): + break + gc_collect() + + _asyncio_run(main()) + + message, = messages + self.assertIsInstance(message['exception'], ZeroDivisionError) + self.assertIn('unhandled exception during asyncio.run() shutdown', + message['message']) + del message, messages + gc_collect() + + def test_asyncgen_nonstarted_hooks_are_cancellable(self): + # See https://bugs.python.org/issue38013 + messages = [] + + def exception_handler(loop, context): + messages.append(context) + + async def async_iterate(): + yield 1 + yield 2 + + async def main(): + loop = asyncio.get_running_loop() + loop.set_exception_handler(exception_handler) + + async for i in async_iterate(): + break + + _asyncio_run(main()) + + self.assertEqual([], messages) + gc_collect() + + if __name__ == "__main__": unittest.main()