diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.mm b/Firestore/Example/Tests/Integration/FSTStreamTests.mm index 6259aff0e18..cec35c10e8f 100644 --- a/Firestore/Example/Tests/Integration/FSTStreamTests.mm +++ b/Firestore/Example/Tests/Integration/FSTStreamTests.mm @@ -16,6 +16,8 @@ #import +#import + #import #import "Firestore/Example/Tests/Util/FSTHelpers.h" @@ -36,7 +38,7 @@ /** Exposes otherwise private methods for testing. */ @interface FSTStream (Testing) -- (void)writesFinishedWithError:(NSError *_Nullable)error; +@property(nonatomic, strong, readwrite) id callbackFilter; @end /** @@ -203,7 +205,9 @@ - (void)testWatchStreamStopBeforeHandshake { }]; // Simulate a final callback from GRPC - [watchStream writesFinishedWithError:nil]; + [_workerDispatchQueue dispatchAsync:^{ + [watchStream.callbackFilter writesFinishedWithError:nil]; + }]; [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]]; } @@ -225,7 +229,9 @@ - (void)testWriteStreamStopBeforeHandshake { }]; // Simulate a final callback from GRPC - [writeStream writesFinishedWithError:nil]; + [_workerDispatchQueue dispatchAsync:^{ + [writeStream.callbackFilter writesFinishedWithError:nil]; + }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]]; } diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm index a1d80add40d..884ed9ac830 100644 --- a/Firestore/Source/Remote/FSTStream.mm +++ b/Firestore/Source/Remote/FSTStream.mm @@ -149,7 +149,12 @@ @interface FSTStream () #pragma mark - FSTCallbackFilter -/** Filter class that allows disabling of GRPC callbacks. */ +/** + * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main + * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer) + * and be able to completely prevent any subsequent events from gRPC from calling back into the + * FSTSTream. + */ @interface FSTCallbackFilter : NSObject - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER; @@ -263,12 +268,12 @@ - (void)startWithDelegate:(id)delegate { /** Add an access token to our RPC, after obtaining one from the credentials provider. */ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateStopped) { // Streams can be stopped while waiting for authorization. return; } - - [self.workerDispatchQueue verifyIsCurrentQueue]; FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", (long)self.state); @@ -282,6 +287,8 @@ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { self.requestsWriter = [[FSTBufferedWriter alloc] init]; _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; + [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue]; + [FSTDatastore prepareHeadersForRPC:_rpc databaseID:&self.databaseInfo->database_id() token:token.token]; @@ -523,11 +530,7 @@ - (void)handleStreamMessage:(id)value { */ - (void)handleStreamClose:(nullable NSError *)error { FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); - - if (![self isStarted]) { // The stream could have already been closed by the idle close timer. - FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class])); - return; - } + FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream."); // In theory the stream could close cleanly, however, in our current model we never expect this // to happen because if we stop a stream ourselves, this callback will never be called. To @@ -540,56 +543,50 @@ - (void)handleStreamClose:(nullable NSError *)error { // The GRXWriteable implementation defines the receive side of the RPC stream. /** - * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately - * redispatch back onto our own worker queue. + * Called by GRPC when it publishes a value. + * + * GRPC must be configured to use our worker queue by calling + * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting + * the RPC. */ -- (void)writeValue:(id)value __used { - // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. - // Once released we can set the responseDispatchQueue property on the GRPCCall and then this - // method can call handleStreamMessage directly. - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (![self isStarted]) { - FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class])); - return; - } - - if (!self.messageReceived) { - self.messageReceived = YES; - if ([FIRFirestore isLoggingEnabled]) { - FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), - (__bridge void *)self, - [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); - } - } - NSError *error; - id proto = [self parseProto:self.responseMessageClass data:value error:&error]; - if (proto) { - [self handleStreamMessage:proto]; - } else { - [_rpc finishWithError:error]; +- (void)writeValue:(id)value { + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert([self isStarted], @"writeValue: called for stopped stream."); + + if (!self.messageReceived) { + self.messageReceived = YES; + if ([FIRFirestore isLoggingEnabled]) { + FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), + (__bridge void *)self, + [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); } - }]; + } + NSError *error; + id proto = [self parseProto:self.responseMessageClass data:value error:&error]; + if (proto) { + [self handleStreamMessage:proto]; + } else { + [_rpc finishWithError:error]; + } } /** * Called by GRPC when it closed the stream with an error representing the final state of the * stream. * - * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to - * directly inform stream-specific logic, or call stop to tear down the stream. + * GRPC must be configured to use our worker queue by calling + * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting + * the RPC. + * + * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call + * stop to tear down the stream. */ - (void)writesFinishedWithError:(nullable NSError *)error __used { error = [FSTDatastore firestoreErrorForError:error]; - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (!self || self.state == FSTStreamStateStopped) { - return; - } - [self handleStreamClose:error]; - }]; + [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream."); + + [self handleStreamClose:error]; } @end