Skip to content

Commit ea3b589

Browse files
authored
Port waitForPendingWrites (#3619)
* port waitForPendingWrites * addressing comments #1 * remove commented line * another change to disable network
1 parent 450acf9 commit ea3b589

21 files changed

+269
-9
lines changed

Firestore/Example/Tests/Integration/API/FIRDatabaseTests.mm

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,4 +1402,50 @@ - (void)testCanRemoveListenerAfterShutdown {
14021402
[listenerRegistration remove];
14031403
}
14041404

1405+
- (void)testWaitForPendingWritesCompletes {
1406+
FIRDocumentReference *doc = [self documentRef];
1407+
FIRFirestore *firestore = doc.firestore;
1408+
1409+
[self disableNetwork];
1410+
1411+
[doc setData:@{@"foo" : @"bar"}];
1412+
[firestore waitForPendingWritesWithCompletion:
1413+
[self completionForExpectationWithName:@"Wait for pending writes"]];
1414+
1415+
[firestore enableNetworkWithCompletion:[self completionForExpectationWithName:@"Enable network"]];
1416+
[self awaitExpectations];
1417+
}
1418+
1419+
- (void)testWaitForPendingWritesFailsWhenUserChanges {
1420+
FIRFirestore *firestore = self.db;
1421+
1422+
[self disableNetwork];
1423+
1424+
// Writes to local to prevent immediate call to the completion of waitForPendingWrites.
1425+
NSDictionary<NSString *, id> *data =
1426+
@{@"owner" : @{@"name" : @"Andy", @"email" : @"[email protected]"}};
1427+
[[self documentRef] setData:data];
1428+
1429+
XCTestExpectation *expectation = [self expectationWithDescription:@"waitForPendingWrites"];
1430+
[firestore waitForPendingWritesWithCompletion:^(NSError *_Nullable error) {
1431+
XCTAssertNotNil(error);
1432+
XCTAssertEqualObjects(error.domain, FIRFirestoreErrorDomain);
1433+
XCTAssertEqual(error.code, FIRFirestoreErrorCodeCancelled);
1434+
[expectation fulfill];
1435+
}];
1436+
1437+
[self triggerUserChangeWithUid:@"user-to-fail-pending-writes"];
1438+
[self awaitExpectations];
1439+
}
1440+
1441+
- (void)testWaitForPendingWritesCompletesWhenOfflineIfNoPending {
1442+
FIRFirestore *firestore = self.db;
1443+
1444+
[self disableNetwork];
1445+
1446+
[firestore waitForPendingWritesWithCompletion:
1447+
[self completionForExpectationWithName:@"Wait for pending writes"]];
1448+
[self awaitExpectations];
1449+
}
1450+
14051451
@end

Firestore/Example/Tests/Local/FSTLevelDBMutationQueueTests.mm

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,14 +87,14 @@ - (void)setUp {
8787

8888
- (void)testLoadNextBatchID_zeroWhenTotallyEmpty {
8989
// Initial seek is invalid
90-
XCTAssertEqual(LoadNextBatchIdFromDb(_db.ptr), 0);
90+
XCTAssertEqual(LoadNextBatchIdFromDb(_db.ptr), 1);
9191
}
9292

9393
- (void)testLoadNextBatchID_zeroWhenNoMutations {
9494
// Initial seek finds no mutations
9595
[self setDummyValueForKey:MutationLikeKey("mutationr", "foo", 20)];
9696
[self setDummyValueForKey:MutationLikeKey("mutationsa", "foo", 10)];
97-
XCTAssertEqual(LoadNextBatchIdFromDb(_db.ptr), 0);
97+
XCTAssertEqual(LoadNextBatchIdFromDb(_db.ptr), 1);
9898
}
9999

100100
- (void)testLoadNextBatchID_findsSingleRow {

Firestore/Example/Tests/Local/FSTLocalStoreTests.mm

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,24 @@ - (void)testHandlesPatchMutationWithTransformThenRemoteEvent {
11051105
FSTAssertChanged(Doc("foo/bar", 1, Map("sum", 1), DocumentState::kLocalMutations));
11061106
}
11071107

1108+
- (void)testGetHighestUnacknowledgeBatchId {
1109+
if ([self isTestBaseClass]) return;
1110+
1111+
XCTAssertEqual(-1, [self.localStore getHighestUnacknowledgedBatchId]);
1112+
1113+
[self writeMutation:FSTTestSetMutation(@"foo/bar", @{@"abc" : @123})];
1114+
XCTAssertEqual(1, [self.localStore getHighestUnacknowledgedBatchId]);
1115+
1116+
[self writeMutation:FSTTestPatchMutation("foo/bar", @{@"abc" : @321}, {})];
1117+
XCTAssertEqual(2, [self.localStore getHighestUnacknowledgedBatchId]);
1118+
1119+
[self acknowledgeMutationWithVersion:1];
1120+
XCTAssertEqual(2, [self.localStore getHighestUnacknowledgedBatchId]);
1121+
1122+
[self rejectMutation];
1123+
XCTAssertEqual(-1, [self.localStore getHighestUnacknowledgedBatchId]);
1124+
}
1125+
11081126
@end
11091127

11101128
NS_ASSUME_NONNULL_END

Firestore/Example/Tests/Util/FSTIntegrationTestCase.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,12 @@ extern "C" {
5555
/** Returns a new Firestore connected to the project with the given projectID. */
5656
- (FIRFirestore *)firestoreWithProjectID:(NSString *)projectID;
5757

58-
/** Returns a new Firestore connected to the project with the given app. */
58+
/** Triggers a user change with given user id. */
59+
- (void)triggerUserChangeWithUid:(NSString *)uid;
60+
61+
/**
62+
* Returns a new Firestore connected to the project with the given app.
63+
*/
5964
- (FIRFirestore *)firestoreWithApp:(FIRApp *)app;
6065

6166
/** Synchronously shuts down the given firestore. */

Firestore/Example/Tests/Util/FSTIntegrationTestCase.mm

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
#include <string>
3434
#include <utility>
3535

36+
#include "Firestore/core/src/firebase/firestore/auth/credentials_provider.h"
3637
#include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
38+
#include "Firestore/core/src/firebase/firestore/auth/user.h"
3739
#include "Firestore/core/src/firebase/firestore/model/database_id.h"
3840
#include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h"
3941
#include "Firestore/core/src/firebase/firestore/util/autoid.h"
@@ -55,8 +57,10 @@
5557
#include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h"
5658

5759
namespace util = firebase::firestore::util;
60+
using firebase::firestore::auth::CredentialChangeListener;
5861
using firebase::firestore::auth::CredentialsProvider;
5962
using firebase::firestore::auth::EmptyCredentialsProvider;
63+
using firebase::firestore::auth::User;
6064
using firebase::firestore::model::DatabaseId;
6165
using firebase::firestore::testutil::AppForUnitTesting;
6266
using firebase::firestore::remote::GrpcConnection;
@@ -79,13 +83,37 @@
7983

8084
static bool runningAgainstEmulator = false;
8185

86+
// Behaves the same as `EmptyCredentialsProvider` except it can also trigger a user
87+
// change.
88+
class FakeCredentialsProvider : public EmptyCredentialsProvider {
89+
public:
90+
void SetCredentialChangeListener(CredentialChangeListener changeListener) override {
91+
if (changeListener) {
92+
listener_ = std::move(changeListener);
93+
listener_(User::Unauthenticated());
94+
}
95+
}
96+
97+
void ChangeUser(NSString *new_id) {
98+
if (listener_) {
99+
listener_(firebase::firestore::auth::User::FromUid(new_id));
100+
}
101+
}
102+
103+
private:
104+
CredentialChangeListener listener_;
105+
};
106+
82107
@implementation FSTIntegrationTestCase {
83108
NSMutableArray<FIRFirestore *> *_firestores;
109+
std::shared_ptr<FakeCredentialsProvider> _fakeCredentialsProvider;
84110
}
85111

86112
- (void)setUp {
87113
[super setUp];
88114

115+
_fakeCredentialsProvider = std::make_shared<FakeCredentialsProvider>();
116+
89117
[self clearPersistenceOnce];
90118
[self primeBackend];
91119

@@ -239,13 +267,11 @@ - (FIRFirestore *)firestoreWithApp:(FIRApp *)app {
239267

240268
FIRSetLoggerLevel(FIRLoggerLevelDebug);
241269

242-
std::unique_ptr<CredentialsProvider> credentials_provider =
243-
absl::make_unique<firebase::firestore::auth::EmptyCredentialsProvider>();
244270
std::string projectID = util::MakeString(app.options.projectID);
245271
FIRFirestore *firestore =
246272
[[FIRFirestore alloc] initWithDatabaseID:DatabaseId(projectID)
247273
persistenceKey:util::MakeString(persistenceKey)
248-
credentialsProvider:std::move(credentials_provider)
274+
credentialsProvider:_fakeCredentialsProvider
249275
workerQueue:std::move(workerQueue)
250276
firebaseApp:app
251277
instanceRegistry:nil];
@@ -255,6 +281,10 @@ - (FIRFirestore *)firestoreWithApp:(FIRApp *)app {
255281
return firestore;
256282
}
257283

284+
- (void)triggerUserChangeWithUid:(NSString *)uid {
285+
_fakeCredentialsProvider->ChangeUser(uid);
286+
}
287+
258288
- (void)primeBackend {
259289
static dispatch_once_t onceToken;
260290
dispatch_once(&onceToken, ^{

Firestore/Source/API/FIRFirestore+Internal.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,22 @@ NS_ASSUME_NONNULL_BEGIN
9090

9191
- (void)shutdownInternalWithCompletion:(nullable void (^)(NSError *_Nullable error))completion;
9292

93+
/**
94+
* Waits until all currently pending writes for the active user have been acknowledged by the
95+
* backend.
96+
*
97+
* The completion block is called immediately without error if there are no outstanding writes.
98+
* Otherwise, the completion block is called when all previously issued writes (including those
99+
* written in a previous app session) have been acknowledged by the backend. The completion
100+
* block does not wait for writes that were added after the method is called. If you
101+
* wish to wait for additional writes, you have to call `waitForPendingWritesWithCompletion`
102+
* again.
103+
*
104+
* Any outstanding `waitForPendingWritesWithCompletion` completion blocks are called with an
105+
* error during user change.
106+
*/
107+
- (void)waitForPendingWritesWithCompletion:(void (^)(NSError *_Nullable error))completion;
108+
93109
- (const std::shared_ptr<util::AsyncQueue> &)workerQueue;
94110

95111
@property(nonatomic, assign, readonly) std::shared_ptr<api::Firestore> wrapped;

Firestore/Source/API/FIRFirestore.mm

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ - (FIRWriteBatch *)batch {
194194
writeBatch:_firestore->GetBatch()];
195195
}
196196

197+
- (void)waitForPendingWritesWithCompletion:(void (^)(NSError *_Nullable error))completion {
198+
_firestore->WaitForPendingWrites(util::MakeCallback(completion));
199+
}
200+
197201
- (void)runTransactionWithBlock:(id _Nullable (^)(FIRTransaction *, NSError **))updateBlock
198202
dispatchQueue:(dispatch_queue_t)queue
199203
completion:

Firestore/Source/Core/FSTFirestoreClient.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,13 @@ NS_ASSUME_NONNULL_BEGIN
111111
- (void)writeMutations:(std::vector<model::Mutation> &&)mutations
112112
callback:(util::StatusCallback)callback;
113113

114+
/**
115+
* Passes a callback that is triggered when all the pending writes at the
116+
* time when this method is called received server acknowledgement.
117+
* An acknowledgement can be either acceptance or rejections.
118+
*/
119+
- (void)waitForPendingWritesWithCallback:(util::StatusCallback)callback;
120+
114121
/** Tries to execute the transaction in updateCallback up to retries times. */
115122
- (void)transactionWithRetries:(int)retries
116123
updateCallback:(core::TransactionUpdateCallback)updateCallback

Firestore/Source/Core/FSTFirestoreClient.mm

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,20 @@ - (void)writeMutations:(std::vector<Mutation> &&)mutations callback:(util::Statu
442442
});
443443
};
444444

445+
- (void)waitForPendingWritesWithCallback:(util::StatusCallback)callback {
446+
[self verifyNotShutdown];
447+
// Dispatch the result back onto the user dispatch queue.
448+
auto async_callback = [self, callback](util::Status status) {
449+
if (callback) {
450+
self->_userExecutor->Execute([=] { callback(std::move(status)); });
451+
}
452+
};
453+
454+
_workerQueue->Enqueue([self, async_callback]() {
455+
[self.syncEngine registerPendingWritesCallback:std::move(async_callback)];
456+
});
457+
}
458+
445459
- (void)transactionWithRetries:(int)retries
446460
updateCallback:(core::TransactionUpdateCallback)update_callback
447461
resultCallback:(core::TransactionResultCallback)resultCallback {

Firestore/Source/Core/FSTSyncEngine.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ NS_ASSUME_NONNULL_BEGIN
8686
- (void)writeMutations:(std::vector<model::Mutation> &&)mutations
8787
completion:(FSTVoidErrorBlock)completion;
8888

89+
/**
90+
* Registers a user callback that is called when all pending mutations at the moment of calling
91+
* are acknowledged .
92+
*/
93+
- (void)registerPendingWritesCallback:(util::StatusCallback)callback;
94+
8995
/**
9096
* Runs the given transaction block up to retries times and then calls completion.
9197
*

Firestore/Source/Core/FSTSyncEngine.mm

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "Firestore/core/src/firebase/firestore/model/document_key.h"
4141
#include "Firestore/core/src/firebase/firestore/model/document_map.h"
4242
#include "Firestore/core/src/firebase/firestore/model/document_set.h"
43+
#include "Firestore/core/src/firebase/firestore/model/mutation_batch.h"
4344
#include "Firestore/core/src/firebase/firestore/model/no_document.h"
4445
#include "Firestore/core/src/firebase/firestore/model/snapshot_version.h"
4546
#include "Firestore/core/src/firebase/firestore/remote/remote_event.h"
@@ -64,6 +65,7 @@
6465
using firebase::firestore::model::DocumentKey;
6566
using firebase::firestore::model::DocumentKeySet;
6667
using firebase::firestore::model::DocumentMap;
68+
using firebase::firestore::model::kBatchIdUnknown;
6769
using firebase::firestore::model::ListenSequenceNumber;
6870
using firebase::firestore::model::MaybeDocumentMap;
6971
using firebase::firestore::model::Mutation;
@@ -79,6 +81,7 @@
7981
using firebase::firestore::util::AsyncQueue;
8082
using firebase::firestore::util::MakeNSError;
8183
using firebase::firestore::util::Status;
84+
using firebase::firestore::util::StatusCallback;
8285

8386
NS_ASSUME_NONNULL_BEGIN
8487

@@ -197,6 +200,9 @@ @implementation FSTSyncEngine {
197200
std::unordered_map<User, NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *, HashUser>
198201
_mutationCompletionBlocks;
199202

203+
/** Stores user callbacks waiting for pending writes to be acknowledged. */
204+
std::unordered_map<model::BatchId, std::vector<StatusCallback>> _pendingWritesCallbacks;
205+
200206
/** FSTQueryViews for all active queries, indexed by query. */
201207
std::unordered_map<Query, FSTQueryView *> _queryViewsByQuery;
202208

@@ -296,6 +302,51 @@ - (void)writeMutations:(std::vector<Mutation> &&)mutations
296302
_remoteStore->FillWritePipeline();
297303
}
298304

305+
- (void)registerPendingWritesCallback:(StatusCallback)callback {
306+
if (!_remoteStore->CanUseNetwork()) {
307+
LOG_DEBUG("The network is disabled. The task returned by 'awaitPendingWrites()' will not "
308+
"complete until the network is enabled.");
309+
}
310+
311+
int largestPendingBatchId = [self.localStore getHighestUnacknowledgedBatchId];
312+
313+
if (largestPendingBatchId == kBatchIdUnknown) {
314+
// Trigger the callback right away if there is no pending writes at the moment.
315+
callback(Status::OK());
316+
return;
317+
}
318+
319+
auto it = _pendingWritesCallbacks.find(largestPendingBatchId);
320+
if (it != _pendingWritesCallbacks.end()) {
321+
it->second.push_back(std::move(callback));
322+
} else {
323+
_pendingWritesCallbacks.emplace(largestPendingBatchId,
324+
std::vector<StatusCallback>({std::move(callback)}));
325+
}
326+
}
327+
328+
/** Triggers callbacks waiting for this batch id to get acknowledged by server, if there are any. */
329+
- (void)triggerPendingWriteCallbacksWithBatchId:(int)batchId {
330+
auto it = _pendingWritesCallbacks.find(batchId);
331+
if (it != _pendingWritesCallbacks.end()) {
332+
for (const auto &callback : it->second) {
333+
callback(Status::OK());
334+
}
335+
336+
_pendingWritesCallbacks.erase(it);
337+
}
338+
}
339+
340+
- (void)failOutstandingPendingWritesAwaitingCallbacks:(absl::string_view)errorMessage {
341+
for (const auto &entry : _pendingWritesCallbacks) {
342+
for (const auto &callback : entry.second) {
343+
callback(Status(Error::Cancelled, errorMessage));
344+
}
345+
}
346+
347+
_pendingWritesCallbacks.clear();
348+
}
349+
299350
- (void)addMutationCompletionBlock:(FSTVoidErrorBlock)completion batchID:(BatchId)batchID {
300351
NSMutableDictionary<NSNumber *, FSTVoidErrorBlock> *completionBlocks =
301352
_mutationCompletionBlocks[_currentUser];
@@ -460,6 +511,8 @@ - (void)applySuccessfulWriteWithResult:(FSTMutationBatchResult *)batchResult {
460511
// consistently happen before listen events.
461512
[self processUserCallbacksForBatchID:batchResult.batch.batchID error:nil];
462513

514+
[self triggerPendingWriteCallbacksWithBatchId:batchResult.batch.batchID];
515+
463516
MaybeDocumentMap changes = [self.localStore acknowledgeBatchWithResult:batchResult];
464517
[self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
465518
}
@@ -478,6 +531,8 @@ - (void)rejectFailedWriteWithBatchID:(BatchId)batchID error:(NSError *)error {
478531
// consistently happen before listen events.
479532
[self processUserCallbacksForBatchID:batchID error:error];
480533

534+
[self triggerPendingWriteCallbacksWithBatchId:batchID];
535+
481536
[self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];
482537
}
483538

@@ -629,6 +684,9 @@ - (void)credentialDidChangeWithUser:(const firebase::firestore::auth::User &)use
629684
_currentUser = user;
630685

631686
if (userChanged) {
687+
// Fails callbacks waiting for pending writes requested by previous user.
688+
[self failOutstandingPendingWritesAwaitingCallbacks:
689+
"'waitForPendingWrites' callback is cancelled due to a user change."];
632690
// Notify local store and emit any resulting events from swapping out the mutation queue.
633691
MaybeDocumentMap changes = [self.localStore userDidChange:user];
634692
[self emitNewSnapshotsAndNotifyLocalStoreWithChanges:changes remoteEvent:absl::nullopt];

0 commit comments

Comments
 (0)