diff --git a/packages/firestore/src/core/sync_engine.ts b/packages/firestore/src/core/sync_engine.ts index 37e63de676a..30f92ff5bea 100644 --- a/packages/firestore/src/core/sync_engine.ts +++ b/packages/firestore/src/core/sync_engine.ts @@ -46,7 +46,7 @@ import { Query } from './query'; import { SnapshotVersion } from './snapshot_version'; import { TargetIdGenerator } from './target_id_generator'; import { Transaction } from './transaction'; -import { BatchId, OnlineState, ProtoByteString, TargetId } from './types'; +import { BatchId, OnlineState, TargetId } from './types'; import { AddedLimboDocument, LimboDocumentChange, @@ -77,12 +77,6 @@ class QueryView { * stream to identify this query. */ public targetId: TargetId, - /** - * An identifier from the datastore backend that indicates the last state - * of the results that was received. This can be used to indicate where - * to continue receiving new doc changes for the query. - */ - public resumeToken: ProtoByteString, /** * The view is responsible for computing the final merged truth of what * docs are in the query. It gets notified of local and remote changes, @@ -195,12 +189,7 @@ export class SyncEngine implements RemoteSyncer { 'applyChanges for new view should always return a snapshot' ); - const data = new QueryView( - query, - queryData.targetId, - queryData.resumeToken, - view - ); + const data = new QueryView(query, queryData.targetId, view); this.queryViewsByQuery.set(query, data); this.queryViewsByTarget[queryData.targetId] = data; this.viewHandler!([viewChange.snapshot!]); diff --git a/packages/firestore/src/local/indexeddb_query_cache.ts b/packages/firestore/src/local/indexeddb_query_cache.ts index 253b07fd524..83a6907b5dc 100644 --- a/packages/firestore/src/local/indexeddb_query_cache.ts +++ b/packages/firestore/src/local/indexeddb_query_cache.ts @@ -44,7 +44,7 @@ export class IndexedDbQueryCache implements QueryCache { constructor(private serializer: LocalSerializer) {} /** - * The last received snapshot version. We store this seperately from the + * The last received snapshot version. We store this separately from the * metadata to avoid the extra conversion to/from DbTimestamp. */ private lastRemoteSnapshotVersion = SnapshotVersion.MIN; @@ -173,7 +173,7 @@ export class IndexedDbQueryCache implements QueryCache { ): PersistencePromise { // Iterating by the canonicalId may yield more than one result because // canonicalId values are not required to be unique per target. This query - // depends on the queryTargets index to be efficent. + // depends on the queryTargets index to be efficient. const canonicalId = query.canonicalId(); const range = IDBKeyRange.bound( [canonicalId, Number.NEGATIVE_INFINITY], @@ -202,7 +202,7 @@ export class IndexedDbQueryCache implements QueryCache { targetId: TargetId ): PersistencePromise { // PORTING NOTE: The reverse index (documentsTargets) is maintained by - // Indexeddb. + // IndexedDb. const promises: Array> = []; const store = documentTargetStore(txn); keys.forEach(key => { diff --git a/packages/firestore/src/local/local_store.ts b/packages/firestore/src/local/local_store.ts index 62a8158a2de..5b0b797bb5d 100644 --- a/packages/firestore/src/local/local_store.ts +++ b/packages/firestore/src/local/local_store.ts @@ -111,6 +111,15 @@ export interface LocalWriteResult { * unrecoverable error (should be caught / reported by the async_queue). */ export class LocalStore { + /** + * The maximum time to leave a resume token buffered without writing it out. + * This value is arbitrary: it's long enough to avoid several writes + * (possibly indefinitely if updates come more frequently than this) but + * short enough that restarting after crashing will still have a pretty + * recent resume token. + */ + private static readonly RESUME_TOKEN_MAX_AGE_MICROS = 5 * 60 * 1e6; + /** * The set of all mutations that have been sent but not yet been applied to * the backend. @@ -469,12 +478,18 @@ export class LocalStore { // any preexisting value. const resumeToken = change.resumeToken; if (resumeToken.length > 0) { + const oldQueryData = queryData; queryData = queryData.copy({ resumeToken, snapshotVersion: remoteEvent.snapshotVersion }); this.targetIds[targetId] = queryData; - promises.push(this.queryCache.updateQueryData(txn, queryData)); + + if ( + LocalStore.shouldPersistQueryData(oldQueryData, queryData, change) + ) { + promises.push(this.queryCache.updateQueryData(txn, queryData)); + } } } ); @@ -550,6 +565,50 @@ export class LocalStore { }); } + /** + * Returns true if the newQueryData should be persisted during an update of + * an active target. QueryData should always be persisted when a target is + * being released and should not call this function. + * + * While the target is active, QueryData updates can be omitted when nothing + * about the target has changed except metadata like the resume token or + * snapshot version. Occasionally it's worth the extra write to prevent these + * values from getting too stale after a crash, but this doesn't have to be + * too frequent. + */ + private static shouldPersistQueryData( + oldQueryData: QueryData, + newQueryData: QueryData, + change: TargetChange + ): boolean { + // Avoid clearing any existing value + if (newQueryData.resumeToken.length === 0) return false; + + // Any resume token is interesting if there isn't one already. + if (oldQueryData.resumeToken.length === 0) return true; + + // Don't allow resume token changes to be buffered indefinitely. This + // allows us to be reasonably up-to-date after a crash and avoids needing + // to loop over all active queries on shutdown. Especially in the browser + // we may not get time to do anything interesting while the current tab is + // closing. + const timeDelta = + newQueryData.snapshotVersion.toMicroseconds() - + oldQueryData.snapshotVersion.toMicroseconds(); + if (timeDelta >= this.RESUME_TOKEN_MAX_AGE_MICROS) return true; + + // Otherwise if the only thing that has changed about a target is its resume + // token it's not worth persisting. Note that the RemoteStore keeps an + // in-memory view of the currently active targets which includes the current + // resume token, so stream failure or user changes will still use an + // up-to-date resume token regardless of what we do here. + const changes = + change.addedDocuments.size + + change.modifiedDocuments.size + + change.removedDocuments.size; + return changes > 0; + } + /** * Notify local store of the changed views to locally pin documents. */ @@ -638,10 +697,22 @@ export class LocalStore { queryData != null, 'Tried to release nonexistent query: ' + query ); - this.localViewReferences.removeReferencesForId(queryData!.targetId); - delete this.targetIds[queryData!.targetId]; + + const targetId = queryData!.targetId; + const cachedQueryData = this.targetIds[targetId]; + + this.localViewReferences.removeReferencesForId(targetId); + delete this.targetIds[targetId]; if (this.garbageCollector.isEager) { return this.queryCache.removeQueryData(txn, queryData!); + } else if ( + cachedQueryData.snapshotVersion > queryData!.snapshotVersion + ) { + // If we've been avoiding persisting the resumeToken (see + // shouldPersistQueryData for conditions and rationale) we need to + // persist the token now because there will no longer be an + // in-memory version to fall back on. + return this.queryCache.updateQueryData(txn, cachedQueryData); } else { return PersistencePromise.resolve(); } diff --git a/packages/firestore/src/remote/watch_change.ts b/packages/firestore/src/remote/watch_change.ts index faeef82abe5..8ccebed2c69 100644 --- a/packages/firestore/src/remote/watch_change.ts +++ b/packages/firestore/src/remote/watch_change.ts @@ -297,7 +297,7 @@ export class WatchChangeAggregator { /** Processes and adds the WatchTargetChange to the current set of changes. */ handleTargetChange(targetChange: WatchTargetChange): void { - targetChange.targetIds.forEach(targetId => { + this.forEachTarget(targetChange, targetId => { const targetState = this.ensureTargetState(targetId); switch (targetChange.state) { case WatchTargetChangeState.NoChange: @@ -352,6 +352,22 @@ export class WatchChangeAggregator { }); } + /** + * Iterates over all targetIds that the watch change applies to: either the + * targetIds explicitly listed in the change or the targetIds of all currently + * active targets. + */ + forEachTarget( + targetChange: WatchTargetChange, + fn: (targetId: TargetId) => void + ): void { + if (targetChange.targetIds.length > 0) { + targetChange.targetIds.forEach(fn); + } else { + objUtils.forEachNumber(this.targetStates, fn); + } + } + /** * Handles existence filters and synthesizes deletes for filter mismatches. * Targets that are invalidated by filter mismatches are added to diff --git a/packages/firestore/test/unit/specs/listen_spec.test.ts b/packages/firestore/test/unit/specs/listen_spec.test.ts index 1b7f4f73134..1442cfedd04 100644 --- a/packages/firestore/test/unit/specs/listen_spec.test.ts +++ b/packages/firestore/test/unit/specs/listen_spec.test.ts @@ -509,4 +509,88 @@ describeSpec('Listens:', [], () => { .watchAcksFull(query, 3000) .expectEvents(query, {}); }); + + specTest('Persists global resume tokens on unlisten', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // Some time later, watch sends an updated resume token and the user stops + // listening. + .watchSnapshots(2000, [], 'resume-token-2000') + .userUnlistens(query) + .watchRemoves(query) + + .userListens(query, 'resume-token-2000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest('Omits global resume tokens for a short while', [], () => { + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, 1000, docA) + .expectEvents(query, { added: [docA] }) + + // One millisecond later, watch sends an updated resume token but the + // user doesn't manage to unlisten before restart. + .watchSnapshots(2000, [], 'resume-token-2000') + .restart() + + .userListens(query, 'resume-token-1000') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-3000') + .watchSnapshots(3000) + .expectEvents(query, { fromCache: false }) + ); + }); + + specTest( + 'Persists global resume tokens if the snapshot is old enough', + [], + () => { + const initialVersion = 1000; + const minutesLater = 5 * 60 * 1e6 + initialVersion; + const evenLater = 1000 + minutesLater; + + const query = Query.atPath(path('collection')); + const docA = doc('collection/a', initialVersion, { key: 'a' }); + + return ( + spec() + .withGCEnabled(false) + .userListens(query) + .watchAcksFull(query, initialVersion, docA) + .expectEvents(query, { added: [docA] }) + + // 5 minutes later, watch sends an updated resume token but the user + // doesn't manage to unlisten before restart. + .watchSnapshots(minutesLater, [], 'resume-token-minutes-later') + .restart() + + .userListens(query, 'resume-token-minutes-later') + .expectEvents(query, { added: [docA], fromCache: true }) + .watchAcks(query) + .watchCurrents(query, 'resume-token-even-later') + .watchSnapshots(evenLater) + .expectEvents(query, { fromCache: false }) + ); + } + ); });