Skip to content

Port waitForPendingWrites from android #2081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,25 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
return this._firestoreClient!.clientShutdown;
}

/**
* Waits until all currently pending writes for the active user have been acknowledged by the
* backend.
*
* The returned Promise resolves immediately if there are no outstanding writes. Otherwise, the
* Promise waits for all previously issued writes (including those written in a previous app
* session), but it does not wait for writes that were added after the method is called. If you
* wish to wait for additional writes, you have to call `waitForPendingWrites()` again.
*
* Any outstanding `waitForPendingWrites()` Promises are rejected during user changes.
*
* @return A Promise which resolves when all currently pending writes have been
* acknowledged by the backend.
*/
_waitForPendingWrites(): Promise<void> {
this.ensureClientConfigured();
return this._firestoreClient!.waitForPendingWrites();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please call ensureClientConfigured() to make sure this works even when it is the first operation that is scheduled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

ensureClientConfigured(): FirestoreClient {
if (!this._firestoreClient) {
// Kick off starting the client but don't actually wait for it.
Expand Down
15 changes: 15 additions & 0 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,21 @@ export class FirestoreClient {
});
}

/**
* Returns a Promise that resolves when all writes that were pending at the time this
* method was called received server acknowledgement. An acknowledgement can be either acceptance
* or rejection.
*/
waitForPendingWrites(): Promise<void> {
this.verifyNotShutdown();

const deferred = new Deferred<void>();
this.asyncQueue.enqueueAndForget(() => {
return this.syncEngine.registerPendingWritesCallback(deferred);
});
return deferred.promise;
}

listen(
query: Query,
observer: Observer<ViewSnapshot>,
Expand Down
68 changes: 66 additions & 2 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ import {
import { MaybeDocument, NoDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { MutationBatchResult } from '../model/mutation_batch';
import { MutationBatchResult, BATCHID_UNKNOWN } from '../model/mutation_batch';
import { RemoteEvent, TargetChange } from '../remote/remote_event';
import { RemoteStore } from '../remote/remote_store';
import { RemoteSyncer } from '../remote/remote_syncer';
import { assert, fail } from '../util/assert';
import { FirestoreError } from '../util/error';
import { Code, FirestoreError } from '../util/error';
import * as log from '../util/log';
import { primitiveComparator } from '../util/misc';
import { ObjectMap } from '../util/obj_map';
Expand Down Expand Up @@ -160,6 +160,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
private mutationUserCallbacks = {} as {
[uidKey: string]: SortedMap<BatchId, Deferred<void>>;
};
/** Stores user callbacks waiting for all pending writes to be acknowledged. */
private pendingWritesCallbacks = new Map<BatchId, Array<Deferred<void>>>();
private limboTargetIdGenerator = TargetIdGenerator.forSyncEngine();

// The primary state is set to `true` or `false` immediately after Firestore
Expand Down Expand Up @@ -450,6 +452,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
(this.isPrimary && source === OnlineStateSource.RemoteStore) ||
(!this.isPrimary && source === OnlineStateSource.SharedClientState)
) {
this.assertSubscribed('applyOnlineStateChange()');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This was missing but should be present. Otherwise this.syncEngineListener!.* might hit errors.

const newViewSnapshots = [] as ViewSnapshot[];
this.queryViewsByQuery.forEach((query, queryView) => {
const viewChange = queryView.view.applyOnlineStateChange(onlineState);
Expand Down Expand Up @@ -570,6 +573,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// before listen events.
this.processUserCallback(batchId, /*error=*/ null);

this.triggerPendingWritesCallbacks(batchId);

try {
const changes = await this.localStore.acknowledgeBatch(
mutationBatchResult
Expand All @@ -593,6 +598,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
// listen events.
this.processUserCallback(batchId, error);

this.triggerPendingWritesCallbacks(batchId);

try {
const changes = await this.localStore.rejectBatch(batchId);
this.sharedClientState.updateMutationState(batchId, 'rejected', error);
Expand All @@ -602,6 +609,58 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
}
}

/**
* Registers a user callback that resolves when all pending mutations at the moment of calling
* are acknowledged .
*/
async registerPendingWritesCallback(callback: Deferred<void>): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't seem to have to be async. You can just return void.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is executed on async queue, the enqueue method demands it to return a promise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the pattern that we use here: https://github.com/firebase/firebase-js-sdk/pull/2063/files#diff-3bab365c04d7fc7ae24002035afdd01fR614

It makes the interactions in the client simpler if we limit async methods to their smallest possible scope. Right now, registerPendingWritesCallback only has one consumer, but once it has two or more this would really make a difference (and even with just one consumer, you can already remove two Promise.resolve() invocations that the transpiler adds for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.localStore.getHighestUnacknowledgedBatchId() returns a promise, which demands it to be async.

if (!this.remoteStore.canUseNetwork()) {
log.debug(
LOG_TAG,
'The network is disabled. The task returned by ' +
"'awaitPendingWrites()' will not complete until the network is enabled."
);
}

const highestBatchId = await this.localStore.getHighestUnacknowledgedBatchId();
if (highestBatchId === BATCHID_UNKNOWN) {
// Trigger the callback right away if there is no pending writes at the moment.
callback.resolve();
return;
}

const callbacks = this.pendingWritesCallbacks.get(highestBatchId) || [];
callbacks.push(callback);
this.pendingWritesCallbacks.set(highestBatchId, callbacks);
}

/**
* Triggers the callbacks that are waiting for this batch id to get acknowledged by server,
* if there are any.
*/
private triggerPendingWritesCallbacks(batchId: BatchId): void {
(this.pendingWritesCallbacks.get(batchId) || []).forEach(callback => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use a for ... of loop then you don't need to create an empty array here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that does not work..compiler still unhappy about it could be potentially undefined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned something today :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:)

callback.resolve();
});

this.pendingWritesCallbacks.delete(batchId);
}

/** Reject all outstanding callbacks waiting for pending writes to complete. */
private rejectOutstandingPendingWritesCallbacks(errorMessage: string): void {
this.pendingWritesCallbacks.forEach(callbacks => {
callbacks.forEach(callback => {
callback.reject(
new FirestoreError(
Code.CANCELLED, errorMessage
)
);
});
});

this.pendingWritesCallbacks.clear();
}

private addMutationCallback(
batchId: BatchId,
callback: Deferred<void>
Expand Down Expand Up @@ -799,6 +858,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
this.currentUser = user;

if (userChanged) {
// Fails tasks waiting for pending writes requested by previous user.
this.rejectOutstandingPendingWritesCallbacks(
"'waitForPendingWrites' promise is rejected due to a user change."
);

const result = await this.localStore.handleUserChange(user);
// TODO(b/114226417): Consider calling this only in the primary tab.
this.sharedClientState.handleUserChange(
Expand Down
19 changes: 19 additions & 0 deletions packages/firestore/src/local/indexeddb_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,25 @@ export class IndexedDbMutationQueue implements MutationQueue {
.next(() => foundBatch);
}

getHighestUnacknowledgedBatchId(
transaction: PersistenceTransaction
): PersistencePromise<BatchId> {
const range = IDBKeyRange.upperBound(
[this.userId, Number.POSITIVE_INFINITY]
);

let batchId = BATCHID_UNKNOWN;
return mutationsStore(transaction)
.iterate(
{ index: DbMutationBatch.userMutationsIndex, range, reverse: true },
(key, dbBatch, control) => {
batchId = dbBatch.batchId;
control.done();
}
)
.next(() => batchId);
}

getAllMutationBatches(
transaction: PersistenceTransaction
): PersistencePromise<MutationBatch[]> {
Expand Down
14 changes: 14 additions & 0 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,20 @@ export class LocalStore {
);
}

/**
* Returns the largest (latest) batch id in mutation queue that is pending server response.
* Returns `BATCHID_UNKNOWN` if the queue is empty.
*/
getHighestUnacknowledgedBatchId(): Promise<BatchId> {
return this.persistence.runTransaction(
'Get highest unacknowledged batch id',
'readonly',
txn => {
return this.mutationQueue.getHighestUnacknowledgedBatchId(txn);
}
);
}

/** Returns the last recorded stream token for the current user. */
getLastStreamToken(): Promise<ProtoByteString> {
return this.persistence.runTransaction(
Expand Down
8 changes: 7 additions & 1 deletion packages/firestore/src/local/memory_mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { BatchId, ProtoByteString } from '../core/types';
import { DocumentKeySet } from '../model/collections';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { MutationBatch } from '../model/mutation_batch';
import { MutationBatch, BATCHID_UNKNOWN } from '../model/mutation_batch';
import { emptyByteString } from '../platform/platform';
import { assert } from '../util/assert';
import { primitiveComparator } from '../util/misc';
Expand Down Expand Up @@ -177,6 +177,12 @@ export class MemoryMutationQueue implements MutationQueue {
);
}

getHighestUnacknowledgedBatchId(): PersistencePromise<BatchId> {
return PersistencePromise.resolve(
this.mutationQueue.length === 0 ? BATCHID_UNKNOWN : this.nextBatchId - 1
);
}

getAllMutationBatches(
transaction: PersistenceTransaction
): PersistencePromise<MutationBatch[]> {
Expand Down
10 changes: 10 additions & 0 deletions packages/firestore/src/local/mutation_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ export interface MutationQueue {
batchId: BatchId
): PersistencePromise<MutationBatch | null>;

/**
* Gets the largest (latest) batch id in mutation queue for the current user that is pending
* server response, returns `BATCHID_UNKNOWN` if the queue is empty.
*
* @return the largest batch id in the mutation queue that is not acknowledged.
*/
getHighestUnacknowledgedBatchId(
transaction: PersistenceTransaction
): PersistencePromise<BatchId>;

/** Gets all mutation batches in the mutation queue. */
// TODO(mikelehen): PERF: Current consumer only needs mutated keys; if we can
// provide that cheaply, we should replace this.
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ export class RemoteStore implements TargetMetadataProvider {
);
}

private canUseNetwork(): boolean {
canUseNetwork(): boolean {
return this.isPrimary && this.networkEnabled;
}

Expand Down
57 changes: 52 additions & 5 deletions packages/firestore/test/integration/api/database.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ import {
withTestDbs,
withTestDoc,
withTestDocAndInitialData,
DEFAULT_SETTINGS
DEFAULT_SETTINGS,
waitForPendingWrites,
withMockCredentialProviderTestDb
} from '../util/helpers';
import { User } from '../../../src/auth/user';

// tslint:disable:no-floating-promises

Expand Down Expand Up @@ -743,7 +746,7 @@ apiDescribe('Database', (persistence: boolean) => {
});
});
});
return Promise.all([deferred1.promise, deferred2.promise]).then(() => {});
return Promise.all([deferred1.promise, deferred2.promise]).then(() => { });
});
});

Expand Down Expand Up @@ -782,7 +785,7 @@ apiDescribe('Database', (persistence: boolean) => {
it('will reject listens', () => {
const deferred = new Deferred();
queryForRejection.onSnapshot(
() => {},
() => { },
(err: Error) => {
expect(err.name).to.exist;
expect(err.message).to.exist;
Expand All @@ -795,12 +798,12 @@ apiDescribe('Database', (persistence: boolean) => {
it('will reject same listens twice in a row', () => {
const deferred = new Deferred();
queryForRejection.onSnapshot(
() => {},
() => { },
(err: Error) => {
expect(err.name).to.exist;
expect(err.message).to.exist;
queryForRejection.onSnapshot(
() => {},
() => { },
(err2: Error) => {
expect(err2.name).to.exist;
expect(err2.message).to.exist;
Expand Down Expand Up @@ -1120,4 +1123,48 @@ apiDescribe('Database', (persistence: boolean) => {
}).to.throw();
});
});

it('can wait for pending writes', async () => {
await withTestDoc(persistence, async docRef => {
const firestore = docRef.firestore;
// Prevent pending writes receiving acknowledgement.
await firestore.disableNetwork();

const pendingWrites = docRef.set({ foo: 'bar' });
const awaitPendingWrites = waitForPendingWrites(firestore);

// pending writes can receive acknowledgements now.
await firestore.enableNetwork();
await pendingWrites;
await awaitPendingWrites;
});
});

it('waiting for pending writes should fail when user changes', async () => {
await withMockCredentialProviderTestDb(persistence, async (db, mockCredentialsProvider) => {
// Prevent pending writes receiving acknowledgement.
await db.disableNetwork();
db.doc('abc/123').set({ foo: 'bar' });
const awaitPendingWrite = waitForPendingWrites(db);

mockCredentialsProvider.triggerUserChange(new User('user_1'));

await expect(awaitPendingWrite).to.be.eventually.rejectedWith(
"'waitForPendingWrites' promise is rejected due to a user change."
);
});
});

it('waiting for pending writes resolves immediately when offline and no pending writes',
async () => {
await withTestDoc(persistence, async docRef => {
const firestore = docRef.firestore;
// Prevent pending writes receiving acknowledgement.
await firestore.disableNetwork();

// `awaitsPendingWrites` is created when there is no pending writes, it will resolve
// immediately even if we are offline.
await waitForPendingWrites(firestore);
});
});
});
Loading