Skip to content

Commit ca0825b

Browse files
authored
Merge pull request #664 from cloudflare/milan/hibernation-lifetime
Move owned WebSocket back to api::WebSocket on final hibernation event
2 parents 665681d + 6a32783 commit ca0825b

7 files changed

+79
-26
lines changed

src/workerd/api/global-scope.c++

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,12 @@ void ServiceWorkerGlobalScope::sendHibernatableWebSocketClose(
527527

528528
KJ_IF_MAYBE(h, exportedHandler) {
529529
KJ_IF_MAYBE(handler, h->webSocketClose) {
530-
auto websocket = event->getWebSocket(lock);
531-
websocket->initiateHibernatableRelease(lock, api::WebSocket::HibernatableReleaseState::CLOSE);
530+
// We won't be dispatching any further events because we've received a close, so we return the
531+
// owned websocket back to the api::WebSocket.
532+
auto releasePackage = event->prepareForRelease(lock);
533+
auto& websocket = releasePackage.webSocketRef;
534+
websocket->initiateHibernatableRelease(lock,
535+
kj::mv(releasePackage.ownedWebSocket), api::WebSocket::HibernatableReleaseState::CLOSE);
532536
auto promise = (*handler)(lock, kj::mv(websocket), close.code, kj::mv(close.reason),
533537
close.wasClean);
534538
event->waitUntil(kj::mv(promise));
@@ -545,8 +549,12 @@ void ServiceWorkerGlobalScope::sendHibernatableWebSocketError(
545549

546550
KJ_IF_MAYBE(h, exportedHandler) {
547551
KJ_IF_MAYBE(handler, h->webSocketError) {
548-
auto websocket = event->getWebSocket(lock);
549-
websocket->initiateHibernatableRelease(lock, WebSocket::HibernatableReleaseState::ERROR);
552+
// We won't be dispatching any further events because we've encountered an error, so we return
553+
// the owned websocket back to the api::WebSocket.
554+
auto releasePackage = event->prepareForRelease(lock);
555+
auto& websocket = releasePackage.webSocketRef;
556+
websocket->initiateHibernatableRelease(lock,
557+
kj::mv(releasePackage.ownedWebSocket), WebSocket::HibernatableReleaseState::ERROR);
550558
jsg::Lock& js(lock);
551559
auto promise = (*handler)(js, kj::mv(websocket), js.exceptionToJs(kj::mv(e)));
552560
event->waitUntil(kj::mv(promise));

src/workerd/api/hibernatable-web-socket.c++

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,22 @@ namespace workerd::api {
1212
HibernatableWebSocketEvent::HibernatableWebSocketEvent()
1313
: ExtendableEvent("webSocketMessage") {};
1414

15+
HibernatableWebSocketEvent::ItemsForRelease HibernatableWebSocketEvent::prepareForRelease(
16+
jsg::Lock &lock) {
17+
auto& actor = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
18+
auto& manager = kj::downcast<HibernationManagerImpl>(
19+
KJ_REQUIRE_NONNULL(actor.getHibernationManager()));
20+
auto& hibernatableWebSocket = KJ_REQUIRE_NONNULL(manager.webSocketForEventHandler);
21+
22+
// `getWebSocket()` requires `HibernatableWebSocket::ws` be non-null, so it must be called first.
23+
// The explicit ctor makes it less likely we accidentally move the owned websocket first.
24+
return ItemsForRelease(getWebSocket(lock), kj::mv(KJ_REQUIRE_NONNULL(hibernatableWebSocket.ws)));
25+
}
26+
1527
jsg::Ref<WebSocket> HibernatableWebSocketEvent::getWebSocket(jsg::Lock& lock) {
16-
auto& manager = static_cast<HibernationManagerImpl&>(
17-
KJ_REQUIRE_NONNULL(
18-
KJ_REQUIRE_NONNULL(IoContext::current().getActor()).getHibernationManager()));
28+
auto& actor = KJ_REQUIRE_NONNULL(IoContext::current().getActor());
29+
auto& manager = kj::downcast<HibernationManagerImpl>(
30+
KJ_REQUIRE_NONNULL(actor.getHibernationManager()));
1931
auto& hibernatableWebSocket = KJ_REQUIRE_NONNULL(manager.webSocketForEventHandler);
2032
return hibernatableWebSocket.getActiveOrUnhibernate(lock);
2133
}

src/workerd/api/hibernatable-web-socket.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,23 @@ class HibernatableWebSocketEvent final: public ExtendableEvent {
2222

2323
static jsg::Ref<HibernatableWebSocketEvent> constructor(kj::String type) = delete;
2424

25+
struct ItemsForRelease {
26+
// When we call a close or error event, we need to move the owned websocket back into the
27+
// api::WebSocket to extend its lifetime. The way we obtain the websocket from the
28+
// HibernationManager is somewhat fragile, so it's better if we group the reference and owned
29+
// websocket together.
30+
jsg::Ref<WebSocket> webSocketRef;
31+
kj::Own<kj::WebSocket> ownedWebSocket;
32+
33+
explicit ItemsForRelease(jsg::Ref<WebSocket> ref, kj::Own<kj::WebSocket> owned)
34+
: webSocketRef(kj::mv(ref)), ownedWebSocket(kj::mv(owned)) {}
35+
};
36+
37+
ItemsForRelease prepareForRelease(jsg::Lock& lock);
38+
// Only call this once (when transferring ownership of the websocket back to the api::WebSocket).
39+
// Gets a reference to the api::WebSocket, and moves the owned kj::WebSocket out of the
40+
// HibernatableWebSocket whose event we are currently delivering.
41+
2542
jsg::Ref<WebSocket> getWebSocket(jsg::Lock& lock);
2643

2744
JSG_RESOURCE_TYPE(HibernatableWebSocketEvent) {

src/workerd/api/web-socket.c++

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ WebSocket::Accepted::Accepted(kj::Own<kj::WebSocket> wsParam, Native& native, Io
373373
}
374374

375375
WebSocket::Accepted::Accepted(Hibernatable wsParam, Native& native, IoContext& context)
376-
: ws(wsParam),
376+
: ws(kj::mv(wsParam)),
377377
whenAbortedTask(createAbortTask(native, context)) {
378378
KJ_IF_MAYBE(a, context.getActor()) {
379379
auto& metrics = a->getMetrics();
@@ -699,7 +699,7 @@ void WebSocket::ensurePumping(jsg::Lock& js) {
699699
KJ_FAIL_ASSERT("Unexpected native web socket state", native.state);
700700
}
701701
}
702-
}, [this](jsg::Lock& js, jsg::Value&& exception) mutable {
702+
}, [this, thisHandle = JSG_THIS](jsg::Lock& js, jsg::Value&& exception) mutable {
703703
if (awaitingHibernatableRelease()) {
704704
// We have a hibernatable websocket -- we don't want to dispatch a regular error event.
705705
tryReleaseNative(js);

src/workerd/api/web-socket.h

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,9 @@ class WebSocket: public EventTarget {
290290
ERROR
291291
};
292292

293-
void initiateHibernatableRelease(jsg::Lock& js, HibernatableReleaseState releaseState) {
293+
void initiateHibernatableRelease(jsg::Lock& js,
294+
kj::Own<kj::WebSocket> ws,
295+
HibernatableReleaseState releaseState) {
294296
// Called when a Hibernatable WebSocket wants to dispatch a close/error event, this modifies
295297
// our `Accepted` state to prepare the state to transition to `Released`.
296298
//
@@ -300,7 +302,7 @@ class WebSocket: public EventTarget {
300302
KJ_IF_MAYBE(state, farNative->state.tryGet<Accepted>()) {
301303
KJ_REQUIRE(state->isHibernatable(),
302304
"tried to initiate hibernatable release but websocket wasn't hibernatable");
303-
state->ws.initiateHibernatableRelease(js, releaseState);
305+
state->ws.initiateHibernatableRelease(js, kj::mv(ws), releaseState);
304306
farNative->closedIncoming = true;
305307
} else {
306308
KJ_LOG(WARNING, "Unexpected Hibernatable WebSocket state on release", farNative->state);
@@ -456,6 +458,11 @@ class WebSocket: public EventTarget {
456458
// A `Hibernatable` WebSocket shares a sub-set of behavior that's already implemented for an
457459
// `Accepted` WebSocket, so we can think of it a sub-state.
458460
kj::WebSocket& ws;
461+
kj::Maybe<kj::Own<void>> attachedForClose;
462+
// If we have initiated a hibernatable error/close event, we need to take back ownership of
463+
// the kj::WebSocket so any final queued messages will deliver. We store this owned websocket
464+
// in `attachedForClose`. Since the `ws` reference is still valid, we prevent usage of
465+
// `attachedForClose` directly in favor of using continuing to use `ws` directly.
459466
HibernatableReleaseState releaseState = HibernatableReleaseState::NONE;
460467
// We can't move the state to Released after the Hibernatable Close/Error event runs, since
461468
// we don't have a request on the thread by the time the event completes.
@@ -514,10 +521,15 @@ class WebSocket: public EventTarget {
514521
return inner.tryGet<Hibernatable>();
515522
}
516523

517-
void initiateHibernatableRelease(jsg::Lock& js, HibernatableReleaseState state) {
524+
void initiateHibernatableRelease(jsg::Lock& js,
525+
kj::Own<kj::WebSocket> ws,
526+
HibernatableReleaseState state) {
518527
// Transitions our Hibernatable websocket to a "Releasing" state.
519528
// The websocket will transition to `Released` when convenient.
520-
KJ_REQUIRE_NONNULL(getIfHibernatable()).releaseState = state;
529+
auto& hibernatable = KJ_REQUIRE_NONNULL(getIfHibernatable());
530+
hibernatable.releaseState = state;
531+
// Note that we move the owned kj::WebSocket here.
532+
hibernatable.attachedForClose = kj::mv(ws);
521533
}
522534

523535
bool isAwaitingRelease() {
@@ -544,18 +556,18 @@ class WebSocket: public EventTarget {
544556
return ws.getIfNotHibernatable() == nullptr;
545557
}
546558

559+
kj::Promise<void> createAbortTask(Native& native, IoContext& context);
560+
kj::Promise<void> whenAbortedTask = nullptr;
561+
// Listens for ws->whenAborted() and possibly triggers a proactive shutdown.
562+
563+
kj::Maybe<kj::Own<ActorObserver>> actorMetrics;
564+
547565
kj::Canceler canceler;
548566
// This canceler wraps the pump loop as a precaution to make sure we can't exit the Accepted
549567
// state with a pump task still happening asychronously. In practice the canceler should usually
550568
// be empty when destroyed because we do not leave the Accepted state if we're still pumping.
551569
// Even in the case of IoContext premature cancellation, the pump task should be canceled
552570
// by the IoContext before the Canceler is destroyed.
553-
554-
kj::Promise<void> createAbortTask(Native& native, IoContext& context);
555-
kj::Promise<void> whenAbortedTask = nullptr;
556-
// Listens for ws->whenAborted() and possibly triggers a proactive shutdown.
557-
558-
kj::Maybe<kj::Own<ActorObserver>> actorMetrics;
559571
};
560572

561573
struct Released {};

src/workerd/io/hibernation-manager.c++

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
namespace workerd {
99

10-
HibernationManagerImpl::~HibernationManagerImpl() {
10+
HibernationManagerImpl::~HibernationManagerImpl() noexcept(false) {
1111
// Note that the HibernatableWebSocket destructor handles removing any references to itself in
1212
// `tagToWs`, and even removes the hashmap entry if there are no more entries in the bucket.
1313
allWs.clear();
@@ -156,7 +156,7 @@ kj::Promise<void> HibernationManagerImpl::handleSocketTermination(
156156

157157
kj::Promise<void> HibernationManagerImpl::readLoop(HibernatableWebSocket& hib) {
158158
// Like the api::WebSocket readLoop(), but we dispatch different types of events.
159-
auto& ws = *hib.ws;
159+
auto& ws = *KJ_REQUIRE_NONNULL(hib.ws);
160160
while (true) {
161161
kj::WebSocket::Message message = co_await ws.receive();
162162
// Note that errors are handled by the callee of `readLoop`, since we throw from `receive()`.

src/workerd/io/hibernation-manager.h

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
2222
hibernationEventType(hibernationEventType),
2323
onDisconnect(DisconnectHandler{}),
2424
readLoopTasks(onDisconnect) {}
25-
~HibernationManagerImpl();
25+
~HibernationManagerImpl() noexcept(false);
2626

2727
void acceptWebSocket(jsg::Ref<api::WebSocket> ws, kj::ArrayPtr<kj::String> tags) override;
2828
// Tells the HibernationManager to create a new HibernatableWebSocket with the associated tags
@@ -39,7 +39,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
3939
// Hibernates all the websockets held by the HibernationManager.
4040
// This converts our activeOrPackage from an api::WebSocket to a HibernationPackage.
4141

42-
friend jsg::Ref<api::WebSocket> api::HibernatableWebSocketEvent::getWebSocket(jsg::Lock& lock);
42+
friend class api::HibernatableWebSocketEvent;
4343

4444
private:
4545
class HibernatableWebSocket;
@@ -73,7 +73,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
7373
ws(activeOrPackage.get<jsg::Ref<api::WebSocket>>()->acceptAsHibernatable()),
7474
manager(manager) {}
7575

76-
~HibernatableWebSocket() {
76+
~HibernatableWebSocket() noexcept(false) {
7777
// We expect this dtor to be called when we're removing a HibernatableWebSocket
7878
// from our `allWs` collection in the HibernationManager.
7979

@@ -100,7 +100,7 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
100100
// to the api::WebSocket.
101101
KJ_IF_MAYBE(package, activeOrPackage.tryGet<api::WebSocket::HibernationPackage>()) {
102102
activeOrPackage.init<jsg::Ref<api::WebSocket>>(
103-
api::WebSocket::hibernatableFromNative(js, *ws, kj::mv(*package)));
103+
api::WebSocket::hibernatableFromNative(js, *KJ_REQUIRE_NONNULL(ws), kj::mv(*package)));
104104
}
105105
return activeOrPackage.get<jsg::Ref<api::WebSocket>>().addRef();
106106
}
@@ -116,7 +116,11 @@ class HibernationManagerImpl final : public Worker::Actor::HibernationManager {
116116
kj::OneOf<jsg::Ref<api::WebSocket>, api::WebSocket::HibernationPackage> activeOrPackage;
117117
// If active, we have an api::WebSocket reference, otherwise, we're hibernating, so we retain
118118
// the websocket's properties in a HibernationPackage until it's time to wake up.
119-
kj::Own<kj::WebSocket> ws;
119+
kj::Maybe<kj::Own<kj::WebSocket>> ws;
120+
// This is an owned websocket that we extract from the api::WebSocket after accepting as
121+
// hibernatable. It becomes null once we dispatch a close or error event because we want its
122+
// lifetime to be managed by IoContext's DeleteQueue. This helps prevent a situation where the
123+
// HibernationManager drops the websocket before all queued messages have sent.
120124
HibernationManagerImpl& manager;
121125
// TODO(someday): We (currently) only use the HibernationManagerImpl reference to refer to
122126
// `tagToWs` when running the dtor for `HibernatableWebSocket`. This feels a bit excessive,

0 commit comments

Comments
 (0)