Skip to content

Add stricter assertions regarding stream state. #812

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Firestore/Example/Tests/Integration/FSTStreamTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#import <XCTest/XCTest.h>

#import <GRPCClient/GRPCCall.h>

#import <FirebaseFirestore/FIRFirestoreSettings.h>

#import "Firestore/Example/Tests/Util/FSTHelpers.h"
Expand All @@ -36,7 +38,7 @@

/** Exposes otherwise private methods for testing. */
@interface FSTStream (Testing)
- (void)writesFinishedWithError:(NSError *_Nullable)error;
@property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
@end

/**
Expand Down Expand Up @@ -204,7 +206,7 @@ - (void)testWatchStreamStopBeforeHandshake {

// Simulate a final callback from GRPC
[_workerDispatchQueue dispatchAsync:^{
[watchStream writesFinishedWithError:nil];
[watchStream.callbackFilter writesFinishedWithError:nil];
}];

[self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
Expand All @@ -228,7 +230,7 @@ - (void)testWriteStreamStopBeforeHandshake {

// Simulate a final callback from GRPC
[_workerDispatchQueue dispatchAsync:^{
[writeStream writesFinishedWithError:nil];
[writeStream.callbackFilter writesFinishedWithError:nil];
}];

[self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
Expand Down
18 changes: 6 additions & 12 deletions Firestore/Source/Remote/FSTStream.mm
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,12 @@ - (void)startWithDelegate:(id)delegate {

/** Add an access token to our RPC, after obtaining one from the credentials provider. */
- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
[self.workerDispatchQueue verifyIsCurrentQueue];

if (self.state == FSTStreamStateStopped) {
// Streams can be stopped while waiting for authorization.
return;
}

[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
(long)self.state);

Expand Down Expand Up @@ -525,11 +525,7 @@ - (void)handleStreamMessage:(id)value {
*/
- (void)handleStreamClose:(nullable NSError *)error {
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);

if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
return;
}
FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe at all.

PR #811 was defending against gRPC calling us and enqueueing an error while (or just before) we were already processing a close for a different reason. This pushed the check in the callback filter out onto the dispatch queue making the callback filter effective again and prevents callbacks from gRPC from getting us after the stream is closed.

However, it's still the case that an idle close timer firing, a user disableNetwork, and a gRPC error can be enqueued at the same time. The first one to succeed may now prevent gRPC from kicking out any further events, but the subsequent close actions are still going to pass through here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree to disagree.

Whichever of those 3 are executed first will call closeWithFinalState: which will call suppressCallbacks in conjunction with setting state to a non-started value, which will prevent subsequent calls to handleStreamClose:.

The only other caller of handleStreamClose: is resumeStartWithToken: when handling auth failures, but it asserts that self.state == FSTStreamStateAuth immediately prior to calling handleStreamClose: so this assert is safe in that case too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Said differently, we always suppress callbacks when setting the stream state to a non started state... And with your change we can now rely on that to prevent any subsequent handleStreamClose calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've convinced myself you're right by tracing it through. All callers check the state prior to calling so this is safe.


// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
Expand All @@ -550,11 +546,7 @@ - (void)handleStreamClose:(nullable NSError *)error {
*/
- (void)writeValue:(id)value __used {
[self.workerDispatchQueue verifyIsCurrentQueue];

if (![self isStarted]) {
FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
return;
}
FSTAssert([self isStarted], @"writeValue: called for stopped stream.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast to the above, this and below should be safe again because they're only called by gRPC.


if (!self.messageReceived) {
self.messageReceived = YES;
Expand Down Expand Up @@ -587,6 +579,8 @@ - (void)writeValue:(id)value __used {
- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");

[self handleStreamClose:error];
}

Expand Down