From 43253299eaf55f7ee6b860b6b70aff8aa4ac6766 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Mon, 16 Sep 2024 17:58:51 -0700 Subject: [PATCH 01/25] EventStreams: add ability to customise the terminating byte sequence of a stream --- .../ServerSentEventsDecoding.swift | 74 +++++++++++++++++-- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 421e5319..56323e89 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -28,9 +28,19 @@ where Upstream.Element == ArraySlice { /// The upstream sequence. private let upstream: Upstream + /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - Parameter: A byte chunk. + /// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API. + private let terminate: (@Sendable (ArraySlice) -> Bool)? + /// Creates a new sequence. - /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. - public init(upstream: Upstream) { self.upstream = upstream } + /// - Parameters: + /// - upstream: The upstream sequence of arbitrary byte chunks. + /// - terminate: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + public init(upstream: Upstream, terminate: (@Sendable (ArraySlice) -> Bool)?) { + self.upstream = upstream + self.terminate = terminate + } } extension ServerSentEventsDeserializationSequence: AsyncSequence { @@ -48,6 +58,17 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// The state machine of the iterator. var stateMachine: StateMachine = .init() + /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - Parameter: A byte chunk. + /// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API. + let terminate: ((ArraySlice) -> Bool)? + + init(upstream: any AsyncIteratorProtocol, terminate: ((ArraySlice) -> Bool)?) { + self.upstream = upstream as! UpstreamIterator + self.stateMachine = .init(terminate: terminate) + self.terminate = terminate + } + /// Asynchronously advances to the next element and returns it, or ends the /// sequence if there is no next element. public mutating func next() async throws -> ServerSentEvent? { @@ -70,7 +91,7 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// Creates the asynchronous iterator that produces elements of this /// asynchronous sequence. public func makeAsyncIterator() -> Iterator { - Iterator(upstream: upstream.makeAsyncIterator()) + Iterator(upstream: upstream.makeAsyncIterator(), terminate: terminate) } } @@ -79,10 +100,16 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. + /// - Parameter: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events. - public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence< + public func asDecodedServerSentEvents(terminate: (@Sendable (ArraySlice) -> Bool)? = nil) -> ServerSentEventsDeserializationSequence< ServerSentEventsLineDeserializationSequence - > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self)) } + > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), terminate: terminate) } + + /// Convenience function for `asDecodedServerSentEvents` that directly receives the terminating byte sequence. + public func asDecodedServerSentEvents(terminatingSequence: ArraySlice) -> ServerSentEventsDeserializationSequence< + ServerSentEventsLineDeserializationSequence + > { asDecodedServerSentEvents(terminate: { incomingSequence in return incomingSequence == terminatingSequence }) } /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// @@ -90,15 +117,17 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// - Parameters: /// - dataType: The type to decode the JSON data into. /// - decoder: The JSON decoder to use. + /// - terminate: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events with the decoded JSON data. public func asDecodedServerSentEventsWithJSONData( of dataType: JSONDataType.Type = JSONDataType.self, - decoder: JSONDecoder = .init() + decoder: JSONDecoder = .init(), + terminate: (@Sendable (ArraySlice) -> Bool)? = nil ) -> AsyncThrowingMapSequence< ServerSentEventsDeserializationSequence>, ServerSentEventWithJSONData > { - asDecodedServerSentEvents() + asDecodedServerSentEvents(terminate: terminate) .map { event in ServerSentEventWithJSONData( event: event.event, @@ -110,6 +139,19 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { ) } } + + public func asDecodedServerSentEventsWithJSONData( + of dataType: JSONDataType.Type = JSONDataType.self, + decoder: JSONDecoder = .init(), + terminatingData: ArraySlice + ) -> AsyncThrowingMapSequence< + ServerSentEventsDeserializationSequence>, + ServerSentEventWithJSONData + > { + asDecodedServerSentEventsWithJSONData(of: dataType, decoder: decoder) { incomingData in + terminatingData == incomingData + } + } } extension ServerSentEventsDeserializationSequence.Iterator { @@ -133,8 +175,16 @@ extension ServerSentEventsDeserializationSequence.Iterator { /// The current state of the state machine. private(set) var state: State + + /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - Parameter: A sequence of byte chunks. + /// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API. + let terminate: ((ArraySlice) -> Bool)? + /// Creates a new state machine. - init() { self.state = .accumulatingEvent(.init(), buffer: []) } + init(terminate: ((ArraySlice) -> Bool)? = nil) { + self.state = .accumulatingEvent(.init(), buffer: []) + self.terminate = terminate} /// An action returned by the `next` method. enum NextAction { @@ -165,6 +215,14 @@ extension ServerSentEventsDeserializationSequence.Iterator { state = .accumulatingEvent(.init(), buffer: buffer) // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } + + if let terminate = terminate { + if let data = event.data { + if terminate(ArraySlice(Data(data.utf8))) { + return .returnNil + } + } + } return .emitEvent(event) } if line.first! == ASCII.colon { From 0fa72a85ea8a29b81824d7fa8b8038539cab3ed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Mon, 16 Sep 2024 17:59:44 -0700 Subject: [PATCH 02/25] EventStreams: add tests for custom terminating byte sequences --- .../Test_ServerSentEventsDecoding.swift | 136 +++++++++++++++++- 1 file changed, 132 insertions(+), 4 deletions(-) diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift index 79d645a5..d31179e2 100644 --- a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -16,17 +16,31 @@ import XCTest import Foundation final class Test_ServerSentEventsDecoding: Test_Runtime { - func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line) + func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, terminate: ((ArraySlice) -> Bool)? = nil, eventCountOffset: Int = 0) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents() let events = try await [ServerSentEvent](collecting: sequence) - XCTAssertEqual(events.count, output.count, file: file, line: line) + let eventCount = events.count + eventCountOffset + XCTAssertEqual(eventCount, output.count, file: file, line: line) for (index, linePair) in zip(events, output).enumerated() { let (actualEvent, expectedEvent) = linePair XCTAssertEqual(actualEvent, expectedEvent, "Event: \(index)", file: file, line: line) } } + + func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, terminatingData: ArraySlice, eventCountOffset: Int) + async throws { + try await _test( + input: input, + output: output, + file: file, + line: line, + terminate: { incomingData in incomingData == terminatingData }, + eventCountOffset: eventCountOffset + ) + } + func test() async throws { // Simple event. try await _test( @@ -83,15 +97,56 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { .init(id: "123", data: "This is a message with an ID."), ] ) + + try await _test( + input: #""" + data: hello + data: world + + data: [DONE] + + data: hello2 + data: world2 + + + """#, + output: [ + .init(data: "hello\nworld") + ], + terminate: { incomingData in + incomingData == ArraySlice(Data("[DONE]".utf8)) + }, + eventCountOffset: -2 + ) + + try await _test( + input: #""" + data: hello + data: world + + data: [DONE] + + data: hello2 + data: world2 + + + """#, + output: [ + .init(data: "hello\nworld") + ], + terminatingData: ArraySlice(Data("[DONE]".utf8)), + eventCountOffset: -2 + ) } func _testJSONData( input: String, output: [ServerSentEventWithJSONData], file: StaticString = #filePath, - line: UInt = #line + line: UInt = #line, + terminate: (@Sendable (ArraySlice) -> Bool)? = nil ) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)) - .asDecodedServerSentEventsWithJSONData(of: JSONType.self) + .asDecodedServerSentEventsWithJSONData(of: JSONType.self, terminate: terminate) let events = try await [ServerSentEventWithJSONData](collecting: sequence) XCTAssertEqual(events.count, output.count, file: file, line: line) for (index, linePair) in zip(events, output).enumerated() { @@ -99,6 +154,23 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { XCTAssertEqual(actualEvent, expectedEvent, "Event: \(index)", file: file, line: line) } } + + func _testJSONData( + input: String, + output: [ServerSentEventWithJSONData], + file: StaticString = #filePath, + line: UInt = #line, + terminatingDataSequence: ArraySlice? + ) async throws { + try await _testJSONData( + input: input, + output: output, + file: file, + terminate: { incomingData in incomingData == ArraySlice(Data("[DONE]".utf8)) + } + ) + } + struct TestEvent: Decodable, Hashable, Sendable { var index: Int } func testJSONData() async throws { // Simple event. @@ -121,6 +193,62 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { .init(event: "event2", data: TestEvent(index: 2), id: "2"), ] ) + + try await _testJSONData( + input: #""" + event: event1 + id: 1 + data: {"index":1} + + event: event2 + id: 2 + data: { + data: "index": 2 + data: } + + data: [DONE] + + event: event3 + id: 1 + data: {"index":3} + + + """#, + output: [ + .init(event: "event1", data: TestEvent(index: 1), id: "1"), + .init(event: "event2", data: TestEvent(index: 2), id: "2"), + ], + terminate: { incomingData in + incomingData == ArraySlice(Data("[DONE]".utf8)) + } + ) + + try await _testJSONData( + input: #""" + event: event1 + id: 1 + data: {"index":1} + + event: event2 + id: 2 + data: { + data: "index": 2 + data: } + + data: [DONE] + + event: event3 + id: 1 + data: {"index":3} + + + """#, + output: [ + .init(event: "event1", data: TestEvent(index: 1), id: "1"), + .init(event: "event2", data: TestEvent(index: 2), id: "2"), + ], + terminatingDataSequence: ArraySlice(Data("[DONE]".utf8)) + ) } } From 5777ae0a76e175d07b09b33bc5141fe597ed788f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Fri, 20 Sep 2024 10:14:50 -0700 Subject: [PATCH 03/25] EventStreams: remove non-closure overload incl. its tests --- .../ServerSentEventsDecoding.swift | 18 ----- .../Test_ServerSentEventsDecoding.swift | 74 ------------------- 2 files changed, 92 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 56323e89..36dd0ffe 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -106,11 +106,6 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { ServerSentEventsLineDeserializationSequence > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), terminate: terminate) } - /// Convenience function for `asDecodedServerSentEvents` that directly receives the terminating byte sequence. - public func asDecodedServerSentEvents(terminatingSequence: ArraySlice) -> ServerSentEventsDeserializationSequence< - ServerSentEventsLineDeserializationSequence - > { asDecodedServerSentEvents(terminate: { incomingSequence in return incomingSequence == terminatingSequence }) } - /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// /// Use this method if the event's `data` field is JSON. @@ -139,19 +134,6 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { ) } } - - public func asDecodedServerSentEventsWithJSONData( - of dataType: JSONDataType.Type = JSONDataType.self, - decoder: JSONDecoder = .init(), - terminatingData: ArraySlice - ) -> AsyncThrowingMapSequence< - ServerSentEventsDeserializationSequence>, - ServerSentEventWithJSONData - > { - asDecodedServerSentEventsWithJSONData(of: dataType, decoder: decoder) { incomingData in - terminatingData == incomingData - } - } } extension ServerSentEventsDeserializationSequence.Iterator { diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift index d31179e2..1c6555e2 100644 --- a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -29,18 +29,6 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { } } - func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, terminatingData: ArraySlice, eventCountOffset: Int) - async throws { - try await _test( - input: input, - output: output, - file: file, - line: line, - terminate: { incomingData in incomingData == terminatingData }, - eventCountOffset: eventCountOffset - ) - } - func test() async throws { // Simple event. try await _test( @@ -118,25 +106,6 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { }, eventCountOffset: -2 ) - - try await _test( - input: #""" - data: hello - data: world - - data: [DONE] - - data: hello2 - data: world2 - - - """#, - output: [ - .init(data: "hello\nworld") - ], - terminatingData: ArraySlice(Data("[DONE]".utf8)), - eventCountOffset: -2 - ) } func _testJSONData( input: String, @@ -155,22 +124,6 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { } } - func _testJSONData( - input: String, - output: [ServerSentEventWithJSONData], - file: StaticString = #filePath, - line: UInt = #line, - terminatingDataSequence: ArraySlice? - ) async throws { - try await _testJSONData( - input: input, - output: output, - file: file, - terminate: { incomingData in incomingData == ArraySlice(Data("[DONE]".utf8)) - } - ) - } - struct TestEvent: Decodable, Hashable, Sendable { var index: Int } func testJSONData() async throws { // Simple event. @@ -222,33 +175,6 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { incomingData == ArraySlice(Data("[DONE]".utf8)) } ) - - try await _testJSONData( - input: #""" - event: event1 - id: 1 - data: {"index":1} - - event: event2 - id: 2 - data: { - data: "index": 2 - data: } - - data: [DONE] - - event: event3 - id: 1 - data: {"index":3} - - - """#, - output: [ - .init(event: "event1", data: TestEvent(index: 1), id: "1"), - .init(event: "event2", data: TestEvent(index: 2), id: "2"), - ], - terminatingDataSequence: ArraySlice(Data("[DONE]".utf8)) - ) } } From b329be3bc7d7d41414869812c4e42b269046889a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Fri, 20 Sep 2024 10:51:42 -0700 Subject: [PATCH 04/25] EventStreams: rename "terminate" to "while" Use `predicate` when referring to `while` internally, use `while` in the public API Invert the value of the return closure. --- .../ServerSentEventsDecoding.swift | 44 +++++++++---------- .../Test_ServerSentEventsDecoding.swift | 14 +++--- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 36dd0ffe..66dff5a5 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -30,16 +30,16 @@ where Upstream.Element == ArraySlice { /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Parameter: A byte chunk. - /// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API. - private let terminate: (@Sendable (ArraySlice) -> Bool)? + /// - Returns: `True` until the terminating byte sequence is received. + private let predicate: (@Sendable (ArraySlice) -> Bool)? /// Creates a new sequence. /// - Parameters: /// - upstream: The upstream sequence of arbitrary byte chunks. - /// - terminate: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. - public init(upstream: Upstream, terminate: (@Sendable (ArraySlice) -> Bool)?) { + /// - while: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + public init(upstream: Upstream, while predicate: (@Sendable (ArraySlice) -> Bool)?) { self.upstream = upstream - self.terminate = terminate + self.predicate = predicate } } @@ -60,13 +60,13 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Parameter: A byte chunk. - /// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API. - let terminate: ((ArraySlice) -> Bool)? + /// - Returns: `True` until the terminating byte sequence is received. + let predicate: ((ArraySlice) -> Bool)? - init(upstream: any AsyncIteratorProtocol, terminate: ((ArraySlice) -> Bool)?) { + init(upstream: any AsyncIteratorProtocol, while predicate: ((ArraySlice) -> Bool)?) { self.upstream = upstream as! UpstreamIterator - self.stateMachine = .init(terminate: terminate) - self.terminate = terminate + self.stateMachine = .init(while: predicate) + self.predicate = predicate } /// Asynchronously advances to the next element and returns it, or ends the @@ -91,7 +91,7 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// Creates the asynchronous iterator that produces elements of this /// asynchronous sequence. public func makeAsyncIterator() -> Iterator { - Iterator(upstream: upstream.makeAsyncIterator(), terminate: terminate) + Iterator(upstream: upstream.makeAsyncIterator(), while: predicate) } } @@ -102,9 +102,9 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. /// - Parameter: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events. - public func asDecodedServerSentEvents(terminate: (@Sendable (ArraySlice) -> Bool)? = nil) -> ServerSentEventsDeserializationSequence< + public func asDecodedServerSentEvents(while predicate: (@Sendable (ArraySlice) -> Bool)? = nil) -> ServerSentEventsDeserializationSequence< ServerSentEventsLineDeserializationSequence - > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), terminate: terminate) } + > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), while: predicate) } /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// @@ -112,17 +112,17 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// - Parameters: /// - dataType: The type to decode the JSON data into. /// - decoder: The JSON decoder to use. - /// - terminate: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - while: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events with the decoded JSON data. public func asDecodedServerSentEventsWithJSONData( of dataType: JSONDataType.Type = JSONDataType.self, decoder: JSONDecoder = .init(), - terminate: (@Sendable (ArraySlice) -> Bool)? = nil + while predicate: (@Sendable (ArraySlice) -> Bool)? = nil ) -> AsyncThrowingMapSequence< ServerSentEventsDeserializationSequence>, ServerSentEventWithJSONData > { - asDecodedServerSentEvents(terminate: terminate) + asDecodedServerSentEvents(while: predicate) .map { event in ServerSentEventWithJSONData( event: event.event, @@ -160,13 +160,13 @@ extension ServerSentEventsDeserializationSequence.Iterator { /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Parameter: A sequence of byte chunks. - /// - Returns: `True` if the given byte sequence is the terminating byte sequence defined by the API. - let terminate: ((ArraySlice) -> Bool)? + /// - Returns: `True` until the terminating byte sequence is received. + let predicate: ((ArraySlice) -> Bool)? /// Creates a new state machine. - init(terminate: ((ArraySlice) -> Bool)? = nil) { + init(while predicate: ((ArraySlice) -> Bool)? = nil) { self.state = .accumulatingEvent(.init(), buffer: []) - self.terminate = terminate} + self.predicate = predicate} /// An action returned by the `next` method. enum NextAction { @@ -198,9 +198,9 @@ extension ServerSentEventsDeserializationSequence.Iterator { // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } - if let terminate = terminate { + if let predicate = predicate { if let data = event.data { - if terminate(ArraySlice(Data(data.utf8))) { + if !predicate(ArraySlice(Data(data.utf8))) { return .returnNil } } diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift index 1c6555e2..beb2614d 100644 --- a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -16,7 +16,7 @@ import XCTest import Foundation final class Test_ServerSentEventsDecoding: Test_Runtime { - func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, terminate: ((ArraySlice) -> Bool)? = nil, eventCountOffset: Int = 0) + func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: ((ArraySlice) -> Bool)? = nil, eventCountOffset: Int = 0) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents() @@ -101,8 +101,8 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { output: [ .init(data: "hello\nworld") ], - terminate: { incomingData in - incomingData == ArraySlice(Data("[DONE]".utf8)) + while: { incomingData in + incomingData != ArraySlice(Data("[DONE]".utf8)) }, eventCountOffset: -2 ) @@ -112,10 +112,10 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { output: [ServerSentEventWithJSONData], file: StaticString = #filePath, line: UInt = #line, - terminate: (@Sendable (ArraySlice) -> Bool)? = nil + while predicate: (@Sendable (ArraySlice) -> Bool)? = nil ) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)) - .asDecodedServerSentEventsWithJSONData(of: JSONType.self, terminate: terminate) + .asDecodedServerSentEventsWithJSONData(of: JSONType.self, while: predicate) let events = try await [ServerSentEventWithJSONData](collecting: sequence) XCTAssertEqual(events.count, output.count, file: file, line: line) for (index, linePair) in zip(events, output).enumerated() { @@ -171,8 +171,8 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { .init(event: "event1", data: TestEvent(index: 1), id: "1"), .init(event: "event2", data: TestEvent(index: 2), id: "2"), ], - terminate: { incomingData in - incomingData == ArraySlice(Data("[DONE]".utf8)) + while: { incomingData in + incomingData != ArraySlice(Data("[DONE]".utf8)) } ) } From 625b013a313001a74806a1f1f2f171a5433eb1d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Fri, 20 Sep 2024 11:31:12 -0700 Subject: [PATCH 05/25] EventStreams: make closure non-optional Default initialise with `{ _ in true }` --- .../ServerSentEventsDecoding.swift | 38 +++++++++---------- .../Test_ServerSentEventsDecoding.swift | 4 +- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 66dff5a5..6762264a 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -28,16 +28,16 @@ where Upstream.Element == ArraySlice { /// The upstream sequence. private let upstream: Upstream - /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Parameter: A byte chunk. /// - Returns: `True` until the terminating byte sequence is received. - private let predicate: (@Sendable (ArraySlice) -> Bool)? + private let predicate: @Sendable (ArraySlice) -> Bool /// Creates a new sequence. /// - Parameters: /// - upstream: The upstream sequence of arbitrary byte chunks. - /// - while: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. - public init(upstream: Upstream, while predicate: (@Sendable (ArraySlice) -> Bool)?) { + /// - while: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + public init(upstream: Upstream, while predicate: @escaping @Sendable (ArraySlice) -> Bool) { self.upstream = upstream self.predicate = predicate } @@ -56,14 +56,14 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { var upstream: UpstreamIterator /// The state machine of the iterator. - var stateMachine: StateMachine = .init() + var stateMachine: StateMachine - /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Parameter: A byte chunk. /// - Returns: `True` until the terminating byte sequence is received. - let predicate: ((ArraySlice) -> Bool)? + let predicate: (ArraySlice) -> Bool - init(upstream: any AsyncIteratorProtocol, while predicate: ((ArraySlice) -> Bool)?) { + init(upstream: any AsyncIteratorProtocol, while predicate: @escaping ((ArraySlice) -> Bool)) { self.upstream = upstream as! UpstreamIterator self.stateMachine = .init(while: predicate) self.predicate = predicate @@ -100,9 +100,9 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. - /// - Parameter: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - Parameter: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events. - public func asDecodedServerSentEvents(while predicate: (@Sendable (ArraySlice) -> Bool)? = nil) -> ServerSentEventsDeserializationSequence< + public func asDecodedServerSentEvents(while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true }) -> ServerSentEventsDeserializationSequence< ServerSentEventsLineDeserializationSequence > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), while: predicate) } @@ -112,12 +112,12 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// - Parameters: /// - dataType: The type to decode the JSON data into. /// - decoder: The JSON decoder to use. - /// - while: An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - while: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events with the decoded JSON data. public func asDecodedServerSentEventsWithJSONData( of dataType: JSONDataType.Type = JSONDataType.self, decoder: JSONDecoder = .init(), - while predicate: (@Sendable (ArraySlice) -> Bool)? = nil + while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true } ) -> AsyncThrowingMapSequence< ServerSentEventsDeserializationSequence>, ServerSentEventWithJSONData @@ -158,13 +158,13 @@ extension ServerSentEventsDeserializationSequence.Iterator { private(set) var state: State - /// An optional closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Parameter: A sequence of byte chunks. /// - Returns: `True` until the terminating byte sequence is received. - let predicate: ((ArraySlice) -> Bool)? + let predicate: (ArraySlice) -> Bool /// Creates a new state machine. - init(while predicate: ((ArraySlice) -> Bool)? = nil) { + init(while predicate: @escaping (ArraySlice) -> Bool) { self.state = .accumulatingEvent(.init(), buffer: []) self.predicate = predicate} @@ -198,11 +198,9 @@ extension ServerSentEventsDeserializationSequence.Iterator { // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } - if let predicate = predicate { - if let data = event.data { - if !predicate(ArraySlice(Data(data.utf8))) { - return .returnNil - } + if let data = event.data { + if !predicate(ArraySlice(Data(data.utf8))) { + return .returnNil } } return .emitEvent(event) diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift index beb2614d..72b55d27 100644 --- a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -16,7 +16,7 @@ import XCTest import Foundation final class Test_ServerSentEventsDecoding: Test_Runtime { - func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: ((ArraySlice) -> Bool)? = nil, eventCountOffset: Int = 0) + func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: (ArraySlice) -> Bool = { _ in true }, eventCountOffset: Int = 0) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents() @@ -112,7 +112,7 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { output: [ServerSentEventWithJSONData], file: StaticString = #filePath, line: UInt = #line, - while predicate: (@Sendable (ArraySlice) -> Bool)? = nil + while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true } ) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)) .asDecodedServerSentEventsWithJSONData(of: JSONType.self, while: predicate) From a7d37911835733cfd1545deab89814a3e395128f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Fri, 20 Sep 2024 11:49:37 -0700 Subject: [PATCH 06/25] Deprecated: mark `asDecodedServerSentEvents` as deprecated --- Sources/OpenAPIRuntime/Deprecated/Deprecated.swift | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift index c9f538ef..658665d6 100644 --- a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift +++ b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift @@ -59,3 +59,14 @@ extension Configuration { ) } } + +extension AsyncSequence where Element == ArraySlice, Self: Sendable { + /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. + /// + /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. + /// - Returns: A sequence that provides the events. + @available(*, deprecated, renamed: "asDecodedServerSentEvents(while:)") + @_disfavoredOverload public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence< + ServerSentEventsLineDeserializationSequence + > { asDecodedServerSentEvents(while: { _ in true }) } +} From 514bbd0d9f7b41ae14aec6dac4b7e3853a916b31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Fri, 20 Sep 2024 11:52:26 -0700 Subject: [PATCH 07/25] Deprecated: mark `asDecodedServerSentEventsWithJSONData` as deprecated --- .../OpenAPIRuntime/Deprecated/Deprecated.swift | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift index 658665d6..533ff30b 100644 --- a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift +++ b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift @@ -69,4 +69,22 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { @_disfavoredOverload public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence< ServerSentEventsLineDeserializationSequence > { asDecodedServerSentEvents(while: { _ in true }) } + + /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. + /// + /// Use this method if the event's `data` field is JSON. + /// - Parameters: + /// - dataType: The type to decode the JSON data into. + /// - decoder: The JSON decoder to use. + /// - Returns: A sequence that provides the events with the decoded JSON data. + @available(*, deprecated, renamed: "asDecodedServerSentEventsWithJSONData(of:decoder:while:)") + @_disfavoredOverload public func asDecodedServerSentEventsWithJSONData( + of dataType: JSONDataType.Type = JSONDataType.self, + decoder: JSONDecoder = .init() + ) -> AsyncThrowingMapSequence< + ServerSentEventsDeserializationSequence>, + ServerSentEventWithJSONData + > { + asDecodedServerSentEventsWithJSONData(of: dataType, decoder: decoder, while: { _ in true }) + } } From de8aeca93d012f5d7390288f863bdc02d6640c4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:11:26 -0700 Subject: [PATCH 08/25] Update doc comments in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../EventStreams/ServerSentEventsDecoding.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 6762264a..6018cb4e 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -28,9 +28,9 @@ where Upstream.Element == ArraySlice { /// The upstream sequence. private let upstream: Upstream - /// A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// A closure that determines whether the given byte chunk should be forwarded to the consumer. /// - Parameter: A byte chunk. - /// - Returns: `True` until the terminating byte sequence is received. + /// - Returns: `true` if the byte chunk should be forwarded, `false` if this byte chunk is the terminating sequence. private let predicate: @Sendable (ArraySlice) -> Bool /// Creates a new sequence. From 0057abe27fd5236756d30e1bf63419729a836b79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:24:48 -0700 Subject: [PATCH 09/25] Deprecated: mark ServerSentEventsDeserializationSequence's `init(upstream:)` as deprecated --- Sources/OpenAPIRuntime/Deprecated/Deprecated.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift index 533ff30b..e90aa1c6 100644 --- a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift +++ b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift @@ -88,3 +88,11 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { asDecodedServerSentEventsWithJSONData(of: dataType, decoder: decoder, while: { _ in true }) } } + +extension ServerSentEventsDeserializationSequence: Sendable +where Upstream.Element == ArraySlice { + /// Creates a new sequence. + /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. + @available(*, deprecated, renamed: "init(upstream:while:)") + @_disfavoredOverload public init(upstream: Upstream) { init(upstream: upstream, while: { _ in true }) } +} From a346c61e40c13b9c4948e064d219f75ab2968d05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:28:13 -0700 Subject: [PATCH 10/25] Update doc comment in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 6018cb4e..d3cadaab 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -36,7 +36,7 @@ where Upstream.Element == ArraySlice { /// Creates a new sequence. /// - Parameters: /// - upstream: The upstream sequence of arbitrary byte chunks. - /// - while: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - while: A closure that determines whether the given byte chunk should be forwarded to the consumer. public init(upstream: Upstream, while predicate: @escaping @Sendable (ArraySlice) -> Bool) { self.upstream = upstream self.predicate = predicate From 54b1251d3f2eaa677ff956290f6a0487da009028 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:28:36 -0700 Subject: [PATCH 11/25] Update doc comment in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index d3cadaab..058eb852 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -58,7 +58,7 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// The state machine of the iterator. var stateMachine: StateMachine - /// A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// A closure that determines whether the given byte chunk should be forwarded to the consumer. /// - Parameter: A byte chunk. /// - Returns: `True` until the terminating byte sequence is received. let predicate: (ArraySlice) -> Bool From feaba23fca0b7ab7498ee408a74dbceae1ba2fbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:28:53 -0700 Subject: [PATCH 12/25] Update doc comment in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 058eb852..f646098a 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -60,7 +60,7 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// A closure that determines whether the given byte chunk should be forwarded to the consumer. /// - Parameter: A byte chunk. - /// - Returns: `True` until the terminating byte sequence is received. + /// - Returns: `true` if the byte chunk should be forwarded, `false` if this byte chunk is the terminating sequence. let predicate: (ArraySlice) -> Bool init(upstream: any AsyncIteratorProtocol, while predicate: @escaping ((ArraySlice) -> Bool)) { From d286aec3062f4d03ee77e3fd0f92f749bb0eaa15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:35:01 -0700 Subject: [PATCH 13/25] Remove redundant forced type cast in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../EventStreams/ServerSentEventsDecoding.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index f646098a..05ca0bb1 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -63,8 +63,8 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// - Returns: `true` if the byte chunk should be forwarded, `false` if this byte chunk is the terminating sequence. let predicate: (ArraySlice) -> Bool - init(upstream: any AsyncIteratorProtocol, while predicate: @escaping ((ArraySlice) -> Bool)) { - self.upstream = upstream as! UpstreamIterator + init(upstream: UpstreamIterator, while predicate: @escaping ((ArraySlice) -> Bool)) { + self.upstream = upstream self.stateMachine = .init(while: predicate) self.predicate = predicate } From 36df44b8966c8d7f980d03e4f4722e10649ca64b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:42:22 -0700 Subject: [PATCH 14/25] EventStreams: formatting --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 05ca0bb1..38c8e764 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -166,7 +166,8 @@ extension ServerSentEventsDeserializationSequence.Iterator { /// Creates a new state machine. init(while predicate: @escaping (ArraySlice) -> Bool) { self.state = .accumulatingEvent(.init(), buffer: []) - self.predicate = predicate} + self.predicate = predicate + } /// An action returned by the `next` method. enum NextAction { From 6de6ec2c6bc0b85b8c67adae0978445d424b862f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:42:52 -0700 Subject: [PATCH 15/25] EventStreams: add doc comment for `ServerSentEventsDeserializationSequence` initialiser --- .../EventStreams/ServerSentEventsDecoding.swift | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 38c8e764..70532ca9 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -63,6 +63,10 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// - Returns: `true` if the byte chunk should be forwarded, `false` if this byte chunk is the terminating sequence. let predicate: (ArraySlice) -> Bool + /// Creates a new sequence. + /// - Parameters: + /// - upstream: The upstream sequence of arbitrary byte chunks. + /// - while: A closure that determines whether the given byte chunk should be forwarded to the consumer. init(upstream: UpstreamIterator, while predicate: @escaping ((ArraySlice) -> Bool)) { self.upstream = upstream self.stateMachine = .init(while: predicate) From 4e7bdb53f41e7c1062e25c878b296b25243932c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 08:56:39 -0700 Subject: [PATCH 16/25] EventStreams: check for terminating byte sequence after removing trailing `\n` Co-authored-by: Honza Dvorsky --- .../EventStreams/ServerSentEventsDecoding.swift | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 70532ca9..8ec93cfd 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -199,14 +199,14 @@ extension ServerSentEventsDeserializationSequence.Iterator { buffer.removeFirst() if line.isEmpty { // Dispatch the accumulated event. - state = .accumulatingEvent(.init(), buffer: buffer) // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } - - if let data = event.data { - if !predicate(ArraySlice(Data(data.utf8))) { - return .returnNil - } + + state = .accumulatingEvent(.init(), buffer: buffer) + + if let data = event.data, !predicate(ArraySlice(data.utf8)) { + state = .finished + return .returnNil } return .emitEvent(event) } From ad8f0553891521509cbc29cdba3e922ff0d8aab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 09:25:34 -0700 Subject: [PATCH 17/25] fixup! Deprecated: mark ServerSentEventsDeserializationSequence's `init(upstream:)` as deprecated --- Sources/OpenAPIRuntime/Deprecated/Deprecated.swift | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift index e90aa1c6..d1912195 100644 --- a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift +++ b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift @@ -89,10 +89,9 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { } } -extension ServerSentEventsDeserializationSequence: Sendable -where Upstream.Element == ArraySlice { +extension ServerSentEventsDeserializationSequence { /// Creates a new sequence. /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. @available(*, deprecated, renamed: "init(upstream:while:)") - @_disfavoredOverload public init(upstream: Upstream) { init(upstream: upstream, while: { _ in true }) } + @_disfavoredOverload public init(upstream: Upstream) { self.init(upstream: upstream, while: { _ in true }) } } From 1552be844f26f66911f603a5fc5561e08060f0f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 10:24:05 -0700 Subject: [PATCH 18/25] EventStreams: store `predicate` closure as an associated value in `StateMachine` --- .../ServerSentEventsDecoding.swift | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 8ec93cfd..98416e88 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -146,10 +146,10 @@ extension ServerSentEventsDeserializationSequence.Iterator { struct StateMachine { /// The possible states of the state machine. - enum State: Hashable { + enum State { /// Accumulating an event, which hasn't been emitted yet. - case accumulatingEvent(ServerSentEvent, buffer: [ArraySlice]) + case accumulatingEvent(ServerSentEvent, buffer: [ArraySlice], predicate: (ArraySlice) -> Bool) /// Finished, the terminal state. case finished @@ -161,16 +161,9 @@ extension ServerSentEventsDeserializationSequence.Iterator { /// The current state of the state machine. private(set) var state: State - - /// A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. - /// - Parameter: A sequence of byte chunks. - /// - Returns: `True` until the terminating byte sequence is received. - let predicate: (ArraySlice) -> Bool - /// Creates a new state machine. init(while predicate: @escaping (ArraySlice) -> Bool) { - self.state = .accumulatingEvent(.init(), buffer: []) - self.predicate = predicate + self.state = .accumulatingEvent(.init(), buffer: [], predicate: predicate) } /// An action returned by the `next` method. @@ -193,7 +186,7 @@ extension ServerSentEventsDeserializationSequence.Iterator { /// - Returns: An action to perform. mutating func next() -> NextAction { switch state { - case .accumulatingEvent(var event, var buffer): + case .accumulatingEvent(var event, var buffer, let predicate): guard let line = buffer.first else { return .needsMore } state = .mutating buffer.removeFirst() @@ -202,7 +195,7 @@ extension ServerSentEventsDeserializationSequence.Iterator { // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } - state = .accumulatingEvent(.init(), buffer: buffer) + state = .accumulatingEvent(.init(), buffer: buffer, predicate: predicate) if let data = event.data, !predicate(ArraySlice(data.utf8)) { state = .finished @@ -212,7 +205,7 @@ extension ServerSentEventsDeserializationSequence.Iterator { } if line.first! == ASCII.colon { // A comment, skip this line. - state = .accumulatingEvent(event, buffer: buffer) + state = .accumulatingEvent(event, buffer: buffer, predicate: predicate) return .noop } // Parse the field name and value. @@ -236,7 +229,7 @@ extension ServerSentEventsDeserializationSequence.Iterator { } guard let value else { // An unknown type of event, skip. - state = .accumulatingEvent(event, buffer: buffer) + state = .accumulatingEvent(event, buffer: buffer, predicate: predicate) return .noop } // Process the field. @@ -257,11 +250,11 @@ extension ServerSentEventsDeserializationSequence.Iterator { } default: // An unknown or invalid field, skip. - state = .accumulatingEvent(event, buffer: buffer) + state = .accumulatingEvent(event, buffer: buffer, predicate: predicate) return .noop } // Processed the field, continue. - state = .accumulatingEvent(event, buffer: buffer) + state = .accumulatingEvent(event, buffer: buffer, predicate: predicate) return .noop case .finished: return .returnNil case .mutating: preconditionFailure("Invalid state") @@ -283,11 +276,11 @@ extension ServerSentEventsDeserializationSequence.Iterator { /// - Returns: An action to perform. mutating func receivedValue(_ value: ArraySlice?) -> ReceivedValueAction { switch state { - case .accumulatingEvent(let event, var buffer): + case .accumulatingEvent(let event, var buffer, let predicate): if let value { state = .mutating buffer.append(value) - state = .accumulatingEvent(event, buffer: buffer) + state = .accumulatingEvent(event, buffer: buffer, predicate: predicate) return .noop } else { // If no value is received, drop the existing event on the floor. From 2c1b3cbb260c58b3d391bb48de7cd4521b9b4c6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Tue, 1 Oct 2024 10:35:22 -0700 Subject: [PATCH 19/25] EventStreams: remove `eventCountOffset` in `Test_ServerSentEventsDecoding` --- .../EventStreams/Test_ServerSentEventsDecoding.swift | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift index 72b55d27..c216fb5f 100644 --- a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -16,13 +16,12 @@ import XCTest import Foundation final class Test_ServerSentEventsDecoding: Test_Runtime { - func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: (ArraySlice) -> Bool = { _ in true }, eventCountOffset: Int = 0) + func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true }) async throws { - let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents() + let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents(while: predicate) let events = try await [ServerSentEvent](collecting: sequence) - let eventCount = events.count + eventCountOffset - XCTAssertEqual(eventCount, output.count, file: file, line: line) + XCTAssertEqual(events.count, output.count, file: file, line: line) for (index, linePair) in zip(events, output).enumerated() { let (actualEvent, expectedEvent) = linePair XCTAssertEqual(actualEvent, expectedEvent, "Event: \(index)", file: file, line: line) @@ -103,8 +102,7 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { ], while: { incomingData in incomingData != ArraySlice(Data("[DONE]".utf8)) - }, - eventCountOffset: -2 + } ) } func _testJSONData( From 97bda707e94bff751fee000494c6797a85515cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Wed, 2 Oct 2024 13:31:53 -0700 Subject: [PATCH 20/25] Update doc comment in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 98416e88..7c50a345 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -104,7 +104,7 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. - /// - Parameter: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - Parameter: A closure that determines whether the given byte chunk should be forwarded to the consumer. /// - Returns: A sequence that provides the events. public func asDecodedServerSentEvents(while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true }) -> ServerSentEventsDeserializationSequence< ServerSentEventsLineDeserializationSequence From 0d4345ef22da80db778bfd98009f472456bfd966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Wed, 2 Oct 2024 21:33:00 -0700 Subject: [PATCH 21/25] EventStream: remove `predicate` property in `ServerSentEventsDeserializationSequence` iterator --- .../EventStreams/ServerSentEventsDecoding.swift | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 7c50a345..cb0edcdf 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -58,11 +58,6 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// The state machine of the iterator. var stateMachine: StateMachine - /// A closure that determines whether the given byte chunk should be forwarded to the consumer. - /// - Parameter: A byte chunk. - /// - Returns: `true` if the byte chunk should be forwarded, `false` if this byte chunk is the terminating sequence. - let predicate: (ArraySlice) -> Bool - /// Creates a new sequence. /// - Parameters: /// - upstream: The upstream sequence of arbitrary byte chunks. @@ -70,7 +65,6 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { init(upstream: UpstreamIterator, while predicate: @escaping ((ArraySlice) -> Bool)) { self.upstream = upstream self.stateMachine = .init(while: predicate) - self.predicate = predicate } /// Asynchronously advances to the next element and returns it, or ends the From 0b841db52c1c1d3dcb9bf94110a1d6de99773687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Thu, 3 Oct 2024 07:20:59 -0700 Subject: [PATCH 22/25] Update doc comment in Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift Co-authored-by: Honza Dvorsky --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index cb0edcdf..c57fb801 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -36,7 +36,7 @@ where Upstream.Element == ArraySlice { /// Creates a new sequence. /// - Parameters: /// - upstream: The upstream sequence of arbitrary byte chunks. - /// - while: A closure that determines whether the given byte chunk should be forwarded to the consumer. + /// - predicate: A closure that determines whether the given byte chunk should be forwarded to the consumer. public init(upstream: Upstream, while predicate: @escaping @Sendable (ArraySlice) -> Bool) { self.upstream = upstream self.predicate = predicate From 30c9b813d93658a9bd792bbf3b92435d0a356b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Thu, 3 Oct 2024 07:24:24 -0700 Subject: [PATCH 23/25] EventStreams: address CI warnings re: doc comments --- .../EventStreams/ServerSentEventsDecoding.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index c57fb801..12dc532b 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -61,7 +61,7 @@ extension ServerSentEventsDeserializationSequence: AsyncSequence { /// Creates a new sequence. /// - Parameters: /// - upstream: The upstream sequence of arbitrary byte chunks. - /// - while: A closure that determines whether the given byte chunk should be forwarded to the consumer. + /// - predicate: A closure that determines whether the given byte chunk should be forwarded to the consumer. init(upstream: UpstreamIterator, while predicate: @escaping ((ArraySlice) -> Bool)) { self.upstream = upstream self.stateMachine = .init(while: predicate) @@ -110,7 +110,7 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// - Parameters: /// - dataType: The type to decode the JSON data into. /// - decoder: The JSON decoder to use. - /// - while: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. + /// - predicate: A closure that determines whether the given byte sequence is the terminating byte sequence defined by the API. /// - Returns: A sequence that provides the events with the decoded JSON data. public func asDecodedServerSentEventsWithJSONData( of dataType: JSONDataType.Type = JSONDataType.self, From 31462e6cad80269f1481bc63b23dfd2625fa9d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Thu, 3 Oct 2024 07:27:10 -0700 Subject: [PATCH 24/25] EventStream: only assign state once in each pass in `next()` Co-authored-by: Honza Dvorsky --- .../OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 12dc532b..5dd95e7b 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -189,12 +189,11 @@ extension ServerSentEventsDeserializationSequence.Iterator { // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } - state = .accumulatingEvent(.init(), buffer: buffer, predicate: predicate) - if let data = event.data, !predicate(ArraySlice(data.utf8)) { state = .finished return .returnNil } + state = .accumulatingEvent(.init(), buffer: buffer, predicate: predicate) return .emitEvent(event) } if line.first! == ASCII.colon { From 78e19da9537ca60126d479baeeb1e3be79bd0e63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Paul=20Heidekr=C3=BCger?= Date: Thu, 3 Oct 2024 10:17:50 -0700 Subject: [PATCH 25/25] Format --- .../Deprecated/Deprecated.swift | 18 ++++++------- .../ServerSentEventsDecoding.swift | 10 +++---- .../Test_ServerSentEventsDecoding.swift | 26 +++++++++---------- 3 files changed, 25 insertions(+), 29 deletions(-) diff --git a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift index d1912195..2ce41750 100644 --- a/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift +++ b/Sources/OpenAPIRuntime/Deprecated/Deprecated.swift @@ -65,11 +65,10 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. /// - Returns: A sequence that provides the events. - @available(*, deprecated, renamed: "asDecodedServerSentEvents(while:)") - @_disfavoredOverload public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence< + @available(*, deprecated, renamed: "asDecodedServerSentEvents(while:)") @_disfavoredOverload + public func asDecodedServerSentEvents() -> ServerSentEventsDeserializationSequence< ServerSentEventsLineDeserializationSequence > { asDecodedServerSentEvents(while: { _ in true }) } - /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// /// Use this method if the event's `data` field is JSON. @@ -77,21 +76,20 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// - dataType: The type to decode the JSON data into. /// - decoder: The JSON decoder to use. /// - Returns: A sequence that provides the events with the decoded JSON data. - @available(*, deprecated, renamed: "asDecodedServerSentEventsWithJSONData(of:decoder:while:)") - @_disfavoredOverload public func asDecodedServerSentEventsWithJSONData( + @available(*, deprecated, renamed: "asDecodedServerSentEventsWithJSONData(of:decoder:while:)") @_disfavoredOverload + public func asDecodedServerSentEventsWithJSONData( of dataType: JSONDataType.Type = JSONDataType.self, decoder: JSONDecoder = .init() ) -> AsyncThrowingMapSequence< ServerSentEventsDeserializationSequence>, ServerSentEventWithJSONData - > { - asDecodedServerSentEventsWithJSONData(of: dataType, decoder: decoder, while: { _ in true }) - } + > { asDecodedServerSentEventsWithJSONData(of: dataType, decoder: decoder, while: { _ in true }) } } extension ServerSentEventsDeserializationSequence { /// Creates a new sequence. /// - Parameter upstream: The upstream sequence of arbitrary byte chunks. - @available(*, deprecated, renamed: "init(upstream:while:)") - @_disfavoredOverload public init(upstream: Upstream) { self.init(upstream: upstream, while: { _ in true }) } + @available(*, deprecated, renamed: "init(upstream:while:)") @_disfavoredOverload public init(upstream: Upstream) { + self.init(upstream: upstream, while: { _ in true }) + } } diff --git a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift index 5dd95e7b..ff374b39 100644 --- a/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift +++ b/Sources/OpenAPIRuntime/EventStreams/ServerSentEventsDecoding.swift @@ -100,10 +100,11 @@ extension AsyncSequence where Element == ArraySlice, Self: Sendable { /// Use this method if the event's `data` field is not JSON, or if you don't want to parse it using `asDecodedServerSentEventsWithJSONData`. /// - Parameter: A closure that determines whether the given byte chunk should be forwarded to the consumer. /// - Returns: A sequence that provides the events. - public func asDecodedServerSentEvents(while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true }) -> ServerSentEventsDeserializationSequence< - ServerSentEventsLineDeserializationSequence - > { .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), while: predicate) } - + public func asDecodedServerSentEvents( + while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true } + ) -> ServerSentEventsDeserializationSequence> { + .init(upstream: ServerSentEventsLineDeserializationSequence(upstream: self), while: predicate) + } /// Returns another sequence that decodes each event's data as the provided type using the provided decoder. /// /// Use this method if the event's `data` field is JSON. @@ -188,7 +189,6 @@ extension ServerSentEventsDeserializationSequence.Iterator { // Dispatch the accumulated event. // If the last character of data is a newline, strip it. if event.data?.hasSuffix("\n") ?? false { event.data?.removeLast() } - if let data = event.data, !predicate(ArraySlice(data.utf8)) { state = .finished return .returnNil diff --git a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift index c216fb5f..2a15b932 100644 --- a/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift +++ b/Tests/OpenAPIRuntimeTests/EventStreams/Test_ServerSentEventsDecoding.swift @@ -16,9 +16,13 @@ import XCTest import Foundation final class Test_ServerSentEventsDecoding: Test_Runtime { - func _test(input: String, output: [ServerSentEvent], file: StaticString = #filePath, line: UInt = #line, while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true }) - async throws - { + func _test( + input: String, + output: [ServerSentEvent], + file: StaticString = #filePath, + line: UInt = #line, + while predicate: @escaping @Sendable (ArraySlice) -> Bool = { _ in true } + ) async throws { let sequence = asOneBytePerElementSequence(ArraySlice(input.utf8)).asDecodedServerSentEvents(while: predicate) let events = try await [ServerSentEvent](collecting: sequence) XCTAssertEqual(events.count, output.count, file: file, line: line) @@ -97,12 +101,8 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { """#, - output: [ - .init(data: "hello\nworld") - ], - while: { incomingData in - incomingData != ArraySlice(Data("[DONE]".utf8)) - } + output: [.init(data: "hello\nworld")], + while: { incomingData in incomingData != ArraySlice(Data("[DONE]".utf8)) } ) } func _testJSONData( @@ -162,16 +162,14 @@ final class Test_ServerSentEventsDecoding: Test_Runtime { event: event3 id: 1 data: {"index":3} - - + + """#, output: [ .init(event: "event1", data: TestEvent(index: 1), id: "1"), .init(event: "event2", data: TestEvent(index: 2), id: "2"), ], - while: { incomingData in - incomingData != ArraySlice(Data("[DONE]".utf8)) - } + while: { incomingData in incomingData != ArraySlice(Data("[DONE]".utf8)) } ) } }