diff --git a/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm b/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm index f057cb5deab..ec43e8ba7eb 100644 --- a/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm +++ b/Firestore/Example/Tests/Integration/FSTDatastoreTests.mm @@ -219,7 +219,7 @@ - (void)testStreamingWrite { FSTRemoteStoreEventCapture *capture = [[FSTRemoteStoreEventCapture alloc] initWithTestCase:self]; [capture expectWriteEventWithDescription:@"write mutations"]; - _remoteStore.syncEngine = capture; + [_remoteStore setSyncEngine:capture]; FSTSetMutation *mutation = [self setMutation]; FSTMutationBatch *batch = [[FSTMutationBatch alloc] initWithBatchID:23 diff --git a/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm b/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm index 6363c652be8..22172c8f959 100644 --- a/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm +++ b/Firestore/Example/Tests/Local/FSTLocalStoreTests.mm @@ -19,6 +19,8 @@ #import #import +#include + #import "Firestore/Source/Core/FSTQuery.h" #import "Firestore/Source/Local/FSTLocalWriteResult.h" #import "Firestore/Source/Local/FSTPersistence.h" @@ -51,6 +53,7 @@ using firebase::firestore::model::SnapshotVersion; using firebase::firestore::model::TargetId; using firebase::firestore::remote::RemoteEvent; +using firebase::firestore::remote::TestTargetMetadataProvider; using firebase::firestore::remote::WatchChangeAggregator; using firebase::firestore::remote::WatchTargetChange; using firebase::firestore::remote::WatchTargetChangeState; @@ -906,9 +909,9 @@ - (void)testPersistsResumeTokens { NSData *resumeToken = FSTTestResumeTokenFromSnapshotVersion(1000); WatchTargetChange watchChange{WatchTargetChangeState::Current, {targetID}, resumeToken}; - WatchChangeAggregator aggregator{[FSTTestTargetMetadataProvider - providerWithSingleResultForKey:testutil::Key("foo/bar") - targets:{targetID}]}; + auto metadataProvider = TestTargetMetadataProvider::CreateSingleResultProvider( + testutil::Key("foo/bar"), std::vector{targetID}); + WatchChangeAggregator aggregator{&metadataProvider}; aggregator.HandleTargetChange(watchChange); RemoteEvent remoteEvent = aggregator.CreateRemoteEvent(testutil::Version(1000)); [self applyRemoteEvent:remoteEvent]; diff --git a/Firestore/Example/Tests/Remote/FSTRemoteEventTests.mm b/Firestore/Example/Tests/Remote/FSTRemoteEventTests.mm index baaa627b63d..3172d63e812 100644 --- a/Firestore/Example/Tests/Remote/FSTRemoteEventTests.mm +++ b/Firestore/Example/Tests/Remote/FSTRemoteEventTests.mm @@ -46,6 +46,7 @@ using firebase::firestore::remote::ExistenceFilterWatchChange; using firebase::firestore::remote::RemoteEvent; using firebase::firestore::remote::TargetChange; +using firebase::firestore::remote::TestTargetMetadataProvider; using firebase::firestore::remote::WatchChange; using firebase::firestore::remote::WatchChangeAggregator; using firebase::firestore::remote::WatchTargetChange; @@ -92,13 +93,12 @@ @interface FSTRemoteEventTests : XCTestCase @implementation FSTRemoteEventTests { NSData *_resumeToken1; - FSTTestTargetMetadataProvider *_targetMetadataProvider; + TestTargetMetadataProvider _targetMetadataProvider; std::unordered_map _noOutstandingResponses; } - (void)setUp { _resumeToken1 = [@"resume1" dataUsingEncoding:NSUTF8StringEncoding]; - _targetMetadataProvider = [FSTTestTargetMetadataProvider new]; } /** @@ -145,7 +145,7 @@ - (void)setUp { * considered active, or `_noOutstandingResponses` if all targets are already active. * @param existingKeys The set of documents that are considered synced with the test targets as * part of a previous listen. To modify this set during test execution, invoke - * `[_targetMetadataProvider setSyncedKeys:forQueryData:]`. + * `_targetMetadataProvider.SetSyncedKeys()`. * @param watchChanges The watch changes to apply before returning the aggregator. Supported * changes are `DocumentWatchChange` and `WatchTargetChange`. */ @@ -154,7 +154,7 @@ - (void)setUp { outstandingResponses:(const std::unordered_map &)outstandingResponses existingKeys:(DocumentKeySet)existingKeys changes:(const std::vector> &)watchChanges { - WatchChangeAggregator aggregator{_targetMetadataProvider}; + WatchChangeAggregator aggregator{&_targetMetadataProvider}; std::vector targetIDs; for (const auto &kv : targetMap) { @@ -162,7 +162,7 @@ - (void)setUp { FSTQueryData *queryData = kv.second; targetIDs.push_back(targetID); - [_targetMetadataProvider setSyncedKeys:existingKeys forQueryData:queryData]; + _targetMetadataProvider.SetSyncedKeys(existingKeys, queryData); }; for (const auto &kv : outstandingResponses) { @@ -223,7 +223,7 @@ - (void)setUp { - (void)testWillAccumulateDocumentAddedAndRemovedEvents { // The target map that contains an entry for every target in this test. If a target ID is - // omitted, the target is considered inactive and FSTTestTargetMetadataProvider will fail on + // omitted, the target is considered inactive and `TestTargetMetadataProvider` will fail on // access. std::unordered_map targetMap{ [self queryDataForTargets:{1, 2, 3, 4, 5, 6}]}; @@ -614,8 +614,7 @@ - (void)testDocumentUpdate { XCTAssertEqualObjects(event.document_updates().at(doc1.key), doc1); XCTAssertEqualObjects(event.document_updates().at(doc2.key), doc2); - [_targetMetadataProvider setSyncedKeys:DocumentKeySet{doc1.key, doc2.key} - forQueryData:targetMap[1]]; + _targetMetadataProvider.SetSyncedKeys(DocumentKeySet{doc1.key, doc2.key}, targetMap[1]); FSTDeletedDocument *deletedDoc1 = [FSTDeletedDocument documentWithKey:doc1.key version:testutil::Version(3) diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h index 9d58e3416bb..59afab20fa1 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.h @@ -39,7 +39,7 @@ class MockDatastore : public Datastore { util::AsyncQueue* worker_queue, auth::CredentialsProvider* credentials); - std::shared_ptr CreateWatchStream(id delegate) override; + std::shared_ptr CreateWatchStream(WatchStreamCallback* callback) override; std::shared_ptr CreateWriteStream(id delegate) override; /** diff --git a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm index f8b6a2d6f87..3d7214c246e 100644 --- a/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm +++ b/Firestore/Example/Tests/SpecTests/FSTMockDatastore.mm @@ -70,11 +70,11 @@ CredentialsProvider* credentials_provider, FSTSerializerBeta* serializer, GrpcConnection* grpc_connection, - id delegate, + WatchStreamCallback* callback, MockDatastore* datastore) - : WatchStream{worker_queue, credentials_provider, serializer, grpc_connection, delegate}, + : WatchStream{worker_queue, credentials_provider, serializer, grpc_connection, callback}, datastore_{datastore}, - delegate_{delegate} { + callback_{callback} { } const std::unordered_map& ActiveTargets() const { @@ -84,7 +84,7 @@ void Start() override { HARD_ASSERT(!open_, "Trying to start already started watch stream"); open_ = true; - [delegate_ watchStreamDidOpen]; + callback_->OnWatchStreamOpen(); } void Stop() override { @@ -118,7 +118,7 @@ void UnwatchTargetId(model::TargetId target_id) override { void FailStream(const Status& error) { open_ = false; - [delegate_ watchStreamWasInterruptedWithError:error]; + callback_->OnWatchStreamClose(error); } void WriteWatchChange(const WatchChange& change, SnapshotVersion snap) { @@ -145,14 +145,14 @@ void WriteWatchChange(const WatchChange& change, SnapshotVersion snap) { } } - [delegate_ watchStreamDidChange:change snapshotVersion:snap]; + callback_->OnWatchStreamChange(change, snap); } private: bool open_ = false; std::unordered_map active_targets_; MockDatastore* datastore_ = nullptr; - id delegate_ = nullptr; + WatchStreamCallback* callback_ = nullptr; }; class MockWriteStream : public WriteStream { @@ -248,11 +248,11 @@ int sent_mutations_count() const { credentials_{credentials} { } -std::shared_ptr MockDatastore::CreateWatchStream(id delegate) { +std::shared_ptr MockDatastore::CreateWatchStream(WatchStreamCallback* callback) { watch_stream_ = std::make_shared( worker_queue_, credentials_, [[FSTSerializerBeta alloc] initWithDatabaseID:&database_info_->database_id()], - grpc_connection(), delegate, this); + grpc_connection(), callback, this); return watch_stream_; } diff --git a/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm b/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm index 8d7b7a236da..299f15703db 100644 --- a/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm +++ b/Firestore/Example/Tests/SpecTests/FSTSyncEngineTestDriver.mm @@ -165,7 +165,7 @@ - (instancetype)initWithPersistence:(id)persistence _syncEngine = [[FSTSyncEngine alloc] initWithLocalStore:_localStore remoteStore:_remoteStore initialUser:initialUser]; - _remoteStore.syncEngine = _syncEngine; + [_remoteStore setSyncEngine:_syncEngine]; _eventManager = [FSTEventManager eventManagerWithSyncEngine:_syncEngine]; // Set up internal event tracking for the spec tests. diff --git a/Firestore/Example/Tests/Util/FSTHelpers.h b/Firestore/Example/Tests/Util/FSTHelpers.h index c876f2071f4..6e6f57158ab 100644 --- a/Firestore/Example/Tests/Util/FSTHelpers.h +++ b/Firestore/Example/Tests/Util/FSTHelpers.h @@ -17,6 +17,7 @@ #import #include +#include #include #import "Firestore/Source/Model/FSTDocument.h" @@ -141,51 +142,57 @@ inline NSString *FSTRemoveExceptionPrefix(NSString *exception) { } while (0) /** - * An implementation of FSTTargetMetadataProvider that provides controlled access to the - * `FSTTargetMetadataProvider` callbacks. Any target accessed via these callbacks must be + * An implementation of `TargetMetadataProvider` that provides controlled access to the + * `TargetMetadataProvider` callbacks. Any target accessed via these callbacks must be * registered beforehand via the factory methods or via `setSyncedKeys:forQueryData:`. */ -@interface FSTTestTargetMetadataProvider : NSObject - -/** - * Creates an FSTTestTargetMetadataProvider that behaves as if there's an established listen for - * each of the given targets, where each target has previously seen query results containing just - * the given documentKey. - * - * Internally this means that the `remoteKeysForTarget` callback for these targets will return just - * the documentKey and that the provided targets will be returned as active from the - * `queryDataForTarget` target. - */ -+ (instancetype) - providerWithSingleResultForKey:(firebase::firestore::model::DocumentKey)documentKey - targets: - (const std::vector &)targets; - -+ (instancetype) - providerWithSingleResultForKey:(firebase::firestore::model::DocumentKey)documentKey - listenTargets: - (const std::vector &)listenTargets - limboTargets: - (const std::vector &)limboTargets; - -/** - * Creates an FSTTestTargetMetadataProvider that behaves as if there's an established listen for - * each of the given targets, where each target has not seen any previous document. - * - * Internally this means that the `remoteKeysForTarget` callback for these targets will return an - * empty set of document keys and that the provided targets will be returned as active from the - * `queryDataForTarget` target. - */ -+ (instancetype) - providerWithEmptyResultForKey:(firebase::firestore::model::DocumentKey)documentKey - targets: - (const std::vector &)targets; +namespace firebase { +namespace firestore { +namespace remote { -/** Sets or replaces the local state for the provided query data. */ -- (void)setSyncedKeys:(firebase::firestore::model::DocumentKeySet)keys - forQueryData:(FSTQueryData *)queryData; +class TestTargetMetadataProvider : public TargetMetadataProvider { + public: + /** + * Creates a `TestTargetMetadataProvider` that behaves as if there's an established listen for + * each of the given targets, where each target has previously seen query results containing just + * the given `document_key`. + * + * Internally this means that the `GetRemoteKeysForTarget` callback for these targets will return + * just the `document_key` and that the provided targets will be returned as active from the + * `GetQueryDataForTarget` target. + */ + static TestTargetMetadataProvider CreateSingleResultProvider( + model::DocumentKey document_key, const std::vector &targets); + static TestTargetMetadataProvider CreateSingleResultProvider( + model::DocumentKey document_key, + const std::vector &targets, + const std::vector &limbo_targets); + + /** + * Creates an `TestTargetMetadataProvider` that behaves as if there's an established listen for + * each of the given targets, where each target has not seen any previous document. + * + * Internally this means that the `GetRemoteKeysForTarget` callback for these targets will return + * an empty set of document keys and that the provided targets will be returned as active from the + * `GetQueryDataForTarget` target. + */ + static TestTargetMetadataProvider CreateEmptyResultProvider( + const model::DocumentKey &document_key, const std::vector &targets); + + /** Sets or replaces the local state for the provided query data. */ + void SetSyncedKeys(model::DocumentKeySet keys, FSTQueryData *query_data); + + model::DocumentKeySet GetRemoteKeysForTarget(model::TargetId target_id) const override; + FSTQueryData *GetQueryDataForTarget(model::TargetId target_id) const override; + + private: + std::unordered_map synced_keys_; + std::unordered_map query_data_; +}; -@end +} // namespace remote +} // namespace firestore +} // namespace firebase /** Creates a new FIRTimestamp from components. Note that year, month, and day are all one-based. */ FIRTimestamp *FSTTestTimestamp(int year, int month, int day, int hour, int minute, int second); diff --git a/Firestore/Example/Tests/Util/FSTHelpers.mm b/Firestore/Example/Tests/Util/FSTHelpers.mm index e5182240d19..6ee7720c4ac 100644 --- a/Firestore/Example/Tests/Util/FSTHelpers.mm +++ b/Firestore/Example/Tests/Util/FSTHelpers.mm @@ -23,9 +23,7 @@ #include #include #include -#include #include -#include #import "Firestore/Source/API/FIRFieldPath+Internal.h" #import "Firestore/Source/API/FSTUserDataConverter.h" @@ -306,82 +304,87 @@ MaybeDocumentMap FSTTestDocUpdates(NSArray *docs) { .snapshot; } -@implementation FSTTestTargetMetadataProvider { - std::unordered_map _syncedKeys; - std::unordered_map _queryData; -} +namespace firebase { +namespace firestore { +namespace remote { -+ (instancetype)providerWithSingleResultForKey:(DocumentKey)documentKey - listenTargets:(const std::vector &)listenTargets - limboTargets:(const std::vector &)limboTargets { - FSTTestTargetMetadataProvider *metadataProvider = [FSTTestTargetMetadataProvider new]; - FSTQuery *query = [FSTQuery queryWithPath:documentKey.path()]; +TestTargetMetadataProvider TestTargetMetadataProvider::CreateSingleResultProvider( + DocumentKey document_key, + const std::vector &listen_targets, + const std::vector &limbo_targets) { + TestTargetMetadataProvider metadata_provider; + FSTQuery *query = [FSTQuery queryWithPath:document_key.path()]; - for (TargetId targetID : listenTargets) { - FSTQueryData *queryData = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:0 - purpose:FSTQueryPurposeListen]; - [metadataProvider setSyncedKeys:DocumentKeySet{documentKey} forQueryData:queryData]; + for (TargetId target_id : listen_targets) { + FSTQueryData *query_data = [[FSTQueryData alloc] initWithQuery:query + targetID:target_id + listenSequenceNumber:0 + purpose:FSTQueryPurposeListen]; + metadata_provider.SetSyncedKeys(DocumentKeySet{document_key}, query_data); } - for (TargetId targetID : limboTargets) { - FSTQueryData *queryData = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:0 - purpose:FSTQueryPurposeLimboResolution]; - [metadataProvider setSyncedKeys:DocumentKeySet{documentKey} forQueryData:queryData]; + for (TargetId target_id : limbo_targets) { + FSTQueryData *query_data = [[FSTQueryData alloc] initWithQuery:query + targetID:target_id + listenSequenceNumber:0 + purpose:FSTQueryPurposeLimboResolution]; + metadata_provider.SetSyncedKeys(DocumentKeySet{document_key}, query_data); } - return metadataProvider; + return metadata_provider; } -+ (instancetype)providerWithSingleResultForKey:(DocumentKey)documentKey - targets:(const std::vector &)targets { - return [self providerWithSingleResultForKey:documentKey listenTargets:targets limboTargets:{}]; +TestTargetMetadataProvider TestTargetMetadataProvider::CreateSingleResultProvider( + DocumentKey document_key, const std::vector &targets) { + return CreateSingleResultProvider(document_key, targets, /*limbo_targets=*/{}); } -+ (instancetype)providerWithEmptyResultForKey:(DocumentKey)documentKey - targets:(const std::vector &)targets { - FSTTestTargetMetadataProvider *metadataProvider = [FSTTestTargetMetadataProvider new]; - FSTQuery *query = [FSTQuery queryWithPath:documentKey.path()]; +TestTargetMetadataProvider TestTargetMetadataProvider::CreateEmptyResultProvider( + const DocumentKey &document_key, const std::vector &targets) { + TestTargetMetadataProvider metadata_provider; + FSTQuery *query = [FSTQuery queryWithPath:document_key.path()]; - for (TargetId targetID : targets) { - FSTQueryData *queryData = [[FSTQueryData alloc] initWithQuery:query - targetID:targetID - listenSequenceNumber:0 - purpose:FSTQueryPurposeListen]; - [metadataProvider setSyncedKeys:DocumentKeySet {} forQueryData:queryData]; + for (TargetId target_id : targets) { + FSTQueryData *query_data = [[FSTQueryData alloc] initWithQuery:query + targetID:target_id + listenSequenceNumber:0 + purpose:FSTQueryPurposeListen]; + metadata_provider.SetSyncedKeys(DocumentKeySet{}, query_data); } - return metadataProvider; + return metadata_provider; } -- (void)setSyncedKeys:(DocumentKeySet)keys forQueryData:(FSTQueryData *)queryData { - _syncedKeys[queryData.targetID] = keys; - _queryData[queryData.targetID] = queryData; +void TestTargetMetadataProvider::SetSyncedKeys(DocumentKeySet keys, FSTQueryData *query_data) { + synced_keys_[query_data.targetID] = keys; + query_data_[query_data.targetID] = query_data; } -- (DocumentKeySet)remoteKeysForTarget:(TargetId)targetID { - auto it = _syncedKeys.find(targetID); - HARD_ASSERT(it != _syncedKeys.end(), "Cannot process unknown target %s", targetID); +DocumentKeySet TestTargetMetadataProvider::GetRemoteKeysForTarget(TargetId target_id) const { + auto it = synced_keys_.find(target_id); + HARD_ASSERT(it != synced_keys_.end(), "Cannot process unknown target %s", target_id); return it->second; } -- (nullable FSTQueryData *)queryDataForTarget:(TargetId)targetID { - auto it = _queryData.find(targetID); - HARD_ASSERT(it != _queryData.end(), "Cannot process unknown target %s", targetID); +FSTQueryData *TestTargetMetadataProvider::GetQueryDataForTarget(TargetId target_id) const { + auto it = query_data_.find(target_id); + HARD_ASSERT(it != query_data_.end(), "Cannot process unknown target %s", target_id); return it->second; } -@end +} // namespace remote +} // namespace firestore +} // namespace firebase + +using firebase::firestore::remote::TestTargetMetadataProvider; RemoteEvent FSTTestAddedRemoteEvent(FSTMaybeDocument *doc, const std::vector &addedToTargets) { HARD_ASSERT(![doc isKindOfClass:[FSTDocument class]] || ![(FSTDocument *)doc hasLocalMutations], "Docs from remote updates shouldn't have local changes."); DocumentWatchChange change{addedToTargets, {}, doc.key, doc}; - WatchChangeAggregator aggregator{ - [FSTTestTargetMetadataProvider providerWithEmptyResultForKey:doc.key targets:addedToTargets]}; + auto metadataProvider = + TestTargetMetadataProvider::CreateEmptyResultProvider(doc.key, addedToTargets); + WatchChangeAggregator aggregator{&metadataProvider}; aggregator.HandleDocumentChange(change); return aggregator.CreateRemoteEvent(doc.version); } @@ -414,10 +417,9 @@ RemoteEvent FSTTestUpdateRemoteEventWithLimboTargets( std::vector listens = updatedInTargets; listens.insert(listens.end(), removedFromTargets.begin(), removedFromTargets.end()); - WatchChangeAggregator aggregator{[FSTTestTargetMetadataProvider - providerWithSingleResultForKey:doc.key - listenTargets:listens - limboTargets:limboTargets]}; + auto metadataProvider = + TestTargetMetadataProvider::CreateSingleResultProvider(doc.key, listens, limboTargets); + WatchChangeAggregator aggregator{&metadataProvider}; aggregator.HandleDocumentChange(change); return aggregator.CreateRemoteEvent(doc.version); } diff --git a/Firestore/Source/Core/FSTFirestoreClient.mm b/Firestore/Source/Core/FSTFirestoreClient.mm index 6f9161f1706..4de9a6a39f2 100644 --- a/Firestore/Source/Core/FSTFirestoreClient.mm +++ b/Firestore/Source/Core/FSTFirestoreClient.mm @@ -240,7 +240,7 @@ - (void)initializeWithUser:(const User &)user settings:(FIRFirestoreSettings *)s _eventManager = [FSTEventManager eventManagerWithSyncEngine:_syncEngine]; // Setup wiring for remote store. - _remoteStore.syncEngine = _syncEngine; + [_remoteStore setSyncEngine:_syncEngine]; // NOTE: RemoteStore depends on LocalStore (for persisting stream tokens, refilling mutation // queue, etc.) so must be started after LocalStore. diff --git a/Firestore/Source/Remote/FSTRemoteStore.h b/Firestore/Source/Remote/FSTRemoteStore.h index c52868bf788..6b1aa4824af 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.h +++ b/Firestore/Source/Remote/FSTRemoteStore.h @@ -23,6 +23,7 @@ #include "Firestore/core/src/firebase/firestore/model/types.h" #include "Firestore/core/src/firebase/firestore/remote/datastore.h" #include "Firestore/core/src/firebase/firestore/remote/remote_event.h" +#include "Firestore/core/src/firebase/firestore/remote/remote_store.h" #include "Firestore/core/src/firebase/firestore/util/async_queue.h" @class FSTLocalStore; @@ -33,64 +34,13 @@ NS_ASSUME_NONNULL_BEGIN -#pragma mark - FSTRemoteSyncer - -/** - * A protocol that describes the actions the FSTRemoteStore needs to perform on a cooperating - * synchronization engine. - */ -@protocol FSTRemoteSyncer - -/** - * Applies one remote event to the sync engine, notifying any views of the changes, and releasing - * any pending mutation batches that would become visible because of the snapshot version the - * remote event contains. - */ -- (void)applyRemoteEvent:(const firebase::firestore::remote::RemoteEvent &)remoteEvent; - -/** - * Rejects the listen for the given targetID. This can be triggered by the backend for any active - * target. - * - * @param targetID The targetID corresponding to a listen initiated via - * -listenToTargetWithQueryData: on FSTRemoteStore. - * @param error A description of the condition that has forced the rejection. Nearly always this - * will be an indication that the user is no longer authorized to see the data matching the - * target. - */ -- (void)rejectListenWithTargetID:(const firebase::firestore::model::TargetId)targetID - error:(NSError *)error; - -/** - * Applies the result of a successful write of a mutation batch to the sync engine, emitting - * snapshots in any views that the mutation applies to, and removing the batch from the mutation - * queue. - */ -- (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult; - -/** - * Rejects the batch, removing the batch from the mutation queue, recomputing the local view of - * any documents affected by the batch and then, emitting snapshots with the reverted value. - */ -- (void)rejectFailedWriteWithBatchID:(firebase::firestore::model::BatchId)batchID - error:(NSError *)error; - -/** - * Returns the set of remote document keys for the given target ID. This list includes the - * documents that were assigned to the target when we received the last snapshot. - */ -- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget: - (firebase::firestore::model::TargetId)targetId; - -@end - #pragma mark - FSTRemoteStore /** * FSTRemoteStore handles all interaction with the backend through a simple, clean interface. This * class is not thread safe and should be only called from the worker dispatch queue. */ -@interface FSTRemoteStore : NSObject +@interface FSTRemoteStore : NSObject - (instancetype) initWithLocalStore:(FSTLocalStore *)localStore @@ -101,7 +51,7 @@ NS_ASSUME_NONNULL_BEGIN - (instancetype)init NS_UNAVAILABLE; -@property(nonatomic, weak) id syncEngine; +- (void)setSyncEngine:(id)syncEngine; /** Starts up the remote store, creating streams, restoring state from LocalStore, etc. */ - (void)start; diff --git a/Firestore/Source/Remote/FSTRemoteStore.mm b/Firestore/Source/Remote/FSTRemoteStore.mm index c765e5c46a7..7b4f215c911 100644 --- a/Firestore/Source/Remote/FSTRemoteStore.mm +++ b/Firestore/Source/Remote/FSTRemoteStore.mm @@ -36,6 +36,7 @@ #include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" #include "Firestore/core/src/firebase/firestore/remote/online_state_tracker.h" #include "Firestore/core/src/firebase/firestore/remote/remote_event.h" +#include "Firestore/core/src/firebase/firestore/remote/remote_store.h" #include "Firestore/core/src/firebase/firestore/remote/stream.h" #include "Firestore/core/src/firebase/firestore/util/error_apple.h" #include "Firestore/core/src/firebase/firestore/util/hard_assert.h" @@ -61,6 +62,7 @@ using firebase::firestore::remote::ExistenceFilterWatchChange; using firebase::firestore::remote::OnlineStateTracker; using firebase::firestore::remote::RemoteEvent; +using firebase::firestore::remote::RemoteStore; using firebase::firestore::remote::TargetChange; using firebase::firestore::remote::WatchChange; using firebase::firestore::remote::WatchChangeAggregator; @@ -79,13 +81,7 @@ #pragma mark - FSTRemoteStore -@interface FSTRemoteStore () - -/** - * The local store, used to fill the write pipeline with outbound mutations and resolve existence - * filter mismatches. Immutable after initialization. - */ -@property(nonatomic, strong, readonly) FSTLocalStore *localStore; +@interface FSTRemoteStore () #pragma mark Watch Stream @@ -107,29 +103,11 @@ @interface FSTRemoteStore () @end @implementation FSTRemoteStore { - OnlineStateTracker _onlineStateTracker; - - std::unique_ptr _watchChangeAggregator; - /** The client-side proxy for interacting with the backend. */ std::shared_ptr _datastore; - /** - * A mapping of watched targets that the client cares about tracking and the - * user has explicitly called a 'listen' for this target. - * - * These targets may or may not have been sent to or acknowledged by the - * server. On re-establishing the listen stream, these targets should be sent - * to the server. The targets removed with unlistens are removed eagerly - * without waiting for confirmation from the listen stream. */ - std::unordered_map _listenTargets; - - std::shared_ptr _watchStream; + + std::unique_ptr _remoteStore; std::shared_ptr _writeStream; - /** - * Set to YES by 'enableNetwork:' and NO by 'disableNetworkInternal:' and - * indicates the user-preferred network state. - */ - BOOL _isNetworkEnabled; } - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore @@ -137,22 +115,25 @@ - (instancetype)initWithLocalStore:(FSTLocalStore *)localStore workerQueue:(AsyncQueue *)queue onlineStateHandler:(std::function)onlineStateHandler { if (self = [super init]) { - _localStore = localStore; _datastore = std::move(datastore); _writePipeline = [NSMutableArray array]; - _onlineStateTracker = OnlineStateTracker{queue, std::move(onlineStateHandler)}; _datastore->Start(); - // Create streams (but note they're not started yet) - _watchStream = _datastore->CreateWatchStream(self); + + _remoteStore = absl::make_unique(localStore, _datastore.get(), queue, + std::move(onlineStateHandler)); _writeStream = _datastore->CreateWriteStream(self); - _isNetworkEnabled = NO; + _remoteStore->set_is_network_enabled(false); } return self; } +- (void)setSyncEngine:(id)syncEngine { + _remoteStore->set_sync_engine(syncEngine); +} + - (void)start { // For now, all setup is handled by enableNetwork(). We might expand on this in the future. [self enableNetwork]; @@ -160,23 +141,17 @@ - (void)start { #pragma mark Online/Offline state -- (BOOL)canUseNetwork { - // PORTING NOTE: This method exists mostly because web also has to take into - // account primary vs. secondary state. - return _isNetworkEnabled; -} - - (void)enableNetwork { - _isNetworkEnabled = YES; + _remoteStore->set_is_network_enabled(true); - if ([self canUseNetwork]) { + if (_remoteStore->CanUseNetwork()) { // Load any saved stream token from persistent storage - _writeStream->SetLastStreamToken([self.localStore lastStreamToken]); + _writeStream->SetLastStreamToken([_remoteStore->local_store() lastStreamToken]); - if ([self shouldStartWatchStream]) { - [self startWatchStream]; + if (_remoteStore->ShouldStartWatchStream()) { + _remoteStore->StartWatchStream(); } else { - _onlineStateTracker.UpdateState(OnlineState::Unknown); + _remoteStore->online_state_tracker().UpdateState(OnlineState::Unknown); } // This will start the write stream if necessary. @@ -185,16 +160,16 @@ - (void)enableNetwork { } - (void)disableNetwork { - _isNetworkEnabled = NO; + _remoteStore->set_is_network_enabled(false); [self disableNetworkInternal]; // Set the OnlineState to Offline so get()s return from cache, etc. - _onlineStateTracker.UpdateState(OnlineState::Offline); + _remoteStore->online_state_tracker().UpdateState(OnlineState::Offline); } /** Disables the network, setting the OnlineState to the specified targetOnlineState. */ - (void)disableNetworkInternal { - _watchStream->Stop(); + _remoteStore->watch_stream().Stop(); _writeStream->Stop(); if (self.writePipeline.count > 0) { @@ -203,246 +178,42 @@ - (void)disableNetworkInternal { [self.writePipeline removeAllObjects]; } - [self cleanUpWatchStreamState]; + _remoteStore->CleanUpWatchStreamState(); } #pragma mark Shutdown - (void)shutdown { LOG_DEBUG("FSTRemoteStore %s shutting down", (__bridge void *)self); - _isNetworkEnabled = NO; + _remoteStore->set_is_network_enabled(false); [self disableNetworkInternal]; // Set the OnlineState to Unknown (rather than Offline) to avoid potentially triggering // spurious listener events with cached data, etc. - _onlineStateTracker.UpdateState(OnlineState::Unknown); + _remoteStore->online_state_tracker().UpdateState(OnlineState::Unknown); _datastore->Shutdown(); } - (void)credentialDidChange { - if ([self canUseNetwork]) { + if (_remoteStore->CanUseNetwork()) { // Tear down and re-create our network streams. This will ensure we get a fresh auth token // for the new user and re-fill the write pipeline with new mutations from the LocalStore // (since mutations are per-user). LOG_DEBUG("FSTRemoteStore %s restarting streams for new credential", (__bridge void *)self); - _isNetworkEnabled = NO; + _remoteStore->set_is_network_enabled(false); [self disableNetworkInternal]; - _onlineStateTracker.UpdateState(OnlineState::Unknown); + _remoteStore->online_state_tracker().UpdateState(OnlineState::Unknown); [self enableNetwork]; } } #pragma mark Watch Stream -- (void)startWatchStream { - HARD_ASSERT([self shouldStartWatchStream], - "startWatchStream: called when shouldStartWatchStream: is false."); - _watchChangeAggregator = absl::make_unique(self); - _watchStream->Start(); - - _onlineStateTracker.HandleWatchStreamStart(); -} - - (void)listenToTargetWithQueryData:(FSTQueryData *)queryData { - TargetId targetKey = queryData.targetID; - HARD_ASSERT(_listenTargets.find(targetKey) == _listenTargets.end(), - "listenToQuery called with duplicate target id: %s", targetKey); - - _listenTargets[targetKey] = queryData; - - if ([self shouldStartWatchStream]) { - [self startWatchStream]; - } else if (_watchStream->IsOpen()) { - [self sendWatchRequestWithQueryData:queryData]; - } -} - -- (void)sendWatchRequestWithQueryData:(FSTQueryData *)queryData { - _watchChangeAggregator->RecordPendingTargetRequest(queryData.targetID); - _watchStream->WatchQuery(queryData); + _remoteStore->Listen(queryData); } - (void)stopListeningToTargetID:(TargetId)targetID { - size_t num_erased = _listenTargets.erase(targetID); - HARD_ASSERT(num_erased == 1, "stopListeningToTargetID: target not currently watched: %s", - targetID); - - if (_watchStream->IsOpen()) { - [self sendUnwatchRequestForTargetID:targetID]; - } - if (_listenTargets.empty()) { - if (_watchStream->IsOpen()) { - _watchStream->MarkIdle(); - } else if ([self canUseNetwork]) { - // Revert to OnlineState::Unknown if the watch stream is not open and we have no listeners, - // since without any listens to send we cannot confirm if the stream is healthy and upgrade - // to OnlineState::Online. - _onlineStateTracker.UpdateState(OnlineState::Unknown); - } - } -} - -- (void)sendUnwatchRequestForTargetID:(TargetId)targetID { - _watchChangeAggregator->RecordPendingTargetRequest(targetID); - _watchStream->UnwatchTargetId(targetID); -} - -/** - * Returns YES if the network is enabled, the watch stream has not yet been started and there are - * active watch targets. - */ -- (BOOL)shouldStartWatchStream { - return [self canUseNetwork] && !_watchStream->IsStarted() && !_listenTargets.empty(); -} - -- (void)cleanUpWatchStreamState { - _watchChangeAggregator.reset(); -} - -- (void)watchStreamDidOpen { - // Restore any existing watches. - for (const auto &kv : _listenTargets) { - [self sendWatchRequestWithQueryData:kv.second]; - } -} - -- (void)watchStreamDidChange:(const WatchChange &)change - snapshotVersion:(const SnapshotVersion &)snapshotVersion { - // Mark the connection as Online because we got a message from the server. - _onlineStateTracker.UpdateState(OnlineState::Online); - - if (change.type() == WatchChange::Type::TargetChange) { - const WatchTargetChange &watchTargetChange = static_cast(change); - if (watchTargetChange.state() == WatchTargetChangeState::Removed && - !watchTargetChange.cause().ok()) { - // There was an error on a target, don't wait for a consistent snapshot to raise events - return [self processTargetErrorForWatchChange:watchTargetChange]; - } else { - _watchChangeAggregator->HandleTargetChange(watchTargetChange); - } - } else if (change.type() == WatchChange::Type::Document) { - _watchChangeAggregator->HandleDocumentChange(static_cast(change)); - } else { - HARD_ASSERT(change.type() == WatchChange::Type::ExistenceFilter, - "Expected watchChange to be an instance of ExistenceFilterWatchChange"); - _watchChangeAggregator->HandleExistenceFilter( - static_cast(change)); - } - - if (snapshotVersion != SnapshotVersion::None() && - snapshotVersion >= [self.localStore lastRemoteSnapshotVersion]) { - // We have received a target change with a global snapshot if the snapshot version is not - // equal to SnapshotVersion.None(). - [self raiseWatchSnapshotWithSnapshotVersion:snapshotVersion]; - } -} - -- (void)watchStreamWasInterruptedWithError:(const Status &)error { - if (error.ok()) { - // Graceful stop (due to Stop() or idle timeout). Make sure that's desirable. - HARD_ASSERT(![self shouldStartWatchStream], - "Watch stream was stopped gracefully while still needed."); - } - - [self cleanUpWatchStreamState]; - - // If we still need the watch stream, retry the connection. - if ([self shouldStartWatchStream]) { - _onlineStateTracker.HandleWatchStreamFailure(error); - - [self startWatchStream]; - } else { - // We don't need to restart the watch stream because there are no active targets. The online - // state is set to unknown because there is no active attempt at establishing a connection. - _onlineStateTracker.UpdateState(OnlineState::Unknown); - } -} - -/** - * Takes a batch of changes from the Datastore, repackages them as a `RemoteEvent`, and passes that - * on to the SyncEngine. - */ -- (void)raiseWatchSnapshotWithSnapshotVersion:(const SnapshotVersion &)snapshotVersion { - HARD_ASSERT(snapshotVersion != SnapshotVersion::None(), - "Can't raise event for unknown SnapshotVersion"); - - RemoteEvent remoteEvent = _watchChangeAggregator->CreateRemoteEvent(snapshotVersion); - - // Update in-memory resume tokens. `FSTLocalStore` will update the persistent view of these when - // applying the completed `RemoteEvent`. - for (const auto &entry : remoteEvent.target_changes()) { - const TargetChange &target_change = entry.second; - NSData *resumeToken = target_change.resume_token(); - if (resumeToken.length > 0) { - TargetId targetID = entry.first; - auto found = _listenTargets.find(targetID); - FSTQueryData *queryData = found != _listenTargets.end() ? found->second : nil; - // A watched target might have been removed already. - if (queryData) { - _listenTargets[targetID] = - [queryData queryDataByReplacingSnapshotVersion:snapshotVersion - resumeToken:resumeToken - sequenceNumber:queryData.sequenceNumber]; - } - } - } - - // Re-establish listens for the targets that have been invalidated by existence filter - // mismatches. - for (TargetId targetID : remoteEvent.target_mismatches()) { - auto found = _listenTargets.find(targetID); - if (found == _listenTargets.end()) { - // A watched target might have been removed already. - continue; - } - FSTQueryData *queryData = found->second; - - // Clear the resume token for the query, since we're in a known mismatch state. - queryData = [[FSTQueryData alloc] initWithQuery:queryData.query - targetID:targetID - listenSequenceNumber:queryData.sequenceNumber - purpose:queryData.purpose]; - _listenTargets[targetID] = queryData; - - // Cause a hard reset by unwatching and rewatching immediately, but deliberately don't send a - // resume token so that we get a full update. - [self sendUnwatchRequestForTargetID:targetID]; - - // Mark the query we send as being on behalf of an existence filter mismatch, but don't - // actually retain that in _listenTargets. This ensures that we flag the first re-listen this - // way without impacting future listens of this target (that might happen e.g. on reconnect). - FSTQueryData *requestQueryData = - [[FSTQueryData alloc] initWithQuery:queryData.query - targetID:targetID - listenSequenceNumber:queryData.sequenceNumber - purpose:FSTQueryPurposeExistenceFilterMismatch]; - [self sendWatchRequestWithQueryData:requestQueryData]; - } - - // Finally handle remote event - [self.syncEngine applyRemoteEvent:remoteEvent]; -} - -/** Process a target error and passes the error along to SyncEngine. */ -- (void)processTargetErrorForWatchChange:(const WatchTargetChange &)change { - HARD_ASSERT(!change.cause().ok(), "Handling target error without a cause"); - // Ignore targets that have been removed already. - for (TargetId targetID : change.target_ids()) { - auto found = _listenTargets.find(targetID); - if (found != _listenTargets.end()) { - _listenTargets.erase(found); - _watchChangeAggregator->RemoveTarget(targetID); - [self.syncEngine rejectListenWithTargetID:targetID error:util::MakeNSError(change.cause())]; - } - } -} - -- (DocumentKeySet)remoteKeysForTarget:(TargetId)targetID { - return [self.syncEngine remoteKeysForTarget:targetID]; -} - -- (nullable FSTQueryData *)queryDataForTarget:(TargetId)targetID { - auto found = _listenTargets.find(targetID); - return found != _listenTargets.end() ? found->second : nil; + _remoteStore->StopListening(targetID); } #pragma mark Write Stream @@ -452,7 +223,8 @@ - (nullable FSTQueryData *)queryDataForTarget:(TargetId)targetID { * pending writes. */ - (BOOL)shouldStartWriteStream { - return [self canUseNetwork] && !_writeStream->IsStarted() && self.writePipeline.count > 0; + return _remoteStore->CanUseNetwork() && !_writeStream->IsStarted() && + self.writePipeline.count > 0; } - (void)startWriteStream { @@ -473,7 +245,8 @@ - (void)fillWritePipeline { BatchId lastBatchIDRetrieved = self.writePipeline.count == 0 ? kBatchIdUnknown : self.writePipeline.lastObject.batchID; while ([self canAddToWritePipeline]) { - FSTMutationBatch *batch = [self.localStore nextMutationBatchAfterBatchID:lastBatchIDRetrieved]; + FSTMutationBatch *batch = + [_remoteStore->local_store() nextMutationBatchAfterBatchID:lastBatchIDRetrieved]; if (!batch) { if (self.writePipeline.count == 0) { _writeStream->MarkIdle(); @@ -493,7 +266,7 @@ - (void)fillWritePipeline { * Returns YES if we can add to the write pipeline (i.e. it is not full and the network is enabled). */ - (BOOL)canAddToWritePipeline { - return [self canUseNetwork] && self.writePipeline.count < kMaxPendingWrites; + return _remoteStore->CanUseNetwork() && self.writePipeline.count < kMaxPendingWrites; } /** @@ -520,7 +293,7 @@ - (void)writeStreamDidOpen { */ - (void)writeStreamDidCompleteHandshake { // Record the stream token. - [self.localStore setLastStreamToken:_writeStream->GetLastStreamToken()]; + [_remoteStore->local_store() setLastStreamToken:_writeStream->GetLastStreamToken()]; // Send the write pipeline now that the stream is established. for (FSTMutationBatch *write in self.writePipeline) { @@ -542,7 +315,7 @@ - (void)writeStreamDidReceiveResponseWithVersion:(const SnapshotVersion &)commit commitVersion:commitVersion mutationResults:results streamToken:_writeStream->GetLastStreamToken()]; - [self.syncEngine applySuccessfulWriteWithResult:batchResult]; + [_remoteStore->sync_engine() applySuccessfulWriteWithResult:batchResult]; // It's possible that with the completion of this mutation another slot has freed up. [self fillWritePipeline]; @@ -589,7 +362,7 @@ - (void)handleHandshakeError:(const Status &)error { "error code: '%s', details: '%s'", (__bridge void *)self, token, error.code(), error.error_message()); _writeStream->SetLastStreamToken(nil); - [self.localStore setLastStreamToken:nil]; + [_remoteStore->local_store() setLastStreamToken:nil]; } else { // Some other error, don't reset stream token. Our stream logic will just retry with exponential // backoff. @@ -612,7 +385,8 @@ - (void)handleWriteError:(const Status &)error { // bad request so inhibit backoff on the next restart. _writeStream->InhibitBackoff(); - [self.syncEngine rejectFailedWriteWithBatchID:batch.batchID error:util::MakeNSError(error)]; + [_remoteStore->sync_engine() rejectFailedWriteWithBatchID:batch.batchID + error:util::MakeNSError(error)]; // It's possible that with the completion of this mutation another slot has freed up. [self fillWritePipeline]; diff --git a/Firestore/Source/Remote/FSTStream.h b/Firestore/Source/Remote/FSTStream.h index d4183379329..211a82154f5 100644 --- a/Firestore/Source/Remote/FSTStream.h +++ b/Firestore/Source/Remote/FSTStream.h @@ -24,33 +24,6 @@ NS_ASSUME_NONNULL_BEGIN -#pragma mark - FSTWatchStreamDelegate - -/** A protocol defining the events that can be emitted by the FSTWatchStream. */ -@protocol FSTWatchStreamDelegate - -/** Called by the FSTWatchStream when it is ready to accept outbound request messages. */ -- (void)watchStreamDidOpen; - -/** - * Called by the FSTWatchStream with changes and the snapshot versions included in in the - * WatchChange responses sent back by the server. - */ -- (void)watchStreamDidChange:(const firebase::firestore::remote::WatchChange &)change - snapshotVersion:(const firebase::firestore::model::SnapshotVersion &)snapshotVersion; - -/** - * Called by the FSTWatchStream when the underlying streaming RPC is interrupted for whatever - * reason, usually because of an error, but possibly due to an idle timeout. The error passed to - * this method may be nil, in which case the stream was closed without attributable fault. - * - * NOTE: This will not be called after `stop` is called on the stream. See "Starting and Stopping" - * on FSTStream for details. - */ -- (void)watchStreamWasInterruptedWithError:(const firebase::firestore::util::Status &)error; - -@end - #pragma mark - FSTWriteStreamDelegate @protocol FSTWriteStreamDelegate diff --git a/Firestore/core/src/firebase/firestore/remote/datastore.h b/Firestore/core/src/firebase/firestore/remote/datastore.h index d6d8a358619..9e2268ea571 100644 --- a/Firestore/core/src/firebase/firestore/remote/datastore.h +++ b/Firestore/core/src/firebase/firestore/remote/datastore.h @@ -85,7 +85,7 @@ class Datastore : public std::enable_shared_from_this { * shared channel. */ virtual std::shared_ptr CreateWatchStream( - id delegate); + WatchStreamCallback* callback); /** * Creates a new `WriteStream` that is still unstarted but uses a common * shared channel. diff --git a/Firestore/core/src/firebase/firestore/remote/datastore.mm b/Firestore/core/src/firebase/firestore/remote/datastore.mm index 9cf387fdbec..1bf2376d8f8 100644 --- a/Firestore/core/src/firebase/firestore/remote/datastore.mm +++ b/Firestore/core/src/firebase/firestore/remote/datastore.mm @@ -151,10 +151,10 @@ void LogGrpcCallFinished(absl::string_view rpc_name, } std::shared_ptr Datastore::CreateWatchStream( - id delegate) { + WatchStreamCallback* callback) { return std::make_shared(worker_queue_, credentials_, serializer_bridge_.GetSerializer(), - &grpc_connection_, delegate); + &grpc_connection_, callback); } std::shared_ptr Datastore::CreateWriteStream( diff --git a/Firestore/core/src/firebase/firestore/remote/remote_event.h b/Firestore/core/src/firebase/firestore/remote/remote_event.h index e154b9ee623..8a1fc4ce066 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_event.h +++ b/Firestore/core/src/firebase/firestore/remote/remote_event.h @@ -43,31 +43,33 @@ NS_ASSUME_NONNULL_BEGIN -/** - * Interface implemented by RemoteStore to expose target metadata to the - * `WatchChangeAggregator`. - */ -@protocol FSTTargetMetadataProvider - -/** - * Returns the set of remote document keys for the given target ID as of the - * last raised snapshot. - */ -- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget: - (firebase::firestore::model::TargetId)targetID; +namespace firebase { +namespace firestore { +namespace remote { /** - * Returns the FSTQueryData for an active target ID or 'null' if this query has - * become inactive + * Interface implemented by `RemoteStore` to expose target metadata to the + * `WatchChangeAggregator`. */ -- (nullable FSTQueryData*)queryDataForTarget: - (firebase::firestore::model::TargetId)targetID; +class TargetMetadataProvider { + public: + virtual ~TargetMetadataProvider() { + } -@end + /** + * Returns the set of remote document keys for the given target ID as of the + * last raised snapshot. + */ + virtual model::DocumentKeySet GetRemoteKeysForTarget( + model::TargetId target_id) const = 0; -namespace firebase { -namespace firestore { -namespace remote { + /** + * Returns the FSTQueryData for an active target ID or 'null' if this query + * has become inactive + */ + virtual FSTQueryData* GetQueryDataForTarget( + model::TargetId target_id) const = 0; +}; /** * A `TargetChange` specifies the set of changes for a specific target as part @@ -309,9 +311,7 @@ class RemoteEvent { class WatchChangeAggregator { public: explicit WatchChangeAggregator( - id target_metadata_provider) - : target_metadata_provider_{target_metadata_provider} { - } + TargetMetadataProvider* target_metadata_provider); /** * Processes and adds the `DocumentWatchChange` to the current set of changes. @@ -437,7 +437,7 @@ class WatchChangeAggregator { */ std::unordered_set pending_target_resets_; - id target_metadata_provider_; + TargetMetadataProvider* target_metadata_provider_ = nullptr; }; } // namespace remote diff --git a/Firestore/core/src/firebase/firestore/remote/remote_event.mm b/Firestore/core/src/firebase/firestore/remote/remote_event.mm index fd4355ddab2..65bb82b0339 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_event.mm +++ b/Firestore/core/src/firebase/firestore/remote/remote_event.mm @@ -114,6 +114,11 @@ // WatchChangeAggregator +WatchChangeAggregator::WatchChangeAggregator( + TargetMetadataProvider* target_metadata_provider) + : target_metadata_provider_{NOT_NULL(target_metadata_provider)} { +} + void WatchChangeAggregator::HandleDocumentChange( const DocumentWatchChange& document_change) { for (TargetId target_id : document_change.updated_target_ids()) { @@ -360,9 +365,9 @@ TargetId target_id) { TargetState& target_state = EnsureTargetState(target_id); TargetChange target_change = target_state.ToTargetChange(); - return ([target_metadata_provider_ remoteKeysForTarget:target_id].size() + - target_change.added_documents().size() - - target_change.removed_documents().size()); + return target_metadata_provider_->GetRemoteKeysForTarget(target_id).size() + + target_change.added_documents().size() - + target_change.removed_documents().size(); } void WatchChangeAggregator::RecordPendingTargetRequest(TargetId target_id) { @@ -385,7 +390,7 @@ return target_state != target_states_.end() && target_state->second.IsPending() ? nil - : [target_metadata_provider_ queryDataForTarget:target_id]; + : target_metadata_provider_->GetQueryDataForTarget(target_id); } void WatchChangeAggregator::ResetTarget(TargetId target_id) { @@ -400,7 +405,7 @@ // removals will be part of the initial snapshot if Watch does not resend // these documents. DocumentKeySet existingKeys = - [target_metadata_provider_ remoteKeysForTarget:target_id]; + target_metadata_provider_->GetRemoteKeysForTarget(target_id); for (const DocumentKey& key : existingKeys) { RemoveDocumentFromTarget(target_id, key, nil); @@ -410,7 +415,7 @@ bool WatchChangeAggregator::TargetContainsDocument(TargetId target_id, const DocumentKey& key) { const DocumentKeySet& existing_keys = - [target_metadata_provider_ remoteKeysForTarget:target_id]; + target_metadata_provider_->GetRemoteKeysForTarget(target_id); return existing_keys.contains(key); } diff --git a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h index c655063c4b0..e5cefba467a 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h +++ b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h @@ -172,22 +172,6 @@ class DatastoreSerializer { FSTSerializerBeta* serializer_; }; -/** A C++ bridge that invokes methods on an `FSTWatchStreamDelegate`. */ -class WatchStreamDelegate { - public: - explicit WatchStreamDelegate(id delegate) - : delegate_{delegate} { - } - - void NotifyDelegateOnOpen(); - void NotifyDelegateOnChange(const WatchChange& change, - const model::SnapshotVersion& snapshot_version); - void NotifyDelegateOnClose(const util::Status& status); - - private: - __weak id delegate_; -}; - /** A C++ bridge that invokes methods on an `FSTWriteStreamDelegate`. */ class WriteStreamDelegate { public: diff --git a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm index 8159fc96806..406f9199f6d 100644 --- a/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm +++ b/Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm @@ -300,21 +300,6 @@ bool IsLoggingEnabled() { return [serializer_ decodedMaybeDocumentFromBatch:response]; } -// WatchStreamDelegate - -void WatchStreamDelegate::NotifyDelegateOnOpen() { - [delegate_ watchStreamDidOpen]; -} - -void WatchStreamDelegate::NotifyDelegateOnChange( - const WatchChange& change, const SnapshotVersion& snapshot_version) { - [delegate_ watchStreamDidChange:change snapshotVersion:snapshot_version]; -} - -void WatchStreamDelegate::NotifyDelegateOnClose(const Status& status) { - [delegate_ watchStreamWasInterruptedWithError:status]; -} - // WriteStreamDelegate void WriteStreamDelegate::NotifyDelegateOnOpen() { diff --git a/Firestore/core/src/firebase/firestore/remote/remote_store.h b/Firestore/core/src/firebase/firestore/remote/remote_store.h new file mode 100644 index 00000000000..4d9180fe365 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/remote/remote_store.h @@ -0,0 +1,218 @@ +/* + * Copyright 2019 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_REMOTE_STORE_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_REMOTE_STORE_H_ + +#if !defined(__OBJC__) +#error "This header only supports Objective-C++" +#endif // !defined(__OBJC__) + +#import + +#include +#include + +#include "Firestore/core/src/firebase/firestore/model/document_key_set.h" +#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" +#include "Firestore/core/src/firebase/firestore/model/types.h" +#include "Firestore/core/src/firebase/firestore/remote/datastore.h" +#include "Firestore/core/src/firebase/firestore/remote/online_state_tracker.h" +#include "Firestore/core/src/firebase/firestore/remote/remote_event.h" +#include "Firestore/core/src/firebase/firestore/remote/watch_change.h" +#include "Firestore/core/src/firebase/firestore/remote/watch_stream.h" +#include "Firestore/core/src/firebase/firestore/util/async_queue.h" +#include "Firestore/core/src/firebase/firestore/util/status.h" + +@class FSTLocalStore; +@class FSTMutationBatchResult; +@class FSTQueryData; + +NS_ASSUME_NONNULL_BEGIN + +/** + * A protocol that describes the actions the FSTRemoteStore needs to perform on + * a cooperating synchronization engine. + */ +@protocol FSTRemoteSyncer + +/** + * Applies one remote event to the sync engine, notifying any views of the + * changes, and releasing any pending mutation batches that would become visible + * because of the snapshot version the remote event contains. + */ +- (void)applyRemoteEvent: + (const firebase::firestore::remote::RemoteEvent&)remoteEvent; + +/** + * Rejects the listen for the given targetID. This can be triggered by the + * backend for any active target. + * + * @param targetID The targetID corresponding to a listen initiated via + * -listenToTargetWithQueryData: on FSTRemoteStore. + * @param error A description of the condition that has forced the rejection. + * Nearly always this will be an indication that the user is no longer + * authorized to see the data matching the target. + */ +- (void)rejectListenWithTargetID: + (const firebase::firestore::model::TargetId)targetID + error: + (NSError*)error; // NOLINT(readability/casting) + +/** + * Applies the result of a successful write of a mutation batch to the sync + * engine, emitting snapshots in any views that the mutation applies to, and + * removing the batch from the mutation queue. + */ +- (void)applySuccessfulWriteWithResult: + (FSTMutationBatchResult*)batchResult; // NOLINT(readability/casting) + +/** + * Rejects the batch, removing the batch from the mutation queue, recomputing + * the local view of any documents affected by the batch and then, emitting + * snapshots with the reverted value. + */ +- (void) + rejectFailedWriteWithBatchID:(firebase::firestore::model::BatchId)batchID + error: + (NSError*)error; // NOLINT(readability/casting) + +/** + * Returns the set of remote document keys for the given target ID. This list + * includes the documents that were assigned to the target when we received the + * last snapshot. + */ +- (firebase::firestore::model::DocumentKeySet)remoteKeysForTarget: + (firebase::firestore::model::TargetId)targetId; + +@end + +namespace firebase { +namespace firestore { +namespace remote { + +class RemoteStore : public TargetMetadataProvider, public WatchStreamCallback { + public: + RemoteStore(FSTLocalStore* local_store, + Datastore* datastore, + util::AsyncQueue* worker_queue, + std::function online_state_handler); + + // TODO(varconst): remove the getters and setters + id sync_engine() { + return sync_engine_; + } + void set_sync_engine(id sync_engine) { + sync_engine_ = sync_engine; + } + + FSTLocalStore* local_store() { + return local_store_; + } + + OnlineStateTracker& online_state_tracker() { + return online_state_tracker_; + } + + void set_is_network_enabled(bool value) { + is_network_enabled_ = value; + } + + WatchStream& watch_stream() { + return *watch_stream_; + } + + /** Listens to the target identified by the given `FSTQueryData`. */ + void Listen(FSTQueryData* query_data); + + /** Stops listening to the target with the given target ID. */ + void StopListening(model::TargetId target_id); + + model::DocumentKeySet GetRemoteKeysForTarget( + model::TargetId target_id) const override; + FSTQueryData* GetQueryDataForTarget(model::TargetId target_id) const override; + + void OnWatchStreamOpen() override; + void OnWatchStreamChange( + const WatchChange& change, + const model::SnapshotVersion& snapshot_version) override; + void OnWatchStreamClose(const util::Status& status) override; + + // TODO(varconst): make the following methods private. + + bool CanUseNetwork() const; + + void StartWatchStream(); + + /** + * Returns true if the network is enabled, the watch stream has not yet been + * started and there are active watch targets. + */ + bool ShouldStartWatchStream() const; + + void CleanUpWatchStreamState(); + + private: + void SendWatchRequest(FSTQueryData* query_data); + void SendUnwatchRequest(model::TargetId target_id); + + /** + * Takes a batch of changes from the `Datastore`, repackages them as a + * `RemoteEvent`, and passes that on to the `SyncEngine`. + */ + void RaiseWatchSnapshot(const model::SnapshotVersion& snapshot_version); + + /** Process a target error and passes the error along to `SyncEngine`. */ + void ProcessTargetError(const WatchTargetChange& change); + + id sync_engine_ = nil; + + /** + * The local store, used to fill the write pipeline with outbound mutations + * and resolve existence filter mismatches. Immutable after initialization. + */ + FSTLocalStore* local_store_ = nil; + + /** + * A mapping of watched targets that the client cares about tracking and the + * user has explicitly called a 'listen' for this target. + * + * These targets may or may not have been sent to or acknowledged by the + * server. On re-establishing the listen stream, these targets should be sent + * to the server. The targets removed with unlistens are removed eagerly + * without waiting for confirmation from the listen stream. + */ + std::unordered_map listen_targets_; + + OnlineStateTracker online_state_tracker_; + + /** + * Set to true by `EnableNetwork` and false by `DisableNetworkInternal` and + * indicates the user-preferred network state. + */ + bool is_network_enabled_ = false; + + std::shared_ptr watch_stream_; + std::unique_ptr watch_change_aggregator_; +}; + +} // namespace remote +} // namespace firestore +} // namespace firebase + +NS_ASSUME_NONNULL_END + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_REMOTE_STORE_H_ diff --git a/Firestore/core/src/firebase/firestore/remote/remote_store.mm b/Firestore/core/src/firebase/firestore/remote/remote_store.mm new file mode 100644 index 00000000000..f40a5e7b31d --- /dev/null +++ b/Firestore/core/src/firebase/firestore/remote/remote_store.mm @@ -0,0 +1,296 @@ +/* + * Copyright 2019 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/remote/remote_store.h" + +#include + +#import "Firestore/Source/Local/FSTLocalStore.h" +#import "Firestore/Source/Local/FSTQueryData.h" + +#include "Firestore/core/src/firebase/firestore/util/error_apple.h" +#include "Firestore/core/src/firebase/firestore/util/hard_assert.h" +#include "absl/memory/memory.h" + +using firebase::firestore::model::DocumentKeySet; +using firebase::firestore::model::OnlineState; +using firebase::firestore::model::SnapshotVersion; +using firebase::firestore::model::DocumentKeySet; +using firebase::firestore::model::TargetId; +using firebase::firestore::remote::Datastore; +using firebase::firestore::remote::WatchStream; +using firebase::firestore::remote::DocumentWatchChange; +using firebase::firestore::remote::ExistenceFilterWatchChange; +using firebase::firestore::remote::OnlineStateTracker; +using firebase::firestore::remote::RemoteEvent; +using firebase::firestore::remote::TargetChange; +using firebase::firestore::remote::WatchChange; +using firebase::firestore::remote::WatchChangeAggregator; +using firebase::firestore::remote::WatchTargetChange; +using firebase::firestore::remote::WatchTargetChangeState; +using firebase::firestore::util::AsyncQueue; +using firebase::firestore::util::Status; + +namespace firebase { +namespace firestore { +namespace remote { + +RemoteStore::RemoteStore( + FSTLocalStore* local_store, + Datastore* datastore, + AsyncQueue* worker_queue, + std::function online_state_handler) + : local_store_{local_store}, + online_state_tracker_{worker_queue, std::move(online_state_handler)} { + // Create streams (but note they're not started yet) + watch_stream_ = datastore->CreateWatchStream(this); +} + +void RemoteStore::Listen(FSTQueryData* query_data) { + TargetId targetKey = query_data.targetID; + HARD_ASSERT(listen_targets_.find(targetKey) == listen_targets_.end(), + "Listen called with duplicate target id: %s", targetKey); + + // Mark this as something the client is currently listening for. + listen_targets_[targetKey] = query_data; + + if (ShouldStartWatchStream()) { + // The listen will be sent in `OnWatchStreamOpen` + StartWatchStream(); + } else if (watch_stream_->IsOpen()) { + SendWatchRequest(query_data); + } +} + +void RemoteStore::StopListening(TargetId target_id) { + size_t num_erased = listen_targets_.erase(target_id); + HARD_ASSERT(num_erased == 1, + "StopListening: target not currently watched: %s", target_id); + + // The watch stream might not be started if we're in a disconnected state + if (watch_stream_->IsOpen()) { + SendUnwatchRequest(target_id); + } + if (listen_targets_.empty()) { + if (watch_stream_->IsOpen()) { + watch_stream_->MarkIdle(); + } else if (CanUseNetwork()) { + // Revert to `OnlineState::Unknown` if the watch stream is not open and we + // have no listeners, since without any listens to send we cannot confirm + // if the stream is healthy and upgrade to `OnlineState::Online`. + online_state_tracker_.UpdateState(OnlineState::Unknown); + } + } +} + +FSTQueryData* RemoteStore::GetQueryDataForTarget(TargetId target_id) const { + auto found = listen_targets_.find(target_id); + return found != listen_targets_.end() ? found->second : nil; +} + +DocumentKeySet RemoteStore::GetRemoteKeysForTarget(TargetId target_id) const { + return [sync_engine_ remoteKeysForTarget:target_id]; +} + +void RemoteStore::SendWatchRequest(FSTQueryData* query_data) { + // We need to increment the the expected number of pending responses we're due + // from watch so we wait for the ack to process any messages from this target. + watch_change_aggregator_->RecordPendingTargetRequest(query_data.targetID); + watch_stream_->WatchQuery(query_data); +} + +void RemoteStore::SendUnwatchRequest(TargetId target_id) { + // We need to increment the expected number of pending responses we're due + // from watch so we wait for the removal on the server before we process any + // messages from this target. + watch_change_aggregator_->RecordPendingTargetRequest(target_id); + watch_stream_->UnwatchTargetId(target_id); +} + +void RemoteStore::StartWatchStream() { + HARD_ASSERT(ShouldStartWatchStream(), + "StartWatchStream called when ShouldStartWatchStream is false."); + watch_change_aggregator_ = absl::make_unique(this); + watch_stream_->Start(); + + online_state_tracker_.HandleWatchStreamStart(); +} + +bool RemoteStore::ShouldStartWatchStream() const { + return CanUseNetwork() && !watch_stream_->IsStarted() && + !listen_targets_.empty(); +} + +bool RemoteStore::CanUseNetwork() const { + // PORTING NOTE: This method exists mostly because web also has to take into + // account primary vs. secondary state. + return is_network_enabled_; +} + +void RemoteStore::CleanUpWatchStreamState() { + watch_change_aggregator_.reset(); +} + +void RemoteStore::OnWatchStreamOpen() { + // Restore any existing watches. + for (const auto& kv : listen_targets_) { + SendWatchRequest(kv.second); + } +} + +void RemoteStore::OnWatchStreamClose(const Status& status) { + if (status.ok()) { + // Graceful stop (due to Stop() or idle timeout). Make sure that's + // desirable. + HARD_ASSERT(!ShouldStartWatchStream(), + "Watch stream was stopped gracefully while still needed."); + } + + CleanUpWatchStreamState(); + + // If we still need the watch stream, retry the connection. + if (ShouldStartWatchStream()) { + online_state_tracker_.HandleWatchStreamFailure(status); + + StartWatchStream(); + } else { + // We don't need to restart the watch stream because there are no active + // targets. The online state is set to unknown because there is no active + // attempt at establishing a connection. + online_state_tracker_.UpdateState(OnlineState::Unknown); + } +} + +void RemoteStore::OnWatchStreamChange(const WatchChange& change, + const SnapshotVersion& snapshot_version) { + // Mark the connection as Online because we got a message from the server. + online_state_tracker_.UpdateState(OnlineState::Online); + + if (change.type() == WatchChange::Type::TargetChange) { + const WatchTargetChange& watch_target_change = + static_cast(change); + if (watch_target_change.state() == WatchTargetChangeState::Removed && + !watch_target_change.cause().ok()) { + // There was an error on a target, don't wait for a consistent snapshot to + // raise events + return ProcessTargetError(watch_target_change); + } else { + watch_change_aggregator_->HandleTargetChange(watch_target_change); + } + } else if (change.type() == WatchChange::Type::Document) { + watch_change_aggregator_->HandleDocumentChange( + static_cast(change)); + } else { + HARD_ASSERT( + change.type() == WatchChange::Type::ExistenceFilter, + "Expected watchChange to be an instance of ExistenceFilterWatchChange"); + watch_change_aggregator_->HandleExistenceFilter( + static_cast(change)); + } + + if (snapshot_version != SnapshotVersion::None() && + snapshot_version >= [local_store_ lastRemoteSnapshotVersion]) { + // We have received a target change with a global snapshot if the snapshot + // version is not equal to `SnapshotVersion::None()`. + RaiseWatchSnapshot(snapshot_version); + } +} + +void RemoteStore::RaiseWatchSnapshot(const SnapshotVersion& snapshot_version) { + HARD_ASSERT(snapshot_version != SnapshotVersion::None(), + "Can't raise event for unknown SnapshotVersion"); + + RemoteEvent remote_event = + watch_change_aggregator_->CreateRemoteEvent(snapshot_version); + + // Update in-memory resume tokens. `FSTLocalStore` will update the persistent + // view of these when applying the completed `RemoteEvent`. + for (const auto& entry : remote_event.target_changes()) { + const TargetChange& target_change = entry.second; + NSData* resumeToken = target_change.resume_token(); + + if (resumeToken.length > 0) { + TargetId target_id = entry.first; + auto found = listen_targets_.find(target_id); + FSTQueryData* query_data = + found != listen_targets_.end() ? found->second : nil; + + // A watched target might have been removed already. + if (query_data) { + listen_targets_[target_id] = [query_data + queryDataByReplacingSnapshotVersion:snapshot_version + resumeToken:resumeToken + sequenceNumber:query_data.sequenceNumber]; + } + } + } + + // Re-establish listens for the targets that have been invalidated by + // existence filter mismatches. + for (TargetId target_id : remote_event.target_mismatches()) { + auto found = listen_targets_.find(target_id); + if (found == listen_targets_.end()) { + // A watched target might have been removed already. + continue; + } + FSTQueryData* query_data = found->second; + + // Clear the resume token for the query, since we're in a known mismatch + // state. + query_data = [[FSTQueryData alloc] initWithQuery:query_data.query + targetID:target_id + listenSequenceNumber:query_data.sequenceNumber + purpose:query_data.purpose]; + listen_targets_[target_id] = query_data; + + // Cause a hard reset by unwatching and rewatching immediately, but + // deliberately don't send a resume token so that we get a full update. + SendUnwatchRequest(target_id); + + // Mark the query we send as being on behalf of an existence filter + // mismatch, but don't actually retain that in listen_targets_. This ensures + // that we flag the first re-listen this way without impacting future + // listens of this target (that might happen e.g. on reconnect). + FSTQueryData* request_query_data = [[FSTQueryData alloc] + initWithQuery:query_data.query + targetID:target_id + listenSequenceNumber:query_data.sequenceNumber + purpose:FSTQueryPurposeExistenceFilterMismatch]; + SendWatchRequest(request_query_data); + } + + // Finally handle remote event + [sync_engine_ applyRemoteEvent:remote_event]; +} + +void RemoteStore::ProcessTargetError(const WatchTargetChange& change) { + HARD_ASSERT(!change.cause().ok(), "Handling target error without a cause"); + + // Ignore targets that have been removed already. + for (TargetId target_id : change.target_ids()) { + auto found = listen_targets_.find(target_id); + if (found != listen_targets_.end()) { + listen_targets_.erase(found); + watch_change_aggregator_->RemoveTarget(target_id); + [sync_engine_ rejectListenWithTargetID:target_id + error:util::MakeNSError(change.cause())]; + } + } +} + +} // namespace remote +} // namespace firestore +} // namespace firebase diff --git a/Firestore/core/src/firebase/firestore/remote/watch_stream.h b/Firestore/core/src/firebase/firestore/remote/watch_stream.h index 43b1ef32f7f..b6136cfbb4d 100644 --- a/Firestore/core/src/firebase/firestore/remote/watch_stream.h +++ b/Firestore/core/src/firebase/firestore/remote/watch_stream.h @@ -24,10 +24,12 @@ #include #include +#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h" #include "Firestore/core/src/firebase/firestore/model/types.h" #include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h" #include "Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h" #include "Firestore/core/src/firebase/firestore/remote/stream.h" +#include "Firestore/core/src/firebase/firestore/remote/watch_change.h" #include "Firestore/core/src/firebase/firestore/util/async_queue.h" #include "Firestore/core/src/firebase/firestore/util/status.h" #include "absl/strings/string_view.h" @@ -41,12 +43,41 @@ namespace firebase { namespace firestore { namespace remote { +/** + * An interface defining the events that can be emitted by the `WatchStream`. + */ +class WatchStreamCallback { + public: + /** Called by the `WatchStream` when it is ready to accept outbound request + * messages. */ + virtual void OnWatchStreamOpen() = 0; + + /** + * Called by the `WatchStream` with changes and the snapshot versions + * included in in the `WatchChange` responses sent back by the server. + */ + virtual void OnWatchStreamChange( + const WatchChange& change, + const model::SnapshotVersion& snapshot_version) = 0; + + /** + * Called by the `WatchStream` when the underlying streaming RPC is + * interrupted for whatever reason, usually because of an error, but possibly + * due to an idle timeout. The status passed to this method may be ok, in + * which case the stream was closed without attributable fault. + * + * NOTE: This will not be called after `Stop` is called on the stream. See + * "Starting and Stopping" on `Stream` for details. + */ + virtual void OnWatchStreamClose(const util::Status& status) = 0; +}; + /** * A `Stream` that implements the StreamingWatch RPC. * - * Once the `WatchStream` has called the `streamDidOpen` method on the delegate, - * any number of `WatchQuery` and `UnwatchTargetId` calls can be sent to control - * what changes will be sent from the server for WatchChanges. + * Once the `WatchStream` has called the `OnWatchStreamOpen` method on the + * callback, any number of `WatchQuery` and `UnwatchTargetId` calls can be sent + * to control what changes will be sent from the server for WatchChanges. */ class WatchStream : public Stream { public: @@ -54,7 +85,7 @@ class WatchStream : public Stream { auth::CredentialsProvider* credentials_provider, FSTSerializerBeta* serializer, GrpcConnection* grpc_connection, - id delegate); + WatchStreamCallback* callback); /** * Registers interest in the results of the given query. If the query includes @@ -85,7 +116,7 @@ class WatchStream : public Stream { } bridge::WatchStreamSerializer serializer_bridge_; - bridge::WatchStreamDelegate delegate_bridge_; + WatchStreamCallback* callback_; }; } // namespace remote diff --git a/Firestore/core/src/firebase/firestore/remote/watch_stream.mm b/Firestore/core/src/firebase/firestore/remote/watch_stream.mm index 7a487d75664..0066809fae3 100644 --- a/Firestore/core/src/firebase/firestore/remote/watch_stream.mm +++ b/Firestore/core/src/firebase/firestore/remote/watch_stream.mm @@ -16,6 +16,7 @@ #include "Firestore/core/src/firebase/firestore/remote/watch_stream.h" +#include "Firestore/core/src/firebase/firestore/util/hard_assert.h" #include "Firestore/core/src/firebase/firestore/util/log.h" #include "Firestore/core/src/firebase/firestore/util/status.h" @@ -36,11 +37,11 @@ CredentialsProvider* credentials_provider, FSTSerializerBeta* serializer, GrpcConnection* grpc_connection, - id delegate) + WatchStreamCallback* callback) : Stream{async_queue, credentials_provider, grpc_connection, TimerId::ListenStreamConnectionBackoff, TimerId::ListenStreamIdle}, serializer_bridge_{serializer}, - delegate_bridge_{delegate} { + callback_{NOT_NULL(callback)} { } void WatchStream::WatchQuery(FSTQueryData* query) { @@ -73,7 +74,7 @@ } void WatchStream::NotifyStreamOpen() { - delegate_bridge_.NotifyDelegateOnOpen(); + callback_->OnWatchStreamOpen(); } Status WatchStream::NotifyStreamResponse(const grpc::ByteBuffer& message) { @@ -92,14 +93,14 @@ // A successful response means the stream is healthy. backoff_.Reset(); - delegate_bridge_.NotifyDelegateOnChange( + callback_->OnWatchStreamChange( *serializer_bridge_.ToWatchChange(response), serializer_bridge_.ToSnapshotVersion(response)); return Status::OK(); } void WatchStream::NotifyStreamClose(const Status& status) { - delegate_bridge_.NotifyDelegateOnClose(status); + callback_->OnWatchStreamClose(status); } } // namespace remote