From e32e9ef60ceb495b1afd775fc0a46a7b31594ffe Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Mon, 28 Jul 2025 16:50:48 -0700 Subject: [PATCH 01/14] [WIP] Draft pitch for share --- Evolution/NNNN-share.md | 178 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 Evolution/NNNN-share.md diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md new file mode 100644 index 00000000..deb8bdb7 --- /dev/null +++ b/Evolution/NNNN-share.md @@ -0,0 +1,178 @@ +# Share + +## Introduction + +Many of the AsyncSequence adopting types only permit a one singular consumption. However there are many times that the same produced values are useful in more than one place. Out of that mechanism there are a few approaches to share, distribute, and broadcast those values. This proposal will focus on one concept; sharing. Sharing is where each consumption independently can make forward progress and get the same values but do not replay from the beginning of time. + +## Motivation + +There are many potential usages for the sharing concept of AsyncSequences. + +One such example is the case where a source of data as an asynchronous sequence needs to be consumed by updating UI, logging, and additionally a network connection. This particular case does not matter on which uses but instead that those uses are independent of each other. It would not be expected for networking to block or delay the updates to UI, nor should logging. This example case also illustrates that the isolation of each side might be different and that some of the sides may not tolerate coalescing or dropping values. + +There are many other use cases that have been requested for this family of algorithms. Since the release of AsyncAlgorithms it has perhaps been the most popularly requested set of behaviors as additions to the package. + +## Proposed solution + +AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of it's AsyncIterator. Those iterations can take place in multiple isolations. + +When values from a differing isolation cannot be coalesced, the two options available are either awaiting (an exertion of back-pressure across the sequences) or buffering (an internal back-pressure to a buffer). Replaying the values from the beginning of the creation of the sequence is a distinctly different behavior that should be considered a different use case. This then leaves the behavioral characteristic of this particular operation of share as; sharing a buffer of values started from the initialization of a new iteration of the sequence. Control over that buffer should then have options to determine the behavior, similar to how AsyncStream allows that control. It should have options to be unbounded, buffering the oldest count of elements, or buffering the newest count of elements. + +It is critical to identify that this is one algorithm in the family of algorithms for sharing values. It should not attempt to solve all behavioral requirements but instead serve a common set of them that make cohesive sense together. This proposal is not mutually exclusive to the other algorithms in the sharing family. + +## Detailed design + +It is not just likely but perhaps a certainty that other algorithms will end up needing the same concept of a buffering policy beyond just AsyncStream and the new sharing mechanism. A new type in AsyncAlgorithms will be introduced to handle this. [^BufferingPolicy] + +```swift +/// A strategy that handles exhaustion of a buffer’s capacity. +public enum BufferingPolicy: Sendable { + /// Continue to add to the buffer, without imposing a limit on the number + /// of buffered elements. + case unbounded + + /// When the buffer is full, discard the newly received element. + /// + /// This strategy enforces keeping at most the specified number of oldest + /// values. + case bufferingOldest(Int) + + /// When the buffer is full, discard the oldest element in the buffer. + /// + /// This strategy enforces keeping at most the specified number of newest + /// values. + case bufferingNewest(Int) +} +``` + +A new extension will be added to return a concrete type representing the share algorithm. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate. + +A new AsyncSequence type will be introduced that is explicitly marked as `Sendable`. This annotation identifies to the developer that this sequence can be shared and stored. Because the type is intended to be stored it cannot be returned by the extension as a `some AsyncSequence & Sendable` since that cannot be assigned to a stored property. Additionally the type of `AsyncShareSequence`, since indented to be stored, will act as a quasi erasing-barrier to the type information of previous sequences in the chain of algorithms in that it will only hold the generic information of the `Element` and `Failure` as part of it's public interface and not the "Base" asynchronous sequence it was created from. + +```swift +extension AsyncSequence where Element: Sendable { + public func share( + bufferingPolicy: BufferingPolicy = .unbounded + ) -> AsyncShareSequence +} + +public struct AsyncShareSequence: AsyncSequence, Sendable { + public struct Iterator: AsyncIteratorProtocol { + public mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? + } + + public func makeAsyncIterator() -> Iterator +} + +@available(*, unavailable) +extension AsyncShareSequence.Iterator: Sendable { } +``` + +The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check. + +## Runtime Behavior + +The construction of the `AsyncShareSequence` will initially construct a shared iteration reference. This means that all instances of the structure of the `AsyncShareSequence` will reference to the same iteration. + +Upon creation of the `Iterator` via `makeAsyncIterator` a new "side" will be constructed to identify the specific iterator interacting with the shared iteration. Then when next is invoked is where the first actual action takes place. + +The next method will first checkout from a critical region the underlying AsyncIterator from the base. If that is successful (i.e. no other iteration sides have already checked it out) then it will invoke the next method of that iterator (forwarding in the actor isolation). If an element is produced then it enqueues the element to the shared buffer, checks in the iterator, adjusts the index in the buffer, and finds all pending continuations all in a shared critical region by a mutex. Then those continuations will be resumed with the given element. + +If no element is returned by the base iterator (signifying the terminal state);then the process is similar except it will instead mark the sequence as finished and resume with nil to any active continuations. Similarly with failures that will set the state as terminal but also store the error for further iteration points that need eventual termination. + +Then all sides are "drained" such that continuations are placed into the shared state and resumed when an element is available for that position. + +Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit). + +```swift +let exampleSource = [0, 1, 2, 3, 4].async.share() + +let t1 = Task { + for await element in exampleSource { + if element == 0 { + try? await Task.sleep(for: .seconds(1)) + } + print("Task 1", element) + } +} + +let t2 = Task { + for await element in exampleSource { + if element == 3 { + try? await Task.sleep(for: .seconds(1)) + } + print("Task 2", element) + } +} + +await t1.value +await t2.value + +``` + +This example will print a possible ordering of the following: + +``` +Task 2 0 +Task 2 1 +Task 2 2 +Task 1 0 +Task 2 3 +Task 2 4 +Task 1 1 +Task 1 2 +Task 1 3 +Task 1 4 +``` + +The order of the interleaving of the prints are not guaranteed; however the order of the elements per iteration is. Likewise in this buffering case it is guaranteed that all values are represented in the output. + +If the creation were altered to the following: + +```swift +let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingNewest(2)) +``` + +The output would print the possible ordering of: + +``` +Task 2 0 +Task 2 1 +Task 2 2 +Task 1 0 +Task 2 4 +Task 1 3 +Task 1 4 +``` + +Some values are dropped due to the buffering policy, but eventually they reach consistency. Which similarly works for the following: + +``` +let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingOldest(2)) +``` + +``` +Task 2 0 +Task 2 1 +Task 2 2 +Task 1 0 +Task 2 4 +Task 1 1 +Task 1 2 +``` + +However in this particular case the newest values are the dropped elements. + +## Usage + +It is expected that this operator will be unlike other + +## Effect on API resilience + +This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing). + +## Alternatives considered + +[^BufferingPolicy] It has been considered that this particular policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to expose it as a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream. + + From 8447eef7863062aa928cdf47ccbff52b4829b363 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Thu, 31 Jul 2025 12:49:31 -0700 Subject: [PATCH 02/14] Update the proposal with some initial feedback and add a first draft implementation of share --- Evolution/NNNN-share.md | 64 +-- .../AsyncAlgorithms/AsyncShareSequence.swift | 437 ++++++++++++++++++ 2 files changed, 451 insertions(+), 50 deletions(-) create mode 100644 Sources/AsyncAlgorithms/AsyncShareSequence.swift diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index deb8bdb7..9fa5ba83 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -22,65 +22,33 @@ It is critical to identify that this is one algorithm in the family of algorithm ## Detailed design -It is not just likely but perhaps a certainty that other algorithms will end up needing the same concept of a buffering policy beyond just AsyncStream and the new sharing mechanism. A new type in AsyncAlgorithms will be introduced to handle this. [^BufferingPolicy] +A new extension will be added to return a `Sendable` `AsyncSequence`. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate. -```swift -/// A strategy that handles exhaustion of a buffer’s capacity. -public enum BufferingPolicy: Sendable { - /// Continue to add to the buffer, without imposing a limit on the number - /// of buffered elements. - case unbounded - - /// When the buffer is full, discard the newly received element. - /// - /// This strategy enforces keeping at most the specified number of oldest - /// values. - case bufferingOldest(Int) - - /// When the buffer is full, discard the oldest element in the buffer. - /// - /// This strategy enforces keeping at most the specified number of newest - /// values. - case bufferingNewest(Int) -} -``` - -A new extension will be added to return a concrete type representing the share algorithm. This extension will take a buffering policy to identify how the buffer will be handled when iterations do not consume at the same rate. - -A new AsyncSequence type will be introduced that is explicitly marked as `Sendable`. This annotation identifies to the developer that this sequence can be shared and stored. Because the type is intended to be stored it cannot be returned by the extension as a `some AsyncSequence & Sendable` since that cannot be assigned to a stored property. Additionally the type of `AsyncShareSequence`, since indented to be stored, will act as a quasi erasing-barrier to the type information of previous sequences in the chain of algorithms in that it will only hold the generic information of the `Element` and `Failure` as part of it's public interface and not the "Base" asynchronous sequence it was created from. +The `Sendable` annotation identifies to the developer that this sequence can be shared and stored in an existental `any`. ```swift extension AsyncSequence where Element: Sendable { public func share( - bufferingPolicy: BufferingPolicy = .unbounded - ) -> AsyncShareSequence + bufferingPolicy: AsyncBufferSequencePolicy = .unbounded + ) -> some AsyncSequence & Sendable } - -public struct AsyncShareSequence: AsyncSequence, Sendable { - public struct Iterator: AsyncIteratorProtocol { - public mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? - } - - public func makeAsyncIterator() -> Iterator -} - -@available(*, unavailable) -extension AsyncShareSequence.Iterator: Sendable { } ``` -The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check. +The buffer internally to the share algorithm will only extend back to the furthest element available but there will only be a singular buffer shared across all iterators. This ensures that with the application of the buffering policy the storage size is as minimal as possible while still allowing all iterations to avoid dropping values and keeping the memory usage in check. The signature reuses the existing `AsyncBufferSequencePolicy` type to specify the behavior around buffering either responding to how it should limit emitting to the buffer or what should happen when the buffer is exceeded. ## Runtime Behavior -The construction of the `AsyncShareSequence` will initially construct a shared iteration reference. This means that all instances of the structure of the `AsyncShareSequence` will reference to the same iteration. +The runtime behaviors fall into a few categories; ordering, iteration isolation, cancellation, and lifetimes. To understand the beahviors there are a terms useful to define. Each creation of the AsyncIterator of the sequence and invocation of next will be referred to a side of the share iteration. The back pressure to the system to fetch a new element or termination is refered to as demand. The limit which is the pending gate for awaiting until the buffer has been serviced used for the `AsyncBufferSequencePolicy.bounded(_ : Int)` policy. The last special definition is that of the extent which is specifically in this case the lifetime of the asynchronous sequence itself. -Upon creation of the `Iterator` via `makeAsyncIterator` a new "side" will be constructed to identify the specific iterator interacting with the shared iteration. Then when next is invoked is where the first actual action takes place. +When the underlying type backing the share algorithm is constructed a new extent is created; this is used for tracking the reference lifetime under the hood and is used to both house the iteration but also to identify the point at which no more sides can be constructed. When no more sides can be constructed and no sides are left to iterate then the backing iteration is canceled. This prevents any un-referenced task backing the iteration to not be leaked by the algorith itself. -The next method will first checkout from a critical region the underlying AsyncIterator from the base. If that is successful (i.e. no other iteration sides have already checked it out) then it will invoke the next method of that iterator (forwarding in the actor isolation). If an element is produced then it enqueues the element to the shared buffer, checks in the iterator, adjusts the index in the buffer, and finds all pending continuations all in a shared critical region by a mutex. Then those continuations will be resumed with the given element. +That construction then creates an initial shared state and buffer. No task is started initially; it is only upon the first demand that the task backing the iteration is started; this means on the first call to next a task is spun up servicing all potential sides. The order of which the sides are serviced is not specified and cannot be relied upon, however the order of delivery within a side is always guarenteed to be ordered. The singular task servicing the iteration will be the only place holding any sort of iterator from the base `AsyncSequence`; so that iterator is isolated and not sent from one isolation to another. That iteration first awaits any limit availability and then awaits for a demand given by a side. After-which it then awaits an element or terminal event from the iterator and enqueues the elements to the buffer. -If no element is returned by the base iterator (signifying the terminal state);then the process is similar except it will instead mark the sequence as finished and resume with nil to any active continuations. Similarly with failures that will set the state as terminal but also store the error for further iteration points that need eventual termination. +The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the default and common behavior is to be unbounded. This allows for a progressive disclosure from common usage that just works out of the box with no configuration, to more advanced cases that need finer grained control. Furthermore there are scenarios where one might want ways of identifing dropped value events within the iteration of a side, this is something that will be addressed later in an upcoming proposal. -Then all sides are "drained" such that continuations are placed into the shared state and resumed when an element is available for that position. +As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer). + +Representing concurrent access is difficult to express all potential examples but there are a few cases included with this proposal to illustrate some of the behaviors. If a more comprehensive behavioral analysis is needed, it is strongly suggested to try out the pending pull request to identify how specific behaviors work. Please keep in mind that the odering between tasks is not specified, only the order within one side of iteration. Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit). @@ -127,7 +95,7 @@ Task 1 4 The order of the interleaving of the prints are not guaranteed; however the order of the elements per iteration is. Likewise in this buffering case it is guaranteed that all values are represented in the output. -If the creation were altered to the following: +If the creation were instead altered to the following: ```swift let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingNewest(2)) @@ -163,16 +131,12 @@ Task 1 2 However in this particular case the newest values are the dropped elements. -## Usage - -It is expected that this operator will be unlike other - ## Effect on API resilience This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing). ## Alternatives considered -[^BufferingPolicy] It has been considered that this particular policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to expose it as a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream. +It has been considered that the buffering policy would be nested inside the `AsyncShareSequence` type. However since this seems to be something that will be useful for other types it makes sense to use an existing type from a top level type. However if it is determined that a general form of a buffering policy would require additional behaviors this might be a debatable placement to move back to an interior type similar to AsyncStream. diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift new file mode 100644 index 00000000..f893db75 --- /dev/null +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -0,0 +1,437 @@ +import Synchronization + +@available(macOS 26.0, *) +extension AsyncSequence where Element: Sendable { + @available(macOS 26.0, *) // TODO: fix the availability for this to be defined as @available(AsyncAlgorithms 1.1, *) + public func share(bufferingPolicy: AsyncBufferSequencePolicy = .unbounded) -> some AsyncSequence & Sendable { + return AsyncShareSequence(self, bufferingPolicy: bufferingPolicy) + } +} + +@available(macOS 26.0, *) +struct AsyncShareSequence: Sendable where Base.Element: Sendable { + final class Side { + struct State { + var continuaton: CheckedContinuation, Never>? + var position = 0 + + func offset(_ adjustment: Int) -> State { + State(continuaton: continuaton, position: position - adjustment) + } + } + + let iteration: Iteration + let id: Int + + init(_ iteration: Iteration) { + self.iteration = iteration + id = iteration.registerSide() + } + + deinit { + iteration.unregisterSide(id) + } + + func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + try await iteration.next(isolation: actor, id: id) + } + } + + final class Iteration: Sendable { + // this is the swapped state of transferring the base to the iterating task + // it does send the Base... but only one transfer + enum IteratingTask: @unchecked Sendable { + case pending(Base) + case starting + case running(Task) + case cancelled + + var isStarting: Bool { + switch self { + case .starting: true + default: false + } + } + + func cancel() { + switch self { + case .running(let task): + task.cancel() + default: + break + } + } + } + struct State: Sendable { + enum StoragePolicy: Sendable { + case unbounded + case bufferingOldest(Int) + case bufferingNewest(Int) + } + + var generation = 0 + var sides = [Int: Side.State]() + var iteratingTask: IteratingTask + var buffer = [Element]() + var finished = false + var failure: Failure? + var limit: CheckedContinuation? + var demand: CheckedContinuation? + + let storagePolicy: StoragePolicy + + init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { + self.iteratingTask = .pending(base) + switch bufferingPolicy.policy { + case .bounded: self.storagePolicy = .unbounded + case .bufferingOldest(let bound): self.storagePolicy = .bufferingOldest(bound) + case .bufferingNewest(let bound): self.storagePolicy = .bufferingNewest(bound) + case .unbounded: self.storagePolicy = .unbounded + } + } + + mutating func trimBuffer() { + if let minimumIndex = sides.values.map({ $0.position }).min(), minimumIndex > 0 { + buffer.removeFirst(minimumIndex) + sides = sides.mapValues { + $0.offset(minimumIndex) + } + } + } + + mutating func emit(_ value: T) -> (T, CheckedContinuation?, CheckedContinuation?, Bool) { + defer { + limit = nil + demand = nil + } + if case .cancelled = iteratingTask { + return (value, limit, demand, true) + } else { + return (value, limit, demand, false) + } + } + + mutating func enqueue(_ element: Element) { + let count = buffer.count + + switch storagePolicy { + case .unbounded: + buffer.append(element) + case .bufferingOldest(let limit): + if count < limit { + buffer.append(element) + } + case .bufferingNewest(let limit): + if count < limit { + buffer.append(element) + } else if count > 0 { + buffer.removeFirst() + buffer.append(element) + } + } + } + + mutating func finish() { + finished = true + } + + mutating func fail(_ error: Failure) { + finished = true + failure = error + } + } + + let state: Mutex + let limit: Int? + + init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { + state = Mutex(State(base, bufferingPolicy: bufferingPolicy)) + switch bufferingPolicy.policy { + case .bounded(let limit): + self.limit = limit + default: + self.limit = nil + } + } + + func cancel() { + // TODO: this currently is a hard cancel, it should be refined to only cancel when everything is terminal + let (task, limit, demand, cancelled) = state.withLock { state -> (IteratingTask?, CheckedContinuation?, CheckedContinuation?, Bool) in + defer { + state.iteratingTask = .cancelled + state.limit = nil + state.demand = nil + } + return state.emit(state.iteratingTask) + } + task?.cancel() + limit?.resume(returning: cancelled) + demand?.resume() + } + + func registerSide() -> Int { + state.withLock { state in + defer { state.generation += 1 } + state.sides[state.generation] = Side.State() + return state.generation + } + } + + func unregisterSide(_ id: Int) { + let (side, continuation, cancelled) = state.withLock { state -> (Side.State?, CheckedContinuation?, Bool) in + let side = state.sides.removeValue(forKey: id) + state.trimBuffer() + if let limit, state.buffer.count < limit { + defer { state.limit = nil } + if case .cancelled = state.iteratingTask { + return (side, state.limit, true) + } else { + return (side, state.limit, false) + } + } else { + if case .cancelled = state.iteratingTask { + return (side, nil, true) + } else { + return (side, nil, false) + } + } + } + if let continuation { + continuation.resume(returning: cancelled) + } + if let side { + side.continuaton?.resume(returning: .success(nil)) + } + } + + func iterate() async -> Bool { + if let limit { + let cancelled = await withCheckedContinuation { (continuation: CheckedContinuation) in + let (resume, cancelled) = state.withLock { state -> (CheckedContinuation?, Bool) in + if state.buffer.count >= limit { + state.limit = continuation + if case .cancelled = state.iteratingTask { + return (nil, true) + } else { + return (nil, false) + } + } else { + assert(state.limit == nil) + if case .cancelled = state.iteratingTask { + return (continuation, true) + } else { + return (continuation, false) + } + } + } + if let resume { + resume.resume(returning: cancelled) + } + } + if cancelled { + return false + } + } + + // await a demand + await withCheckedContinuation { (continuation: CheckedContinuation) in + let hasPendingDemand = state.withLock { state in + for (_, side) in state.sides { + if side.continuaton != nil { + return true + } + } + state.demand = continuation + return false + } + if hasPendingDemand { + continuation.resume() + } + } + return state.withLock { state in + switch state.iteratingTask { + case .cancelled: + return false + default: + return true + } + } + } + + func cancel(id: Int) { + unregisterSide(id) // doubly unregistering is idempotent but has a side effect of emitting nil if present + } + + struct Resumption { + let continuation: CheckedContinuation, Never> + let result: Result + + func resume() { + continuation.resume(returning: result) + } + } + + func emit(_ result: Result) { + let (resumptions, demandContinuation) = state.withLock { state -> ([Resumption], CheckedContinuation?) in + var resumptions = [Resumption]() + switch result { + case .success(let element): + if let element { + state.enqueue(element) + } else { + state.finished = true + } + case .failure(let failure): + state.finished = true + state.failure = failure + } + for (id, side) in state.sides { + if let continuation = side.continuaton { + if side.position < state.buffer.count { + resumptions.append(Resumption(continuation: continuation, result: .success(state.buffer[side.position]))) + state.sides[id]?.position += 1 + state.sides[id]?.continuaton = nil + } else if state.finished { + state.sides[id]?.continuaton = nil + if let failure = state.failure { + resumptions.append(Resumption(continuation: continuation, result: .failure(failure))) + } else { + resumptions.append(Resumption(continuation: continuation, result: .success(nil))) + } + } + } + } + state.trimBuffer() + if let limit, state.buffer.count < limit { + defer { + state.demand = nil + } + return (resumptions, state.demand) + } else { + return (resumptions, nil) + } + } + if let demandContinuation { + demandContinuation.resume() + } + for resumption in resumptions { + resumption.resume() + } + } + + func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? { + let (base, cancelled) = state.withLock { state -> (Base?, Bool) in + switch state.iteratingTask { + case .pending(let base): + state.iteratingTask = .starting + return (base, false) + case .cancelled: + return (nil, true) + default: + return (nil, false) + } + } + if cancelled { return nil } + if let base { + nonisolated(unsafe) let transfer = base.makeAsyncIterator() + let task = Task.detached { [transfer, self] in + var iterator = transfer + do { + while await iterate() { + if let element = try await iterator.next() { + emit(.success(element)) + } else { + emit(.success(nil)) + } + } + } catch { + emit(.failure(error as! Failure)) + } + } + state.withLock { state in + precondition(state.iteratingTask.isStarting) + state.iteratingTask = .running(task) + } + } + let result: Result = await withTaskCancellationHandler { + await withCheckedContinuation { continuation in + let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> (Result?, CheckedContinuation?, CheckedContinuation?, Bool) in + let side = state.sides[id]! + if side.position < state.buffer.count { + // There's an element available at this position + let element = state.buffer[side.position] + state.sides[id]?.position += 1 + state.trimBuffer() + return state.emit(.success(element)) + } else { + // Position is beyond the buffer + if let failure = state.failure { + return state.emit(.failure(failure)) + } else if state.finished { + return state.emit(.success(nil)) + } else { + state.sides[id]?.continuaton = continuation + return state.emit(nil) + } + } + } + if let limitContinuation { + limitContinuation.resume(returning: cancelled) + } + if let demandContinuation { + demandContinuation.resume() + } + if let res { + continuation.resume(returning: res) + } + } + } onCancel: { + cancel(id: id) + } + + return try result.get() + } + } + + final class Extent: Sendable { + let iteration: Iteration + + init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { + iteration = Iteration(base, bufferingPolicy: bufferingPolicy) + } + + deinit { + iteration.cancel() + } + } + + let extent: Extent + + init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { + extent = Extent(base, bufferingPolicy: bufferingPolicy) + } +} + +@available(macOS 26.0, *) +extension AsyncShareSequence: AsyncSequence { + typealias Element = Base.Element + typealias Failure = Base.Failure + + struct Iterator: AsyncIteratorProtocol { + + + let side: Side + + init(_ iteration: Iteration) { + side = Side(iteration) + } + + mutating func next(isolation actor: isolated (any Actor)?) async throws(Failure) -> Element? { + try await side.next(isolation: actor) + } + } + + func makeAsyncIterator() -> Iterator { + Iterator(extent.iteration) + } +} From c8de4d05b2a86427db586df4709efea1a9c48bd0 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Thu, 31 Jul 2025 14:22:57 -0700 Subject: [PATCH 03/14] Fix the remaining todo on hard cancellation vs soft cancellation --- .../AsyncAlgorithms/AsyncShareSequence.swift | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index f893db75..36dc702d 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -75,6 +75,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen var buffer = [Element]() var finished = false var failure: Failure? + var cancelled = false var limit: CheckedContinuation? var demand: CheckedContinuation? @@ -155,14 +156,17 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } func cancel() { - // TODO: this currently is a hard cancel, it should be refined to only cancel when everything is terminal let (task, limit, demand, cancelled) = state.withLock { state -> (IteratingTask?, CheckedContinuation?, CheckedContinuation?, Bool) in - defer { - state.iteratingTask = .cancelled - state.limit = nil - state.demand = nil + if state.sides.count == 0 { + defer { + state.iteratingTask = .cancelled + state.cancelled = true + } + return state.emit(state.iteratingTask) + } else { + state.cancelled = true + return state.emit(nil) } - return state.emit(state.iteratingTask) } task?.cancel() limit?.resume(returning: cancelled) @@ -178,21 +182,32 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } func unregisterSide(_ id: Int) { - let (side, continuation, cancelled) = state.withLock { state -> (Side.State?, CheckedContinuation?, Bool) in + let (side, continuation, cancelled, iteratingTaskToCancel) = state.withLock { state -> (Side.State?, CheckedContinuation?, Bool, IteratingTask?) in let side = state.sides.removeValue(forKey: id) state.trimBuffer() + let cancelRequested = state.sides.count == 0 && state.cancelled if let limit, state.buffer.count < limit { defer { state.limit = nil } if case .cancelled = state.iteratingTask { - return (side, state.limit, true) + return (side, state.limit, true, nil) } else { - return (side, state.limit, false) + defer { + if cancelRequested { + state.iteratingTask = .cancelled + } + } + return (side, state.limit, false, cancelRequested ? state.iteratingTask : nil) } } else { if case .cancelled = state.iteratingTask { - return (side, nil, true) + return (side, nil, true, nil) } else { - return (side, nil, false) + defer { + if cancelRequested { + state.iteratingTask = .cancelled + } + } + return (side, nil, false, cancelRequested ? state.iteratingTask : nil) } } } @@ -202,6 +217,9 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen if let side { side.continuaton?.resume(returning: .success(nil)) } + if let iteratingTaskToCancel { + iteratingTaskToCancel.cancel() + } } func iterate() async -> Bool { From 954eb9951c4b1d4f2e6e2fae2630e8752e6e03c9 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Sat, 2 Aug 2025 20:06:07 -0700 Subject: [PATCH 04/14] Update Sources/AsyncAlgorithms/AsyncShareSequence.swift Co-authored-by: Jamie <2119834+jamieQ@users.noreply.github.com> --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 36dc702d..a7baf1a0 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -12,7 +12,7 @@ extension AsyncSequence where Element: Sendable { struct AsyncShareSequence: Sendable where Base.Element: Sendable { final class Side { struct State { - var continuaton: CheckedContinuation, Never>? + var continuation: CheckedContinuation, Never>? var position = 0 func offset(_ adjustment: Int) -> State { From 4322be78dd0a023a28e2b35b95438ec0d337e3e4 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Mon, 11 Aug 2025 14:51:05 -0700 Subject: [PATCH 05/14] Simplify the version definitions and add a new definition for 1.1 (#359) --- Package.swift | 31 ++++--------------------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/Package.swift b/Package.swift index c97097b4..b34c676a 100644 --- a/Package.swift +++ b/Package.swift @@ -4,34 +4,11 @@ import PackageDescription import CompilerPluginSupport // Availability Macros -let availabilityTags = [Availability("AsyncAlgorithms")] -let versionNumbers = ["1.0"] -// Availability Macro Utilities -enum OSAvailability: String { - // This should match the package's deployment target - case initialIntroduction = "macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0" - case pending = "macOS 9999, iOS 9999, tvOS 9999, watchOS 9999" - // Use 10000 for future availability to avoid compiler magic around - // the 9999 version number but ensure it is greater than 9999 - case future = "macOS 10000, iOS 10000, tvOS 10000, watchOS 10000" -} - -struct Availability { - let name: String - let osAvailability: OSAvailability - - init(_ name: String, availability: OSAvailability = .initialIntroduction) { - self.name = name - self.osAvailability = availability - } -} - -let availabilityMacros: [SwiftSetting] = versionNumbers.flatMap { version in - availabilityTags.map { - .enableExperimentalFeature("AvailabilityMacro=\($0.name) \(version):\($0.osAvailability.rawValue)") - } -} +let availabilityMacros: [SwiftSetting] = [ + .enableExperimentalFeature("AvailabilityMacro=AsyncAlgorithms 1.0:macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0"), + .enableExperimentalFeature("AvailabilityMacro=AsyncAlgorithms 1.1:macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0"), +] let package = Package( name: "swift-async-algorithms", From e342eb0a066a1203978c5199db21ab4a2b999d60 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Tue, 12 Aug 2025 17:22:16 -0700 Subject: [PATCH 06/14] Update the implementation of share to handle sendability requirements, address some edge cases and update to the latest version of the discussion around buffering behaviors as well as adding some documentation and commentary --- Evolution/NNNN-share.md | 51 ++- .../AsyncAlgorithms/AsyncShareSequence.swift | 320 ++++++++++++++---- 2 files changed, 307 insertions(+), 64 deletions(-) diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index 9fa5ba83..a5d5d969 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -29,7 +29,7 @@ The `Sendable` annotation identifies to the developer that this sequence can be ```swift extension AsyncSequence where Element: Sendable { public func share( - bufferingPolicy: AsyncBufferSequencePolicy = .unbounded + bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1) ) -> some AsyncSequence & Sendable } ``` @@ -44,7 +44,7 @@ When the underlying type backing the share algorithm is constructed a new extent That construction then creates an initial shared state and buffer. No task is started initially; it is only upon the first demand that the task backing the iteration is started; this means on the first call to next a task is spun up servicing all potential sides. The order of which the sides are serviced is not specified and cannot be relied upon, however the order of delivery within a side is always guarenteed to be ordered. The singular task servicing the iteration will be the only place holding any sort of iterator from the base `AsyncSequence`; so that iterator is isolated and not sent from one isolation to another. That iteration first awaits any limit availability and then awaits for a demand given by a side. After-which it then awaits an element or terminal event from the iterator and enqueues the elements to the buffer. -The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the default and common behavior is to be unbounded. This allows for a progressive disclosure from common usage that just works out of the box with no configuration, to more advanced cases that need finer grained control. Furthermore there are scenarios where one might want ways of identifing dropped value events within the iteration of a side, this is something that will be addressed later in an upcoming proposal. +The buffer itself is only held in one location, each side however has a cursor index into that buffer and when values are consumed it adjusts the indexes accordingly; leaving the buffer usage only as big as the largest deficit. This means that new sides that are started post initial start up will not have a "replay" effect; that is a similar but distinct algorithm and is not addressed by this proposal. Any buffer size sensitive systems that wish to adjust behavior should be aware that specifying a policy is a suggested step. However in common usage similar to other such systems servicing desktop and mobile applications the common behavior is often unbounded. Alternatively desktop or mobile applications will often want `.bounded(1)` since that enforces the slowest consumption to drive the forward progress at most 1 buffered element. All of the use cases have a reasonable default of `.bounded(1)`; mobile, deskop, and server side uses. Leaving this as the default parameter keeps the progressive disclosure of the beahviors - such that the easiest thing to write is correct for all uses, and then more advanced control can be adjusted by passing in a specific policy. This default argument diverges slightly from AsyncStream, but follows a similar behavior to that of Combine's `share`. As previously stated, the isolation of the iteration of the upstream/base AsyncSequence is to a detached task, this ensures that individual sides can have independent cancellation. Those cancellations will have the effect of remvoing that side from the shared iteration and cleaning up accordingly (including adjusting the trimming of the internal buffer). @@ -53,7 +53,7 @@ Representing concurrent access is difficult to express all potential examples bu Practically this all means that a given iteration may be "behind" another and can eventually catch up (provided it is within the buffer limit). ```swift -let exampleSource = [0, 1, 2, 3, 4].async.share() +let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .unbounded) let t1 = Task { for await element in exampleSource { @@ -131,6 +131,51 @@ Task 1 2 However in this particular case the newest values are the dropped elements. +The `.bounded(N)` policy enforces consumption to prevent any side from being beyond a given amount away from other sides' consumption. + +```swift +let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bounded(1)) + +let t1 = Task { + for await element in exampleSource { + if element == 0 { + try? await Task.sleep(for: .seconds(1)) + } + print("Task 1", element) + } +} + +let t2 = Task { + for await element in exampleSource { + if element == 3 { + try? await Task.sleep(for: .seconds(1)) + } + print("Task 2", element) + } +} + +await t1.value +await t2.value +``` + +Will have a potential ordering output of: + +``` +Task 2 0 +Task 2 1 +Task 1 0 +Task 1 1 +Task 2 2 +Task 1 2 +Task 1 3 +Task 1 4 +Task 2 3 +Task 2 4 +``` + +In that example output Task 2 can get element 0 and 1 but must await until task 1 has caught up to the specified buffering. This limit means that no additional iteration (and no values are then dropped) is made until the buffer count is below the specified value. + + ## Effect on API resilience This is an additive API and no existing systems are changed, however it will introduce a few new types that will need to be maintained as ABI interfaces. Since the intent of this is to provide a mechanism to store AsyncSequences to a shared context the type must be exposed as ABI (for type sizing). diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index a7baf1a0..d54dff45 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -1,20 +1,147 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + import Synchronization -@available(macOS 26.0, *) -extension AsyncSequence where Element: Sendable { - @available(macOS 26.0, *) // TODO: fix the availability for this to be defined as @available(AsyncAlgorithms 1.1, *) - public func share(bufferingPolicy: AsyncBufferSequencePolicy = .unbounded) -> some AsyncSequence & Sendable { - return AsyncShareSequence(self, bufferingPolicy: bufferingPolicy) +@available(AsyncAlgorithms 1.1, *) +extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype { + /// Creates a shared async sequence that allows multiple concurrent iterations over a single source. + /// + /// The `share` method transforms an async sequence into a shareable sequence that can be safely + /// iterated by multiple concurrent tasks. This is useful when you want to broadcast elements from + /// a single source to multiple consumers without duplicating work or creating separate iterations. + /// + /// - Important: Each element from the source sequence is delivered to all active iterators. + /// Elements are buffered according to the specified buffering policy to handle timing differences + /// between consumers. + /// + /// - Parameter bufferingPolicy: The policy controlling how elements are buffered when consumers + /// iterate at different rates. Defaults to `.bounded(1)`. + /// - `.bounded(n)`: Limits the buffer to `n` elements, applying backpressure to the source when that limit is reached + /// - `.bufferingOldest(n)`: Keeps the oldest `n` elements, discarding newer ones when full + /// - `.bufferingNewest(n)`: Keeps the newest `n` elements, discarding older ones when full + /// - `.unbounded`: Allows unlimited buffering (use with caution) + /// + /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks. + /// + /// ## Example Usage + /// + /// ```swift + /// let numbers = AsyncStream { continuation in + /// Task { + /// for i in 1...5 { + /// continuation.yield(i) + /// try await Task.sleep(for: .seconds(1)) + /// } + /// continuation.finish() + /// } + /// } + /// + /// let shared = numbers.share() + /// + /// // Multiple tasks can iterate concurrently + /// async let consumer1 = Task { + /// for await value in shared { + /// print("Consumer 1: \(value)") + /// } + /// } + /// + /// async let consumer2 = Task { + /// for await value in shared { + /// print("Consumer 2: \(value)") + /// } + /// } + /// + /// await consumer1.value + /// await consumer2.value + /// ``` + /// + /// ## Buffering Behavior + /// + /// The buffering policy determines how the shared sequence handles elements when consumers + /// iterate at different speeds: + /// + /// - **Bounded**: Applies backpressure to slow down the source when the buffer is full + /// - **Buffering Oldest**: Drops new elements when the buffer is full, preserving older ones + /// - **Buffering Newest**: Drops old elements when the buffer is full, preserving newer ones + /// - **Unbounded**: Never drops elements but may consume unbounded memory + /// + /// - Note: The source async sequence's iterator is consumed only once, regardless of how many + /// concurrent consumers are active. This makes sharing efficient for expensive-to-produce sequences. + public func share(bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)) -> some AsyncSequence & Sendable { + // the iterator is transferred to the isolation of the iterating task + // this has to be done "unsafely" since we cannot annotate the transfer + // however since iterating an AsyncSequence types twice has been defined + // as invalid and one creation of the iterator is virtually a consuming + // operation so this is safe at runtime. + nonisolated(unsafe) let iterator = makeAsyncIterator() + return AsyncShareSequence( { + iterator + }, bufferingPolicy: bufferingPolicy) } } -@available(macOS 26.0, *) -struct AsyncShareSequence: Sendable where Base.Element: Sendable { +// An async sequence that enables safe concurrent sharing of a single source sequence. +// +// `AsyncShareSequence` wraps a base async sequence and allows multiple concurrent iterators +// to consume elements from the same source. It handles all the complexity of coordinating +// between multiple consumers, buffering elements, and managing the lifecycle of the underlying +// iteration. +// +// ## Key Features +// +// - **Single Source Iteration**: The base sequence's iterator is created and consumed only once +// - **Concurrent Safe**: Multiple tasks can safely iterate simultaneously +// - **Configurable Buffering**: Supports various buffering strategies for different use cases +// - **Automatic Cleanup**: Properly manages resources and cancellation across all consumers +// +// ## Internal Architecture +// +// The implementation uses several key components: +// - `Side`: Represents a single consumer's iteration state +// - `Iteration`: Coordinates all consumers and manages the shared buffer +// - `Extent`: Manages the overall lifecycle and cleanup +// +// This type is typically not used directly; instead, use the `share()` method on any +// async sequence that meets the sendability requirements. +@available(AsyncAlgorithms 1.1, *) +struct AsyncShareSequence: Sendable where Base.Element: Sendable, Base: SendableMetatype, Base.AsyncIterator: SendableMetatype { + // Represents a single consumer's connection to the shared sequence. + // + // Each iterator of the shared sequence creates its own `Side` instance, which tracks + // that consumer's position in the shared buffer and manages its continuation for + // async iteration. The `Side` automatically registers itself with the central + // `Iteration` coordinator and cleans up when deallocated. + // + // ## Lifecycle + // + // - **Creation**: Automatically registers with the iteration coordinator + // - **Usage**: Tracks buffer position and manages async continuations + // - **Cleanup**: Automatically unregisters and cancels pending operations on deinit final class Side { + // Tracks the state of a single consumer's iteration. + // + // - `continuaton`: The continuation waiting for the next element (nil if not waiting) + // - `position`: The consumer's current position in the shared buffer struct State { - var continuation: CheckedContinuation, Never>? + var continuaton: UnsafeContinuation, Never>? var position = 0 + // Creates a new state with the position adjusted by the given offset. + // + // This is used when the shared buffer is trimmed to maintain correct + // relative positioning for this consumer. + // + // - Parameter adjustment: The number of positions to subtract from the current position + // - Returns: A new `State` with the adjusted position func offset(_ adjustment: Int) -> State { State(continuaton: continuaton, position: position - adjustment) } @@ -37,11 +164,29 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } + // The central coordinator that manages the shared iteration state. + // + // `Iteration` is responsible for: + // - Managing the single background task that consumes the source sequence + // - Coordinating between multiple consumer sides + // - Buffering elements according to the specified policy + // - Handling backpressure and flow control + // - Managing cancellation and cleanup + // + // ## Thread Safety + // + // All operations are synchronized using a `Mutex` to ensure thread-safe access + // to the shared state across multiple concurrent consumers. final class Iteration: Sendable { - // this is the swapped state of transferring the base to the iterating task - // it does send the Base... but only one transfer - enum IteratingTask: @unchecked Sendable { - case pending(Base) + // Represents the state of the background task that consumes the source sequence. + // + // The iteration task goes through several states during its lifecycle: + // - `pending`: Initial state, holds the factory to create the iterator + // - `starting`: Transitional state while the task is being created + // - `running`: Active state with a running background task + // - `cancelled`: Terminal state when the iteration has been cancelled + enum IteratingTask { + case pending(@Sendable () -> sending Base.AsyncIterator) case starting case running(Task) case cancelled @@ -62,7 +207,17 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } } + // The complete shared state for coordinating all aspects of the shared iteration. + // + // This state is protected by a mutex and contains all the information needed + // to coordinate between multiple consumers, manage buffering, and control + // the background iteration task. struct State: Sendable { + // Defines how elements are stored and potentially discarded in the shared buffer. + // + // - `unbounded`: Store all elements without limit (may cause memory growth) + // - `bufferingOldest(Int)`: Keep only the oldest N elements, ignore newer ones when full + // - `bufferingNewest(Int)`: Keep only the newest N elements, discard older ones when full enum StoragePolicy: Sendable { case unbounded case bufferingOldest(Int) @@ -76,13 +231,13 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen var finished = false var failure: Failure? var cancelled = false - var limit: CheckedContinuation? - var demand: CheckedContinuation? + var limit: UnsafeContinuation? + var demand: UnsafeContinuation? let storagePolicy: StoragePolicy - init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { - self.iteratingTask = .pending(base) + init(_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, bufferingPolicy: AsyncBufferSequencePolicy) { + self.iteratingTask = .pending(iteratorFactory) switch bufferingPolicy.policy { case .bounded: self.storagePolicy = .unbounded case .bufferingOldest(let bound): self.storagePolicy = .bufferingOldest(bound) @@ -91,6 +246,14 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } + // Removes elements from the front of the buffer that all consumers have already processed. + // + // This method finds the minimum position across all active consumers and removes + // that many elements from the front of the buffer. It then adjusts all consumer + // positions to account for the removed elements, maintaining their relative positions. + // + // This optimization prevents the buffer from growing indefinitely when all consumers + // are keeping pace with each other. mutating func trimBuffer() { if let minimumIndex = sides.values.map({ $0.position }).min(), minimumIndex > 0 { buffer.removeFirst(minimumIndex) @@ -100,18 +263,43 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } - mutating func emit(_ value: T) -> (T, CheckedContinuation?, CheckedContinuation?, Bool) { + // Private state machine transitions for the emission of a given value. + // + // This method ensures the continuations are properly consumed when emitting values + // and returns those continuations for resumption. + private mutating func _emit(_ value: T, limit: Int) -> (T, UnsafeContinuation?, UnsafeContinuation?, Bool) { + let belowLimit = buffer.count < limit || limit == 0 defer { - limit = nil + if belowLimit { + self.limit = nil + } demand = nil } if case .cancelled = iteratingTask { - return (value, limit, demand, true) + return (value, belowLimit ? self.limit : nil, demand, true) } else { - return (value, limit, demand, false) + return (value, belowLimit ? self.limit : nil, demand, false) } } + // Internal state machine transitions for the emission of a given value. + // + // This method ensures the continuations are properly consumed when emitting values + // and returns those continuations for resumption. + // + // If no limit is specified it interprets that as an unbounded limit. + mutating func emit(_ value: T, limit: Int?) -> (T, UnsafeContinuation?, UnsafeContinuation?, Bool) { + return _emit(value, limit: limit ?? .max) + } + + // Adds an element to the buffer according to the configured storage policy. + // + // The behavior depends on the storage policy: + // - **Unbounded**: Always appends the element + // - **Buffering Oldest**: Appends only if under the limit, otherwise ignores the element + // - **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends + // + // - Parameter element: The element to add to the buffer mutating func enqueue(_ element: Element) { let count = buffer.count @@ -145,8 +333,8 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen let state: Mutex let limit: Int? - init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { - state = Mutex(State(base, bufferingPolicy: bufferingPolicy)) + init(_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, bufferingPolicy: AsyncBufferSequencePolicy) { + state = Mutex(State(iteratorFactory, bufferingPolicy: bufferingPolicy)) switch bufferingPolicy.policy { case .bounded(let limit): self.limit = limit @@ -156,20 +344,20 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } func cancel() { - let (task, limit, demand, cancelled) = state.withLock { state -> (IteratingTask?, CheckedContinuation?, CheckedContinuation?, Bool) in + let (task, limitContinuation, demand, cancelled) = state.withLock { state -> (IteratingTask?, UnsafeContinuation?, UnsafeContinuation?, Bool) in if state.sides.count == 0 { defer { state.iteratingTask = .cancelled state.cancelled = true } - return state.emit(state.iteratingTask) + return state.emit(state.iteratingTask, limit: limit) } else { state.cancelled = true - return state.emit(nil) + return state.emit(nil, limit: limit) } } task?.cancel() - limit?.resume(returning: cancelled) + limitContinuation?.resume(returning: cancelled) demand?.resume() } @@ -182,7 +370,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } func unregisterSide(_ id: Int) { - let (side, continuation, cancelled, iteratingTaskToCancel) = state.withLock { state -> (Side.State?, CheckedContinuation?, Bool, IteratingTask?) in + let (side, continuation, cancelled, iteratingTaskToCancel) = state.withLock { state -> (Side.State?, UnsafeContinuation?, Bool, IteratingTask?) in let side = state.sides.removeValue(forKey: id) state.trimBuffer() let cancelRequested = state.sides.count == 0 && state.cancelled @@ -224,8 +412,8 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen func iterate() async -> Bool { if let limit { - let cancelled = await withCheckedContinuation { (continuation: CheckedContinuation) in - let (resume, cancelled) = state.withLock { state -> (CheckedContinuation?, Bool) in + let cancelled = await withUnsafeContinuation { (continuation: UnsafeContinuation) in + let (resume, cancelled) = state.withLock { state -> (UnsafeContinuation?, Bool) in if state.buffer.count >= limit { state.limit = continuation if case .cancelled = state.iteratingTask { @@ -252,7 +440,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } // await a demand - await withCheckedContinuation { (continuation: CheckedContinuation) in + await withUnsafeContinuation { (continuation: UnsafeContinuation) in let hasPendingDemand = state.withLock { state in for (_, side) in state.sides { if side.continuaton != nil { @@ -281,7 +469,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } struct Resumption { - let continuation: CheckedContinuation, Never> + let continuation: UnsafeContinuation, Never> let result: Result func resume() { @@ -290,7 +478,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } func emit(_ result: Result) { - let (resumptions, demandContinuation) = state.withLock { state -> ([Resumption], CheckedContinuation?) in + let (resumptions, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> ([Resumption], UnsafeContinuation?, UnsafeContinuation?, Bool) in var resumptions = [Resumption]() switch result { case .success(let element): @@ -320,14 +508,11 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } state.trimBuffer() - if let limit, state.buffer.count < limit { - defer { - state.demand = nil - } - return (resumptions, state.demand) - } else { - return (resumptions, nil) - } + return state.emit(resumptions, limit: limit) + } + + if let limitContinuation { + limitContinuation.resume(returning: cancelled) } if let demandContinuation { demandContinuation.resume() @@ -338,11 +523,11 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? { - let (base, cancelled) = state.withLock { state -> (Base?, Bool) in + let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in switch state.iteratingTask { - case .pending(let base): + case .pending(let factory): state.iteratingTask = .starting - return (base, false) + return (factory, false) case .cancelled: return (nil, true) default: @@ -350,10 +535,13 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } if cancelled { return nil } - if let base { - nonisolated(unsafe) let transfer = base.makeAsyncIterator() - let task = Task.detached { [transfer, self] in - var iterator = transfer + if let factory { + // this has to be interfaced as detached since we want the priority inference + // from the creator to not have a direct effect on the iteration. + // This might be improved later by passing on the creation context's task + // priority. + let task = Task.detached(name: "Share Iteration") { [factory, self] in + var iterator = factory() do { while await iterate() { if let element = try await iterator.next() { @@ -372,24 +560,26 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } let result: Result = await withTaskCancellationHandler { - await withCheckedContinuation { continuation in - let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> (Result?, CheckedContinuation?, CheckedContinuation?, Bool) in - let side = state.sides[id]! + await withUnsafeContinuation { continuation in + let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> (Result?, UnsafeContinuation?, UnsafeContinuation?, Bool) in + guard let side = state.sides[id] else { + return state.emit(.success(nil), limit: limit) + } if side.position < state.buffer.count { // There's an element available at this position let element = state.buffer[side.position] state.sides[id]?.position += 1 state.trimBuffer() - return state.emit(.success(element)) + return state.emit(.success(element), limit: limit) } else { // Position is beyond the buffer if let failure = state.failure { - return state.emit(.failure(failure)) + return state.emit(.failure(failure), limit: limit) } else if state.finished { - return state.emit(.success(nil)) + return state.emit(.success(nil), limit: limit) } else { state.sides[id]?.continuaton = continuation - return state.emit(nil) + return state.emit(nil, limit: limit) } } } @@ -411,11 +601,20 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } + // Manages the lifecycle of the shared iteration. + // + // `Extent` serves as the ownership boundary for the shared sequence. When the + // `AsyncShareSequence` itself is deallocated, the `Extent` ensures that the + // background iteration task is properly cancelled and all resources are cleaned up. + // + // This design allows multiple iterators to safely reference the same underlying + // iteration coordinator while ensuring proper cleanup when the shared sequence + // is no longer needed. final class Extent: Sendable { let iteration: Iteration - init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { - iteration = Iteration(base, bufferingPolicy: bufferingPolicy) + init(_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, bufferingPolicy: AsyncBufferSequencePolicy) { + iteration = Iteration(iteratorFactory, bufferingPolicy: bufferingPolicy) } deinit { @@ -425,19 +624,18 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen let extent: Extent - init(_ base: Base, bufferingPolicy: AsyncBufferSequencePolicy) { - extent = Extent(base, bufferingPolicy: bufferingPolicy) + + init(_ iteratorFactory: @escaping @Sendable () -> sending Base.AsyncIterator, bufferingPolicy: AsyncBufferSequencePolicy) { + extent = Extent(iteratorFactory, bufferingPolicy: bufferingPolicy) } } -@available(macOS 26.0, *) +@available(AsyncAlgorithms 1.1, *) extension AsyncShareSequence: AsyncSequence { typealias Element = Base.Element typealias Failure = Base.Failure struct Iterator: AsyncIteratorProtocol { - - let side: Side init(_ iteration: Iteration) { From 9184733946b3912381b702b4cc66387a9f2e80e5 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Mon, 11 Aug 2025 14:26:55 -0700 Subject: [PATCH 07/14] Add some first drafts at unit tests for verifying share behaviors --- Evolution/NNNN-share.md | 2 +- .../AsyncAlgorithms/AsyncShareSequence.swift | 11 +- .../Support/GatedSequence.swift | 10 + Tests/AsyncAlgorithmsTests/TestShare.swift | 595 ++++++++++++++++++ 4 files changed, 611 insertions(+), 7 deletions(-) create mode 100644 Tests/AsyncAlgorithmsTests/TestShare.swift diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index a5d5d969..494c0e7f 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -98,7 +98,7 @@ The order of the interleaving of the prints are not guaranteed; however the orde If the creation were instead altered to the following: ```swift -let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingNewest(2)) +let exampleSource = [0, 1, 2, 3, 4].async.share(bufferingPolicy: .bufferingLatest(2)) ``` The output would print the possible ordering of: diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index d54dff45..64f771ad 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -227,9 +227,9 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen var generation = 0 var sides = [Int: Side.State]() var iteratingTask: IteratingTask - var buffer = [Element]() - var finished = false - var failure: Failure? + private(set) var buffer = [Element]() + private(set) var finished = false + private(set) var failure: Failure? var cancelled = false var limit: UnsafeContinuation? var demand: UnsafeContinuation? @@ -485,11 +485,10 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen if let element { state.enqueue(element) } else { - state.finished = true + state.finish() } case .failure(let failure): - state.finished = true - state.failure = failure + state.fail(failure) } for (id, side) in state.sides { if let continuation = side.continuaton { diff --git a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift index 9167a4c1..160b0db0 100644 --- a/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift +++ b/Tests/AsyncAlgorithmsTests/Support/GatedSequence.swift @@ -10,6 +10,7 @@ //===----------------------------------------------------------------------===// public struct GatedSequence { + public typealias Failure = Never let elements: [Element] let gates: [Gate] var index = 0 @@ -44,6 +45,15 @@ extension GatedSequence: AsyncSequence { await gate.enter() return element } + + public mutating func next(isolation actor: isolated (any Actor)?) async throws(Never) -> Element? { + guard gatedElements.count > 0 else { + return nil + } + let (element, gate) = gatedElements.removeFirst() + await gate.enter() + return element + } } public func makeAsyncIterator() -> Iterator { diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift new file mode 100644 index 00000000..5b57950d --- /dev/null +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -0,0 +1,595 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import XCTest +import AsyncAlgorithms +import Synchronization + +@available(macOS 15.0, *) +final class TestShare: XCTestCase { + + // MARK: - Basic Functionality Tests + + func test_share_delivers_elements_to_multiple_consumers() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.share() + let gate1 = Gate() + let gate2 = Gate() + + async let consumer1 = Task.detached { + var results = [Int]() + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = await iterator.next(isolation: nil) { + results.append(value) + } + return results + } + + async let consumer2 = Task.detached { + var results = [Int]() + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next(isolation: nil) { + results.append(value) + } + return results + } + let results1 = await consumer1.value + let results2 = await consumer2.value + + XCTAssertEqual(results1, [1, 2, 3, 4, 5]) + XCTAssertEqual(results2, [1, 2, 3, 4, 5]) + } + + func test_share_with_single_consumer() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.share() + + var results = [Int]() + for await value in shared { + results.append(value) + } + + XCTAssertEqual(results, [1, 2, 3, 4, 5]) + } + + func test_share_with_empty_source() async { + let source = [Int]() + let shared = source.async.share() + + var results = [Int]() + for await value in shared { + results.append(value) + } + + XCTAssertEqual(results, []) + } + + // MARK: - Buffering Policy Tests + + func test_share_with_bounded_buffering() async { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.share(bufferingPolicy: .bounded(2)) + + let results1 = Mutex([Int]()) + let results2 = Mutex([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let consumer1 = Task { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + // Consumer 1 reads first element + if let value = await iterator.next(isolation: nil) { + results1.withLock { $0.append(value) } + } + // Delay to allow consumer 2 to get ahead + try? await Task.sleep(for: .milliseconds(10)) + // Continue reading + while let value = await iterator.next(isolation: nil) { + results1.withLock { $0.append(value) } + } + } + + let consumer2 = Task { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + // Consumer 2 reads all elements quickly + while let value = await iterator.next(isolation: nil) { + results2.withLock { $0.append(value) } + } + } + + // Advance the gated sequence to make elements available + gated.advance() // 1 + gated.advance() // 2 + gated.advance() // 3 + gated.advance() // 4 + gated.advance() // 5 + + await consumer1.value + await consumer2.value + + // Both consumers should receive all elements + XCTAssertEqual(results1.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) + XCTAssertEqual(results2.withLock { $0 }.sorted(), [1, 2, 3, 4, 5]) + } + + func test_share_with_unbounded_buffering() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.share(bufferingPolicy: .unbounded) + + let results1 = Mutex([Int]()) + let results2 = Mutex([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let consumer1 = Task { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next(isolation: nil) { + results1.withLock { $0.append(value) } + // Add some delay to consumer 1 + try? await Task.sleep(for: .milliseconds(1)) + } + } + + let consumer2 = Task { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = await iterator.next(isolation: nil) { + results2.withLock { $0.append(value) } + } + } + + await consumer1.value + await consumer2.value + + XCTAssertEqual(results1.withLock { $0 }, [1, 2, 3, 4, 5]) + XCTAssertEqual(results2.withLock { $0 }, [1, 2, 3, 4, 5]) + } + + func test_share_with_bufferingLatest_buffering() async { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.share(bufferingPolicy: .bufferingLatest(2)) + + let fastResults = Mutex([Int]()) + let slowResults = Mutex([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let fastConsumer = Task.detached { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next(isolation: nil) { + fastResults.withLock { $0.append(value) } + } + } + + let slowConsumer = Task.detached { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + // Read first element immediately + if let value = await iterator.next(isolation: nil) { + slowResults.withLock { $0.append(value) } + } + // Add significant delay to let buffer fill up and potentially overflow + try? await Task.sleep(for: .milliseconds(50)) + // Continue reading remaining elements + while let value = await iterator.next(isolation: nil) { + slowResults.withLock { $0.append(value) } + } + } + + // Release all elements quickly to test buffer overflow behavior + gated.advance() // 1 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 2 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 3 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 4 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 5 + + await fastConsumer.value + await slowConsumer.value + + let slowResultsArray = slowResults.withLock { $0 } + + // Slow consumer should get the first element plus the latest elements in buffer + // With bufferingLatest(2), when buffer overflows, older elements are discarded + XCTAssertTrue(slowResultsArray.count >= 1, "Should have at least the first element") + XCTAssertEqual(slowResultsArray.first, 1, "Should start with first element") + + // Due to bufferingLatest policy, the slow consumer should favor newer elements + // It may miss some middle elements but should get the latest ones + let receivedSet = Set(slowResultsArray) + XCTAssertTrue(receivedSet.isSubset(of: Set([1, 2, 3, 4, 5]))) + + // With bufferingLatest, we expect the slow consumer to get newer elements + // when it finally catches up after the delay + if slowResultsArray.count > 1 { + let laterElements = Set(slowResultsArray.dropFirst()) + // Should have received some of the later elements (4, 5) due to bufferingLatest + XCTAssertTrue(laterElements.contains(4) || laterElements.contains(5) || + laterElements.contains(3), + "BufferingLatest should favor keeping newer elements") + } + } + + func test_share_with_bufferingOldest_buffering() async { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.share(bufferingPolicy: .bufferingOldest(2)) + + let fastResults = Mutex([Int]()) + let slowResults = Mutex([Int]()) + let gate1 = Gate() + let gate2 = Gate() + + let fastConsumer = Task { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = await iterator.next(isolation: nil) { + fastResults.withLock { $0.append(value) } + } + } + + let slowConsumer = Task { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + // Read first element immediately + if let value = await iterator.next(isolation: nil) { + slowResults.withLock { $0.append(value) } + } + // Add significant delay to let buffer fill up and potentially overflow + try? await Task.sleep(for: .milliseconds(50)) + // Continue reading remaining elements + while let value = await iterator.next(isolation: nil) { + slowResults.withLock { $0.append(value) } + } + } + + // Release all elements quickly to test buffer overflow behavior + gated.advance() // 1 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 2 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 3 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 4 + try? await Task.sleep(for: .milliseconds(5)) + gated.advance() // 5 + + await fastConsumer.value + await slowConsumer.value + + let slowResultsArray = slowResults.withLock { $0 } + + // Slow consumer should get the first element plus the oldest elements that fit in buffer + // With bufferingOldest(2), when buffer overflows, newer elements are ignored + XCTAssertTrue(slowResultsArray.count >= 1, "Should have at least the first element") + XCTAssertEqual(slowResultsArray.first, 1, "Should start with first element") + + // Due to bufferingOldest policy, the slow consumer should favor older elements + let receivedSet = Set(slowResultsArray) + XCTAssertTrue(receivedSet.isSubset(of: Set([1, 2, 3, 4, 5]))) + + // With bufferingOldest, when the buffer is full, newer elements are ignored + // So the slow consumer should be more likely to receive earlier elements + if slowResultsArray.count > 1 { + let laterElements = Array(slowResultsArray.dropFirst()) + // Should have received earlier elements due to bufferingOldest policy + // Elements 4 and 5 are less likely to be received since they're newer + let hasEarlierElements = laterElements.contains(2) || laterElements.contains(3) + let hasLaterElements = laterElements.contains(4) && laterElements.contains(5) + + // BufferingOldest should favor keeping older elements when buffer is full + // So we should be more likely to see earlier elements than later ones + XCTAssertTrue(hasEarlierElements || !hasLaterElements, + "BufferingOldest should favor keeping older elements over newer ones") + } + } + + // MARK: - Cancellation Tests + + func test_share_cancellation_of_single_consumer() async { + let shared = Indefinite(value: 42).async.share() + + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + + let task = Task { + var firstIteration = false + for await _ in shared { + if !firstIteration { + firstIteration = true + iterated.fulfill() + } + } + finished.fulfill() + } + + // Wait for the task to start iterating + await fulfillment(of: [iterated], timeout: 1.0) + + // Cancel the task + task.cancel() + + // Verify the task finishes + await fulfillment(of: [finished], timeout: 1.0) + } + + func test_share_cancellation_with_multiple_consumers() async { + let shared = Indefinite(value: 42).async.share() + + let consumer1Finished = expectation(description: "consumer1Finished") + let consumer2Finished = expectation(description: "consumer2Finished") + let consumer1Iterated = expectation(description: "consumer1Iterated") + let consumer2Iterated = expectation(description: "consumer2Iterated") + + let consumer1 = Task { + var firstIteration = false + for await _ in shared { + if !firstIteration { + firstIteration = true + consumer1Iterated.fulfill() + } + } + consumer1Finished.fulfill() + } + + let consumer2 = Task { + var firstIteration = false + for await _ in shared { + if !firstIteration { + firstIteration = true + consumer2Iterated.fulfill() + } + } + consumer2Finished.fulfill() + } + + // Wait for both consumers to start + await fulfillment(of: [consumer1Iterated, consumer2Iterated], timeout: 1.0) + + // Cancel only consumer1 + consumer1.cancel() + + // Consumer1 should finish + await fulfillment(of: [consumer1Finished], timeout: 1.0) + + // Consumer2 should still be running, so cancel it too + consumer2.cancel() + await fulfillment(of: [consumer2Finished], timeout: 1.0) + } + + func test_share_cancellation_cancels_source_when_no_consumers() async { + let source = Indefinite(value: 1).async + let shared = source.share() + + let finished = expectation(description: "finished") + let iterated = expectation(description: "iterated") + + let task = Task { + var iterator = shared.makeAsyncIterator() + if await iterator.next(isolation: nil) != nil { + iterated.fulfill() + } + // Task will be cancelled here, so iteration should stop + while await iterator.next(isolation: nil) != nil { + // Continue iterating until cancelled + } + finished.fulfill() + } + + await fulfillment(of: [iterated], timeout: 1.0) + task.cancel() + await fulfillment(of: [finished], timeout: 1.0) + } + + // MARK: - Error Handling Tests + + func test_share_propagates_errors_to_all_consumers() async { + let source = [1, 2, 3, 4, 5].async.map { value in + if value == 3 { + throw TestError.failure + } + return value + } + let shared = source.share() + + let consumer1Results = Mutex([Int]()) + let consumer2Results = Mutex([Int]()) + let consumer1Error = Mutex(nil) + let consumer2Error = Mutex(nil) + let gate1 = Gate() + let gate2 = Gate() + + let consumer1 = Task { + do { + var iterator = shared.makeAsyncIterator() + gate2.open() + await gate1.enter() + while let value = try await iterator.next() { + consumer1Results.withLock { $0.append(value) } + } + } catch { + consumer1Error.withLock { $0 = error } + } + } + + let consumer2 = Task { + do { + var iterator = shared.makeAsyncIterator() + gate1.open() + await gate2.enter() + while let value = try await iterator.next() { + consumer2Results.withLock { $0.append(value) } + } + } catch { + consumer2Error.withLock { $0 = error } + } + } + + await consumer1.value + await consumer2.value + + // Both consumers should receive the first two elements + XCTAssertEqual(consumer1Results.withLock { $0 }, [1, 2]) + XCTAssertEqual(consumer2Results.withLock { $0 }, [1, 2]) + + // Both consumers should receive the error + XCTAssertTrue(consumer1Error.withLock { $0 is TestError }) + XCTAssertTrue(consumer2Error.withLock { $0 is TestError }) + } + + // MARK: - Timing and Race Condition Tests + + func test_share_with_late_joining_consumer() async { + var gated = GatedSequence([1, 2, 3, 4, 5]) + let shared = gated.share(bufferingPolicy: .unbounded) + + let earlyResults = Mutex([Int]()) + let lateResults = Mutex([Int]()) + + // Start early consumer + let earlyConsumer = Task { + var iterator = shared.makeAsyncIterator() + while let value = await iterator.next(isolation: nil) { + earlyResults.withLock { $0.append(value) } + } + } + + // Advance some elements + gated.advance() // 1 + gated.advance() // 2 + + // Give early consumer time to consume + try? await Task.sleep(for: .milliseconds(10)) + + // Start late consumer + let lateConsumer = Task { + var iterator = shared.makeAsyncIterator() + while let value = await iterator.next(isolation: nil) { + lateResults.withLock { $0.append(value) } + } + } + + // Advance remaining elements + gated.advance() // 3 + gated.advance() // 4 + gated.advance() // 5 + + await earlyConsumer.value + await lateConsumer.value + + // Early consumer gets all elements + XCTAssertEqual(earlyResults.withLock { $0 }, [1, 2, 3, 4, 5]) + // Late consumer only gets elements from when it joined + XCTAssertTrue(lateResults.withLock { $0.count <= 5 }) + } + + func test_share_iterator_independence() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.share() + + var iterator1 = shared.makeAsyncIterator() + var iterator2 = shared.makeAsyncIterator() + + // Both iterators should independently get the same elements + let value1a = await iterator1.next(isolation: nil) + let value2a = await iterator2.next(isolation: nil) + + let value1b = await iterator1.next(isolation: nil) + let value2b = await iterator2.next(isolation: nil) + + XCTAssertEqual(value1a, 1) + XCTAssertEqual(value2a, 1) + XCTAssertEqual(value1b, 2) + XCTAssertEqual(value2b, 2) + } + + // MARK: - Memory and Resource Management Tests + + func test_share_cleans_up_when_all_consumers_finish() async { + let source = [1, 2, 3] + let shared = source.async.share() + + var results = [Int]() + for await value in shared { + results.append(value) + } + + XCTAssertEqual(results, [1, 2, 3]) + + // Create a new iterator after the sequence finished + var newIterator = shared.makeAsyncIterator() + let value = await newIterator.next(isolation: nil) + XCTAssertNil(value) // Should return nil since source is exhausted + } + + // MARK: - Edge Cases + + func test_share_with_immediate_cancellation() async { + let shared = Indefinite(value: 42).async.share() + + let task = Task { + for await _ in shared { + // This should not execute since we cancel immediately + XCTFail("Should not execute due to immediate cancellation") + } + } + + // Cancel immediately + task.cancel() + + // Task should complete without issues + await task.value + } + + func test_share_multiple_sequential_consumers() async { + let source = [1, 2, 3, 4, 5] + let shared = source.async.share(bufferingPolicy: .unbounded) + + // First consumer + var results1 = [Int]() + for await value in shared { + results1.append(value) + } + + // Second consumer (starting after first finished) + var results2 = [Int]() + for await value in shared { + results2.append(value) + } + + XCTAssertEqual(results1, [1, 2, 3, 4, 5]) + XCTAssertEqual(results2, []) // Should be empty since source is exhausted + } +} + +// MARK: - Helper Types + +enum TestError: Error, Equatable { + case failure +} From 1848fb27b6b5a2c4c18a0829414f945313b93cdf Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Tue, 12 Aug 2025 17:20:20 -0700 Subject: [PATCH 08/14] Cleanup some of the documentation and address feedback for implementation details --- .../AsyncAlgorithms/AsyncShareSequence.swift | 247 ++++++++++-------- Tests/AsyncAlgorithmsTests/TestShare.swift | 21 +- 2 files changed, 145 insertions(+), 123 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 64f771ad..932d4be2 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -10,6 +10,7 @@ //===----------------------------------------------------------------------===// import Synchronization +import DequeModule @available(AsyncAlgorithms 1.1, *) extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIterator: SendableMetatype { @@ -19,69 +20,62 @@ extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIt /// iterated by multiple concurrent tasks. This is useful when you want to broadcast elements from /// a single source to multiple consumers without duplicating work or creating separate iterations. /// - /// - Important: Each element from the source sequence is delivered to all active iterators. - /// Elements are buffered according to the specified buffering policy to handle timing differences - /// between consumers. + /// Each element from the source sequence is delivered to all active iterators. + /// Elements are buffered according to the specified buffering policy to handle timing differences + /// between consumers. /// - /// - Parameter bufferingPolicy: The policy controlling how elements are buffered when consumers - /// iterate at different rates. Defaults to `.bounded(1)`. - /// - `.bounded(n)`: Limits the buffer to `n` elements, applying backpressure to the source when that limit is reached - /// - `.bufferingOldest(n)`: Keeps the oldest `n` elements, discarding newer ones when full - /// - `.bufferingNewest(n)`: Keeps the newest `n` elements, discarding older ones when full - /// - `.unbounded`: Allows unlimited buffering (use with caution) - /// - /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks. + /// The base sequence is iterated in it's own task to ensure that cancellation is not polluted from + /// one side of iteration to another. /// /// ## Example Usage /// /// ```swift - /// let numbers = AsyncStream { continuation in - /// Task { - /// for i in 1...5 { - /// continuation.yield(i) - /// try await Task.sleep(for: .seconds(1)) - /// } - /// continuation.finish() - /// } + /// let numbers = [1, 2, 3, 4, 5].share.map { + /// try? await Task.sleep(for: .seconds(1)) + /// return $0 /// } /// /// let shared = numbers.share() /// /// // Multiple tasks can iterate concurrently - /// async let consumer1 = Task { - /// for await value in shared { - /// print("Consumer 1: \(value)") - /// } + /// let consumer1 = Task { + /// for await value in shared { + /// print("Consumer 1: \(value)") + /// } /// } /// - /// async let consumer2 = Task { - /// for await value in shared { - /// print("Consumer 2: \(value)") - /// } + /// let consumer2 = Task { + /// for await value in shared { + /// print("Consumer 2: \(value)") + /// } /// } /// /// await consumer1.value /// await consumer2.value /// ``` /// - /// ## Buffering Behavior - /// - /// The buffering policy determines how the shared sequence handles elements when consumers - /// iterate at different speeds: + /// - Parameter bufferingPolicy: The policy controlling how elements are enqueued to the shared buffer. Defaults to `.bounded(1)`. + /// - `.bounded(n)`: Limits the buffer to `n` elements, applying backpressure to the source when that limit is reached + /// - `.bufferingOldest(n)`: Keeps the oldest `n` elements, discarding newer ones when full + /// - `.bufferingNewest(n)`: Keeps the newest `n` elements, discarding older ones when full + /// - `.unbounded`: Allows unlimited buffering (use with caution) /// - /// - **Bounded**: Applies backpressure to slow down the source when the buffer is full - /// - **Buffering Oldest**: Drops new elements when the buffer is full, preserving older ones - /// - **Buffering Newest**: Drops old elements when the buffer is full, preserving newer ones - /// - **Unbounded**: Never drops elements but may consume unbounded memory + /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks. /// - /// - Note: The source async sequence's iterator is consumed only once, regardless of how many - /// concurrent consumers are active. This makes sharing efficient for expensive-to-produce sequences. public func share(bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)) -> some AsyncSequence & Sendable { - // the iterator is transferred to the isolation of the iterating task + // The iterator is transferred to the isolation of the iterating task // this has to be done "unsafely" since we cannot annotate the transfer // however since iterating an AsyncSequence types twice has been defined // as invalid and one creation of the iterator is virtually a consuming // operation so this is safe at runtime. + // The general principal of `.share()` is to provide a mecahnism for non- + // shared AsyncSequence types to be shared. The parlance for those is + // that the base AsyncSequence type is not Sendable. If the iterator + // is not marked as `nonisolated(unsafe)` the compiler will claim that + // the value is "Capture of 'iterator' with non-Sendable type 'Self.AsyncIterator' in a '@Sendable' closure;" + // Since the closure returns a disconnected non-sendable value there is no + // distinct problem here and the compiler just needs to be informed + // that the diagnostic is overly pessimistic. nonisolated(unsafe) let iterator = makeAsyncIterator() return AsyncShareSequence( { iterator @@ -98,17 +92,17 @@ extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIt // // ## Key Features // -// - **Single Source Iteration**: The base sequence's iterator is created and consumed only once -// - **Concurrent Safe**: Multiple tasks can safely iterate simultaneously -// - **Configurable Buffering**: Supports various buffering strategies for different use cases -// - **Automatic Cleanup**: Properly manages resources and cancellation across all consumers +// **Single Source Iteration**: The base sequence's iterator is created and consumed only once +// **Concurrent Safe**: Multiple tasks can safely iterate simultaneously +// **Configurable Buffering**: Supports various buffering strategies for different use cases +// **Automatic Cleanup**: Properly manages resources and cancellation across all consumers // // ## Internal Architecture // // The implementation uses several key components: -// - `Side`: Represents a single consumer's iteration state -// - `Iteration`: Coordinates all consumers and manages the shared buffer -// - `Extent`: Manages the overall lifecycle and cleanup +// `Side`: Represents a single consumer's iteration state +// `Iteration`: Coordinates all consumers and manages the shared buffer +// `Extent`: Manages the overall lifecycle and cleanup // // This type is typically not used directly; instead, use the `share()` method on any // async sequence that meets the sendability requirements. @@ -123,9 +117,9 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen // // ## Lifecycle // - // - **Creation**: Automatically registers with the iteration coordinator - // - **Usage**: Tracks buffer position and manages async continuations - // - **Cleanup**: Automatically unregisters and cancels pending operations on deinit + // **Creation**: Automatically registers with the iteration coordinator + // **Usage**: Tracks buffer position and manages async continuations + // **Cleanup**: Automatically unregisters and cancels pending operations on deinit final class Side { // Tracks the state of a single consumer's iteration. // @@ -167,11 +161,11 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen // The central coordinator that manages the shared iteration state. // // `Iteration` is responsible for: - // - Managing the single background task that consumes the source sequence - // - Coordinating between multiple consumer sides - // - Buffering elements according to the specified policy - // - Handling backpressure and flow control - // - Managing cancellation and cleanup + // Managing the single background task that consumes the source sequence + // Coordinating between multiple consumer sides + // Buffering elements according to the specified policy + // Handling backpressure and flow control + // Managing cancellation and cleanup // // ## Thread Safety // @@ -181,10 +175,10 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen // Represents the state of the background task that consumes the source sequence. // // The iteration task goes through several states during its lifecycle: - // - `pending`: Initial state, holds the factory to create the iterator - // - `starting`: Transitional state while the task is being created - // - `running`: Active state with a running background task - // - `cancelled`: Terminal state when the iteration has been cancelled + // `pending`: Initial state, holds the factory to create the iterator + // `starting`: Transitional state while the task is being created + // `running`: Active state with a running background task + // `cancelled`: Terminal state when the iteration has been cancelled enum IteratingTask { case pending(@Sendable () -> sending Base.AsyncIterator) case starting @@ -215,9 +209,9 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen struct State: Sendable { // Defines how elements are stored and potentially discarded in the shared buffer. // - // - `unbounded`: Store all elements without limit (may cause memory growth) - // - `bufferingOldest(Int)`: Keep only the oldest N elements, ignore newer ones when full - // - `bufferingNewest(Int)`: Keep only the newest N elements, discard older ones when full + // `unbounded`: Store all elements without limit (may cause memory growth) + // `bufferingOldest(Int)`: Keep only the oldest N elements, ignore newer ones when full + // `bufferingNewest(Int)`: Keep only the newest N elements, discard older ones when full enum StoragePolicy: Sendable { case unbounded case bufferingOldest(Int) @@ -227,7 +221,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen var generation = 0 var sides = [Int: Side.State]() var iteratingTask: IteratingTask - private(set) var buffer = [Element]() + private(set) var buffer = Deque() private(set) var finished = false private(set) var failure: Failure? var cancelled = false @@ -295,9 +289,9 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen // Adds an element to the buffer according to the configured storage policy. // // The behavior depends on the storage policy: - // - **Unbounded**: Always appends the element - // - **Buffering Oldest**: Appends only if under the limit, otherwise ignores the element - // - **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends + // **Unbounded**: Always appends the element + // **Buffering Oldest**: Appends only if under the limit, otherwise ignores the element + // **Buffering Newest**: Appends if under the limit, otherwise removes the oldest and appends // // - Parameter element: The element to add to the buffer mutating func enqueue(_ element: Element) { @@ -521,44 +515,8 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } } - func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? { - let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in - switch state.iteratingTask { - case .pending(let factory): - state.iteratingTask = .starting - return (factory, false) - case .cancelled: - return (nil, true) - default: - return (nil, false) - } - } - if cancelled { return nil } - if let factory { - // this has to be interfaced as detached since we want the priority inference - // from the creator to not have a direct effect on the iteration. - // This might be improved later by passing on the creation context's task - // priority. - let task = Task.detached(name: "Share Iteration") { [factory, self] in - var iterator = factory() - do { - while await iterate() { - if let element = try await iterator.next() { - emit(.success(element)) - } else { - emit(.success(nil)) - } - } - } catch { - emit(.failure(error as! Failure)) - } - } - state.withLock { state in - precondition(state.iteratingTask.isStarting) - state.iteratingTask = .running(task) - } - } - let result: Result = await withTaskCancellationHandler { + private func nextIteration(_ id: Int) async -> Result.Element?, AsyncShareSequence.Failure> { + return await withTaskCancellationHandler { await withUnsafeContinuation { continuation in let (res, limitContinuation, demandContinuation, cancelled) = state.withLock { state -> (Result?, UnsafeContinuation?, UnsafeContinuation?, Bool) in guard let side = state.sides[id] else { @@ -595,8 +553,91 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } onCancel: { cancel(id: id) } + } + + private func iterationLoop(factory: @Sendable () -> sending Base.AsyncIterator) async { + var iterator = factory() + do { + while await iterate() { + if let element = try await iterator.next() { + emit(.success(element)) + } else { + emit(.success(nil)) + } + } + } catch { + emit(.failure(error as! Failure)) + } + } + + func next(isolation actor: isolated (any Actor)?, id: Int) async throws(Failure) -> Element? { + let (factory, cancelled) = state.withLock { state -> ((@Sendable () -> sending Base.AsyncIterator)?, Bool) in + switch state.iteratingTask { + case .pending(let factory): + state.iteratingTask = .starting + return (factory, false) + case .cancelled: + return (nil, true) + default: + return (nil, false) + } + } + if cancelled { return nil } + if let factory { + let task: Task + // for the fancy dance of availability and canImport see the comment on the next check for details +#if canImport(_Concurrency, _version: 6.2) + if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { + task = Task(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } else { + task = Task.detached(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } + } +#else + task = Task.detached(name: "Share Iteration") { [factory, self] in + await iterationLoop(factory: factory) + } +#endif + // Known Issue: there is a very small race where the task may not get a priority escalation during startup + // this unfortuantely cannot be avoided since the task should ideally not be formed within the critical + // region of the state. Since that could lead to potential deadlocks in low-core-count systems. + // That window is relatively small and can be revisited if a suitable proof of safe behavior can be + // determined. + state.withLock { state in + precondition(state.iteratingTask.isStarting) + state.iteratingTask = .running(task) + } + } + + // withTaskPriorityEscalationHandler is only available for the '26 releases and the 6.2 version of + // the _Concurrency library. This menas for Darwin based OSes we have to have a fallback at runtime, + // and for non-darwin OSes we need to verify against the ability to import that version. + // Using this priority escalation means that the base task can avoid being detached. +#if canImport(_Concurrency, _version: 6.2) + if #available(macOS 26.0, iOS 26.0, tvOS 26.0, visionOS 26.0, *) { + return try await withTaskPriorityEscalationHandler { + return await nextIteration(id) + } onPriorityEscalated: { old, new in + let task = state.withLock { state -> Task? in + switch state.iteratingTask { + case .running(let task): + return task + default: + return nil + } + } + task?.escalatePriority(to: new) + }.get() + } else { + return try await nextIteration(id).get() + } +#else + return try await nextIteration(id).get() +#endif - return try result.get() } } diff --git a/Tests/AsyncAlgorithmsTests/TestShare.swift b/Tests/AsyncAlgorithmsTests/TestShare.swift index 5b57950d..372c9cd4 100644 --- a/Tests/AsyncAlgorithmsTests/TestShare.swift +++ b/Tests/AsyncAlgorithmsTests/TestShare.swift @@ -547,25 +547,6 @@ final class TestShare: XCTestCase { let value = await newIterator.next(isolation: nil) XCTAssertNil(value) // Should return nil since source is exhausted } - - // MARK: - Edge Cases - - func test_share_with_immediate_cancellation() async { - let shared = Indefinite(value: 42).async.share() - - let task = Task { - for await _ in shared { - // This should not execute since we cancel immediately - XCTFail("Should not execute due to immediate cancellation") - } - } - - // Cancel immediately - task.cancel() - - // Task should complete without issues - await task.value - } func test_share_multiple_sequential_consumers() async { let source = [1, 2, 3, 4, 5] @@ -590,6 +571,6 @@ final class TestShare: XCTestCase { // MARK: - Helper Types -enum TestError: Error, Equatable { +fileprivate enum TestError: Error, Equatable { case failure } From ed15653c0eec7db618950ceeaab3862404955e36 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Tue, 12 Aug 2025 17:27:04 -0700 Subject: [PATCH 09/14] Fix merge damage by restoring the changes from @jamieQ --- .../AsyncAlgorithms/AsyncShareSequence.swift | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 932d4be2..9367f678 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -123,10 +123,10 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen final class Side { // Tracks the state of a single consumer's iteration. // - // - `continuaton`: The continuation waiting for the next element (nil if not waiting) + // - `continuation`: The continuation waiting for the next element (nil if not waiting) // - `position`: The consumer's current position in the shared buffer struct State { - var continuaton: UnsafeContinuation, Never>? + var continuation: UnsafeContinuation, Never>? var position = 0 // Creates a new state with the position adjusted by the given offset. @@ -137,7 +137,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen // - Parameter adjustment: The number of positions to subtract from the current position // - Returns: A new `State` with the adjusted position func offset(_ adjustment: Int) -> State { - State(continuaton: continuaton, position: position - adjustment) + State(continuation: continuation, position: position - adjustment) } } @@ -397,7 +397,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen continuation.resume(returning: cancelled) } if let side { - side.continuaton?.resume(returning: .success(nil)) + side.continuation?.resume(returning: .success(nil)) } if let iteratingTaskToCancel { iteratingTaskToCancel.cancel() @@ -437,7 +437,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen await withUnsafeContinuation { (continuation: UnsafeContinuation) in let hasPendingDemand = state.withLock { state in for (_, side) in state.sides { - if side.continuaton != nil { + if side.continuation != nil { return true } } @@ -485,13 +485,13 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen state.fail(failure) } for (id, side) in state.sides { - if let continuation = side.continuaton { + if let continuation = side.continuation { if side.position < state.buffer.count { resumptions.append(Resumption(continuation: continuation, result: .success(state.buffer[side.position]))) state.sides[id]?.position += 1 - state.sides[id]?.continuaton = nil + state.sides[id]?.continuation = nil } else if state.finished { - state.sides[id]?.continuaton = nil + state.sides[id]?.continuation = nil if let failure = state.failure { resumptions.append(Resumption(continuation: continuation, result: .failure(failure))) } else { @@ -535,7 +535,7 @@ struct AsyncShareSequence: Sendable where Base.Element: Sen } else if state.finished { return state.emit(.success(nil), limit: limit) } else { - state.sides[id]?.continuaton = continuation + state.sides[id]?.continuation = continuation return state.emit(nil, limit: limit) } } From 185ab5cfcd48042c2ae6c8f88075367ff64c8e30 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Wed, 20 Aug 2025 11:07:14 -0700 Subject: [PATCH 10/14] Typo fixes in proposal Co-authored-by: r3econ <505054+r3econ@users.noreply.github.com> --- Evolution/NNNN-share.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index 494c0e7f..f620abfe 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -38,7 +38,7 @@ The buffer internally to the share algorithm will only extend back to the furthe ## Runtime Behavior -The runtime behaviors fall into a few categories; ordering, iteration isolation, cancellation, and lifetimes. To understand the beahviors there are a terms useful to define. Each creation of the AsyncIterator of the sequence and invocation of next will be referred to a side of the share iteration. The back pressure to the system to fetch a new element or termination is refered to as demand. The limit which is the pending gate for awaiting until the buffer has been serviced used for the `AsyncBufferSequencePolicy.bounded(_ : Int)` policy. The last special definition is that of the extent which is specifically in this case the lifetime of the asynchronous sequence itself. +The runtime behaviors fall into a few categories; ordering, iteration isolation, cancellation, and lifetimes. To understand the behaviors there are terms useful to define. Each creation of the AsyncIterator of the sequence and invocation of next will be referred to a side of the share iteration. The back pressure to the system to fetch a new element or termination is referred to as demand. The limit which is the pending gate for awaiting until the buffer has been serviced used for the `AsyncBufferSequencePolicy.bounded(_ : Int)` policy. The last special definition is that of the extent which is specifically in this case the lifetime of the asynchronous sequence itself. When the underlying type backing the share algorithm is constructed a new extent is created; this is used for tracking the reference lifetime under the hood and is used to both house the iteration but also to identify the point at which no more sides can be constructed. When no more sides can be constructed and no sides are left to iterate then the backing iteration is canceled. This prevents any un-referenced task backing the iteration to not be leaked by the algorith itself. From 15da3f9223b46e54966963129b99a08b0292d013 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Wed, 20 Aug 2025 11:07:48 -0700 Subject: [PATCH 11/14] Some space fixes for the proposal Co-authored-by: r3econ <505054+r3econ@users.noreply.github.com> --- Evolution/NNNN-share.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index f620abfe..9c87664e 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -8,7 +8,7 @@ Many of the AsyncSequence adopting types only permit a one singular consumption. There are many potential usages for the sharing concept of AsyncSequences. -One such example is the case where a source of data as an asynchronous sequence needs to be consumed by updating UI, logging, and additionally a network connection. This particular case does not matter on which uses but instead that those uses are independent of each other. It would not be expected for networking to block or delay the updates to UI, nor should logging. This example case also illustrates that the isolation of each side might be different and that some of the sides may not tolerate coalescing or dropping values. +One such example is the case where a source of data as an asynchronous sequence needs to be consumed by updating UI, logging, and additionally a network connection. This particular case does not matter on which uses but instead that those uses are independent of each other. It would not be expected for networking to block or delay the updates to UI, nor should logging. This example case also illustrates that the isolation of each side might be different and that some of the sides may not tolerate coalescing or dropping values. There are many other use cases that have been requested for this family of algorithms. Since the release of AsyncAlgorithms it has perhaps been the most popularly requested set of behaviors as additions to the package. From 929dc0f536e46977f164bdb7972cc4ff9d8127f7 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Wed, 20 Aug 2025 11:08:09 -0700 Subject: [PATCH 12/14] More typo fixes Co-authored-by: r3econ <505054+r3econ@users.noreply.github.com> --- Evolution/NNNN-share.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index 9c87664e..d8650786 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -14,7 +14,7 @@ There are many other use cases that have been requested for this family of algor ## Proposed solution -AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of it's AsyncIterator. Those iterations can take place in multiple isolations. +AsyncAlgorithms will introduce a new extension function on AsyncSequence that will provide a shareable asynchronous sequence that will produce the same values upon iteration from multiple instances of its AsyncIterator. Those iterations can take place in multiple isolations. When values from a differing isolation cannot be coalesced, the two options available are either awaiting (an exertion of back-pressure across the sequences) or buffering (an internal back-pressure to a buffer). Replaying the values from the beginning of the creation of the sequence is a distinctly different behavior that should be considered a different use case. This then leaves the behavioral characteristic of this particular operation of share as; sharing a buffer of values started from the initialization of a new iteration of the sequence. Control over that buffer should then have options to determine the behavior, similar to how AsyncStream allows that control. It should have options to be unbounded, buffering the oldest count of elements, or buffering the newest count of elements. From 909e1a0af6526c0b7e59acfeaab7d598529bfc86 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Wed, 20 Aug 2025 11:08:25 -0700 Subject: [PATCH 13/14] More space fixes Co-authored-by: r3econ <505054+r3econ@users.noreply.github.com> --- Evolution/NNNN-share.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Evolution/NNNN-share.md b/Evolution/NNNN-share.md index d8650786..c496b27d 100644 --- a/Evolution/NNNN-share.md +++ b/Evolution/NNNN-share.md @@ -2,7 +2,7 @@ ## Introduction -Many of the AsyncSequence adopting types only permit a one singular consumption. However there are many times that the same produced values are useful in more than one place. Out of that mechanism there are a few approaches to share, distribute, and broadcast those values. This proposal will focus on one concept; sharing. Sharing is where each consumption independently can make forward progress and get the same values but do not replay from the beginning of time. +Many of the AsyncSequence adopting types only permit a one singular consumption. However there are many times that the same produced values are useful in more than one place. Out of that mechanism there are a few approaches to share, distribute, and broadcast those values. This proposal will focus on one concept; sharing. Sharing is where each consumption independently can make forward progress and get the same values but do not replay from the beginning of time. ## Motivation From 607ff7b1e2825fdbce198ac6dca934498c7e4042 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Wed, 20 Aug 2025 11:09:11 -0700 Subject: [PATCH 14/14] Formatting fix for comments Co-authored-by: Franz Busch --- Sources/AsyncAlgorithms/AsyncShareSequence.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Sources/AsyncAlgorithms/AsyncShareSequence.swift b/Sources/AsyncAlgorithms/AsyncShareSequence.swift index 9367f678..c66df0e1 100644 --- a/Sources/AsyncAlgorithms/AsyncShareSequence.swift +++ b/Sources/AsyncAlgorithms/AsyncShareSequence.swift @@ -61,7 +61,6 @@ extension AsyncSequence where Element: Sendable, Self: SendableMetatype, AsyncIt /// - `.unbounded`: Allows unlimited buffering (use with caution) /// /// - Returns: A sendable async sequence that can be safely shared across multiple concurrent tasks. - /// public func share(bufferingPolicy: AsyncBufferSequencePolicy = .bounded(1)) -> some AsyncSequence & Sendable { // The iterator is transferred to the isolation of the iterating task // this has to be done "unsafely" since we cannot annotate the transfer