Skip to content

Removing held write batch handling from LocalStore #1997

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Firestore/Example/Tests/Core/FSTViewTests.mm
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,7 @@ - (void)testRemovesKeysFromMutatedKeysWhenNewDocHasNoLocalChanges {
[view applyChangesToDocuments:changes];
XCTAssertEqual(changes.mutatedKeys, (DocumentKeySet{doc2.key}));

FSTDocument *doc2Prime =
FSTTestDoc("rooms/eros/messages/1", 0, @{}, FSTDocumentStateSynced);
FSTDocument *doc2Prime = FSTTestDoc("rooms/eros/messages/1", 0, @{}, FSTDocumentStateSynced);
changes = [view computeChangesWithDocuments:FSTTestDocUpdates(@[ doc2Prime ])];
[view applyChangesToDocuments:changes];
XCTAssertEqual(changes.mutatedKeys, DocumentKeySet{});
Expand Down
132 changes: 10 additions & 122 deletions Firestore/Source/Local/FSTLocalStore.mm
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,6 @@ @interface FSTLocalStore ()
/** Maps a targetID to data about its query. */
@property(nonatomic, strong) NSMutableDictionary<NSNumber *, FSTQueryData *> *targetIDs;

/**
* A heldBatchResult is a mutation batch result (from a write acknowledgement) that arrived before
* the watch stream got notified of a snapshot that includes the write.  So we "hold" it until
* the watch stream catches up. It ensures that the local write remains visible (latency
* compensation) and doesn't temporarily appear reverted because the watch stream is slower than
* the write stream and so wasn't reflecting it.
*
* NOTE: Eventually we want to move this functionality into the remote store.
*/
@property(nonatomic, strong) NSMutableArray<FSTMutationBatchResult *> *heldBatchResults;

@end

@implementation FSTLocalStore {
Expand All @@ -117,7 +106,6 @@ - (instancetype)initWithPersistence:(id<FSTPersistence>)persistence
[_persistence.referenceDelegate addInMemoryPins:_localViewReferences];

_targetIDs = [NSMutableDictionary dictionary];
_heldBatchResults = [NSMutableArray array];

_targetIDGenerator = TargetIdGenerator::LocalStoreTargetIdGenerator(0);
}
Expand All @@ -131,27 +119,7 @@ - (void)start {
}

- (void)startMutationQueue {
self.persistence.run("Start MutationQueue", [&]() {
[self.mutationQueue start];

// If we have any leftover mutation batch results from a prior run, just drop them.
// TODO(http://b/33446471): We probably need to repopulate heldBatchResults or similar instead,
// but that is not straightforward since we're not persisting the write ack versions.
[self.heldBatchResults removeAllObjects];

// TODO(mikelehen): This is the only usage of getAllMutationBatchesThroughBatchId:. Consider
// removing it in favor of a getAcknowledgedBatches method.
BatchId highestAck = [self.mutationQueue highestAcknowledgedBatchID];
if (highestAck != kFSTBatchIDUnknown) {
NSArray<FSTMutationBatch *> *batches =
[self.mutationQueue allMutationBatchesThroughBatchID:highestAck];
if (batches.count > 0) {
// NOTE: This could be more efficient if we had a removeBatchesThroughBatchID, but this set
// should be very small and this code should go away eventually.
[self removeMutationBatches:batches];
}
}
});
self.persistence.run("Start MutationQueue", [&]() { [self.mutationQueue start]; });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the android port, you seem to have also dropped the persistence.runTransaction() call, but have preserved it here. Is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to Gil about this (in the chat room). The decision was to add the transaction back to Android. By convention (and for our sanity later on), we should wrap all database calls in transactions. In LevelDb, that allows us to read back our changes right away. On Android, it prevents auto-commit.

}

- (FSTMaybeDocumentDictionary *)userDidChange:(const User &)user {
Expand Down Expand Up @@ -202,18 +170,12 @@ - (FSTMaybeDocumentDictionary *)acknowledgeBatchWithResult:(FSTMutationBatchResu
return self.persistence.run("Acknowledge batch", [&]() -> FSTMaybeDocumentDictionary * {
id<FSTMutationQueue> mutationQueue = self.mutationQueue;

[mutationQueue acknowledgeBatch:batchResult.batch streamToken:batchResult.streamToken];

DocumentKeySet affected;
if ([self shouldHoldBatchResultWithVersion:batchResult.commitVersion]) {
[self.heldBatchResults addObject:batchResult];
} else {
affected = [self releaseBatchResults:@[ batchResult ]];
}

FSTMutationBatch *batch = batchResult.batch;
[mutationQueue acknowledgeBatch:batch streamToken:batchResult.streamToken];
[self applyBatchResult:batchResult];
[self.mutationQueue performConsistencyCheck];

return [self.localDocuments documentsForKeys:affected];
return [self.localDocuments documentsForKeys:batch.keys];
});
}

Expand All @@ -225,11 +187,10 @@ - (FSTMaybeDocumentDictionary *)rejectBatchID:(BatchId)batchID {
BatchId lastAcked = [self.mutationQueue highestAcknowledgedBatchID];
HARD_ASSERT(batchID > lastAcked, "Acknowledged batches can't be rejected.");

DocumentKeySet affected = [self removeMutationBatch:toReject];

[self.mutationQueue removeMutationBatch:toReject];
[self.mutationQueue performConsistencyCheck];

return [self.localDocuments documentsForKeys:affected];
return [self.localDocuments documentsForKeys:toReject.keys];
});
}

