Skip to content

Port structural changes from Multi-Tab #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public List<DocumentChange> getDocumentChanges() {
@NonNull
@PublicApi
public List<DocumentChange> getDocumentChanges(MetadataChanges metadataChanges) {
if (MetadataChanges.INCLUDE.equals(metadataChanges) && snapshot.excludesMetadataChanges()) {
throw new IllegalArgumentException(
"To include metadata changes with your document changes, you must also pass MetadataChanges.INCLUDE to addSnapshotListener().");
}

if (cachedChanges == null || cachedChangesMetadataState != metadataChanges) {
cachedChanges =
Collections.unmodifiableList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ public void handleRejectedWrite(int batchId, Status error) {
@Override
public void handleOnlineStateChange(OnlineState onlineState) {
syncEngine.handleOnlineStateChange(onlineState);
eventManager.handleOnlineStateChange(onlineState);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import com.google.firebase.firestore.EventListener;
import com.google.firebase.firestore.FirebaseFirestoreException;
import com.google.firebase.firestore.core.DocumentViewChange.Type;
import com.google.firebase.firestore.model.Document;
import com.google.firebase.firestore.model.DocumentSet;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -82,7 +80,8 @@ public void onViewSnapshot(ViewSnapshot newSnapshot) {
documentChanges,
newSnapshot.isFromCache(),
newSnapshot.getMutatedKeys(),
newSnapshot.didSyncStateChange());
newSnapshot.didSyncStateChange(),
/* excludesMetadataChanges= */ true);
}

if (!raisedInitialEvent) {
Expand Down Expand Up @@ -154,23 +153,13 @@ private boolean shouldRaiseEvent(ViewSnapshot snapshot) {
private void raiseInitialEvent(ViewSnapshot snapshot) {
hardAssert(!raisedInitialEvent, "Trying to raise initial event for second time");
snapshot =
new ViewSnapshot(
ViewSnapshot.fromInitialDocuments(
snapshot.getQuery(),
snapshot.getDocuments(),
DocumentSet.emptySet(snapshot.getQuery().comparator()),
QueryListener.getInitialViewChanges(snapshot),
snapshot.isFromCache(),
snapshot.getMutatedKeys(),
/*didSyncStateChange=*/ true);
snapshot.isFromCache(),
snapshot.excludesMetadataChanges());
raisedInitialEvent = true;
listener.onEvent(snapshot, null);
}

private static List<DocumentViewChange> getInitialViewChanges(ViewSnapshot snapshot) {
List<DocumentViewChange> res = new ArrayList<>();
for (Document doc : snapshot.getDocuments()) {
res.add(DocumentViewChange.create(Type.ADDED, doc));
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,16 @@ private static class LimboResolution {

private static final String TAG = SyncEngine.class.getSimpleName();

/** A callback used to handle events from the SyncEngine */
/** Interface implemented by EventManager to handle notifications from SyncEngine. */
interface SyncEngineCallback {
/** Handles new view snapshots. */
void onViewSnapshots(List<ViewSnapshot> snapshotList);

/** Handles the failure of a query. */
void onError(Query query, Status error);

/** Handles a change in online state. */
void handleOnlineStateChange(OnlineState onlineState);
}

/** The local store, used to persist mutations and cached documents. */
Expand Down Expand Up @@ -133,7 +138,7 @@ interface SyncEngineCallback {

private User currentUser;

private SyncEngineCallback callback;
private SyncEngineCallback syncEngineListener;

public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUser) {
this.localStore = localStore;
Expand All @@ -147,16 +152,16 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs
limboDocumentRefs = new ReferenceSet();

mutationUserCallbacks = new HashMap<>();
targetIdGenerator = TargetIdGenerator.getSyncEngineGenerator(0);
targetIdGenerator = TargetIdGenerator.forSyncEngine();
currentUser = initialUser;
}

public void setCallback(SyncEngineCallback callback) {
this.callback = callback;
this.syncEngineListener = callback;
}

private void assertCallback(String method) {
hardAssert(callback != null, "Trying to call %s before setting callback", method);
hardAssert(syncEngineListener != null, "Trying to call %s before setting callback", method);
}

/**
Expand All @@ -171,6 +176,16 @@ public int listen(Query query) {
hardAssert(!queryViewsByQuery.containsKey(query), "We already listen to query: %s", query);

QueryData queryData = localStore.allocateQuery(query);
ViewSnapshot viewSnapshot = initializeViewAndComputeSnapshot(queryData);
syncEngineListener.onViewSnapshots(Collections.singletonList(viewSnapshot));

remoteStore.listen(queryData);
return queryData.getTargetId();
}

private ViewSnapshot initializeViewAndComputeSnapshot(QueryData queryData) {
Query query = queryData.getQuery();

ImmutableSortedMap<DocumentKey, Document> docs = localStore.executeQuery(query);
ImmutableSortedSet<DocumentKey> remoteKeys =
localStore.getRemoteDocumentKeys(queryData.getTargetId());
Expand All @@ -185,10 +200,7 @@ public int listen(Query query) {
QueryView queryView = new QueryView(query, queryData.getTargetId(), view);
queryViewsByQuery.put(query, queryView);
queryViewsByTarget.put(queryData.getTargetId(), queryView);
callback.onViewSnapshots(Collections.singletonList(viewChange.getSnapshot()));

remoteStore.listen(queryData);
return queryData.getTargetId();
return viewChange.getSnapshot();
}

/** Stops listening to a query previously listened to via listen. */
Expand All @@ -200,7 +212,7 @@ void stopListening(Query query) {

localStore.releaseQuery(query);
remoteStore.stopListening(queryView.getTargetId());
removeAndCleanup(queryView);
removeAndCleanupQuery(queryView);
}

/**
Expand All @@ -215,7 +227,7 @@ public void writeMutations(List<Mutation> mutations, TaskCompletionSource<Void>
LocalWriteResult result = localStore.writeLocally(mutations);
addUserCallback(result.getBatchId(), userTask);

emitNewSnapshot(result.getChanges(), /*remoteEvent=*/ null);
emitNewSnapsAndNotifyLocalStore(result.getChanges(), /*remoteEvent=*/ null);
remoteStore.fillWritePipeline();
}

Expand Down Expand Up @@ -313,7 +325,7 @@ public void handleRemoteEvent(RemoteEvent event) {
}

ImmutableSortedMap<DocumentKey, MaybeDocument> changes = localStore.applyRemoteEvent(event);
emitNewSnapshot(changes, event);
emitNewSnapsAndNotifyLocalStore(changes, event);
}

/** Applies an OnlineState change to the sync engine and notifies any views of the change. */
Expand All @@ -329,7 +341,8 @@ public void handleOnlineStateChange(OnlineState onlineState) {
newViewSnapshots.add(viewChange.getSnapshot());
}
}
callback.onViewSnapshots(newViewSnapshots);
syncEngineListener.onViewSnapshots(newViewSnapshots);
syncEngineListener.handleOnlineStateChange(onlineState);
}

@Override
Expand Down Expand Up @@ -381,9 +394,9 @@ public void handleRejectedListen(int targetId, Status error) {
hardAssert(queryView != null, "Unknown target: %s", targetId);
Query query = queryView.getQuery();
localStore.releaseQuery(query);
removeAndCleanup(queryView);
removeAndCleanupQuery(queryView);
logErrorIfInteresting(error, "Listen for %s failed", query);
callback.onError(query, error);
syncEngineListener.onError(query, error);
}
}

Expand All @@ -399,7 +412,7 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) {
ImmutableSortedMap<DocumentKey, MaybeDocument> changes =
localStore.acknowledgeBatch(mutationBatchResult);

emitNewSnapshot(changes, /*remoteEvent=*/ null);
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
}

