diff --git a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClient.java b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClient.java index 4603c19..2ec846d 100644 --- a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClient.java +++ b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClient.java @@ -10,6 +10,8 @@ public interface ParseLiveQueryClient { void unsubscribe(final ParseQuery query, final SubscriptionHandling subscriptionHandling); + void connectIfNeeded(); + void reconnect(); void disconnect(); diff --git a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java index b007ad6..60d9eb9 100644 --- a/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java +++ b/ParseLiveQuery/src/main/java/com/parse/ParseLiveQueryClientImpl.java @@ -10,6 +10,7 @@ import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -86,18 +87,39 @@ public SubscriptionHandling subscribe(ParseQuery q int requestId = requestIdGenerator(); Subscription subscription = new Subscription<>(requestId, query); subscriptions.append(requestId, subscription); - if (webSocketClient == null || (webSocketClient.getState() != WebSocketClient.State.CONNECTING && webSocketClient.getState() != WebSocketClient.State.CONNECTED)) { - if (!userInitiatedDisconnect) { - reconnect(); - } else { - Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions."); - } - } else if (webSocketClient.getState() == WebSocketClient.State.CONNECTED) { + + if (inAnyState(WebSocketClient.State.CONNECTED)) { sendSubscription(subscription); + } else if (userInitiatedDisconnect) { + Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions."); + } else { + connectIfNeeded(); } + return subscription; } + public void connectIfNeeded() { + switch (getWebSocketState()) { + case CONNECTED: + // nothing to do + break; + case CONNECTING: + // just wait for it to finish connecting + break; + + case NONE: + case DISCONNECTING: + case DISCONNECTED: + reconnect(); + break; + + default: + + break; + } + } + @Override public void unsubscribe(final ParseQuery query) { if (query != null) { @@ -124,14 +146,12 @@ public void unsubscribe(final ParseQuery query, final @Override public void reconnect() { - disconnectAsync().continueWith(new Continuation() { - @Override - public Void then(Task task) throws Exception { - webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri); - webSocketClient.open(); - return null; - } - }); + if (webSocketClient != null) { + webSocketClient.close(); + } + + webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri); + webSocketClient.open(); userInitiatedDisconnect = false; } @@ -139,7 +159,8 @@ public Void then(Task task) throws Exception { public void disconnect() { if (webSocketClient != null) { userInitiatedDisconnect = true; - disconnectAsync(); + webSocketClient.close(); + webSocketClient = null; } } @@ -159,6 +180,15 @@ private synchronized int requestIdGenerator() { return requestIdCount++; } + private WebSocketClient.State getWebSocketState() { + WebSocketClient.State state = webSocketClient == null ? null : webSocketClient.getState(); + return state == null ? WebSocketClient.State.NONE : state; + } + + private boolean inAnyState(WebSocketClient.State... states) { + return Arrays.asList(states).contains(getWebSocketState()); + } + private Task handleOperationAsync(final String message) { return Task.call(new Callable() { public Void call() throws Exception { @@ -182,20 +212,6 @@ public Void call() throws Exception { }, taskExecutor); } - private Task disconnectAsync() { - return Task.call(new Callable() { - @Override - public Void call() throws Exception { - if (webSocketClient != null) { - webSocketClient.close(); - webSocketClient = null; - } - - return null; - } - }, taskExecutor); - } - private void parseMessage(String message) throws LiveQueryException { try { JSONObject jsonObject = new JSONObject(message); diff --git a/ParseLiveQuery/src/test/java/com/parse/ImmediateExecutor.java b/ParseLiveQuery/src/test/java/com/parse/ImmediateExecutor.java new file mode 100644 index 0000000..4fa556f --- /dev/null +++ b/ParseLiveQuery/src/test/java/com/parse/ImmediateExecutor.java @@ -0,0 +1,10 @@ +package com.parse; + +import java.util.concurrent.Executor; + +class ImmediateExecutor implements Executor { + @Override + public void execute(Runnable runnable) { + runnable.run(); + } +} diff --git a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java index cf73f3f..9b54d4e 100644 --- a/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java +++ b/ParseLiveQuery/src/test/java/com/parse/TestParseLiveQueryClient.java @@ -15,9 +15,6 @@ import java.io.IOException; import java.net.URI; -import java.util.LinkedList; -import java.util.Queue; -import java.util.concurrent.Executor; import bolts.Task; @@ -41,7 +38,6 @@ @Config(constants = BuildConfig.class, sdk = 21) public class TestParseLiveQueryClient { - private PauseableExecutor executor; private WebSocketClient webSocketClient; private WebSocketClient.WebSocketClientCallback webSocketClientCallback; private ParseLiveQueryClient parseLiveQueryClient; @@ -69,8 +65,6 @@ public Task answer(InvocationOnMock invocation) throws Throwable { }); ParseCorePlugins.getInstance().registerCurrentUserController(currentUserController); - executor = new PauseableExecutor(); - parseLiveQueryClient = ParseLiveQueryClient.Factory.getClient(new URI(""), new WebSocketClientFactory() { @Override public WebSocketClient createInstance(WebSocketClient.WebSocketClientCallback webSocketClientCallback, URI hostUrl) { @@ -78,7 +72,7 @@ public WebSocketClient createInstance(WebSocketClient.WebSocketClientCallback we webSocketClient = mock(WebSocketClient.class); return webSocketClient; } - }, executor); + }, new ImmediateExecutor()); reconnect(); } @@ -389,16 +383,6 @@ public void testEmptySessionTokenOnSubscribe() { contains("\"sessionToken\":\"the token\""))); } - @Test - public void testDisconnectOnBackgroundThread() throws Exception { - executor.pause(); - - parseLiveQueryClient.disconnect(); - verify(webSocketClient, never()).close(); - assertTrue(executor.advanceOne()); - verify(webSocketClient, times(1)).close(); - } - @Test public void testCallbackNotifiedOnUnexpectedDisconnect() throws Exception { LoggingCallbacks callbacks = new LoggingCallbacks(); @@ -580,41 +564,6 @@ public void onSocketError(ParseLiveQueryClient client, Throwable reason) { } } - private static class PauseableExecutor implements Executor { - private boolean isPaused = false; - private final Queue queue = new LinkedList<>(); - - void pause() { - isPaused = true; - } - - void unpause() { - if (isPaused) { - isPaused = false; - - //noinspection StatementWithEmptyBody - while (advanceOne()) { - // keep going - } - } - } - - boolean advanceOne() { - Runnable next = queue.poll(); - if (next != null) next.run(); - return next != null; - } - - @Override - public void execute(Runnable runnable) { - if (isPaused) { - queue.add(runnable); - } else { - runnable.run(); - } - } - } - @ParseClassName("MockA") static class MockClassA extends ParseObject { }