Skip to content

View changes for held write acks #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ static List<DocumentChange> changesFromSnapshot(
for (DocumentViewChange change : snapshot.getChanges()) {
Document document = change.getDocument();
QueryDocumentSnapshot documentSnapshot =
QueryDocumentSnapshot.fromDocument(firestore, document, snapshot.isFromCache());
QueryDocumentSnapshot.fromDocument(
firestore,
document,
snapshot.isFromCache(),
snapshot.getMutatedKeys().contains(document.getKey()));
hardAssert(
change.getType() == DocumentViewChange.Type.ADDED,
"Invalid added event for first snapshot");
Expand All @@ -163,7 +167,11 @@ static List<DocumentChange> changesFromSnapshot(
}
Document document = change.getDocument();
QueryDocumentSnapshot documentSnapshot =
QueryDocumentSnapshot.fromDocument(firestore, document, snapshot.isFromCache());
QueryDocumentSnapshot.fromDocument(
firestore,
document,
snapshot.isFromCache(),
snapshot.getMutatedKeys().contains(document.getKey()));
int oldIndex, newIndex;
Type type = getType(change);
if (type != Type.ADDED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,12 @@ public Task<DocumentSnapshot> get(Source source) {
.getDocumentFromLocalCache(key)
.continueWith(
Executors.DIRECT_EXECUTOR,
(Task<Document> doc) ->
new DocumentSnapshot(firestore, key, doc.getResult(), /*isFromCache=*/ true));
(Task<Document> task) -> {
Document doc = task.getResult();
boolean hasPendingWrites = doc != null && doc.hasLocalMutations();
return new DocumentSnapshot(
firestore, key, doc, /*isFromCache=*/ true, hasPendingWrites);
});
} else {
return getViaSnapshotListener(source);
}
Expand Down Expand Up @@ -523,11 +527,16 @@ private ListenerRegistration addSnapshotListenerInternal(
Document document = snapshot.getDocuments().getDocument(key);
DocumentSnapshot documentSnapshot;
if (document != null) {
boolean hasPendingWrites = snapshot.getMutatedKeys().contains(document.getKey());
documentSnapshot =
DocumentSnapshot.fromDocument(firestore, document, snapshot.isFromCache());
DocumentSnapshot.fromDocument(
firestore, document, snapshot.isFromCache(), hasPendingWrites);
} else {
// We don't raise `hasPendingWrites` for deleted documents.
boolean hasPendingWrites = false;
documentSnapshot =
DocumentSnapshot.fromNoDocument(firestore, key, snapshot.isFromCache());
DocumentSnapshot.fromNoDocument(
firestore, key, snapshot.isFromCache(), hasPendingWrites);
}
listener.onEvent(documentSnapshot, null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,25 @@ public enum ServerTimestampBehavior {
private final SnapshotMetadata metadata;

DocumentSnapshot(
FirebaseFirestore firestore, DocumentKey key, @Nullable Document doc, boolean isFromCache) {
FirebaseFirestore firestore,
DocumentKey key,
@Nullable Document doc,
boolean isFromCache,
boolean hasPendingWrites) {
this.firestore = checkNotNull(firestore);
this.key = checkNotNull(key);
this.doc = doc;
boolean hasPendingWrites = this.doc != null && this.doc.hasLocalMutations();
this.metadata = new SnapshotMetadata(hasPendingWrites, isFromCache);
}

static DocumentSnapshot fromDocument(
FirebaseFirestore firestore, Document doc, boolean fromCache) {
return new DocumentSnapshot(firestore, doc.getKey(), doc, fromCache);
FirebaseFirestore firestore, Document doc, boolean fromCache, boolean hasPendingWrites) {
return new DocumentSnapshot(firestore, doc.getKey(), doc, fromCache, hasPendingWrites);
}

static DocumentSnapshot fromNoDocument(
FirebaseFirestore firestore, DocumentKey key, boolean fromCache) {
return new DocumentSnapshot(firestore, key, null, fromCache);
FirebaseFirestore firestore, DocumentKey key, boolean fromCache, boolean hasPendingWrites) {
return new DocumentSnapshot(firestore, key, null, fromCache, hasPendingWrites);
}

/** @return The id of the document. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@
public class QueryDocumentSnapshot extends DocumentSnapshot {

private QueryDocumentSnapshot(
FirebaseFirestore firestore, DocumentKey key, @Nullable Document doc, boolean isFromCache) {
super(firestore, key, doc, isFromCache);
FirebaseFirestore firestore,
DocumentKey key,
@Nullable Document doc,
boolean isFromCache,
boolean hasPendingWrites) {
super(firestore, key, doc, isFromCache, hasPendingWrites);
}

static QueryDocumentSnapshot fromDocument(
FirebaseFirestore firestore, Document doc, boolean fromCache) {
return new QueryDocumentSnapshot(firestore, doc.getKey(), doc, fromCache);
FirebaseFirestore firestore, Document doc, boolean fromCache, boolean hasPendingWrites) {
return new QueryDocumentSnapshot(firestore, doc.getKey(), doc, fromCache, hasPendingWrites);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ public <T> List<T> toObjects(
}

private QueryDocumentSnapshot convertDocument(Document document) {
return QueryDocumentSnapshot.fromDocument(firestore, document, snapshot.isFromCache());
return QueryDocumentSnapshot.fromDocument(
firestore,
document,
snapshot.isFromCache(),
snapshot.getMutatedKeys().contains(document.getKey()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ private Task<DocumentSnapshot> getAsync(DocumentReference documentRef) {
MaybeDocument doc = docs.get(0);
if (doc instanceof Document) {
return DocumentSnapshot.fromDocument(
firestore, (Document) doc, /*fromCache=*/ false);
firestore, (Document) doc, /*fromCache=*/ false, /*hasPendingWrites=*/ false);
} else if (doc instanceof NoDocument) {
return DocumentSnapshot.fromNoDocument(
firestore, doc.getKey(), /*fromCache=*/ false);
firestore, doc.getKey(), /*fromCache=*/ false, /*hasPendingWrites=*/ false);
} else {
throw fail(
"BatchGetDocumentsRequest returned unexpected document type: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void onViewSnapshot(ViewSnapshot newSnapshot) {
newSnapshot.getOldDocuments(),
documentChanges,
newSnapshot.isFromCache(),
newSnapshot.hasPendingWrites(),
newSnapshot.getMutatedKeys(),
newSnapshot.didSyncStateChange());
}

