-
Notifications
You must be signed in to change notification settings - Fork 936
Implement global resume token #1052
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
338ac12
2051b1a
6bedde4
b4397fc
2cd9953
a4a7983
20728a5
6033958
f303c13
7d9d21d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,6 +111,15 @@ export interface LocalWriteResult { | |
* unrecoverable error (should be caught / reported by the async_queue). | ||
*/ | ||
export class LocalStore { | ||
/** | ||
* The maximum time to leave a resume token buffered without writing it out. | ||
* This value is arbitrary: it's long enough to avoid several writes | ||
* (possibly indefinitely if updates come more frequently than this) but | ||
* short enough that restarting after crashing will still have a pretty | ||
* recent resume token. | ||
*/ | ||
private static readonly RESUME_TOKEN_MAX_AGE_MICROS = 5 * 60 * 1e6; | ||
|
||
/** | ||
* The set of all mutations that have been sent but not yet been applied to | ||
* the backend. | ||
|
@@ -469,12 +478,18 @@ export class LocalStore { | |
// any preexisting value. | ||
const resumeToken = change.resumeToken; | ||
if (resumeToken.length > 0) { | ||
const oldQueryData = queryData; | ||
queryData = queryData.copy({ | ||
resumeToken, | ||
snapshotVersion: remoteEvent.snapshotVersion | ||
}); | ||
this.targetIds[targetId] = queryData; | ||
promises.push(this.queryCache.updateQueryData(txn, queryData)); | ||
|
||
if ( | ||
LocalStore.shouldPersistQueryData(oldQueryData, queryData, change) | ||
) { | ||
promises.push(this.queryCache.updateQueryData(txn, queryData)); | ||
} | ||
} | ||
} | ||
); | ||
|
@@ -550,6 +565,50 @@ export class LocalStore { | |
}); | ||
} | ||
|
||
/** | ||
* Returns true if the newQueryData should be persisted during an update of | ||
* an active target. QueryData should always be persisted when a target is | ||
* being released and should not call this function. | ||
* | ||
* While the target is active, QueryData updates can be omitted when nothing | ||
* about the target has changed except metadata like the resume token or | ||
* snapshot version. Occasionally it's worth the extra write to prevent these | ||
* values from getting too stale after a crash, but this doesn't have to be | ||
* too frequent. | ||
*/ | ||
private static shouldPersistQueryData( | ||
oldQueryData: QueryData, | ||
newQueryData: QueryData, | ||
change: TargetChange | ||
): boolean { | ||
// Avoid clearing any existing value | ||
if (newQueryData.resumeToken.length === 0) return false; | ||
|
||
// Any resume token is interesting if there isn't one already. | ||
if (oldQueryData.resumeToken.length === 0) return true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just learned something about our style guide: "Control flow statements spanning multiple lines always use blocks for the containing code. Good to know :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for looking this up so I wouldn't have to. :-) |
||
|
||
// Don't allow resume token changes to be buffered indefinitely. This | ||
// allows us to be reasonably up-to-date after a crash and avoids needing | ||
// to loop over all active queries on shutdown. Especially in the browser | ||
// we may not get time to do anything interesting while the current tab is | ||
// closing. | ||
const timeDelta = | ||
newQueryData.snapshotVersion.toMicroseconds() - | ||
oldQueryData.snapshotVersion.toMicroseconds(); | ||
if (timeDelta >= this.RESUME_TOKEN_MAX_AGE_MICROS) return true; | ||
|
||
// Otherwise if the only thing that has changed about a target is its resume | ||
// token it's not worth persisting. Note that the RemoteStore keeps an | ||
// in-memory view of the currently active targets which includes the current | ||
// resume token, so stream failure or user changes will still use an | ||
// up-to-date resume token regardless of what we do here. | ||
const changes = | ||
change.addedDocuments.size + | ||
change.modifiedDocuments.size + | ||
change.removedDocuments.size; | ||
return changes > 0; | ||
} | ||
|
||
/** | ||
* Notify local store of the changed views to locally pin documents. | ||
*/ | ||
|
@@ -638,10 +697,22 @@ export class LocalStore { | |
queryData != null, | ||
'Tried to release nonexistent query: ' + query | ||
); | ||
this.localViewReferences.removeReferencesForId(queryData!.targetId); | ||
delete this.targetIds[queryData!.targetId]; | ||
|
||
const targetId = queryData!.targetId; | ||
const cachedQueryData = this.targetIds[targetId]; | ||
|
||
this.localViewReferences.removeReferencesForId(targetId); | ||
delete this.targetIds[targetId]; | ||
if (this.garbageCollector.isEager) { | ||
return this.queryCache.removeQueryData(txn, queryData!); | ||
} else if ( | ||
cachedQueryData.snapshotVersion > queryData!.snapshotVersion | ||
) { | ||
// If we've been avoiding persisting the resumeToken (see | ||
// shouldPersistQueryData for conditions and rationale) we need to | ||
// persist the token now because there will no longer be an | ||
// in-memory version to fall back on. | ||
return this.queryCache.updateQueryData(txn, cachedQueryData); | ||
} else { | ||
return PersistencePromise.resolve(); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -509,4 +509,88 @@ describeSpec('Listens:', [], () => { | |
.watchAcksFull(query, 3000) | ||
.expectEvents(query, {}); | ||
}); | ||
|
||
specTest('Persists global resume tokens on unlisten', [], () => { | ||
const query = Query.atPath(path('collection')); | ||
const docA = doc('collection/a', 1000, { key: 'a' }); | ||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
.userListens(query) | ||
.watchAcksFull(query, 1000, docA) | ||
.expectEvents(query, { added: [docA] }) | ||
|
||
// Some time later, watch sends an updated resume token and the user stops | ||
// listening. | ||
.watchSnapshots(2000, [], 'resume-token-2000') | ||
.userUnlistens(query) | ||
.watchRemoves(query) | ||
|
||
.userListens(query, 'resume-token-2000') | ||
.expectEvents(query, { added: [docA], fromCache: true }) | ||
.watchAcks(query) | ||
.watchCurrents(query, 'resume-token-3000') | ||
.watchSnapshots(3000) | ||
.expectEvents(query, { fromCache: false }) | ||
); | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems pretty straightforward to test the MAX_RESUME_TOKEN_BUFFERING_MICROS logic too if we wanted to (might need to use .restart() to test re-listening without unlistening). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay fine, you caught my lazy bones out. Done. |
||
|
||
specTest('Omits global resume tokens for a short while', [], () => { | ||
const query = Query.atPath(path('collection')); | ||
const docA = doc('collection/a', 1000, { key: 'a' }); | ||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
.userListens(query) | ||
.watchAcksFull(query, 1000, docA) | ||
.expectEvents(query, { added: [docA] }) | ||
|
||
// One millisecond later, watch sends an updated resume token but the | ||
// user doesn't manage to unlisten before restart. | ||
.watchSnapshots(2000, [], 'resume-token-2000') | ||
.restart() | ||
|
||
.userListens(query, 'resume-token-1000') | ||
.expectEvents(query, { added: [docA], fromCache: true }) | ||
.watchAcks(query) | ||
.watchCurrents(query, 'resume-token-3000') | ||
.watchSnapshots(3000) | ||
.expectEvents(query, { fromCache: false }) | ||
); | ||
}); | ||
|
||
specTest( | ||
'Persists global resume tokens if the snapshot is old enough', | ||
[], | ||
() => { | ||
const initialVersion = 1000; | ||
const minutesLater = 5 * 60 * 1e6 + initialVersion; | ||
const evenLater = 1000 + minutesLater; | ||
|
||
const query = Query.atPath(path('collection')); | ||
const docA = doc('collection/a', initialVersion, { key: 'a' }); | ||
|
||
return ( | ||
spec() | ||
.withGCEnabled(false) | ||
.userListens(query) | ||
.watchAcksFull(query, initialVersion, docA) | ||
.expectEvents(query, { added: [docA] }) | ||
|
||
// 5 minutes later, watch sends an updated resume token but the user | ||
// doesn't manage to unlisten before restart. | ||
.watchSnapshots(minutesLater, [], 'resume-token-minutes-later') | ||
.restart() | ||
|
||
.userListens(query, 'resume-token-minutes-later') | ||
.expectEvents(query, { added: [docA], fromCache: true }) | ||
.watchAcks(query) | ||
.watchCurrents(query, 'resume-token-even-later') | ||
.watchSnapshots(evenLater) | ||
.expectEvents(query, { fromCache: false }) | ||
); | ||
} | ||
); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This functions seems more like a
shouldPersistQueryData
to me. To me, that would describe the way it is used and the internal logic better than its current name.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.