Skip to content

Adding onSnapshotsInSync (not public) #778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.Logger.Level;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -509,6 +511,50 @@ public void testAddingToACollectionYieldsTheCorrectDocumentReference() {
assertEquals(data, document.getData());
}

@Test
public void testSnapshotsInSyncListenerFiresAfterListenersInSync() {
Map<String, Object> data = map("foo", 1.0);
CollectionReference collection = testCollection();
DocumentReference documentReference = waitFor(collection.add(data));
List<String> events = new ArrayList<>();

Semaphore gotInitialSnapshot = new Semaphore(0);
Semaphore done = new Semaphore(0);

ListenerRegistration listenerRegistration = null;

documentReference.addSnapshotListener(
(value, error) -> {
events.add("doc");
gotInitialSnapshot.release();
});
waitFor(gotInitialSnapshot);
events.clear();

try {
listenerRegistration =
documentReference
.getFirestore()
.addSnapshotsInSyncListener(
() -> {
events.add("snapshots-in-sync");
if (events.size() == 3) {
// We should have an initial snapshots-in-sync event, then a snapshot event
// for set(), then another event to indicate we're in sync again.
assertEquals(
Arrays.asList("snapshots-in-sync", "doc", "snapshots-in-sync"), events);
done.release();
}
});
waitFor(documentReference.set(map("foo", 3.0)));
waitFor(done);
} finally {
if (listenerRegistration != null) {
listenerRegistration.remove();
}
}
}

