|
25 | 25 | from . import events
|
26 | 26 | from . import exceptions
|
27 | 27 | from . import futures
|
| 28 | +from . import queues |
28 | 29 | from . import timeouts
|
29 | 30 |
|
30 | 31 | # Helper to generate new task names
|
@@ -564,62 +565,125 @@ async def _cancel_and_wait(fut):
|
564 | 565 | fut.remove_done_callback(cb)
|
565 | 566 |
|
566 | 567 |
|
567 |
| -# This is *not* a @coroutine! It is just an iterator (yielding Futures). |
| 568 | +class _AsCompletedIterator: |
| 569 | + """Iterator of awaitables representing tasks of asyncio.as_completed. |
| 570 | +
|
| 571 | + As an asynchronous iterator, iteration yields futures as they finish. As a |
| 572 | + plain iterator, new coroutines are yielded that will return or raise the |
| 573 | + result of the next underlying future to complete. |
| 574 | + """ |
| 575 | + def __init__(self, aws, timeout): |
| 576 | + self._done = queues.Queue() |
| 577 | + self._timeout_handle = None |
| 578 | + |
| 579 | + loop = events.get_event_loop() |
| 580 | + todo = {ensure_future(aw, loop=loop) for aw in set(aws)} |
| 581 | + for f in todo: |
| 582 | + f.add_done_callback(self._handle_completion) |
| 583 | + if todo and timeout is not None: |
| 584 | + self._timeout_handle = ( |
| 585 | + loop.call_later(timeout, self._handle_timeout) |
| 586 | + ) |
| 587 | + self._todo = todo |
| 588 | + self._todo_left = len(todo) |
| 589 | + |
| 590 | + def __aiter__(self): |
| 591 | + return self |
| 592 | + |
| 593 | + def __iter__(self): |
| 594 | + return self |
| 595 | + |
| 596 | + async def __anext__(self): |
| 597 | + if not self._todo_left: |
| 598 | + raise StopAsyncIteration |
| 599 | + assert self._todo_left > 0 |
| 600 | + self._todo_left -= 1 |
| 601 | + return await self._wait_for_one() |
| 602 | + |
| 603 | + def __next__(self): |
| 604 | + if not self._todo_left: |
| 605 | + raise StopIteration |
| 606 | + assert self._todo_left > 0 |
| 607 | + self._todo_left -= 1 |
| 608 | + return self._wait_for_one(resolve=True) |
| 609 | + |
| 610 | + def _handle_timeout(self): |
| 611 | + for f in self._todo: |
| 612 | + f.remove_done_callback(self._handle_completion) |
| 613 | + self._done.put_nowait(None) # Sentinel for _wait_for_one(). |
| 614 | + self._todo.clear() # Can't do todo.remove(f) in the loop. |
| 615 | + |
| 616 | + def _handle_completion(self, f): |
| 617 | + if not self._todo: |
| 618 | + return # _handle_timeout() was here first. |
| 619 | + self._todo.remove(f) |
| 620 | + self._done.put_nowait(f) |
| 621 | + if not self._todo and self._timeout_handle is not None: |
| 622 | + self._timeout_handle.cancel() |
| 623 | + |
| 624 | + async def _wait_for_one(self, resolve=False): |
| 625 | + # Wait for the next future to be done and return it unless resolve is |
| 626 | + # set, in which case return either the result of the future or raise |
| 627 | + # an exception. |
| 628 | + f = await self._done.get() |
| 629 | + if f is None: |
| 630 | + # Dummy value from _handle_timeout(). |
| 631 | + raise exceptions.TimeoutError |
| 632 | + return f.result() if resolve else f |
| 633 | + |
| 634 | + |
568 | 635 | def as_completed(fs, *, timeout=None):
|
569 |
| - """Return an iterator whose values are coroutines. |
| 636 | + """Create an iterator of awaitables or their results in completion order. |
570 | 637 |
|
571 |
| - When waiting for the yielded coroutines you'll get the results (or |
572 |
| - exceptions!) of the original Futures (or coroutines), in the order |
573 |
| - in which and as soon as they complete. |
| 638 | + Run the supplied awaitables concurrently. The returned object can be |
| 639 | + iterated to obtain the results of the awaitables as they finish. |
574 | 640 |
|
575 |
| - This differs from PEP 3148; the proper way to use this is: |
| 641 | + The object returned can be iterated as an asynchronous iterator or a plain |
| 642 | + iterator. When asynchronous iteration is used, the originally-supplied |
| 643 | + awaitables are yielded if they are tasks or futures. This makes it easy to |
| 644 | + correlate previously-scheduled tasks with their results: |
576 | 645 |
|
577 |
| - for f in as_completed(fs): |
578 |
| - result = await f # The 'await' may raise. |
579 |
| - # Use result. |
| 646 | + ipv4_connect = create_task(open_connection("127.0.0.1", 80)) |
| 647 | + ipv6_connect = create_task(open_connection("::1", 80)) |
| 648 | + tasks = [ipv4_connect, ipv6_connect] |
580 | 649 |
|
581 |
| - If a timeout is specified, the 'await' will raise |
582 |
| - TimeoutError when the timeout occurs before all Futures are done. |
| 650 | + async for earliest_connect in as_completed(tasks): |
| 651 | + # earliest_connect is done. The result can be obtained by |
| 652 | + # awaiting it or calling earliest_connect.result() |
| 653 | + reader, writer = await earliest_connect |
583 | 654 |
|
584 |
| - Note: The futures 'f' are not necessarily members of fs. |
585 |
| - """ |
586 |
| - if futures.isfuture(fs) or coroutines.iscoroutine(fs): |
587 |
| - raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}") |
| 655 | + if earliest_connect is ipv6_connect: |
| 656 | + print("IPv6 connection established.") |
| 657 | + else: |
| 658 | + print("IPv4 connection established.") |
588 | 659 |
|
589 |
| - from .queues import Queue # Import here to avoid circular import problem. |
590 |
| - done = Queue() |
| 660 | + During asynchronous iteration, implicitly-created tasks will be yielded for |
| 661 | + supplied awaitables that aren't tasks or futures. |
591 | 662 |
|
592 |
| - loop = events.get_event_loop() |
593 |
| - todo = {ensure_future(f, loop=loop) for f in set(fs)} |
594 |
| - timeout_handle = None |
| 663 | + When used as a plain iterator, each iteration yields a new coroutine that |
| 664 | + returns the result or raises the exception of the next completed awaitable. |
| 665 | + This pattern is compatible with Python versions older than 3.13: |
595 | 666 |
|
596 |
| - def _on_timeout(): |
597 |
| - for f in todo: |
598 |
| - f.remove_done_callback(_on_completion) |
599 |
| - done.put_nowait(None) # Queue a dummy value for _wait_for_one(). |
600 |
| - todo.clear() # Can't do todo.remove(f) in the loop. |
| 667 | + ipv4_connect = create_task(open_connection("127.0.0.1", 80)) |
| 668 | + ipv6_connect = create_task(open_connection("::1", 80)) |
| 669 | + tasks = [ipv4_connect, ipv6_connect] |
601 | 670 |
|
602 |
| - def _on_completion(f): |
603 |
| - if not todo: |
604 |
| - return # _on_timeout() was here first. |
605 |
| - todo.remove(f) |
606 |
| - done.put_nowait(f) |
607 |
| - if not todo and timeout_handle is not None: |
608 |
| - timeout_handle.cancel() |
| 671 | + for next_connect in as_completed(tasks): |
| 672 | + # next_connect is not one of the original task objects. It must be |
| 673 | + # awaited to obtain the result value or raise the exception of the |
| 674 | + # awaitable that finishes next. |
| 675 | + reader, writer = await next_connect |
609 | 676 |
|
610 |
| - async def _wait_for_one(): |
611 |
| - f = await done.get() |
612 |
| - if f is None: |
613 |
| - # Dummy value from _on_timeout(). |
614 |
| - raise exceptions.TimeoutError |
615 |
| - return f.result() # May raise f.exception(). |
| 677 | + A TimeoutError is raised if the timeout occurs before all awaitables are |
| 678 | + done. This is raised by the async for loop during asynchronous iteration or |
| 679 | + by the coroutines yielded during plain iteration. |
| 680 | + """ |
| 681 | + if inspect.isawaitable(fs): |
| 682 | + raise TypeError( |
| 683 | + f"expects an iterable of awaitables, not {type(fs).__name__}" |
| 684 | + ) |
616 | 685 |
|
617 |
| - for f in todo: |
618 |
| - f.add_done_callback(_on_completion) |
619 |
| - if todo and timeout is not None: |
620 |
| - timeout_handle = loop.call_later(timeout, _on_timeout) |
621 |
| - for _ in range(len(todo)): |
622 |
| - yield _wait_for_one() |
| 686 | + return _AsCompletedIterator(fs, timeout) |
623 | 687 |
|
624 | 688 |
|
625 | 689 | @types.coroutine
|
|
0 commit comments