From 32c6c00dbf3391b3e684f25767d756d9c4457e57 Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Sat, 17 Feb 2018 15:09:21 -0800 Subject: [PATCH 1/3] Use -[GRPCCall setResponseDispatchQueue] ... to dispatch GRPC callbacks directly onto the Firestore worker queue. This saves a double-dispatch and simplifies our logic. --- .../Tests/Integration/FSTStreamTests.mm | 8 ++- Firestore/Source/Remote/FSTStream.mm | 72 +++++++++---------- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.mm b/Firestore/Example/Tests/Integration/FSTStreamTests.mm index 6259aff0e18..3faf67859a4 100644 --- a/Firestore/Example/Tests/Integration/FSTStreamTests.mm +++ b/Firestore/Example/Tests/Integration/FSTStreamTests.mm @@ -203,7 +203,9 @@ - (void)testWatchStreamStopBeforeHandshake { }]; // Simulate a final callback from GRPC - [watchStream writesFinishedWithError:nil]; + [_workerDispatchQueue dispatchAsync:^{ + [watchStream writesFinishedWithError:nil]; + }]; [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]]; } @@ -225,7 +227,9 @@ - (void)testWriteStreamStopBeforeHandshake { }]; // Simulate a final callback from GRPC - [writeStream writesFinishedWithError:nil]; + [_workerDispatchQueue dispatchAsync:^{ + [writeStream writesFinishedWithError:nil]; + }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]]; } diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm index a1d80add40d..e07718236ae 100644 --- a/Firestore/Source/Remote/FSTStream.mm +++ b/Firestore/Source/Remote/FSTStream.mm @@ -282,6 +282,8 @@ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { self.requestsWriter = [[FSTBufferedWriter alloc] init]; _rpc = [self createRPCWithRequestsWriter:self.requestsWriter]; + [_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue]; + [FSTDatastore prepareHeadersForRPC:_rpc databaseID:&self.databaseInfo->database_id() token:token.token]; @@ -540,56 +542,52 @@ - (void)handleStreamClose:(nullable NSError *)error { // The GRXWriteable implementation defines the receive side of the RPC stream. /** - * Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately - * redispatch back onto our own worker queue. + * Called by GRPC when it publishes a value. + * + * GRPC must be configured to use our worker queue by calling + * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting + * the RPC. */ - (void)writeValue:(id)value __used { - // TODO(mcg): remove the double-dispatch once GRPCCall at head is released. - // Once released we can set the responseDispatchQueue property on the GRPCCall and then this - // method can call handleStreamMessage directly. - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (![self isStarted]) { - FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class])); - return; - } + [self.workerDispatchQueue verifyIsCurrentQueue]; - if (!self.messageReceived) { - self.messageReceived = YES; - if ([FIRFirestore isLoggingEnabled]) { - FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), - (__bridge void *)self, - [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); - } - } - NSError *error; - id proto = [self parseProto:self.responseMessageClass data:value error:&error]; - if (proto) { - [self handleStreamMessage:proto]; - } else { - [_rpc finishWithError:error]; + if (![self isStarted]) { + FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class])); + return; + } + + if (!self.messageReceived) { + self.messageReceived = YES; + if ([FIRFirestore isLoggingEnabled]) { + FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]), + (__bridge void *)self, + [FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]); } - }]; + } + NSError *error; + id proto = [self parseProto:self.responseMessageClass data:value error:&error]; + if (proto) { + [self handleStreamMessage:proto]; + } else { + [_rpc finishWithError:error]; + } } /** * Called by GRPC when it closed the stream with an error representing the final state of the * stream. * - * Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to - * directly inform stream-specific logic, or call stop to tear down the stream. + * GRPC must be configured to use our worker queue by calling + * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting + * the RPC. + * + * Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call + * stop to tear down the stream. */ - (void)writesFinishedWithError:(nullable NSError *)error __used { error = [FSTDatastore firestoreErrorForError:error]; - FSTWeakify(self); - [self.workerDispatchQueue dispatchAsync:^{ - FSTStrongify(self); - if (!self || self.state == FSTStreamStateStopped) { - return; - } - [self handleStreamClose:error]; - }]; + [self.workerDispatchQueue verifyIsCurrentQueue]; + [self handleStreamClose:error]; } @end From b78f9064f19900522641c490bea76c9ce60251ca Mon Sep 17 00:00:00 2001 From: Michael Lehenbauer Date: Mon, 19 Feb 2018 11:36:03 -0800 Subject: [PATCH 2/3] Add stricter assertions regarding stream state. (#812) --- .../Tests/Integration/FSTStreamTests.mm | 8 +++++--- Firestore/Source/Remote/FSTStream.mm | 18 ++++++------------ 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/Firestore/Example/Tests/Integration/FSTStreamTests.mm b/Firestore/Example/Tests/Integration/FSTStreamTests.mm index 3faf67859a4..cec35c10e8f 100644 --- a/Firestore/Example/Tests/Integration/FSTStreamTests.mm +++ b/Firestore/Example/Tests/Integration/FSTStreamTests.mm @@ -16,6 +16,8 @@ #import +#import + #import #import "Firestore/Example/Tests/Util/FSTHelpers.h" @@ -36,7 +38,7 @@ /** Exposes otherwise private methods for testing. */ @interface FSTStream (Testing) -- (void)writesFinishedWithError:(NSError *_Nullable)error; +@property(nonatomic, strong, readwrite) id callbackFilter; @end /** @@ -204,7 +206,7 @@ - (void)testWatchStreamStopBeforeHandshake { // Simulate a final callback from GRPC [_workerDispatchQueue dispatchAsync:^{ - [watchStream writesFinishedWithError:nil]; + [watchStream.callbackFilter writesFinishedWithError:nil]; }]; [self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]]; @@ -228,7 +230,7 @@ - (void)testWriteStreamStopBeforeHandshake { // Simulate a final callback from GRPC [_workerDispatchQueue dispatchAsync:^{ - [writeStream writesFinishedWithError:nil]; + [writeStream.callbackFilter writesFinishedWithError:nil]; }]; [self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]]; diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm index e07718236ae..4f96d5407df 100644 --- a/Firestore/Source/Remote/FSTStream.mm +++ b/Firestore/Source/Remote/FSTStream.mm @@ -263,12 +263,12 @@ - (void)startWithDelegate:(id)delegate { /** Add an access token to our RPC, after obtaining one from the credentials provider. */ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error { + [self.workerDispatchQueue verifyIsCurrentQueue]; + if (self.state == FSTStreamStateStopped) { // Streams can be stopped while waiting for authorization. return; } - - [self.workerDispatchQueue verifyIsCurrentQueue]; FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)", (long)self.state); @@ -525,11 +525,7 @@ - (void)handleStreamMessage:(id)value { */ - (void)handleStreamClose:(nullable NSError *)error { FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error); - - if (![self isStarted]) { // The stream could have already been closed by the idle close timer. - FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class])); - return; - } + FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream."); // In theory the stream could close cleanly, however, in our current model we never expect this // to happen because if we stop a stream ourselves, this callback will never be called. To @@ -550,11 +546,7 @@ - (void)handleStreamClose:(nullable NSError *)error { */ - (void)writeValue:(id)value __used { [self.workerDispatchQueue verifyIsCurrentQueue]; - - if (![self isStarted]) { - FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class])); - return; - } + FSTAssert([self isStarted], @"writeValue: called for stopped stream."); if (!self.messageReceived) { self.messageReceived = YES; @@ -587,6 +579,8 @@ - (void)writeValue:(id)value __used { - (void)writesFinishedWithError:(nullable NSError *)error __used { error = [FSTDatastore firestoreErrorForError:error]; [self.workerDispatchQueue verifyIsCurrentQueue]; + FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream."); + [self handleStreamClose:error]; } From 14816987fc4eabbb68fa8382573ac99b9613d3ae Mon Sep 17 00:00:00 2001 From: Marek Gilbert Date: Mon, 19 Feb 2018 11:43:57 -0800 Subject: [PATCH 3/3] Update comments --- Firestore/Source/Remote/FSTStream.mm | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Firestore/Source/Remote/FSTStream.mm b/Firestore/Source/Remote/FSTStream.mm index 4f96d5407df..884ed9ac830 100644 --- a/Firestore/Source/Remote/FSTStream.mm +++ b/Firestore/Source/Remote/FSTStream.mm @@ -149,7 +149,12 @@ @interface FSTStream () #pragma mark - FSTCallbackFilter -/** Filter class that allows disabling of GRPC callbacks. */ +/** + * Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main + * FSTStream to allow the stream to be stopped externally (either by the user or via idle timer) + * and be able to completely prevent any subsequent events from gRPC from calling back into the + * FSTSTream. + */ @interface FSTCallbackFilter : NSObject - (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER; @@ -544,7 +549,7 @@ - (void)handleStreamClose:(nullable NSError *)error { * `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting * the RPC. */ -- (void)writeValue:(id)value __used { +- (void)writeValue:(id)value { [self.workerDispatchQueue verifyIsCurrentQueue]; FSTAssert([self isStarted], @"writeValue: called for stopped stream.");