@Test
public void testQueriesAreValidatedOnClient() {
// NOTE: Failure cases are validated in ValidationTest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package com.google.firebase.firestore;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.app.Activity;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
Expand All @@ -30,12 +32,15 @@
import com.google.firebase.firestore.auth.CredentialsProvider;
import com.google.firebase.firestore.auth.EmptyCredentialsProvider;
import com.google.firebase.firestore.auth.FirebaseAuthCredentialsProvider;
import com.google.firebase.firestore.core.ActivityScope;
import com.google.firebase.firestore.core.AsyncEventListener;
import com.google.firebase.firestore.core.DatabaseInfo;
import com.google.firebase.firestore.core.FirestoreClient;
import com.google.firebase.firestore.local.SQLitePersistence;
import com.google.firebase.firestore.model.DatabaseId;
import com.google.firebase.firestore.model.ResourcePath;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.Executors;
import com.google.firebase.firestore.util.Logger;
import com.google.firebase.firestore.util.Logger.Level;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -460,6 +465,94 @@ public Task<Void> clearPersistence() {
return source.getTask();
}

/**
* Attaches a listener for a snapshots-in-sync event. The snapshots-in-sync event indicates that
* all listeners affected by a given change have fired, even if a single server-generated change
* affects multiple listeners.
*
* <p>NOTE: The snapshots-in-sync event only indicates that listeners are in sync with each other,
* but does not relate to whether those snapshots are in sync with the server. Use
* SnapshotMetadata in the individual listeners to determine if a snapshot is from the cache or
* the server.
*
* @param runnable A callback to be called every time all snapshot listeners are in sync with each
* other.
* @return A registration object that can be used to remove the listener.
*/
ListenerRegistration addSnapshotsInSyncListener(@NonNull Runnable runnable) {
return addSnapshotsInSyncListener(Executors.DEFAULT_CALLBACK_EXECUTOR, runnable);
}

/**
* Attaches a listener for a snapshots-in-sync event. The snapshots-in-sync event indicates that
* all listeners affected by a given change have fired, even if a single server-generated change
* affects multiple listeners.
*
* <p>NOTE: The snapshots-in-sync event only indicates that listeners are in sync with each other,
* but does not relate to whether those snapshots are in sync with the server. Use
* SnapshotMetadata in the individual listeners to determine if a snapshot is from the cache or
* the server.
*
* @param activity The activity to scope the listener to.
* @param runnable A callback to be called every time all snapshot listeners are in sync with each
* other.
* @return A registration object that can be used to remove the listener.
*/
@NonNull
ListenerRegistration addSnapshotsInSyncListener(Activity activity, @NonNull Runnable runnable) {
return addSnapshotsInSyncListener(Executors.DEFAULT_CALLBACK_EXECUTOR, activity, runnable);
}

/**
* Attaches a listener for a snapshots-in-sync event. The snapshots-in-sync event indicates that
* all listeners affected by a given change have fired, even if a single server-generated change
* affects multiple listeners.
*
* <p>NOTE: The snapshots-in-sync event only indicates that listeners are in sync with each other,
* but does not relate to whether those snapshots are in sync with the server. Use
* SnapshotMetadata in the individual listeners to determine if a snapshot is from the cache or
* the server.
*
* @param executor The executor to use to call the listener.
* @param runnable A callback to be called every time all snapshot listeners are in sync with each
* other.
* @return A registration object that can be used to remove the listener.
*/
@NonNull
ListenerRegistration addSnapshotsInSyncListener(Executor executor, @NonNull Runnable runnable) {
return addSnapshotsInSyncListener(executor, null, runnable);
}

/**
* Internal helper method to add a snapshotsInSync listener.
*
* <p>Will be Activity scoped if the activity parameter is non-{@code null}.
*
* @param userExecutor The executor to use to call the listener.
* @param activity Optional activity this listener is scoped to.
* @param runnable A callback to be called every time all snapshot listeners are in sync with each
* other.
* @return A registration object that can be used to remove the listener.
*/
private ListenerRegistration addSnapshotsInSyncListener(
Executor userExecutor, @Nullable Activity activity, @NonNull Runnable runnable) {
ensureClientConfigured();
EventListener<Void> eventListener =
(Void v, FirebaseFirestoreException error) -> {
hardAssert(error == null, "snapshots-in-sync listeners should never get errors.");
runnable.run();
};
AsyncEventListener<Void> asyncListener =
new AsyncEventListener<Void>(userExecutor, eventListener);
client.addSnapshotsInSyncListener(asyncListener);
return ActivityScope.bind(
activity,
() -> {
asyncListener.mute();
client.removeSnapshotsInSyncListener(asyncListener);
});
}

FirestoreClient getClient() {
return client;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@

package com.google.firebase.firestore.core;

import static com.google.firebase.firestore.util.Assert.hardAssert;

import com.google.firebase.firestore.EventListener;
import com.google.firebase.firestore.core.SyncEngine.SyncEngineCallback;
import com.google.firebase.firestore.util.Util;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* EventManager is responsible for mapping queries to query event listeners. It handles "fan-out."
Expand Down Expand Up @@ -54,6 +59,8 @@ public static class ListenOptions {

private final Map<Query, QueryListenersInfo> queries;

private final Set<EventListener<Void>> snapshotsInSyncListeners = new HashSet<>();

private OnlineState onlineState = OnlineState.UNKNOWN;

public EventManager(SyncEngine syncEngine) {
Expand Down Expand Up @@ -81,10 +88,16 @@ public int addQueryListener(QueryListener queryListener) {

queryInfo.listeners.add(queryListener);

queryListener.onOnlineStateChanged(onlineState);
// Run global snapshot listeners if a consistent snapshot has been emitted.
boolean raisedEvent = queryListener.onOnlineStateChanged(onlineState);
hardAssert(
!raisedEvent, "onOnlineStateChanged() shouldn't raise an event for brand-new listeners.");

if (queryInfo.viewSnapshot != null) {
queryListener.onViewSnapshot(queryInfo.viewSnapshot);
raisedEvent = queryListener.onViewSnapshot(queryInfo.viewSnapshot);
if (raisedEvent) {
raiseSnapshotsInSyncEvent();
}
}

if (firstListen) {
Expand All @@ -109,18 +122,40 @@ public void removeQueryListener(QueryListener listener) {
}
}

public void addSnapshotsInSyncListener(EventListener<Void> listener) {
snapshotsInSyncListeners.add(listener);
listener.onEvent(null, null);
}

public void removeSnapshotsInSyncListener(EventListener<Void> listener) {
snapshotsInSyncListeners.remove(listener);
}

/** Call all global snapshot listeners that have been set. */
private void raiseSnapshotsInSyncEvent() {
for (EventListener<Void> listener : snapshotsInSyncListeners) {
listener.onEvent(null, null);
}
}

@Override
public void onViewSnapshots(List<ViewSnapshot> snapshotList) {
boolean raisedEvent = false;
for (ViewSnapshot viewSnapshot : snapshotList) {
Query query = viewSnapshot.getQuery();
QueryListenersInfo info = queries.get(query);
if (info != null) {
for (QueryListener listener : info.listeners) {
listener.onViewSnapshot(viewSnapshot);
if (listener.onViewSnapshot(viewSnapshot)) {
raisedEvent = true;
}
}
info.viewSnapshot = viewSnapshot;
}
}
if (raisedEvent) {
raiseSnapshotsInSyncEvent();
}
}

@Override
Expand All @@ -136,11 +171,17 @@ public void onError(Query query, Status error) {

@Override
public void handleOnlineStateChange(OnlineState onlineState) {
boolean raisedEvent = false;
this.onlineState = onlineState;
for (QueryListenersInfo info : queries.values()) {
for (QueryListener listener : info.listeners) {
listener.onOnlineStateChanged(onlineState);
if (listener.onOnlineStateChanged(onlineState)) {
raisedEvent = true;
}
}
}
if (raisedEvent) {
raiseSnapshotsInSyncEvent();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,18 @@ private void initialize(Context context, User user, boolean usePersistence, long
remoteStore.start();
}

public void addSnapshotsInSyncListener(EventListener<Void> listener) {
verifyNotTerminated();
asyncQueue.enqueueAndForget(() -> eventManager.addSnapshotsInSyncListener(listener));
}

public void removeSnapshotsInSyncListener(EventListener<Void> listener) {
if (isTerminated()) {
return;
}
eventManager.removeSnapshotsInSyncListener(listener);
}

private void verifyNotTerminated() {
if (this.isTerminated()) {
throw new IllegalStateException("The client has already been terminated");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,17 @@ public Query getQuery() {
return query;
}

public void onViewSnapshot(ViewSnapshot newSnapshot) {
/**
* Applies the new ViewSnapshot to this listener, raising a user-facing event if applicable
* (depending on what changed, whether the user has opted into metadata-only changes, etc.).
* Returns true if a user-facing event was indeed raised.
*/
public boolean onViewSnapshot(ViewSnapshot newSnapshot) {
hardAssert(
!newSnapshot.getChanges().isEmpty() || newSnapshot.didSyncStateChange(),
"We got a new snapshot with no changes?");

boolean raisedEvent = false;
if (!options.includeDocumentMetadataChanges) {
// Remove the metadata only changes
List<DocumentViewChange> documentChanges = new ArrayList<>();
Expand All @@ -87,23 +93,30 @@ public void onViewSnapshot(ViewSnapshot newSnapshot) {
if (!raisedInitialEvent) {
if (shouldRaiseInitialEvent(newSnapshot, onlineState)) {
raiseInitialEvent(newSnapshot);
raisedEvent = true;
}
} else if (shouldRaiseEvent(newSnapshot)) {
listener.onEvent(newSnapshot, null);
raisedEvent = true;
}

this.snapshot = newSnapshot;
return raisedEvent;
}

public void onError(FirebaseFirestoreException error) {
listener.onEvent(null, error);
}

public void onOnlineStateChanged(OnlineState onlineState) {
/** Returns whether a snapshot was raised. */
public boolean onOnlineStateChanged(OnlineState onlineState) {
this.onlineState = onlineState;
boolean raisedEvent = false;
if (snapshot != null && !raisedInitialEvent && shouldRaiseInitialEvent(snapshot, onlineState)) {
raiseInitialEvent(snapshot);
raisedEvent = true;
}
return raisedEvent;
}

private boolean shouldRaiseInitialEvent(ViewSnapshot snapshot, OnlineState onlineState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ public void testWillForwardOnOnlineStateChangedCalls() {

QueryListener spy = mock(QueryListener.class);
when(spy.getQuery()).thenReturn(query1);
doAnswer(invocation -> events.add(invocation.getArguments()[0]))
doAnswer(
invocation -> {
events.add(invocation.getArguments()[0]);
return false;
})
.when(spy)
.onOnlineStateChanged(any());

Expand Down
Loading