diff --git a/packages/firestore/src/api/database.ts b/packages/firestore/src/api/database.ts index fde7d73e6cc..7b8afd28c02 100644 --- a/packages/firestore/src/api/database.ts +++ b/packages/firestore/src/api/database.ts @@ -478,6 +478,25 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService { return this._firestoreClient!.clientShutdown; } + /** + * Waits until all currently pending writes for the active user have been acknowledged by the + * backend. + * + * The returned Promise resolves immediately if there are no outstanding writes. Otherwise, the + * Promise waits for all previously issued writes (including those written in a previous app + * session), but it does not wait for writes that were added after the method is called. If you + * wish to wait for additional writes, you have to call `waitForPendingWrites()` again. + * + * Any outstanding `waitForPendingWrites()` Promises are rejected during user changes. + * + * @return A Promise which resolves when all currently pending writes have been + * acknowledged by the backend. + */ + _waitForPendingWrites(): Promise { + this.ensureClientConfigured(); + return this._firestoreClient!.waitForPendingWrites(); + } + ensureClientConfigured(): FirestoreClient { if (!this._firestoreClient) { // Kick off starting the client but don't actually wait for it. diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index 57df940ab1f..de2eced49dc 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -522,6 +522,21 @@ export class FirestoreClient { }); } + /** + * Returns a Promise that resolves when all writes that were pending at the time this + * method was called received server acknowledgement. An acknowledgement can be either acceptance + * or rejection. + */ + waitForPendingWrites(): Promise { + this.verifyNotShutdown(); + + const deferred = new Deferred(); + this.asyncQueue.enqueueAndForget(() => { + return this.syncEngine.registerPendingWritesCallback(deferred); + }); + return deferred.promise; + } + listen( query: Query, observer: Observer, diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index fbc0317c4f5..353e9b12680 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -28,12 +28,12 @@ import { import { MaybeDocument, NoDocument } from '../model/document'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; -import { MutationBatchResult } from '../model/mutation_batch'; +import { MutationBatchResult, BATCHID_UNKNOWN } from '../model/mutation_batch'; import { RemoteEvent, TargetChange } from '../remote/remote_event'; import { RemoteStore } from '../remote/remote_store'; import { RemoteSyncer } from '../remote/remote_syncer'; import { assert, fail } from '../util/assert'; -import { FirestoreError } from '../util/error'; +import { Code, FirestoreError } from '../util/error'; import * as log from '../util/log'; import { primitiveComparator } from '../util/misc'; import { ObjectMap } from '../util/obj_map'; @@ -160,6 +160,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { private mutationUserCallbacks = {} as { [uidKey: string]: SortedMap>; }; + /** Stores user callbacks waiting for all pending writes to be acknowledged. */ + private pendingWritesCallbacks = new Map>>(); private limboTargetIdGenerator = TargetIdGenerator.forSyncEngine(); // The primary state is set to `true` or `false` immediately after Firestore @@ -450,6 +452,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { (this.isPrimary && source === OnlineStateSource.RemoteStore) || (!this.isPrimary && source === OnlineStateSource.SharedClientState) ) { + this.assertSubscribed('applyOnlineStateChange()'); const newViewSnapshots = [] as ViewSnapshot[]; this.queryViewsByQuery.forEach((query, queryView) => { const viewChange = queryView.view.applyOnlineStateChange(onlineState); @@ -570,6 +573,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // before listen events. this.processUserCallback(batchId, /*error=*/ null); + this.triggerPendingWritesCallbacks(batchId); + try { const changes = await this.localStore.acknowledgeBatch( mutationBatchResult @@ -593,6 +598,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { // listen events. this.processUserCallback(batchId, error); + this.triggerPendingWritesCallbacks(batchId); + try { const changes = await this.localStore.rejectBatch(batchId); this.sharedClientState.updateMutationState(batchId, 'rejected', error); @@ -602,6 +609,58 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { } } + /** + * Registers a user callback that resolves when all pending mutations at the moment of calling + * are acknowledged . + */ + async registerPendingWritesCallback(callback: Deferred): Promise { + if (!this.remoteStore.canUseNetwork()) { + log.debug( + LOG_TAG, + 'The network is disabled. The task returned by ' + + "'awaitPendingWrites()' will not complete until the network is enabled." + ); + } + + const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId(); + if (highestBatchId === BATCHID_UNKNOWN) { + // Trigger the callback right away if there is no pending writes at the moment. + callback.resolve(); + return; + } + + const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || []; + callbacks.push(callback); + this.pendingWritesCallbacks.set(highestBatchId, callbacks); + } + + /** + * Triggers the callbacks that are waiting for this batch id to get acknowledged by server, + * if there are any. + */ + private triggerPendingWritesCallbacks(batchId: BatchId): void { + (this.pendingWritesCallbacks.get(batchId) || []).forEach(callback => { + callback.resolve(); + }); + + this.pendingWritesCallbacks.delete(batchId); + } + + /** Reject all outstanding callbacks waiting for pending writes to complete. */ + private rejectOutstandingPendingWritesCallbacks(errorMessage: string): void { + this.pendingWritesCallbacks.forEach(callbacks => { + callbacks.forEach(callback => { + callback.reject( + new FirestoreError( + Code.CANCELLED, errorMessage + ) + ); + }); + }); + + this.pendingWritesCallbacks.clear(); + } + private addMutationCallback( batchId: BatchId, callback: Deferred @@ -799,6 +858,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer { this.currentUser = user; if (userChanged) { + // Fails tasks waiting for pending writes requested by previous user. + this.rejectOutstandingPendingWritesCallbacks( + "'waitForPendingWrites' promise is rejected due to a user change." + ); + const result = await this.localStore.handleUserChange(user); // TODO(b/114226417): Consider calling this only in the primary tab. this.sharedClientState.handleUserChange( diff --git a/packages/firestore/src/local/indexeddb_mutation_queue.ts b/packages/firestore/src/local/indexeddb_mutation_queue.ts index 0a931364534..ec3d2df6152 100644 --- a/packages/firestore/src/local/indexeddb_mutation_queue.ts +++ b/packages/firestore/src/local/indexeddb_mutation_queue.ts @@ -265,6 +265,25 @@ export class IndexedDbMutationQueue implements MutationQueue { .next(() => foundBatch); } + getHighestUnacknowledgedBatchId( + transaction: PersistenceTransaction + ): PersistencePromise { + const range = IDBKeyRange.upperBound( + [this.userId, Number.POSITIVE_INFINITY] + ); + + let batchId = BATCHID_UNKNOWN; + return mutationsStore(transaction) + .iterate( + { index: DbMutationBatch.userMutationsIndex, range, reverse: true }, + (key, dbBatch, control) => { + batchId = dbBatch.batchId; + control.done(); + } + ) + .next(() => batchId); + } + getAllMutationBatches( transaction: PersistenceTransaction ): PersistencePromise { diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 19927eb4adb..4fe71997eb5 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -387,6 +387,20 @@ export class LocalStore { ); } + /** + * Returns the largest (latest) batch id in mutation queue that is pending server response. + * Returns `BATCHID_UNKNOWN` if the queue is empty. + */ + getHighestUnacknowledgedBatchId(): Promise { + return this.persistence.runTransaction( + 'Get highest unacknowledged batch id', + 'readonly', + txn => { + return this.mutationQueue.getHighestUnacknowledgedBatchId(txn); + } + ); + } + /** Returns the last recorded stream token for the current user. */ getLastStreamToken(): Promise { return this.persistence.runTransaction( diff --git a/packages/firestore/src/local/memory_mutation_queue.ts b/packages/firestore/src/local/memory_mutation_queue.ts index e384d329371..c114d8e3d95 100644 --- a/packages/firestore/src/local/memory_mutation_queue.ts +++ b/packages/firestore/src/local/memory_mutation_queue.ts @@ -21,7 +21,7 @@ import { BatchId, ProtoByteString } from '../core/types'; import { DocumentKeySet } from '../model/collections'; import { DocumentKey } from '../model/document_key'; import { Mutation } from '../model/mutation'; -import { MutationBatch } from '../model/mutation_batch'; +import { MutationBatch, BATCHID_UNKNOWN } from '../model/mutation_batch'; import { emptyByteString } from '../platform/platform'; import { assert } from '../util/assert'; import { primitiveComparator } from '../util/misc'; @@ -177,6 +177,12 @@ export class MemoryMutationQueue implements MutationQueue { ); } + getHighestUnacknowledgedBatchId(): PersistencePromise { + return PersistencePromise.resolve( + this.mutationQueue.length === 0 ? BATCHID_UNKNOWN : this.nextBatchId - 1 + ); + } + getAllMutationBatches( transaction: PersistenceTransaction ): PersistencePromise { diff --git a/packages/firestore/src/local/mutation_queue.ts b/packages/firestore/src/local/mutation_queue.ts index 1518fd52ae3..a396ded8c3f 100644 --- a/packages/firestore/src/local/mutation_queue.ts +++ b/packages/firestore/src/local/mutation_queue.ts @@ -103,6 +103,16 @@ export interface MutationQueue { batchId: BatchId ): PersistencePromise; + /** + * Gets the largest (latest) batch id in mutation queue for the current user that is pending + * server response, returns `BATCHID_UNKNOWN` if the queue is empty. + * + * @return the largest batch id in the mutation queue that is not acknowledged. + */ + getHighestUnacknowledgedBatchId( + transaction: PersistenceTransaction + ): PersistencePromise; + /** Gets all mutation batches in the mutation queue. */ // TODO(mikelehen): PERF: Current consumer only needs mutated keys; if we can // provide that cheaply, we should replace this. diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 21a4134ebec..d0dae70ca4b 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -327,7 +327,7 @@ export class RemoteStore implements TargetMetadataProvider { ); } - private canUseNetwork(): boolean { + canUseNetwork(): boolean { return this.isPrimary && this.networkEnabled; } diff --git a/packages/firestore/test/integration/api/database.test.ts b/packages/firestore/test/integration/api/database.test.ts index 651514d7cdb..38df1451802 100644 --- a/packages/firestore/test/integration/api/database.test.ts +++ b/packages/firestore/test/integration/api/database.test.ts @@ -37,8 +37,11 @@ import { withTestDbs, withTestDoc, withTestDocAndInitialData, - DEFAULT_SETTINGS + DEFAULT_SETTINGS, + waitForPendingWrites, + withMockCredentialProviderTestDb } from '../util/helpers'; +import { User } from '../../../src/auth/user'; // tslint:disable:no-floating-promises @@ -743,7 +746,7 @@ apiDescribe('Database', (persistence: boolean) => { }); }); }); - return Promise.all([deferred1.promise, deferred2.promise]).then(() => {}); + return Promise.all([deferred1.promise, deferred2.promise]).then(() => { }); }); }); @@ -782,7 +785,7 @@ apiDescribe('Database', (persistence: boolean) => { it('will reject listens', () => { const deferred = new Deferred(); queryForRejection.onSnapshot( - () => {}, + () => { }, (err: Error) => { expect(err.name).to.exist; expect(err.message).to.exist; @@ -795,12 +798,12 @@ apiDescribe('Database', (persistence: boolean) => { it('will reject same listens twice in a row', () => { const deferred = new Deferred(); queryForRejection.onSnapshot( - () => {}, + () => { }, (err: Error) => { expect(err.name).to.exist; expect(err.message).to.exist; queryForRejection.onSnapshot( - () => {}, + () => { }, (err2: Error) => { expect(err2.name).to.exist; expect(err2.message).to.exist; @@ -1120,4 +1123,48 @@ apiDescribe('Database', (persistence: boolean) => { }).to.throw(); }); }); + + it('can wait for pending writes', async () => { + await withTestDoc(persistence, async docRef => { + const firestore = docRef.firestore; + // Prevent pending writes receiving acknowledgement. + await firestore.disableNetwork(); + + const pendingWrites = docRef.set({ foo: 'bar' }); + const awaitPendingWrites = waitForPendingWrites(firestore); + + // pending writes can receive acknowledgements now. + await firestore.enableNetwork(); + await pendingWrites; + await awaitPendingWrites; + }); + }); + + it('waiting for pending writes should fail when user changes', async () => { + await withMockCredentialProviderTestDb(persistence, async (db, mockCredentialsProvider) => { + // Prevent pending writes receiving acknowledgement. + await db.disableNetwork(); + db.doc('abc/123').set({ foo: 'bar' }); + const awaitPendingWrite = waitForPendingWrites(db); + + mockCredentialsProvider.triggerUserChange(new User('user_1')); + + await expect(awaitPendingWrite).to.be.eventually.rejectedWith( + "'waitForPendingWrites' promise is rejected due to a user change." + ); + }); + }); + + it('waiting for pending writes resolves immediately when offline and no pending writes', + async () => { + await withTestDoc(persistence, async docRef => { + const firestore = docRef.firestore; + // Prevent pending writes receiving acknowledgement. + await firestore.disableNetwork(); + + // `awaitsPendingWrites` is created when there is no pending writes, it will resolve + // immediately even if we are offline. + await waitForPendingWrites(firestore); + }); + }); }); diff --git a/packages/firestore/test/integration/util/helpers.ts b/packages/firestore/test/integration/util/helpers.ts index e9df9e0c537..8ec10eb8122 100644 --- a/packages/firestore/test/integration/util/helpers.ts +++ b/packages/firestore/test/integration/util/helpers.ts @@ -18,6 +18,8 @@ import * as firestore from '@firebase/firestore-types'; import { clearTestPersistence } from './../../unit/local/persistence_test_helpers'; import firebase from './firebase_export'; +import { EmptyCredentialsProvider, CredentialChangeListener } from '../../../src/api/credentials'; +import { User } from '../../../src/auth/user'; /** * NOTE: These helpers are used by api/ tests and therefore may not have any @@ -124,6 +126,20 @@ export const apiDescribe = apiDescribeInternal.bind( apiDescribe.skip = apiDescribeInternal.bind(null, describe.skip); apiDescribe.only = apiDescribeInternal.bind(null, describe.only); +export class MockCredentialsProvider extends EmptyCredentialsProvider { + + private listener: CredentialChangeListener | null = null; + + triggerUserChange(newUser: User): void { + this.listener!(newUser); + } + + setChangeListener(listener: CredentialChangeListener): void { + super.setChangeListener(listener); + this.listener = listener; + } +} + /** Converts the documents in a QuerySnapshot to an array with the data of each document. */ export function toDataArray( docSet: firestore.QuerySnapshot @@ -163,6 +179,20 @@ export function withTestDb( }); } +export function withMockCredentialProviderTestDb( + persistence: boolean, + fn: (db: firestore.FirebaseFirestore, mockCredential: MockCredentialsProvider) => Promise +): Promise { + const mockCredentialsProvider = new MockCredentialsProvider(); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const settings = {... DEFAULT_SETTINGS, + credentials: { client: mockCredentialsProvider, type: 'provider' }}; + return withTestDbsSettings(persistence, DEFAULT_PROJECT_ID, settings, 1, + ([db]) => { + return fn(db, mockCredentialsProvider); + }); +} + /** Runs provided fn with a db for an alternate project id. */ export function withAlternateTestDb( persistence: boolean, @@ -321,6 +351,11 @@ export function shutdownDb(db: firestore.FirebaseFirestore): Promise { return (db as any)._shutdown(); } +export function waitForPendingWrites(db: firestore.FirebaseFirestore): Promise { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return (db as any)._waitForPendingWrites(); +} + // TODO(in-queries): This exists just so we don't have to do the cast // repeatedly. Once we expose 'array-contains-any' publicly we can remove it and // just use 'array-contains-any' in all the tests. diff --git a/packages/firestore/test/unit/local/local_store.test.ts b/packages/firestore/test/unit/local/local_store.test.ts index df48fc0cbdb..c7870398c16 100644 --- a/packages/firestore/test/unit/local/local_store.test.ts +++ b/packages/firestore/test/unit/local/local_store.test.ts @@ -20,7 +20,7 @@ import { PublicFieldValue } from '../../../src/api/field_value'; import { Timestamp } from '../../../src/api/timestamp'; import { User } from '../../../src/auth/user'; import { Query } from '../../../src/core/query'; -import { TargetId } from '../../../src/core/types'; +import { TargetId, BatchId } from '../../../src/core/types'; import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence'; import { LocalStore, LocalWriteResult } from '../../../src/local/local_store'; import { LocalViewChanges } from '../../../src/local/local_view_changes'; @@ -38,7 +38,8 @@ import { } from '../../../src/model/mutation'; import { MutationBatch, - MutationBatchResult + MutationBatchResult, + BATCHID_UNKNOWN } from '../../../src/model/mutation_batch'; import { emptyByteString } from '../../../src/platform/platform'; import { RemoteEvent } from '../../../src/remote/remote_event'; @@ -261,6 +262,15 @@ class LocalStoreTester { } } + toReturnHighestUnacknowledgeBatchId(expectedId: BatchId): LocalStoreTester { + this.promiseChain = this.promiseChain.then(() => { + return this.localStore.getHighestUnacknowledgedBatchId().then(actual => { + expect(actual).to.equal(expectedId); + }); + }); + return this; + } + finish(): Promise { return this.promiseChain; } @@ -1237,4 +1247,18 @@ function genericLocalStoreTests( .toContain(doc('foo/bar', 1, { sum: 1 }, { hasLocalMutations: true })) .finish(); }); + + it('computes highest unacknowledged batch id correctly', () => { + return expectLocalStore() + .toReturnHighestUnacknowledgeBatchId(BATCHID_UNKNOWN) + .afterMutations([setMutation('foo/bar', {})]) + .toReturnHighestUnacknowledgeBatchId(1) + .afterMutations([patchMutation('foo/bar', { abc: 123 })]) + .toReturnHighestUnacknowledgeBatchId(2) + .afterAcknowledgingMutation({ documentVersion: 1 }) + .toReturnHighestUnacknowledgeBatchId(2) + .afterRejectingMutation() + .toReturnHighestUnacknowledgeBatchId(BATCHID_UNKNOWN) + .finish(); + }); }