Skip to content

feat(realtime): send broadcast events through HTTP #476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions .swiftpm/configuration/Package.resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"pins" : [
{
"identity" : "swift-concurrency-extras",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-concurrency-extras",
"state" : {
"revision" : "bb5059bde9022d69ac516803f4f227d8ac967f71",
"version" : "1.1.0"
}
},
{
"identity" : "swift-crypto",
"kind" : "remoteSourceControl",
"location" : "https://github.com/apple/swift-crypto.git",
"state" : {
"revision" : "bc1c29221f6dfeb0ebbfbc98eb95cd3d4967868e",
"version" : "3.4.0"
}
},
{
"identity" : "swift-custom-dump",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-custom-dump",
"state" : {
"revision" : "aec6a73f5c1dc1f1be4f61888094b95cf995d973",
"version" : "1.3.2"
}
},
{
"identity" : "swift-snapshot-testing",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-snapshot-testing",
"state" : {
"revision" : "c097f955b4e724690f0fc8ffb7a6d4b881c9c4e3",
"version" : "1.17.2"
}
},
{
"identity" : "swift-syntax",
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftlang/swift-syntax",
"state" : {
"revision" : "303e5c5c36d6a558407d364878df131c3546fad8",
"version" : "510.0.2"
}
},
{
"identity" : "xctest-dynamic-overlay",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
"state" : {
"revision" : "357ca1e5dd31f613a1d43320870ebc219386a495",
"version" : "1.2.2"
}
}
],
"version" : 2
}
3 changes: 3 additions & 0 deletions .swiftpm/xcode/xcshareddata/xcschemes/Supabase.xcscheme
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
<TestPlanReference
reference = "container:TestPlans/Integration.xctestplan">
</TestPlanReference>
<TestPlanReference
reference = "container:TestPlans/AllTests.xctestplan">
</TestPlanReference>
</TestPlans>
</TestAction>
<LaunchAction
Expand Down
21 changes: 15 additions & 6 deletions Package.resolved
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-custom-dump",
"state" : {
"revision" : "d237304f42af07f22563aa4cc2d7e2cfb25da82e",
"version" : "1.3.1"
"revision" : "aec6a73f5c1dc1f1be4f61888094b95cf995d973",
"version" : "1.3.2"
}
},
{
"identity" : "swift-issue-reporting",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/swift-issue-reporting",
"state" : {
"revision" : "c85092304cda8cb38d2d68454b29609a8013620b",
"version" : "1.2.1"
"revision" : "357ca1e5dd31f613a1d43320870ebc219386a495",
"version" : "1.2.2"
}
},
{
Expand All @@ -50,8 +50,17 @@
"kind" : "remoteSourceControl",
"location" : "https://github.com/swiftlang/swift-syntax",
"state" : {
"revision" : "4c6cc0a3b9e8f14b3ae2307c5ccae4de6167ac2c",
"version" : "600.0.0-prerelease-2024-06-12"
"revision" : "82a453c2dfa335c7e778695762438dfe72b328d2",
"version" : "600.0.0-prerelease-2024-07-24"
}
},
{
"identity" : "xctest-dynamic-overlay",
"kind" : "remoteSourceControl",
"location" : "https://github.com/pointfreeco/xctest-dynamic-overlay",
"state" : {
"revision" : "357ca1e5dd31f613a1d43320870ebc219386a495",
"version" : "1.2.2"
}
}
],
Expand Down
18 changes: 9 additions & 9 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ let package = Package(
],
dependencies: [
.package(url: "https://github.com/apple/swift-crypto.git", "1.0.0" ..< "4.0.0"),
.package(url: "https://github.com/pointfreeco/swift-concurrency-extras", from: "1.0.0"),
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.0"),
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.8.1"),
.package(url: "https://github.com/pointfreeco/swift-issue-reporting", from: "1.2.0"),
.package(url: "https://github.com/pointfreeco/swift-concurrency-extras", from: "1.1.0"),
.package(url: "https://github.com/pointfreeco/swift-custom-dump", from: "1.3.2"),
.package(url: "https://github.com/pointfreeco/swift-snapshot-testing", from: "1.17.2"),
.package(url: "https://github.com/pointfreeco/xctest-dynamic-overlay", from: "1.2.2"),
],
targets: [
.target(
Expand Down Expand Up @@ -55,7 +55,7 @@ let package = Package(
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "SnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Helpers",
"Auth",
"TestHelpers",
Expand All @@ -71,7 +71,7 @@ let package = Package(
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "SnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Functions",
"TestHelpers",
],
Expand All @@ -82,7 +82,7 @@ let package = Package(
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "InlineSnapshotTesting", package: "swift-snapshot-testing"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Helpers",
"Auth",
"PostgREST",
Expand Down Expand Up @@ -129,7 +129,7 @@ let package = Package(
name: "StorageTests",
dependencies: [
.product(name: "CustomDump", package: "swift-custom-dump"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Storage",
]
),
Expand All @@ -155,7 +155,7 @@ let package = Package(
name: "TestHelpers",
dependencies: [
.product(name: "ConcurrencyExtras", package: "swift-concurrency-extras"),
.product(name: "IssueReporting", package: "swift-issue-reporting"),
.product(name: "XCTestDynamicOverlay", package: "xctest-dynamic-overlay"),
"Auth",
]
),
Expand Down
7 changes: 0 additions & 7 deletions Sources/Realtime/RealtimeChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,6 @@ public struct RealtimeChannelOptions {
}
}

/// Represents the different status of a push
public enum PushStatus: String, Sendable {
case ok
case error
case timeout
}

public enum RealtimeSubscribeStates {
case subscribed
case timedOut
Expand Down
7 changes: 7 additions & 0 deletions Sources/Realtime/V2/PushV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
import Foundation
import Helpers

/// Represents the different status of a push
public enum PushStatus: String, Sendable {
case ok
case error
case timeout
}

actor PushV2 {
private weak var channel: RealtimeChannelV2?
let message: RealtimeMessageV2
Expand Down
95 changes: 78 additions & 17 deletions Sources/Realtime/V2/RealtimeChannelV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,56 @@ import ConcurrencyExtras
import Foundation
import Helpers

#if canImport(FoundationNetworking)
import FoundationNetworking

extension HTTPURLResponse {
convenience init() {
self.init(
url: URL(string: "http://127.0.0.1")!,
statusCode: 200,
httpVersion: nil,
headerFields: nil
)!
}
}
#endif

public struct RealtimeChannelConfig: Sendable {
public var broadcast: BroadcastJoinConfig
public var presence: PresenceJoinConfig
public var isPrivate: Bool
}

struct Socket: Sendable {
var broadcastURL: @Sendable () -> URL
var status: @Sendable () -> RealtimeClientV2.Status
var options: @Sendable () -> RealtimeClientOptions
var accessToken: @Sendable () -> String?
var apiKey: @Sendable () -> String?
var makeRef: @Sendable () -> Int

var connect: @Sendable () async -> Void
var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void
var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void
var push: @Sendable (_ message: RealtimeMessageV2) async -> Void
var httpSend: @Sendable (_ request: HTTPRequest) async throws -> HTTPResponse
}

extension Socket {
init(client: RealtimeClientV2) {
self.init(
broadcastURL: { [weak client] in client?.broadcastURL ?? URL(string: "http://localhost")! },
status: { [weak client] in client?.status ?? .disconnected },
options: { [weak client] in client?.options ?? .init() },
accessToken: { [weak client] in client?.mutableState.accessToken },
apiKey: { [weak client] in client?.apikey },
makeRef: { [weak client] in client?.makeRef() ?? 0 },
connect: { [weak client] in await client?.connect() },
addChannel: { [weak client] in client?.addChannel($0) },
removeChannel: { [weak client] in await client?.removeChannel($0) },
push: { [weak client] in await client?.push($0) }
push: { [weak client] in await client?.push($0) },
httpSend: { [weak client] in try await client?.http.send($0) ?? .init(data: Data(), response: HTTPURLResponse()) }
)
}
}
Expand Down Expand Up @@ -202,24 +223,64 @@ public final class RealtimeChannelV2: Sendable {
/// - event: Broadcast message event.
/// - message: Message payload.
public func broadcast(event: String, message: JSONObject) async {
assert(
status == .subscribed,
"You can only broadcast after subscribing to the channel. Did you forget to call `channel.subscribe()`?"
)
if status != .subscribed {
struct Message: Encodable {
let topic: String
let event: String
let payload: JSONObject
let `private`: Bool
}

await push(
RealtimeMessageV2(
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
"type": "broadcast",
"event": .string(event),
"payload": .object(message),
]
var headers = HTTPHeaders(["content-type": "application/json"])
if let apiKey = socket.apiKey() {
headers["apikey"] = apiKey
}
if let accessToken = socket.accessToken() {
headers["authorization"] = "Bearer \(accessToken)"
}

let task = Task { [headers] in
_ = try? await socket.httpSend(
HTTPRequest(
url: socket.broadcastURL(),
method: .post,
headers: headers,
body: JSONEncoder().encode(
[
"messages": [
Message(
topic: topic,
event: event,
payload: message,
private: config.isPrivate
),
],
]
)
)
)
}

if config.broadcast.acknowledgeBroadcasts {
try? await withTimeout(interval: socket.options().timeoutInterval) {
await task.value
}
}
} else {
await push(
RealtimeMessageV2(
joinRef: mutableState.joinRef,
ref: socket.makeRef().description,
topic: topic,
event: ChannelEvent.broadcast,
payload: [
"type": "broadcast",
"event": .string(event),
"payload": .object(message),
]
)
)
)
}
}

public func track(_ state: some Codable) async throws {
Expand Down
21 changes: 19 additions & 2 deletions Sources/Realtime/V2/RealtimeClientV2.swift
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public final class RealtimeClientV2: Sendable {
let options: RealtimeClientOptions
let ws: any WebSocketClient
let mutableState = LockIsolated(MutableState())
let http: any HTTPClientType
let apikey: String?

public var subscriptions: [String: RealtimeChannelV2] {
Expand Down Expand Up @@ -128,6 +129,12 @@ public final class RealtimeClientV2: Sendable {
}

public convenience init(url: URL, options: RealtimeClientOptions) {
var interceptors: [any HTTPClientInterceptor] = []

if let logger = options.logger {
interceptors.append(LoggerInterceptor(logger: logger))
}

self.init(
url: url,
options: options,
Expand All @@ -137,14 +144,24 @@ public final class RealtimeClientV2: Sendable {
apikey: options.apikey
),
options: options
),
http: HTTPClient(
fetch: options.fetch ?? { try await URLSession.shared.data(for: $0) },
interceptors: interceptors
)
)
}

init(url: URL, options: RealtimeClientOptions, ws: any WebSocketClient) {
init(
url: URL,
options: RealtimeClientOptions,
ws: any WebSocketClient,
http: any HTTPClientType
) {
self.url = url
self.options = options
self.ws = ws
self.http = http
apikey = options.apikey

mutableState.withValue {
Expand Down Expand Up @@ -471,7 +488,7 @@ public final class RealtimeClientV2: Sendable {
return url
}

private var broadcastURL: URL {
var broadcastURL: URL {
url.appendingPathComponent("api/broadcast")
}
}
Loading
Loading