@Override
Expand All @@ -416,7 +429,7 @@ public void handleRejectedWrite(int batchId, Status status) {
// they consistently happen before listen events.
notifyUser(batchId, status);

emitNewSnapshot(changes, /*remoteEvent=*/ null);
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
}

/** Resolves the task corresponding to this write result. */
Expand All @@ -439,7 +452,7 @@ private void notifyUser(int batchId, @Nullable Status status) {
}
}

private void removeAndCleanup(QueryView view) {
private void removeAndCleanupQuery(QueryView view) {
queryViewsByQuery.remove(view.getQuery());
queryViewsByTarget.remove(view.getTargetId());

Expand Down Expand Up @@ -469,7 +482,7 @@ private void removeLimboTarget(DocumentKey key) {
* Computes a new snapshot from the changes and calls the registered callback with the new
* snapshot.
*/
private void emitNewSnapshot(
private void emitNewSnapsAndNotifyLocalStore(
ImmutableSortedMap<DocumentKey, MaybeDocument> changes, @Nullable RemoteEvent remoteEvent) {
List<ViewSnapshot> newSnapshots = new ArrayList<>();
List<LocalViewChanges> documentChangesInAllViews = new ArrayList<>();
Expand Down Expand Up @@ -498,7 +511,7 @@ private void emitNewSnapshot(
documentChangesInAllViews.add(docChanges);
}
}
callback.onViewSnapshots(newSnapshots);
syncEngineListener.onViewSnapshots(newSnapshots);
localStore.notifyLocalViewChanges(documentChangesInAllViews);
}

Expand Down Expand Up @@ -553,7 +566,7 @@ public void handleCredentialChange(User user) {
if (userChanged) {
// Notify local store and emit any resulting events from swapping out the mutation queue.
ImmutableSortedMap<DocumentKey, MaybeDocument> changes = localStore.handleUserChange(user);
emitNewSnapshot(changes, /*remoteEvent=*/ null);
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
}

// Notify remote store so it can restart its streams.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,68 +14,75 @@

package com.google.firebase.firestore.core;

import static com.google.firebase.firestore.util.Assert.hardAssert;

/**
* Generates monotonically increasing integer IDs. There are separate generators for different
* scopes. While these generators will operate independently of each other, they are scoped, such
* that no two generators will ever produce the same ID. This is useful, because sometimes the
* backend may group IDs from separate parts of the client into the same ID space.
* Generates monotonically increasing target IDs for sending targets to the watch stream.
*
* <p>The client constructs two generators, one for the query cache (via forQueryCache()), and one
* for limbo documents (via forSyncEngine()). These two generators produce non-overlapping IDs (by
* using even and odd IDs respectively).
*
* <p>By separating the target ID space, the query cache can generate target IDs that persist across
* client restarts, while sync engine can independently generate in-memory target IDs that are
* transient and can be reused after a restart.
*/
// TODO(mrschmidt): Explore removing this class in favor of generating these IDs directly in
// SyncEngine and LocalStore.
public class TargetIdGenerator {

/**
* Creates and returns the TargetIdGenerator for the local store.
*
* @param after An ID to start at. Every call to nextID will return an id > after.
* @return A shared instance of TargetIdGenerator.
*/
public static TargetIdGenerator getLocalStoreIdGenerator(int after) {
return new TargetIdGenerator(LOCAL_STATE_ID, after);
public static TargetIdGenerator forQueryCache(int after) {
TargetIdGenerator generator = new TargetIdGenerator(QUERY_CACHE_ID, after);
// Make sure that the next call to `nextId()` returns the first value after 'after'.
generator.nextId();
return generator;
}

/**
* Creates and returns the TargetIdGenerator for the sync engine.
*
* @param after An ID to start at. Every call to nextID will return an id > after.
* @return A shared instance of TargetIdGenerator.
*/
public static TargetIdGenerator getSyncEngineGenerator(int after) {
return new TargetIdGenerator(SYNC_ENGINE_ID, after);
public static TargetIdGenerator forSyncEngine() {
// Sync engine assigns target IDs for limbo document detection.
return new TargetIdGenerator(SYNC_ENGINE_ID, 1);
}

private static final int LOCAL_STATE_ID = 0;
private static final int QUERY_CACHE_ID = 0;
private static final int SYNC_ENGINE_ID = 1;

private static final int RESERVED_BITS = 1;

private int previousId;
private int nextId;
private int generatorId;

/** Instantiates a new TargetIdGenerator, using the seed as the first target ID to return. */
TargetIdGenerator(int generatorId, int seed) {
hardAssert(
(generatorId & RESERVED_BITS) == generatorId,
"Generator ID %d contains more than %d reserved bits",
generatorId,
RESERVED_BITS);
this.generatorId = generatorId;
seek(seed);
}

TargetIdGenerator(int generatorId, int after) {
int afterWithoutGenerator = (after >>> RESERVED_BITS) << RESERVED_BITS;
int afterGenerator = after - afterWithoutGenerator;
if (afterGenerator >= generatorId) {
// For example, if:
// self.generatorID = 0b0000
// after = 0b1011
// afterGenerator = 0b0001
// Then:
// previous = 0b1010
// next = 0b1100
previousId = afterWithoutGenerator | generatorId;
} else {
// For example, if:
// self.generatorID = 0b0001
// after = 0b1010
// afterGenerator = 0b0000
// Then:
// previous = 0b1001
// next = 0b1011
previousId = (afterWithoutGenerator | generatorId) - (1 << RESERVED_BITS);
}
private void seek(int targetId) {
hardAssert(
(targetId & RESERVED_BITS) == this.generatorId,
"Cannot supply target ID from different generator ID");
this.nextId = targetId;
}

/** @return the next id in the sequence */
public int nextId() {
previousId += 1 << RESERVED_BITS;
return previousId;
int nextId = this.nextId;
this.nextId += 1 << RESERVED_BITS;
return nextId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,8 @@ public ViewChange applyChanges(DocumentChanges docChanges, TargetChange targetCh
viewChanges,
fromCache,
docChanges.mutatedKeys,
syncStatedChanged);
syncStatedChanged,
/* excludesMetadataChanges= */ false);
}
return new ViewChange(snapshot, limboDocumentChanges);
}
Expand Down
Loading