Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0060af5

Browse files
committedAug 18, 2022
Remove AsyncIterator: Sendable requirement from merge
# Motivation Currently a lot of the operator implementations in here that consume other `AsyncSequence`s require the `AsyncIterator` to be `Sendable`. This is mostly due to the fact that we are calling `makeAsyncIterator` on the upstream `AsyncSequence` and then pass that iterator around to various newly spawned `Task`s. This has two downsides: 1. It only allows users to use operators like `merge` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In merge we are creating new `Task`s for every new demand. Creating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `merge`. # Modification This PR overhauls the complete inner workings of the `AsyncMerge2Sequence`. It does a couple of things: 1. The main change is that instead of creating new `Task`s for every demand, we are creating one `Task` when the `AsyncIterator` is created. This task has as child task for every upstream sequence. 2. When calling `next` we are signalling the child tasks to demand from the upstream 3. A new state machine that is synchronizing the various concurrent operations that can happen 4. Handling cancellation since we are creating a bunch of continuations. # Result In the end, this PR swaps the implementation of `AsyncMerge2Sequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 50% speed increase in throughput. # Open points 1. I need to make this sequence re-throwing but before going down that rabbit whole I wanna get buy-in on the implementation. 2. We should discuss and document if `merge` and other operators are hot or cold, i.e. if they only request if they got downstream demand 3. I need to switch `AsyncMerge3Sequence` over to the same iplementation
1 parent c0fdfdb commit 0060af5

File tree

5 files changed

+1252
-194
lines changed

5 files changed

+1252
-194
lines changed
 