Expand Down Expand Up @@ -341,15 +302,7 @@ - (FSTMaybeDocumentDictionary *)applyRemoteEvent:(FSTRemoteEvent *)remoteEvent {
[self.queryCache setLastRemoteSnapshotVersion:remoteVersion];
}

DocumentKeySet releasedWriteKeys = [self releaseHeldBatchResults];

// Union the two key sets.
DocumentKeySet keysToRecalc = changedDocKeys;
for (const DocumentKey &key : releasedWriteKeys) {
keysToRecalc = keysToRecalc.insert(key);
}

return [self.localDocuments documentsForKeys:keysToRecalc];
return [self.localDocuments documentsForKeys:changedDocKeys];
});
}

Expand Down Expand Up @@ -458,12 +411,6 @@ - (void)releaseQuery:(FSTQuery *)query {
[self.localViewReferences removeReferencesForID:targetID];
[self.targetIDs removeObjectForKey:boxedTargetID];
[self.persistence.referenceDelegate removeTarget:queryData];

// If this was the last watch target, then we won't get any more watch snapshots, so we should
// release any held batch results.
if ([self.targetIDs count] == 0) {
[self releaseHeldBatchResults];
}
});
}

Expand All @@ -479,67 +426,6 @@ - (DocumentKeySet)remoteDocumentKeysForTarget:(TargetId)targetID {
});
}

/**
* Releases all the held mutation batches up to the current remote version received, and
* applies their mutations to the docs in the remote documents cache.
*
* @return the set of keys of docs that were modified by those writes.
*/
- (DocumentKeySet)releaseHeldBatchResults {
NSMutableArray<FSTMutationBatchResult *> *toRelease = [NSMutableArray array];
for (FSTMutationBatchResult *batchResult in self.heldBatchResults) {
if (![self isRemoteUpToVersion:batchResult.commitVersion]) {
break;
}
[toRelease addObject:batchResult];
}

if (toRelease.count == 0) {
return DocumentKeySet{};
} else {
[self.heldBatchResults removeObjectsInRange:NSMakeRange(0, toRelease.count)];
return [self releaseBatchResults:toRelease];
}
}

- (BOOL)isRemoteUpToVersion:(const SnapshotVersion &)version {
// If there are no watch targets, then we won't get remote snapshots, and are always "up-to-date."
return version <= self.queryCache.lastRemoteSnapshotVersion || self.targetIDs.count == 0;
}

- (BOOL)shouldHoldBatchResultWithVersion:(const SnapshotVersion &)version {
// Check if watcher isn't up to date or prior results are already held.
return ![self isRemoteUpToVersion:version] || self.heldBatchResults.count > 0;
}

- (DocumentKeySet)releaseBatchResults:(NSArray<FSTMutationBatchResult *> *)batchResults {
NSMutableArray<FSTMutationBatch *> *batches = [NSMutableArray array];
for (FSTMutationBatchResult *batchResult in batchResults) {
[self applyBatchResult:batchResult];
[batches addObject:batchResult.batch];
}

return [self removeMutationBatches:batches];
}

- (DocumentKeySet)removeMutationBatch:(FSTMutationBatch *)batch {
return [self removeMutationBatches:@[ batch ]];
}

/** Removes all the mutation batches named in the given array. */
- (DocumentKeySet)removeMutationBatches:(NSArray<FSTMutationBatch *> *)batches {
DocumentKeySet affectedDocs;
for (FSTMutationBatch *batch in batches) {
for (FSTMutation *mutation in batch.mutations) {
const DocumentKey &key = mutation.key;
affectedDocs = affectedDocs.insert(key);
}
[self.mutationQueue removeMutationBatch:batch];
}

return affectedDocs;
}

- (void)applyBatchResult:(FSTMutationBatchResult *)batchResult {
FSTMutationBatch *batch = batchResult.batch;
DocumentKeySet docKeys = batch.keys;
Expand All @@ -562,6 +448,8 @@ - (void)applyBatchResult:(FSTMutationBatchResult *)batchResult {
}
}
}

[self.mutationQueue removeMutationBatch:batch];
}

@end
Expand Down
1 change: 0 additions & 1 deletion Firestore/Source/Model/FSTDocument.mm
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ + (instancetype)documentWithKey:(DocumentKey)key
return deletedDocument;
}


- (BOOL)hasCommittedMutations {
return _hasCommittedMutations;
}
Expand Down