Skip to content

Use -[GRPCCall setResponseDispatchQueue] #811

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Firestore/Example/Tests/Integration/FSTStreamTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#import <XCTest/XCTest.h>

#import <GRPCClient/GRPCCall.h>

#import <FirebaseFirestore/FIRFirestoreSettings.h>

#import "Firestore/Example/Tests/Util/FSTHelpers.h"
Expand All @@ -36,7 +38,7 @@

/** Exposes otherwise private methods for testing. */
@interface FSTStream (Testing)
- (void)writesFinishedWithError:(NSError *_Nullable)error;
@property(nonatomic, strong, readwrite) id<GRXWriteable> callbackFilter;
@end

/**
Expand Down Expand Up @@ -203,7 +205,9 @@ - (void)testWatchStreamStopBeforeHandshake {
}];

// Simulate a final callback from GRPC
[watchStream writesFinishedWithError:nil];
[_workerDispatchQueue dispatchAsync:^{
[watchStream.callbackFilter writesFinishedWithError:nil];
}];

[self verifyDelegateObservedStates:@[ @"watchStreamDidOpen" ]];
}
Expand All @@ -225,7 +229,9 @@ - (void)testWriteStreamStopBeforeHandshake {
}];

// Simulate a final callback from GRPC
[writeStream writesFinishedWithError:nil];
[_workerDispatchQueue dispatchAsync:^{
[writeStream.callbackFilter writesFinishedWithError:nil];
}];

[self verifyDelegateObservedStates:@[ @"writeStreamDidOpen" ]];
}
Expand Down
91 changes: 44 additions & 47 deletions Firestore/Source/Remote/FSTStream.mm
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ @interface FSTStream () <GRXWriteable>

#pragma mark - FSTCallbackFilter

/** Filter class that allows disabling of GRPC callbacks. */
/**
* Implements callbacks from gRPC via the GRXWriteable protocol. This is separate from the main
* FSTStream to allow the stream to be stopped externally (either by the user or via idle timer)
* and be able to completely prevent any subsequent events from gRPC from calling back into the
* FSTSTream.
*/
@interface FSTCallbackFilter : NSObject <GRXWriteable>

- (instancetype)initWithStream:(FSTStream *)stream NS_DESIGNATED_INITIALIZER;
Expand Down Expand Up @@ -263,12 +268,12 @@ - (void)startWithDelegate:(id)delegate {

/** Add an access token to our RPC, after obtaining one from the credentials provider. */
- (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {
[self.workerDispatchQueue verifyIsCurrentQueue];

if (self.state == FSTStreamStateStopped) {
// Streams can be stopped while waiting for authorization.
return;
}

[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert(self.state == FSTStreamStateAuth, @"State should still be auth (was %ld)",
(long)self.state);

Expand All @@ -282,6 +287,8 @@ - (void)resumeStartWithToken:(FSTGetTokenResult *)token error:(NSError *)error {

self.requestsWriter = [[FSTBufferedWriter alloc] init];
_rpc = [self createRPCWithRequestsWriter:self.requestsWriter];
[_rpc setResponseDispatchQueue:self.workerDispatchQueue.queue];

[FSTDatastore prepareHeadersForRPC:_rpc
databaseID:&self.databaseInfo->database_id()
token:token.token];
Expand Down Expand Up @@ -523,11 +530,7 @@ - (void)handleStreamMessage:(id)value {
*/
- (void)handleStreamClose:(nullable NSError *)error {
FSTLog(@"%@ %p close: %@", NSStringFromClass([self class]), (__bridge void *)self, error);

if (![self isStarted]) { // The stream could have already been closed by the idle close timer.
FSTLog(@"%@ Ignoring server close for already closed stream.", NSStringFromClass([self class]));
return;
}
FSTAssert([self isStarted], @"handleStreamClose: called for non-started stream.");

// In theory the stream could close cleanly, however, in our current model we never expect this
// to happen because if we stop a stream ourselves, this callback will never be called. To
Expand All @@ -540,56 +543,50 @@ - (void)handleStreamClose:(nullable NSError *)error {
// The GRXWriteable implementation defines the receive side of the RPC stream.

/**
* Called by GRPC when it publishes a value. It is called from GRPC's own queue so we immediately
* redispatch back onto our own worker queue.
* Called by GRPC when it publishes a value.
*
* GRPC must be configured to use our worker queue by calling
* `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
* the RPC.
*/
- (void)writeValue:(id)value __used {
// TODO(mcg): remove the double-dispatch once GRPCCall at head is released.
// Once released we can set the responseDispatchQueue property on the GRPCCall and then this
// method can call handleStreamMessage directly.
FSTWeakify(self);
[self.workerDispatchQueue dispatchAsync:^{
FSTStrongify(self);
if (![self isStarted]) {
FSTLog(@"%@ Ignoring stream message from inactive stream.", NSStringFromClass([self class]));
return;
}

if (!self.messageReceived) {
self.messageReceived = YES;
if ([FIRFirestore isLoggingEnabled]) {
FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
(__bridge void *)self,
[FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
}
NSError *error;
id proto = [self parseProto:self.responseMessageClass data:value error:&error];
if (proto) {
[self handleStreamMessage:proto];
} else {
[_rpc finishWithError:error];
- (void)writeValue:(id)value {
[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert([self isStarted], @"writeValue: called for stopped stream.");

if (!self.messageReceived) {
self.messageReceived = YES;
if ([FIRFirestore isLoggingEnabled]) {
FSTLog(@"%@ %p headers (whitelisted): %@", NSStringFromClass([self class]),
(__bridge void *)self,
[FSTDatastore extractWhiteListedHeaders:self.rpc.responseHeaders]);
}
}];
}
NSError *error;
id proto = [self parseProto:self.responseMessageClass data:value error:&error];
if (proto) {
[self handleStreamMessage:proto];
} else {
[_rpc finishWithError:error];
}
}

/**
* Called by GRPC when it closed the stream with an error representing the final state of the
* stream.
*
* Do not call directly, since it dispatches via the worker queue. Call handleStreamClose to
* directly inform stream-specific logic, or call stop to tear down the stream.
* GRPC must be configured to use our worker queue by calling
* `[call setResponseDispatchQueue:self.workerDispatchQueue.queue]` on the GRPCCall before starting
* the RPC.
*
* Do not call directly. Call handleStreamClose to directly inform stream-specific logic, or call
* stop to tear down the stream.
*/
- (void)writesFinishedWithError:(nullable NSError *)error __used {
error = [FSTDatastore firestoreErrorForError:error];
FSTWeakify(self);
[self.workerDispatchQueue dispatchAsync:^{
FSTStrongify(self);
if (!self || self.state == FSTStreamStateStopped) {
return;
}
[self handleStreamClose:error];
}];
[self.workerDispatchQueue verifyIsCurrentQueue];
FSTAssert([self isStarted], @"writesFinishedWithError: called for stopped stream.");

[self handleStreamClose:error];
}

@end
Expand Down