‎Package.swift

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,16 @@ let package = Package(
1616
.library(name: "_CAsyncSequenceValidationSupport", type: .static, targets: ["AsyncSequenceValidation"]),
1717
.library(name: "AsyncAlgorithms_XCTest", targets: ["AsyncAlgorithms_XCTest"]),
1818
],
19-
dependencies: [],
19+
dependencies: [
20+
.package(url: "https://github.com/apple/swift-collections.git", from: "1.0.2"),
21+
],
2022
targets: [
21-
.target(name: "AsyncAlgorithms"),
23+
.target(
24+
name: "AsyncAlgorithms",
25+
dependencies: [
26+
.product(name: "DequeModule", package: "swift-collections")
27+
]
28+
),
2229
.target(
2330
name: "AsyncSequenceValidation",
2431
dependencies: ["_CAsyncSequenceValidationSupport", "AsyncAlgorithms"]),

‎Sources/AsyncAlgorithms/AsyncMerge2Sequence.swift

Lines changed: 1049 additions & 181 deletions
Large diffs are not rendered by default.

‎Sources/AsyncAlgorithms/Locking.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,27 @@ internal struct Lock {
8787
func unlock() {
8888
Lock.unlock(platformLock)
8989
}
90+
91+
/// Acquire the lock for the duration of the given block.
92+
///
93+
/// This convenience method should be preferred to `lock` and `unlock` in
94+
/// most situations, as it ensures that the lock will be released regardless
95+
/// of how `body` exits.
96+
///
97+
/// - Parameter body: The block to execute while holding the lock.
98+
/// - Returns: The value returned by the block.
99+
public func withLock<T>(_ body: () throws -> T) rethrows -> T {
100+
self.lock()
101+
defer {
102+
self.unlock()
103+
}
104+
return try body()
105+
}
106+
107+
// specialise Void return (for performance)
108+
public func withLockVoid(_ body: () throws -> Void) rethrows -> Void {
109+
try self.withLock(body)
110+
}
90111
}
91112

92113
struct ManagedCriticalState<State> {
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the Swift Async Algorithms open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the Swift project authors
6+
// Licensed under Apache License v2.0 with Runtime Library Exception
7+
//
8+
// See https://swift.org/LICENSE.txt for license information
9+
//
10+
//===----------------------------------------------------------------------===//
11+
12+
struct Merge2StateMachine<Base1: AsyncSequence, Base2: AsyncSequence>: Sendable where Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable, Base1.Element: Sendable, Base2.Element: Sendable {
13+
typealias Element1 = Base1.Element
14+
typealias Element2 = Base2.Element
15+
16+
let iter1TerminatesOnNil : Bool
17+
let iter2terminatesOnNil : Bool
18+
19+
enum Partial: @unchecked Sendable {
20+
case first(Result<Element1?, Error>, Base1.AsyncIterator)
21+
case second(Result<Element2?, Error>, Base2.AsyncIterator)
22+
}
23+
24+
enum Either {
25+
case first(Base1.Element)
26+
case second(Base2.Element)
27+
}
28+
29+
var state: (PartialIteration<Base1.AsyncIterator, Partial>, PartialIteration<Base2.AsyncIterator, Partial>)
30+
31+
init(_ iterator1: Base1.AsyncIterator, terminatesOnNil iter1TerminatesOnNil: Bool = false, _ iterator2: Base2.AsyncIterator, terminatesOnNil iter2terminatesOnNil: Bool = false) {
32+
self.iter1TerminatesOnNil = iter1TerminatesOnNil
33+
self.iter2terminatesOnNil = iter2terminatesOnNil
34+
state = (.idle(iterator1), .idle(iterator2))
35+
}
36+
37+
mutating func apply(_ task1: Task<Merge2StateMachine<Base1, Base2>.Partial, Never>?, _ task2: Task<Merge2StateMachine<Base1, Base2>.Partial, Never>?) async rethrows -> Either? {
38+
switch await Task.select([task1, task2].compactMap ({ $0 })).value {
39+
case .first(let result, let iterator):
40+
do {
41+
guard let value = try state.0.resolve(result, iterator) else {
42+
if iter1TerminatesOnNil {
43+
state.1.cancel()
44+
return nil
45+
}
46+
return try await next()
47+
}
48+
return .first(value)
49+
} catch {
50+
state.1.cancel()
51+
throw error
52+
}
53+
case .second(let result, let iterator):
54+
do {
55+
guard let value = try state.1.resolve(result, iterator) else {
56+
if iter2terminatesOnNil {
57+
state.0.cancel()
58+
return nil
59+
}
60+
return try await next()
61+
}
62+
return .second(value)
63+
} catch {
64+
state.0.cancel()
65+
throw error
66+
}
67+
}
68+
}
69+
70+
func first(_ iterator1: Base1.AsyncIterator) -> Task<Partial, Never> {
71+
Task {
72+
var iter = iterator1
73+
do {
74+
let value = try await iter.next()
75+
return .first(.success(value), iter)
76+
} catch {
77+
return .first(.failure(error), iter)
78+
}
79+
}
80+
}
81+
82+
func second(_ iterator2: Base2.AsyncIterator) -> Task<Partial, Never> {
83+
Task {
84+
var iter = iterator2
85+
do {
86+
let value = try await iter.next()
87+
return .second(.success(value), iter)
88+
} catch {
89+
return .second(.failure(error), iter)
90+
}
91+
}
92+
}
93+
94+
/// Advances to the next element and returns it or `nil` if no next element exists.
95+
mutating func next() async rethrows -> Either? {
96+
if Task.isCancelled {
97+
state.0.cancel()
98+
state.1.cancel()
99+
return nil
100+
}
101+
switch state {
102+
case (.idle(let iterator1), .idle(let iterator2)):
103+
let task1 = first(iterator1)
104+
let task2 = second(iterator2)
105+
state = (.pending(task1), .pending(task2))
106+
return try await apply(task1, task2)
107+
case (.idle(let iterator1), .pending(let task2)):
108+
let task1 = first(iterator1)
109+
state = (.pending(task1), .pending(task2))
110+
return try await apply(task1, task2)
111+
case (.pending(let task1), .idle(let iterator2)):
112+
let task2 = second(iterator2)
113+
state = (.pending(task1), .pending(task2))
114+
return try await apply(task1, task2)
115+
case (.idle(var iterator1), .terminal):
116+
do {
117+
if let value = try await iterator1.next() {
118+
state = (.idle(iterator1), .terminal)
119+
return .first(value)
120+
} else {
121+
state = (.terminal, .terminal)
122+
return nil
123+
}
124+
} catch {
125+
state = (.terminal, .terminal)
126+
throw error
127+
}
128+
case (.terminal, .idle(var iterator2)):
129+
do {
130+
if let value = try await iterator2.next() {
131+
state = (.terminal, .idle(iterator2))
132+
return .second(value)
133+
} else {
134+
state = (.terminal, .terminal)
135+
return nil
136+
}
137+
} catch {
138+
state = (.terminal, .terminal)
139+
throw error
140+
}
141+
case (.terminal, .pending(let task2)):
142+
return try await apply(nil, task2)
143+
case (.pending(let task1), .pending(let task2)):
144+
return try await apply(task1, task2)
145+
case (.pending(let task1), .terminal):
146+
return try await apply(task1, nil)
147+
case (.terminal, .terminal):
148+
return nil
149+
}
150+
}
151+
}
152+
153+
extension Merge2StateMachine.Either where Base1.Element == Base2.Element {
154+
var value : Base1.Element {
155+
switch self {
156+
case .first(let val):
157+
return val
158+
case .second(let val):
159+
return val
160+
}
161+
}
162+
}

‎Tests/AsyncAlgorithmsTests/TestMerge.swift

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import AsyncAlgorithms
1414

1515
final class TestMerge2: XCTestCase {
16-
func test_merge_makes_sequence_with_elements_from_sources_when_all_have_same_size() async {
16+
func test_merge_makes_sequence_with_elements_from_sources_when_all_have_same_size() async throws {
1717
let first = [1, 2, 3]
1818
let second = [4, 5, 6]
1919

@@ -22,16 +22,16 @@ final class TestMerge2: XCTestCase {
2222
let expected = Set(first + second).sorted()
2323

2424
var iterator = merged.makeAsyncIterator()
25-
while let item = await iterator.next() {
25+
while let item = try await iterator.next() {
2626
collected.append(item)
2727
}
28-
let pastEnd = await iterator.next()
28+
let pastEnd = try await iterator.next()
2929

3030
XCTAssertNil(pastEnd)
3131
XCTAssertEqual(Set(collected).sorted(), expected)
3232
}
3333

34-
func test_merge_makes_sequence_with_elements_from_sources_when_first_is_longer() async {
34+
func test_merge_makes_sequence_with_elements_from_sources_when_first_is_longer() async throws {
3535
let first = [1, 2, 3, 4, 5, 6, 7]
3636
let second = [8, 9, 10]
3737

@@ -40,16 +40,16 @@ final class TestMerge2: XCTestCase {
4040
let expected = Set(first + second).sorted()
4141

4242
var iterator = merged.makeAsyncIterator()
43-
while let item = await iterator.next() {
43+
while let item = try await iterator.next() {
4444
collected.append(item)
4545
}
46-
let pastEnd = await iterator.next()
46+
let pastEnd = try await iterator.next()
4747

4848
XCTAssertNil(pastEnd)
4949
XCTAssertEqual(Set(collected).sorted(), expected)
5050
}
5151

52-
func test_merge_makes_sequence_with_elements_from_sources_when_second_is_longer() async {
52+
func test_merge_makes_sequence_with_elements_from_sources_when_second_is_longer() async throws {
5353
let first = [1, 2, 3]
5454
let second = [4, 5, 6, 7]
5555

@@ -58,10 +58,10 @@ final class TestMerge2: XCTestCase {
5858
let expected = Set(first + second).sorted()
5959

6060
var iterator = merged.makeAsyncIterator()
61-
while let item = await iterator.next() {
61+
while let item = try await iterator.next() {
6262
collected.append(item)
6363
}
64-
let pastEnd = await iterator.next()
64+
let pastEnd = try await iterator.next()
6565

6666
XCTAssertNil(pastEnd)
6767
XCTAssertEqual(Set(collected).sorted(), expected)
@@ -168,15 +168,15 @@ final class TestMerge2: XCTestCase {
168168
}
169169
}
170170

171-
func test_merge_finishes_when_iteration_task_is_cancelled() async {
171+
func test_merge_finishes_when_iteration_task_is_cancelled() async throws {
172172
let source1 = Indefinite(value: "test1")
173173
let source2 = Indefinite(value: "test2")
174174
let sequence = merge(source1.async, source2.async)
175175
let finished = expectation(description: "finished")
176176
let iterated = expectation(description: "iterated")
177177
let task = Task {
178178
var firstIteration = false
179-
for await _ in sequence {
179+
for try await _ in sequence {
180180
if !firstIteration {
181181
firstIteration = true
182182
iterated.fulfill()

0 commit comments

Comments
 (0)
Please sign in to comment.