diff --git a/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm b/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm index 22172c8f959..026db12ad05 100644 --- a/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm +++ b/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm @@ -153,7 +153,7 @@ - (void)acknowledgeMutationWithVersion:(FSTTestSnapshotVersion)documentVersion { transformResults:nil]; FSTMutationBatchResult *result = [FSTMutationBatchResult resultWithBatch:batch commitVersion:version - mutationResults:@[ mutationResult ] + mutationResults:{mutationResult} streamToken:nil]; _lastChanges = [self.localStore acknowledgeBatchWithResult:result]; } diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h index 59afab20fa1..c9555cd9455 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h @@ -18,6 +18,7 @@ #include #include +#include #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" #include "Firestore/core/src/firebase/firestore/model/types.h" @@ -40,7 +41,7 @@ class MockDatastore : public Datastore { auth::CredentialsProvider* credentials); std::shared_ptr CreateWatchStream(WatchStreamCallback* callback) override; - std::shared_ptr CreateWriteStream(id delegate) override; + std::shared_ptr CreateWriteStream(WriteStreamCallback* callback) override; /** * A count of the total number of requests sent to the watch stream since the beginning of the @@ -82,7 +83,7 @@ class MockDatastore : public Datastore { int WritesSent() const; /** Injects a write ack as though it had come from the backend in response to a write. */ - void AckWrite(const model::SnapshotVersion& version, NSArray* results); + void AckWrite(const model::SnapshotVersion& version, std::vector results); /** Injects a stream failure as though it had come from the backend. */ void FailWrite(const util::Status& error); diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm index 3d7214c246e..49bf0b71375 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm @@ -25,7 +25,6 @@ #import "Firestore/Source/Local/FSTQueryData.h" #import "Firestore/Source/Model/FSTMutation.h" #import "Firestore/Source/Remote/FSTSerializerBeta.h" -#import "Firestore/Source/Remote/FSTStream.h" #include "Firestore/core/src/firebase/firestore/auth/credentials_provider.h" #include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h" @@ -161,18 +160,18 @@ void WriteWatchChange(const WatchChange& change, SnapshotVersion snap) { CredentialsProvider* credentials_provider, FSTSerializerBeta* serializer, GrpcConnection* grpc_connection, - id delegate, + WriteStreamCallback* callback, MockDatastore* datastore) - : WriteStream{worker_queue, credentials_provider, serializer, grpc_connection, delegate}, + : WriteStream{worker_queue, credentials_provider, serializer, grpc_connection, callback}, datastore_{datastore}, - delegate_{delegate} { + callback_{callback} { } void Start() override { HARD_ASSERT(!open_, "Trying to start already started write stream"); open_ = true; sent_mutations_ = {}; - [delegate_ writeStreamDidOpen]; + callback_->OnWriteStreamOpen(); } void Stop() override { @@ -194,7 +193,7 @@ bool IsOpen() const override { void WriteHandshake() override { datastore_->IncrementWriteStreamRequests(); SetHandshakeComplete(); - [delegate_ writeStreamDidCompleteHandshake]; + callback_->OnWriteStreamHandshakeComplete(); } void WriteMutations(NSArray* mutations) override { @@ -203,14 +202,14 @@ void WriteMutations(NSArray* mutations) override { } /** Injects a write ack as though it had come from the backend in response to a write. */ - void AckWrite(const SnapshotVersion& commitVersion, NSArray* results) { - [delegate_ writeStreamDidReceiveResponseWithVersion:commitVersion mutationResults:results]; + void AckWrite(const SnapshotVersion& commitVersion, std::vector results) { + callback_->OnWriteStreamMutationResult(commitVersion, std::move(results)); } /** Injects a failed write response as though it had come from the backend. */ void FailStream(const Status& error) { open_ = false; - [delegate_ writeStreamWasInterruptedWithError:error]; + callback_->OnWriteStreamClose(error); } /** @@ -236,7 +235,7 @@ int sent_mutations_count() const { bool open_ = false; std::queue*> sent_mutations_; MockDatastore* datastore_ = nullptr; - id delegate_ = nullptr; + WriteStreamCallback* callback_ = nullptr; }; MockDatastore::MockDatastore(const core::DatabaseInfo& database_info, @@ -257,11 +256,11 @@ int sent_mutations_count() const { return watch_stream_; } -std::shared_ptr MockDatastore::CreateWriteStream(id delegate) { +std::shared_ptr MockDatastore::CreateWriteStream(WriteStreamCallback* callback) { write_stream_ = std::make_shared( worker_queue_, credentials_, [[FSTSerializerBeta alloc] initWithDatabaseID:&database_info_->database_id()], - grpc_connection(), delegate, this); + grpc_connection(), callback, this); return write_stream_; } @@ -290,8 +289,9 @@ int sent_mutations_count() const { return write_stream_->sent_mutations_count(); } -void MockDatastore::AckWrite(const SnapshotVersion& version, NSArray* results) { - write_stream_->AckWrite(version, results); +void MockDatastore::AckWrite(const SnapshotVersion& version, + std::vector results) { + write_stream_->AckWrite(version, std::move(results)); } void MockDatastore::FailWrite(const Status& error) { diff --git a/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm b/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm index 266cda72610..24f0fd4a7ee 100644 --- a/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm +++ b/Firestore/Example/Tests/SpecTests/FSTSpecTests.mm @@ -363,7 +363,7 @@ - (void)doWriteAck:(NSDictionary *)spec { FSTMutationResult *mutationResult = [[FSTMutationResult alloc] initWithVersion:version transformResults:nil]; - [self.driver receiveWriteAckWithVersion:version mutationResults:@[ mutationResult ]]; + [self.driver receiveWriteAckWithVersion:version mutationResults:{mutationResult}]; } - (void)doFailWrite:(NSDictionary *)spec { diff --git a/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h b/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h index b3b2a1d2a5c..780291a7888 100644 --- a/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h +++ b/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.h @@ -18,6 +18,7 @@ #include #include +#include #import "Firestore/Source/Remote/FSTRemoteStore.h" @@ -195,9 +196,9 @@ typedef std::unordered_map *)mutationResults; +- (FSTOutstandingWrite *) + receiveWriteAckWithVersion:(const firebase::firestore::model::SnapshotVersion &)commitVersion + mutationResults:(std::vector)mutationResults; /** * A count of the mutations written to the write stream by the FSTSyncEngine, but not yet diff --git a/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm b/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm index 299f15703db..4f196b15033 100644 --- a/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm +++ b/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm @@ -273,12 +273,13 @@ - (void)changeUser:(const User &)user { - (FSTOutstandingWrite *)receiveWriteAckWithVersion:(const SnapshotVersion &)commitVersion mutationResults: - (NSArray *)mutationResults { + (std::vector)mutationResults { FSTOutstandingWrite *write = [self currentOutstandingWrites].firstObject; [[self currentOutstandingWrites] removeObjectAtIndex:0]; [self validateNextWriteSent:write.write]; - _workerQueue->EnqueueBlocking([&] { _datastore->AckWrite(commitVersion, mutationResults); }); + _workerQueue->EnqueueBlocking( + [&] { _datastore->AckWrite(commitVersion, std::move(mutationResults)); }); return write; } diff --git a/Firestore/Source/Model/FSTMutationBatch.h b/Firestore/Source/Model/FSTMutationBatch.h index 0aace1d323f..ac8212db47a 100644 --- a/Firestore/Source/Model/FSTMutationBatch.h +++ b/Firestore/Source/Model/FSTMutationBatch.h @@ -17,6 +17,7 @@ #import #include +#include #include "Firestore/core/src/firebase/firestore/model/document_key.h" #include "Firestore/core/src/firebase/firestore/model/document_key_set.h" @@ -102,13 +103,13 @@ NS_ASSUME_NONNULL_BEGIN */ + (instancetype)resultWithBatch:(FSTMutationBatch *)batch commitVersion:(firebase::firestore::model::SnapshotVersion)commitVersion - mutationResults:(NSArray *)mutationResults + mutationResults:(std::vector)mutationResults streamToken:(nullable NSData *)streamToken; - (const firebase::firestore::model::SnapshotVersion &)commitVersion; +- (const std::vector &)mutationResults; @property(nonatomic, strong, readonly) FSTMutationBatch *batch; -@property(nonatomic, strong, readonly) NSArray *mutationResults; @property(nonatomic, strong, readonly, nullable) NSData *streamToken; - (const firebase::firestore::model::DocumentVersionMap &)docVersions; diff --git a/Firestore/Source/Model/FSTMutationBatch.mm b/Firestore/Source/Model/FSTMutationBatch.mm index aad425f3f62..c76c5e3fd2f 100644 --- a/Firestore/Source/Model/FSTMutationBatch.mm +++ b/Firestore/Source/Model/FSTMutationBatch.mm @@ -82,9 +82,9 @@ - (FSTMaybeDocument *_Nullable)applyToRemoteDocument:(FSTMaybeDocument *_Nullabl "applyTo: key %s doesn't match maybeDoc key %s", documentKey.ToString(), maybeDoc.key.ToString()); - HARD_ASSERT(mutationBatchResult.mutationResults.count == self.mutations.count, + HARD_ASSERT(mutationBatchResult.mutationResults.size() == self.mutations.count, "Mismatch between mutations length (%s) and results length (%s)", - self.mutations.count, mutationBatchResult.mutationResults.count); + self.mutations.count, mutationBatchResult.mutationResults.size()); for (NSUInteger i = 0; i < self.mutations.count; i++) { FSTMutation *mutation = self.mutations[i]; @@ -130,25 +130,26 @@ - (DocumentKeySet)keys { @interface FSTMutationBatchResult () - (instancetype)initWithBatch:(FSTMutationBatch *)batch commitVersion:(SnapshotVersion)commitVersion - mutationResults:(NSArray *)mutationResults + mutationResults:(std::vector)mutationResults streamToken:(nullable NSData *)streamToken docVersions:(DocumentVersionMap)docVersions NS_DESIGNATED_INITIALIZER; @end @implementation FSTMutationBatchResult { SnapshotVersion _commitVersion; + std::vector _mutationResults; DocumentVersionMap _docVersions; } - (instancetype)initWithBatch:(FSTMutationBatch *)batch commitVersion:(SnapshotVersion)commitVersion - mutationResults:(NSArray *)mutationResults + mutationResults:(std::vector)mutationResults streamToken:(nullable NSData *)streamToken docVersions:(DocumentVersionMap)docVersions { if (self = [super init]) { _batch = batch; _commitVersion = std::move(commitVersion); - _mutationResults = mutationResults; + _mutationResults = std::move(mutationResults); _streamToken = streamToken; _docVersions = std::move(docVersions); } @@ -159,17 +160,21 @@ - (instancetype)initWithBatch:(FSTMutationBatch *)batch return _commitVersion; } +- (const std::vector &)mutationResults { + return _mutationResults; +} + - (const DocumentVersionMap &)docVersions { return _docVersions; } + (instancetype)resultWithBatch:(FSTMutationBatch *)batch commitVersion:(SnapshotVersion)commitVersion - mutationResults:(NSArray *)mutationResults + mutationResults:(std::vector)mutationResults streamToken:(nullable NSData *)streamToken { - HARD_ASSERT(batch.mutations.count == mutationResults.count, + HARD_ASSERT(batch.mutations.count == mutationResults.size(), "Mutations sent %s must equal results received %s", batch.mutations.count, - mutationResults.count); + mutationResults.size()); DocumentVersionMap docVersions; NSArray *mutations = batch.mutations; @@ -186,7 +191,7 @@ + (instancetype)resultWithBatch:(FSTMutationBatch *)batch return [[FSTMutationBatchResult alloc] initWithBatch:batch commitVersion:std::move(commitVersion) - mutationResults:mutationResults + mutationResults:std::move(mutationResults) streamToken:streamToken docVersions:std::move(docVersions)]; } diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm index 7b4f215c911..9d86bd9ee29 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.mm +++ b/Firestore/Source/Remote/FSTRemoteStore.mm @@ -28,7 +28,6 @@ #import "Firestore/Source/Model/FSTDocument.h" #import "Firestore/Source/Model/FSTMutation.h" #import "Firestore/Source/Model/FSTMutationBatch.h" -#import "Firestore/Source/Remote/FSTStream.h" #include "Firestore/core/src/firebase/firestore/auth/user.h" #include "Firestore/core/src/firebase/firestore/model/document_key.h" @@ -73,41 +72,13 @@ NS_ASSUME_NONNULL_BEGIN -/** - * The maximum number of pending writes to allow. - * TODO(bjornick): Negotiate this value with the backend. - */ -static const int kMaxPendingWrites = 10; - #pragma mark - FSTRemoteStore -@interface FSTRemoteStore () - -#pragma mark Watch Stream - -/** - * A list of up to kMaxPendingWrites writes that we have fetched from the LocalStore via - * fillWritePipeline and have or will send to the write stream. - * - * Whenever writePipeline is not empty, the RemoteStore will attempt to start or restart the write - * stream. When the stream is established, the writes in the pipeline will be sent in order. - * - * Writes remain in writePipeline until they are acknowledged by the backend and thus will - * automatically be re-sent if the stream is interrupted / restarted before they're acknowledged. - * - * Write responses from the backend are linked to their originating request purely based on - * order, and so we can just remove writes from the front of the writePipeline as we receive - * responses. - */ -@property(nonatomic, strong, readonly) NSMutableArray *writePipeline; -@end - @implementation FSTRemoteStore { /** The client-side proxy for interacting with the backend. */ std::shared_ptr _datastore; std::unique_ptr _remoteStore; - std::shared_ptr _writeStream; } - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore @@ -116,15 +87,10 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore onlineStateHandler:(std::function)onlineStateHandler { if (self = [super init]) { _datastore = std::move(datastore); - - _writePipeline = [NSMutableArray array]; - _datastore->Start(); _remoteStore = absl::make_unique(localStore, _datastore.get(), queue, std::move(onlineStateHandler)); - _writeStream = _datastore->CreateWriteStream(self); - _remoteStore->set_is_network_enabled(false); } return self; @@ -146,7 +112,7 @@ - (void)enableNetwork { if (_remoteStore->CanUseNetwork()) { // Load any saved stream token from persistent storage - _writeStream->SetLastStreamToken([_remoteStore->local_store() lastStreamToken]); + _remoteStore->write_stream().SetLastStreamToken([_remoteStore->local_store() lastStreamToken]); if (_remoteStore->ShouldStartWatchStream()) { _remoteStore->StartWatchStream(); @@ -170,12 +136,12 @@ - (void)disableNetwork { /** Disables the network, setting the OnlineState to the specified targetOnlineState. */ - (void)disableNetworkInternal { _remoteStore->watch_stream().Stop(); - _writeStream->Stop(); + _remoteStore->write_stream().Stop(); - if (self.writePipeline.count > 0) { + if (!_remoteStore->write_pipeline().empty()) { LOG_DEBUG("Stopping write stream with %s pending writes", - (unsigned long)self.writePipeline.count); - [self.writePipeline removeAllObjects]; + _remoteStore->write_pipeline().size()); + _remoteStore->write_pipeline().clear(); } _remoteStore->CleanUpWatchStreamState(); @@ -218,21 +184,6 @@ - (void)stopListeningToTargetID:(TargetId)targetID { #pragma mark Write Stream -/** - * Returns YES if the network is enabled, the write stream has not yet been started and there are - * pending writes. - */ -- (BOOL)shouldStartWriteStream { - return _remoteStore->CanUseNetwork() && !_writeStream->IsStarted() && - self.writePipeline.count > 0; -} - -- (void)startWriteStream { - HARD_ASSERT([self shouldStartWriteStream], - "startWriteStream: called when shouldStartWriteStream: is false."); - _writeStream->Start(); -} - /** * Attempts to fill our write pipeline with writes from the LocalStore. * @@ -242,154 +193,11 @@ - (void)startWriteStream { * Starts the write stream if necessary. */ - (void)fillWritePipeline { - BatchId lastBatchIDRetrieved = - self.writePipeline.count == 0 ? kBatchIdUnknown : self.writePipeline.lastObject.batchID; - while ([self canAddToWritePipeline]) { - FSTMutationBatch *batch = - [_remoteStore->local_store() nextMutationBatchAfterBatchID:lastBatchIDRetrieved]; - if (!batch) { - if (self.writePipeline.count == 0) { - _writeStream->MarkIdle(); - } - break; - } - [self addBatchToWritePipeline:batch]; - lastBatchIDRetrieved = batch.batchID; - } - - if ([self shouldStartWriteStream]) { - [self startWriteStream]; - } -} - -/** - * Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled). - */ -- (BOOL)canAddToWritePipeline { - return _remoteStore->CanUseNetwork() && self.writePipeline.count < kMaxPendingWrites; + _remoteStore->FillWritePipeline(); } -/** - * Queues additional writes to be sent to the write stream, sending them immediately if the write - * stream is established. - */ - (void)addBatchToWritePipeline:(FSTMutationBatch *)batch { - HARD_ASSERT([self canAddToWritePipeline], "addBatchToWritePipeline called when pipeline is full"); - - [self.writePipeline addObject:batch]; - - if (_writeStream->IsOpen() && _writeStream->handshake_complete()) { - _writeStream->WriteMutations(batch.mutations); - } -} - -- (void)writeStreamDidOpen { - _writeStream->WriteHandshake(); -} - -/** - * Handles a successful handshake response from the server, which is our cue to send any pending - * writes. - */ -- (void)writeStreamDidCompleteHandshake { - // Record the stream token. - [_remoteStore->local_store() setLastStreamToken:_writeStream->GetLastStreamToken()]; - - // Send the write pipeline now that the stream is established. - for (FSTMutationBatch *write in self.writePipeline) { - _writeStream->WriteMutations(write.mutations); - } -} - -/** Handles a successful StreamingWriteResponse from the server that contains a mutation result. */ -- (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commitVersion - mutationResults:(NSArray *)results { - // This is a response to a write containing mutations and should be correlated to the first - // write in our write pipeline. - NSMutableArray *writePipeline = self.writePipeline; - FSTMutationBatch *batch = writePipeline[0]; - [writePipeline removeObjectAtIndex:0]; - - FSTMutationBatchResult *batchResult = - [FSTMutationBatchResult resultWithBatch:batch - commitVersion:commitVersion - mutationResults:results - streamToken:_writeStream->GetLastStreamToken()]; - [_remoteStore->sync_engine() applySuccessfulWriteWithResult:batchResult]; - - // It's possible that with the completion of this mutation another slot has freed up. - [self fillWritePipeline]; -} - -/** - * Handles the closing of the StreamingWrite RPC, either because of an error or because the RPC - * has been terminated by the client or the server. - */ -- (void)writeStreamWasInterruptedWithError:(const Status &)error { - if (error.ok()) { - // Graceful stop (due to Stop() or idle timeout). Make sure that's desirable. - HARD_ASSERT(![self shouldStartWriteStream], - "Write stream was stopped gracefully while still needed."); - } - - // If the write stream closed due to an error, invoke the error callbacks if there are pending - // writes. - if (!error.ok() && self.writePipeline.count > 0) { - if (_writeStream->handshake_complete()) { - // This error affects the actual writes. - [self handleWriteError:error]; - } else { - // If there was an error before the handshake finished, it's possible that the server is - // unable to process the stream token we're sending. (Perhaps it's too old?) - [self handleHandshakeError:error]; - } - } - - // The write stream might have been started by refilling the write pipeline for failed writes - if ([self shouldStartWriteStream]) { - [self startWriteStream]; - } -} - -- (void)handleHandshakeError:(const Status &)error { - HARD_ASSERT(!error.ok(), "Handling write error with status OK."); - // Reset the token if it's a permanent error, signaling the write stream is - // no longer valid. Note that the handshake does not count as a write: see - // comments on `Datastore::IsPermanentWriteError` for details. - if (Datastore::IsPermanentError(error)) { - NSString *token = [_writeStream->GetLastStreamToken() base64EncodedStringWithOptions:0]; - LOG_DEBUG("FSTRemoteStore %s error before completed handshake; resetting stream token %s: " - "error code: '%s', details: '%s'", - (__bridge void *)self, token, error.code(), error.error_message()); - _writeStream->SetLastStreamToken(nil); - [_remoteStore->local_store() setLastStreamToken:nil]; - } else { - // Some other error, don't reset stream token. Our stream logic will just retry with exponential - // backoff. - } -} - -- (void)handleWriteError:(const Status &)error { - HARD_ASSERT(!error.ok(), "Handling write error with status OK."); - // Only handle permanent errors here. If it's transient, just let the retry logic kick in. - if (!Datastore::IsPermanentWriteError(error)) { - return; - } - - // If this was a permanent error, the request itself was the problem so it's not going to - // succeed if we resend it. - FSTMutationBatch *batch = self.writePipeline[0]; - [self.writePipeline removeObjectAtIndex:0]; - - // In this case it's also unlikely that the server itself is melting down--this was just a - // bad request so inhibit backoff on the next restart. - _writeStream->InhibitBackoff(); - - [_remoteStore->sync_engine() rejectFailedWriteWithBatchID:batch.batchID - error:util::MakeNSError(error)]; - - // It's possible that with the completion of this mutation another slot has freed up. - [self fillWritePipeline]; + _remoteStore->AddToWritePipeline(batch); } - (FSTTransaction *)transaction { diff --git a/Firestore/Source/Remote/FSTStream.h b/Firestore/Source/Remote/FSTStream.h deleted file mode 100644 index 211a82154f5..00000000000 --- a/Firestore/Source/Remote/FSTStream.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2017 Google - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#import - -#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" -#include "Firestore/core/src/firebase/firestore/remote/watch_change.h" -#include "Firestore/core/src/firebase/firestore/util/status.h" - -@class FSTMutationResult; - -NS_ASSUME_NONNULL_BEGIN - -#pragma mark - FSTWriteStreamDelegate - -@protocol FSTWriteStreamDelegate - -/** Called by the FSTWriteStream when it is ready to accept outbound request messages. */ -- (void)writeStreamDidOpen; - -/** - * Called by the FSTWriteStream upon a successful handshake response from the server, which is the - * receiver's cue to send any pending writes. - */ -- (void)writeStreamDidCompleteHandshake; - -/** - * Called by the FSTWriteStream upon receiving a StreamingWriteResponse from the server that - * contains mutation results. - */ -- (void)writeStreamDidReceiveResponseWithVersion: - (const firebase::firestore::model::SnapshotVersion &)commitVersion - mutationResults:(NSArray *)results; - -/** - * Called when the FSTWriteStream's underlying RPC is interrupted for whatever reason, usually - * because of an error, but possibly due to an idle timeout. The error passed to this method may be - * nil, in which case the stream was closed without attributable fault. - * - * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" - * on FSTStream for details. - */ -- (void)writeStreamWasInterruptedWithError:(const firebase::firestore::util::Status &)error; - -@end - -NS_ASSUME_NONNULL_END diff --git a/Firestore/core/src/firebase/firestore/remote/datastore.h b/Firestore/core/src/firebase/firestore/remote/datastore.h index 9e2268ea571..1c9c592977a 100644 --- a/Firestore/core/src/firebase/firestore/remote/datastore.h +++ b/Firestore/core/src/firebase/firestore/remote/datastore.h @@ -45,7 +45,6 @@ #include "grpcpp/support/status.h" #import "Firestore/Source/Core/FSTTypes.h" -#import "Firestore/Source/Remote/FSTStream.h" namespace firebase { namespace firestore { @@ -91,7 +90,7 @@ class Datastore : public std::enable_shared_from_this { * shared channel. */ virtual std::shared_ptr CreateWriteStream( - id delegate); + WriteStreamCallback* callback); void CommitMutations(NSArray* mutations, FSTVoidErrorBlock completion); diff --git a/Firestore/core/src/firebase/firestore/remote/datastore.mm b/Firestore/core/src/firebase/firestore/remote/datastore.mm index 1bf2376d8f8..d7f955abed8 100644 --- a/Firestore/core/src/firebase/firestore/remote/datastore.mm +++ b/Firestore/core/src/firebase/firestore/remote/datastore.mm @@ -158,10 +158,10 @@ void LogGrpcCallFinished(absl::string_view rpc_name, } std::shared_ptr Datastore::CreateWriteStream( - id delegate) { + WriteStreamCallback* callback) { return std::make_shared(worker_queue_, credentials_, serializer_bridge_.GetSerializer(), - &grpc_connection_, delegate); + &grpc_connection_, callback); } void Datastore::CommitMutations(NSArray* mutations, diff --git a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h index e5cefba467a..63f23e03781 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h +++ b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h @@ -39,7 +39,6 @@ #import "Firestore/Source/Local/FSTQueryData.h" #import "Firestore/Source/Model/FSTMutation.h" #import "Firestore/Source/Remote/FSTSerializerBeta.h" -#import "Firestore/Source/Remote/FSTStream.h" namespace firebase { namespace firestore { @@ -125,7 +124,7 @@ class WriteStreamSerializer { GCFSWriteResponse* ParseResponse(const grpc::ByteBuffer& message, util::Status* out_status) const; model::SnapshotVersion ToCommitVersion(GCFSWriteResponse* proto) const; - NSArray* ToMutationResults( + std::vector ToMutationResults( GCFSWriteResponse* proto) const; /** Creates a pretty-printed description of the proto for debugging. */ @@ -172,23 +171,6 @@ class DatastoreSerializer { FSTSerializerBeta* serializer_; }; -/** A C++ bridge that invokes methods on an `FSTWriteStreamDelegate`. */ -class WriteStreamDelegate { - public: - explicit WriteStreamDelegate(id delegate) - : delegate_{delegate} { - } - - void NotifyDelegateOnOpen(); - void NotifyDelegateOnHandshakeComplete(); - void NotifyDelegateOnCommit(const model::SnapshotVersion& commit_version, - NSArray* results); - void NotifyDelegateOnClose(const util::Status& status); - - private: - __weak id delegate_; -}; - } // namespace bridge } // namespace remote } // namespace firestore diff --git a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm index 406f9199f6d..c2f0882b807 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm +++ b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm @@ -208,16 +208,16 @@ bool IsLoggingEnabled() { return [serializer_ decodedVersion:proto.commitTime]; } -NSArray* WriteStreamSerializer::ToMutationResults( +std::vector WriteStreamSerializer::ToMutationResults( GCFSWriteResponse* response) const { NSMutableArray* responses = response.writeResultsArray; - NSMutableArray* results = - [NSMutableArray arrayWithCapacity:responses.count]; + std::vector results; + results.reserve(responses.count); const model::SnapshotVersion commitVersion = ToCommitVersion(response); for (GCFSWriteResult* proto in responses) { - [results addObject:[serializer_ decodedMutationResult:proto - commitVersion:commitVersion]]; + results.push_back([serializer_ decodedMutationResult:proto + commitVersion:commitVersion]); }; return results; } @@ -300,27 +300,6 @@ bool IsLoggingEnabled() { return [serializer_ decodedMaybeDocumentFromBatch:response]; } -// WriteStreamDelegate - -void WriteStreamDelegate::NotifyDelegateOnOpen() { - [delegate_ writeStreamDidOpen]; -} - -void WriteStreamDelegate::NotifyDelegateOnHandshakeComplete() { - [delegate_ writeStreamDidCompleteHandshake]; -} - -void WriteStreamDelegate::NotifyDelegateOnCommit( - const SnapshotVersion& commit_version, - NSArray* results) { - [delegate_ writeStreamDidReceiveResponseWithVersion:commit_version - mutationResults:results]; -} - -void WriteStreamDelegate::NotifyDelegateOnClose(const Status& status) { - [delegate_ writeStreamWasInterruptedWithError:status]; -} - } // namespace bridge } // namespace remote } // namespace firestore diff --git a/Firestore/core/src/firebase/firestore/remote/remote_store.h b/Firestore/core/src/firebase/firestore/remote/remote_store.h index 4d9180fe365..6d0d0bfcf6f 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_store.h +++ b/Firestore/core/src/firebase/firestore/remote/remote_store.h @@ -25,6 +25,7 @@ #include #include +#include #include "Firestore/core/src/firebase/firestore/model/document_key_set.h" #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" @@ -34,10 +35,12 @@ #include "Firestore/core/src/firebase/firestore/remote/remote_event.h" #include "Firestore/core/src/firebase/firestore/remote/watch_change.h" #include "Firestore/core/src/firebase/firestore/remote/watch_stream.h" +#include "Firestore/core/src/firebase/firestore/remote/write_stream.h" #include "Firestore/core/src/firebase/firestore/util/async_queue.h" #include "Firestore/core/src/firebase/firestore/util/status.h" @class FSTLocalStore; +@class FSTMutationBatch; @class FSTMutationBatchResult; @class FSTQueryData; @@ -104,7 +107,9 @@ namespace firebase { namespace firestore { namespace remote { -class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { +class RemoteStore : public TargetMetadataProvider, + public WatchStreamCallback, + public WriteStreamCallback { public: RemoteStore(FSTLocalStore* local_store, Datastore* datastore, @@ -112,6 +117,7 @@ class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { std::function online_state_handler); // TODO(varconst): remove the getters and setters + id sync_engine() { return sync_engine_; } @@ -134,6 +140,13 @@ class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { WatchStream& watch_stream() { return *watch_stream_; } + WriteStream& write_stream() { + return *write_stream_; + } + + std::vector& write_pipeline() { + return write_pipeline_; + } /** Listens to the target identified by the given `FSTQueryData`. */ void Listen(FSTQueryData* query_data); @@ -151,6 +164,13 @@ class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { const model::SnapshotVersion& snapshot_version) override; void OnWatchStreamClose(const util::Status& status) override; + void OnWriteStreamOpen() override; + void OnWriteStreamHandshakeComplete() override; + void OnWriteStreamClose(const util::Status& status) override; + void OnWriteStreamMutationResult( + model::SnapshotVersion commit_version, + std::vector mutation_results) override; + // TODO(varconst): make the following methods private. bool CanUseNetwork() const; @@ -165,6 +185,22 @@ class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { void CleanUpWatchStreamState(); + /** + * Attempts to fill our write pipeline with writes from the `FSTLocalStore`. + * + * Called internally to bootstrap or refill the write pipeline and by + * `FSTSyncEngine` whenever there are new mutations to process. + * + * Starts the write stream if necessary. + */ + void FillWritePipeline(); + + /** + * Queues additional writes to be sent to the write stream, sending them + * immediately if the write stream is established. + */ + void AddToWritePipeline(FSTMutationBatch* batch); + private: void SendWatchRequest(FSTQueryData* query_data); void SendUnwatchRequest(model::TargetId target_id); @@ -178,6 +214,23 @@ class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { /** Process a target error and passes the error along to `SyncEngine`. */ void ProcessTargetError(const WatchTargetChange& change); + /** + * Returns true if we can add to the write pipeline (i.e. it is not full and + * the network is enabled). + */ + bool CanAddToWritePipeline() const; + + void StartWriteStream(); + + /** + * Returns true if the network is enabled, the write stream has not yet been + * started and there are pending writes. + */ + bool ShouldStartWriteStream() const; + + void HandleHandshakeError(const util::Status& status); + void HandleWriteError(const util::Status& status); + id sync_engine_ = nil; /** @@ -206,7 +259,27 @@ class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { bool is_network_enabled_ = false; std::shared_ptr watch_stream_; + std::shared_ptr write_stream_; std::unique_ptr watch_change_aggregator_; + + /** + * A list of up to `kMaxPendingWrites` writes that we have fetched from the + * `LocalStore` via `FillWritePipeline` and have or will send to the write + * stream. + * + * Whenever `write_pipeline_` is not empty, the `RemoteStore` will attempt to + * start or restart the write stream. When the stream is established, the + * writes in the pipeline will be sent in order. + * + * Writes remain in `write_pipeline_` until they are acknowledged by the + * backend and thus will automatically be re-sent if the stream is interrupted + * / restarted before they're acknowledged. + * + * Write responses from the backend are linked to their originating request + * purely based on order, and so we can just remove writes from the front of + * the `write_pipeline_` as we receive responses. + */ + std::vector write_pipeline_; }; } // namespace remote diff --git a/Firestore/core/src/firebase/firestore/remote/remote_store.mm b/Firestore/core/src/firebase/firestore/remote/remote_store.mm index f40a5e7b31d..3779086aeba 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_store.mm +++ b/Firestore/core/src/firebase/firestore/remote/remote_store.mm @@ -20,16 +20,20 @@ #import "Firestore/Source/Local/FSTLocalStore.h" #import "Firestore/Source/Local/FSTQueryData.h" +#import "Firestore/Source/Model/FSTMutationBatch.h" +#include "Firestore/core/src/firebase/firestore/model/mutation_batch.h" #include "Firestore/core/src/firebase/firestore/util/error_apple.h" #include "Firestore/core/src/firebase/firestore/util/hard_assert.h" +#include "Firestore/core/src/firebase/firestore/util/log.h" #include "absl/memory/memory.h" +using firebase::firestore::model::BatchId; using firebase::firestore::model::DocumentKeySet; using firebase::firestore::model::OnlineState; using firebase::firestore::model::SnapshotVersion; -using firebase::firestore::model::DocumentKeySet; using firebase::firestore::model::TargetId; +using firebase::firestore::model::kBatchIdUnknown; using firebase::firestore::remote::Datastore; using firebase::firestore::remote::WatchStream; using firebase::firestore::remote::DocumentWatchChange; @@ -48,6 +52,12 @@ namespace firestore { namespace remote { +/** + * The maximum number of pending writes to allow. + * TODO(b/35853402): Negotiate this value with the backend. + */ +constexpr int kMaxPendingWrites = 10; + RemoteStore::RemoteStore( FSTLocalStore* local_store, Datastore* datastore, @@ -57,8 +67,11 @@ online_state_tracker_{worker_queue, std::move(online_state_handler)} { // Create streams (but note they're not started yet) watch_stream_ = datastore->CreateWatchStream(this); + write_stream_ = datastore->CreateWriteStream(this); } +// Watch Stream + void RemoteStore::Listen(FSTQueryData* query_data) { TargetId targetKey = query_data.targetID; HARD_ASSERT(listen_targets_.find(targetKey) == listen_targets_.end(), @@ -96,15 +109,6 @@ } } -FSTQueryData* RemoteStore::GetQueryDataForTarget(TargetId target_id) const { - auto found = listen_targets_.find(target_id); - return found != listen_targets_.end() ? found->second : nil; -} - -DocumentKeySet RemoteStore::GetRemoteKeysForTarget(TargetId target_id) const { - return [sync_engine_ remoteKeysForTarget:target_id]; -} - void RemoteStore::SendWatchRequest(FSTQueryData* query_data) { // We need to increment the the expected number of pending responses we're due // from watch so we wait for the ack to process any messages from this target. @@ -120,6 +124,11 @@ watch_stream_->UnwatchTargetId(target_id); } +bool RemoteStore::ShouldStartWatchStream() const { + return CanUseNetwork() && !watch_stream_->IsStarted() && + !listen_targets_.empty(); +} + void RemoteStore::StartWatchStream() { HARD_ASSERT(ShouldStartWatchStream(), "StartWatchStream called when ShouldStartWatchStream is false."); @@ -129,17 +138,6 @@ online_state_tracker_.HandleWatchStreamStart(); } -bool RemoteStore::ShouldStartWatchStream() const { - return CanUseNetwork() && !watch_stream_->IsStarted() && - !listen_targets_.empty(); -} - -bool RemoteStore::CanUseNetwork() const { - // PORTING NOTE: This method exists mostly because web also has to take into - // account primary vs. secondary state. - return is_network_enabled_; -} - void RemoteStore::CleanUpWatchStreamState() { watch_change_aggregator_.reset(); } @@ -291,6 +289,185 @@ } } +// Write Stream + +void RemoteStore::FillWritePipeline() { + BatchId last_batch_id_retrieved = write_pipeline_.empty() + ? kBatchIdUnknown + : write_pipeline_.back().batchID; + while (CanAddToWritePipeline()) { + FSTMutationBatch* batch = + [local_store_ nextMutationBatchAfterBatchID:last_batch_id_retrieved]; + if (!batch) { + if (write_pipeline_.empty()) { + write_stream_->MarkIdle(); + } + break; + } + AddToWritePipeline(batch); + last_batch_id_retrieved = batch.batchID; + } + + if (ShouldStartWriteStream()) { + StartWriteStream(); + } +} + +bool RemoteStore::CanAddToWritePipeline() const { + return CanUseNetwork() && write_pipeline_.size() < kMaxPendingWrites; +} + +void RemoteStore::AddToWritePipeline(FSTMutationBatch* batch) { + HARD_ASSERT(CanAddToWritePipeline(), + "AddToWritePipeline called when pipeline is full"); + + write_pipeline_.push_back(batch); + + if (write_stream_->IsOpen() && write_stream_->handshake_complete()) { + write_stream_->WriteMutations(batch.mutations); + } +} + +bool RemoteStore::ShouldStartWriteStream() const { + return CanUseNetwork() && !write_stream_->IsStarted() && + !write_pipeline_.empty(); +} + +void RemoteStore::StartWriteStream() { + HARD_ASSERT(ShouldStartWriteStream(), "StartWriteStream called when " + "ShouldStartWriteStream is false."); + write_stream_->Start(); +} + +void RemoteStore::OnWriteStreamOpen() { + write_stream_->WriteHandshake(); +} + +void RemoteStore::OnWriteStreamHandshakeComplete() { + // Record the stream token. + [local_store_ setLastStreamToken:write_stream_->GetLastStreamToken()]; + + // Send the write pipeline now that the stream is established. + for (FSTMutationBatch* write : write_pipeline_) { + write_stream_->WriteMutations(write.mutations); + } +} + +void RemoteStore::OnWriteStreamMutationResult( + SnapshotVersion commit_version, + std::vector mutation_results) { + // This is a response to a write containing mutations and should be correlated + // to the first write in our write pipeline. + HARD_ASSERT(!write_pipeline_.empty(), "Got result for empty write pipeline"); + + FSTMutationBatch* batch = write_pipeline_.front(); + write_pipeline_.erase(write_pipeline_.begin()); + + FSTMutationBatchResult* batchResult = [FSTMutationBatchResult + resultWithBatch:batch + commitVersion:commit_version + mutationResults:std::move(mutation_results) + streamToken:write_stream_->GetLastStreamToken()]; + [sync_engine_ applySuccessfulWriteWithResult:batchResult]; + + // It's possible that with the completion of this mutation another slot has + // freed up. + FillWritePipeline(); +} + +void RemoteStore::OnWriteStreamClose(const Status& status) { + if (status.ok()) { + // Graceful stop (due to Stop() or idle timeout). Make sure that's + // desirable. + HARD_ASSERT(!ShouldStartWriteStream(), + "Write stream was stopped gracefully while still needed."); + } + + // If the write stream closed due to an error, invoke the error callbacks if + // there are pending writes. + if (!status.ok() && !write_pipeline_.empty()) { + // TODO(varconst): handle UNAUTHENTICATED status, see + // go/firestore-client-errors + if (write_stream_->handshake_complete()) { + // This error affects the actual writes. + HandleWriteError(status); + } else { + // If there was an error before the handshake finished, it's possible that + // the server is unable to process the stream token we're sending. + // (Perhaps it's too old?) + HandleHandshakeError(status); + } + } + + // The write stream might have been started by refilling the write pipeline + // for failed writes + if (ShouldStartWriteStream()) { + StartWriteStream(); + } +} + +void RemoteStore::HandleHandshakeError(const Status& status) { + HARD_ASSERT(!status.ok(), "Handling write error with status OK."); + + // Reset the token if it's a permanent error, signaling the write stream is + // no longer valid. Note that the handshake does not count as a write: see + // comments on `Datastore::IsPermanentWriteError` for details. + if (Datastore::IsPermanentError(status)) { + NSString* token = + [write_stream_->GetLastStreamToken() base64EncodedStringWithOptions:0]; + LOG_DEBUG("RemoteStore %s error before completed handshake; resetting " + "stream token %s: " + "error code: '%s', details: '%s'", + this, token, status.code(), status.error_message()); + write_stream_->SetLastStreamToken(nil); + [local_store_ setLastStreamToken:nil]; + } else { + // Some other error, don't reset stream token. Our stream logic will just + // retry with exponential backoff. + } +} + +void RemoteStore::HandleWriteError(const Status& status) { + HARD_ASSERT(!status.ok(), "Handling write error with status OK."); + + // Only handle permanent errors here. If it's transient, just let the retry + // logic kick in. + if (!Datastore::IsPermanentWriteError(status)) { + return; + } + + // If this was a permanent error, the request itself was the problem so it's + // not going to succeed if we resend it. + FSTMutationBatch* batch = write_pipeline_.front(); + write_pipeline_.erase(write_pipeline_.begin()); + + // In this case it's also unlikely that the server itself is melting + // down--this was just a bad request so inhibit backoff on the next restart. + write_stream_->InhibitBackoff(); + + [sync_engine_ rejectFailedWriteWithBatchID:batch.batchID + error:util::MakeNSError(status)]; + + // It's possible that with the completion of this mutation another slot has + // freed up. + FillWritePipeline(); +} + +bool RemoteStore::CanUseNetwork() const { + // PORTING NOTE: This method exists mostly because web also has to take into + // account primary vs. secondary state. + return is_network_enabled_; +} + +DocumentKeySet RemoteStore::GetRemoteKeysForTarget(TargetId target_id) const { + return [sync_engine_ remoteKeysForTarget:target_id]; +} + +FSTQueryData* RemoteStore::GetQueryDataForTarget(TargetId target_id) const { + auto found = listen_targets_.find(target_id); + return found != listen_targets_.end() ? found->second : nil; +} + } // namespace remote } // namespace firestore } // namespace firebase diff --git a/Firestore/core/src/firebase/firestore/remote/write_stream.h b/Firestore/core/src/firebase/firestore/remote/write_stream.h index 8e34e3eac25..8e85bcf1693 100644 --- a/Firestore/core/src/firebase/firestore/remote/write_stream.h +++ b/Firestore/core/src/firebase/firestore/remote/write_stream.h @@ -24,7 +24,9 @@ #import #include #include +#include +#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" #include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h" #include "Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h" #include "Firestore/core/src/firebase/firestore/remote/stream.h" @@ -37,10 +39,46 @@ #import "Firestore/Source/Model/FSTMutation.h" #import "Firestore/Source/Remote/FSTSerializerBeta.h" +@class FSTMutationResult; + namespace firebase { namespace firestore { namespace remote { +class WriteStreamCallback { + public: + /** + * Called by the `WriteStream` when it is ready to accept outbound request + * messages. + */ + virtual void OnWriteStreamOpen() = 0; + + /** + * Called by the `WriteStream` upon a successful handshake response from the + * server, which is the receiver's cue to send any pending writes. + */ + virtual void OnWriteStreamHandshakeComplete() = 0; + + /** + * Called by the `WriteStream` upon receiving a StreamingWriteResponse from + * the server that contains mutation results. + */ + virtual void OnWriteStreamMutationResult( + model::SnapshotVersion commit_version, + std::vector results) = 0; + + /** + * Called when the `WriteStream`'s underlying RPC is interrupted for whatever + * reason, usually because of an error, but possibly due to an idle timeout. + * The status passed to this method may be "ok", in which case the stream was + * closed without attributable fault. + * + * NOTE: This will not be called after `Stop` is called on the stream. See + * "Starting and Stopping" on `Stream` for details. + */ + virtual void OnWriteStreamClose(const util::Status& status) = 0; +}; + /** * A Stream that implements the Write RPC. * @@ -65,7 +103,7 @@ class WriteStream : public Stream { auth::CredentialsProvider* credentials_provider, FSTSerializerBeta* serializer, GrpcConnection* grpc_connection, - id delegate); + WriteStreamCallback* callback); void SetLastStreamToken(NSData* token); /** @@ -115,7 +153,7 @@ class WriteStream : public Stream { } bridge::WriteStreamSerializer serializer_bridge_; - bridge::WriteStreamDelegate delegate_bridge_; + WriteStreamCallback* callback_ = nullptr; bool handshake_complete_ = false; }; diff --git a/Firestore/core/src/firebase/firestore/remote/write_stream.mm b/Firestore/core/src/firebase/firestore/remote/write_stream.mm index 17f46a53e04..db778509fda 100644 --- a/Firestore/core/src/firebase/firestore/remote/write_stream.mm +++ b/Firestore/core/src/firebase/firestore/remote/write_stream.mm @@ -36,11 +36,11 @@ CredentialsProvider* credentials_provider, FSTSerializerBeta* serializer, GrpcConnection* grpc_connection, - id delegate) + WriteStreamCallback* callback) : Stream{async_queue, credentials_provider, grpc_connection, TimerId::WriteStreamConnectionBackoff, TimerId::WriteStreamIdle}, serializer_bridge_{serializer}, - delegate_bridge_{delegate} { + callback_{NOT_NULL(callback)} { } void WriteStream::SetLastStreamToken(NSData* token) { @@ -97,11 +97,11 @@ } void WriteStream::NotifyStreamOpen() { - delegate_bridge_.NotifyDelegateOnOpen(); + callback_->OnWriteStreamOpen(); } void WriteStream::NotifyStreamClose(const Status& status) { - delegate_bridge_.NotifyDelegateOnClose(status); + callback_->OnWriteStreamClose(status); // Delegate's logic might depend on whether handshake was completed, so only // reset it after notifying. handshake_complete_ = false; @@ -124,14 +124,14 @@ if (!handshake_complete()) { // The first response is the handshake response handshake_complete_ = true; - delegate_bridge_.NotifyDelegateOnHandshakeComplete(); + callback_->OnWriteStreamHandshakeComplete(); } else { // A successful first write response means the stream is healthy. // Note that we could consider a successful handshake healthy, however, the // write itself might be causing an error we want to back off from. backoff_.Reset(); - delegate_bridge_.NotifyDelegateOnCommit( + callback_->OnWriteStreamMutationResult( serializer_bridge_.ToCommitVersion(response), serializer_bridge_.ToMutationResults(response)); }