Expand Down Expand Up @@ -160,7 +160,7 @@ private void raiseInitialEvent(ViewSnapshot snapshot) {
DocumentSet.emptySet(snapshot.getQuery().comparator()),
QueryListener.getInitialViewChanges(snapshot),
snapshot.isFromCache(),
snapshot.hasPendingWrites(),
snapshot.getMutatedKeys(),
/*didSyncStateChange=*/ true);
raisedInitialEvent = true;
listener.onEvent(snapshot, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ public void handleRejectedListen(int targetId, Status error) {
// Ideally, we would have a method in the local store to purge a document. However, it would
// be tricky to keep all of the local store's invariants with another method.
Map<DocumentKey, MaybeDocument> documentUpdates =
Collections.singletonMap(limboKey, new NoDocument(limboKey, SnapshotVersion.NONE));
Collections.singletonMap(
limboKey,
new NoDocument(limboKey, SnapshotVersion.NONE, /*hasCommittedMutations=*/ false));
Set<DocumentKey> limboDocuments = Collections.singleton(limboKey);
RemoteEvent event =
new RemoteEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,49 +169,66 @@ public <D extends MaybeDocument> DocumentChanges computeDocChanges(
}
}

if (newDoc != null) {
newDocumentSet = newDocumentSet.add(newDoc);
if (newDoc.hasLocalMutations()) {
newMutatedKeys = newMutatedKeys.insert(newDoc.getKey());
} else {
newMutatedKeys = newMutatedKeys.remove(newDoc.getKey());
}
} else {
newDocumentSet = newDocumentSet.remove(key);
newMutatedKeys = newMutatedKeys.remove(key);
}
boolean oldDocHadPendingMutations =
oldDoc != null && this.mutatedKeys.contains(oldDoc.getKey());

// We only consider committed mutations for documents that were mutated during the lifetime of
// the view.
boolean newDocHasPendingMutations =
newDoc != null
&& (newDoc.hasLocalMutations()
|| (this.mutatedKeys.contains(newDoc.getKey())
&& newDoc.hasCommittedMutations()));

boolean changeApplied = false;

// Calculate change
if (oldDoc != null && newDoc != null) {
boolean docsEqual = oldDoc.getData().equals(newDoc.getData());
if (!docsEqual || oldDoc.hasLocalMutations() != newDoc.hasLocalMutations()) {
// only report a change if document actually changed.
if (docsEqual) {
changeSet.addChange(DocumentViewChange.create(Type.METADATA, newDoc));
} else {
if (!docsEqual) {
if (!shouldWaitForSyncedDocument(oldDoc, newDoc)) {
changeSet.addChange(DocumentViewChange.create(Type.MODIFIED, newDoc));
changeApplied = true;
}

if (lastDocInLimit != null && query.comparator().compare(newDoc, lastDocInLimit) > 0) {
// This doc moved from inside the limit to after the limit. That means there may be some
// doc in the local cache that's actually less than this one.
needsRefill = true;
}
} else if (oldDocHadPendingMutations != newDocHasPendingMutations) {
changeSet.addChange(DocumentViewChange.create(Type.METADATA, newDoc));
changeApplied = true;
}
} else if (oldDoc == null && newDoc != null) {
changeSet.addChange(DocumentViewChange.create(Type.ADDED, newDoc));
changeApplied = true;
} else if (oldDoc != null && newDoc == null) {
changeSet.addChange(DocumentViewChange.create(Type.REMOVED, oldDoc));
changeApplied = true;
if (lastDocInLimit != null) {
// A doc was removed from a full limit query. We'll need to requery from the local cache
// to see if we know about some other doc that should be in the results.
needsRefill = true;
}
}

if (changeApplied) {
if (newDoc != null) {
newDocumentSet = newDocumentSet.add(newDoc);
if (newDoc.hasLocalMutations()) {
newMutatedKeys = newMutatedKeys.insert(newDoc.getKey());
} else {
newMutatedKeys = newMutatedKeys.remove(newDoc.getKey());
}
} else {
newDocumentSet = newDocumentSet.remove(key);
newMutatedKeys = newMutatedKeys.remove(key);
}
}
}

if (query.hasLimit()) {
// TODO: Make QuerySnapshot size be constant time.
while (newDocumentSet.size() > query.getLimit()) {
for (long i = newDocumentSet.size() - this.query.getLimit(); i > 0; --i) {
Document oldDoc = newDocumentSet.getLastDocument();
newDocumentSet = newDocumentSet.remove(oldDoc.getKey());
newMutatedKeys = newMutatedKeys.remove(oldDoc.getKey());
Expand All @@ -226,6 +243,18 @@ public <D extends MaybeDocument> DocumentChanges computeDocChanges(
return new DocumentChanges(newDocumentSet, changeSet, newMutatedKeys, needsRefill);
}

private boolean shouldWaitForSyncedDocument(Document oldDoc, Document newDoc) {
// We suppress the initial change event for documents that were modified as part of a write
// acknowledgment (e.g. when the value of a server transform is applied) as Watch will send us
// the same document again. By suppressing the event, we only raise two user visible events (one
// with `hasPendingWrites` and the final state of the document) instead of three (one with
// `hasPendingWrites`, the modified document with `hasPendingWrites` and the final state of the
// document).
return (oldDoc.hasLocalMutations()
&& newDoc.hasCommittedMutations()
&& !newDoc.hasLocalMutations());
}

/**
* Updates the view with the given ViewDocumentChanges and updates limbo docs and sync state from
* the given (optional) target change.
Expand Down Expand Up @@ -273,15 +302,14 @@ public ViewChange applyChanges(DocumentChanges docChanges, TargetChange targetCh
ViewSnapshot snapshot = null;
if (viewChanges.size() != 0 || syncStatedChanged) {
boolean fromCache = newSyncState == SyncState.LOCAL;
boolean hasPendingWrites = !docChanges.mutatedKeys.isEmpty();
snapshot =
new ViewSnapshot(
query,
docChanges.documentSet,
oldDocumentSet,
viewChanges,
fromCache,
hasPendingWrites,
docChanges.mutatedKeys,
syncStatedChanged);
}
return new ViewChange(snapshot, limboDocumentChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.google.firebase.firestore.core;

import com.google.firebase.database.collection.ImmutableSortedSet;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.DocumentSet;
import java.util.List;

Expand All @@ -31,7 +33,7 @@ public enum SyncState {
private final DocumentSet oldDocuments;
private final List<DocumentViewChange> changes;
private final boolean isFromCache;
private final boolean hasPendingWrites;
private final ImmutableSortedSet<DocumentKey> mutatedKeys;
private final boolean didSyncStateChange;

public ViewSnapshot(
Expand All @@ -40,14 +42,14 @@ public ViewSnapshot(
DocumentSet oldDocuments,
List<DocumentViewChange> changes,
boolean isFromCache,
boolean hasPendingWrites,
ImmutableSortedSet<DocumentKey> mutatedKeys,
boolean didSyncStateChange) {
this.query = query;
this.documents = documents;
this.oldDocuments = oldDocuments;
this.changes = changes;
this.isFromCache = isFromCache;
this.hasPendingWrites = hasPendingWrites;
this.mutatedKeys = mutatedKeys;
this.didSyncStateChange = didSyncStateChange;
}

Expand All @@ -72,7 +74,11 @@ public boolean isFromCache() {
}

public boolean hasPendingWrites() {
return hasPendingWrites;
return !mutatedKeys.isEmpty();
}

public ImmutableSortedSet<DocumentKey> getMutatedKeys() {
return mutatedKeys;
}

public boolean didSyncStateChange() {
Expand All @@ -93,15 +99,15 @@ public boolean equals(Object o) {
if (isFromCache != that.isFromCache) {
return false;
}
if (hasPendingWrites != that.hasPendingWrites) {
return false;
}
if (didSyncStateChange != that.didSyncStateChange) {
return false;
}
if (!query.equals(that.query)) {
return false;
}
if (!mutatedKeys.equals(that.mutatedKeys)) {
return false;
}
if (!documents.equals(that.documents)) {
return false;
}
Expand All @@ -117,8 +123,8 @@ public int hashCode() {
result = 31 * result + documents.hashCode();
result = 31 * result + oldDocuments.hashCode();
result = 31 * result + changes.hashCode();
result = 31 * result + mutatedKeys.hashCode();
result = 31 * result + (isFromCache ? 1 : 0);
result = 31 * result + (hasPendingWrites ? 1 : 0);
result = 31 * result + (didSyncStateChange ? 1 : 0);
return result;
}
Expand All @@ -135,8 +141,8 @@ public String toString() {
+ changes
+ ", isFromCache="
+ isFromCache
+ ", hasPendingWrites="
+ hasPendingWrites
+ ", mutatedKeys="
+ mutatedKeys.size()
+ ", didSyncStateChange="
+ didSyncStateChange
+ ")";
Expand Down
Loading