Skip to content

Revert the async disconnect change #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface ParseLiveQueryClient {

<T extends ParseObject> void unsubscribe(final ParseQuery<T> query, final SubscriptionHandling<T> subscriptionHandling);

void connectIfNeeded();

void reconnect();

void disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -86,18 +87,39 @@ public <T extends ParseObject> SubscriptionHandling<T> subscribe(ParseQuery<T> q
int requestId = requestIdGenerator();
Subscription<T> subscription = new Subscription<>(requestId, query);
subscriptions.append(requestId, subscription);
if (webSocketClient == null || (webSocketClient.getState() != WebSocketClient.State.CONNECTING && webSocketClient.getState() != WebSocketClient.State.CONNECTED)) {
if (!userInitiatedDisconnect) {
reconnect();
} else {
Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions.");
}
} else if (webSocketClient.getState() == WebSocketClient.State.CONNECTED) {

if (inAnyState(WebSocketClient.State.CONNECTED)) {
sendSubscription(subscription);
} else if (userInitiatedDisconnect) {
Log.w(LOG_TAG, "Warning: The client was explicitly disconnected! You must explicitly call .reconnect() in order to process your subscriptions.");
} else {
connectIfNeeded();
}

return subscription;
}

public void connectIfNeeded() {
switch (getWebSocketState()) {
case CONNECTED:
// nothing to do
break;
case CONNECTING:
// just wait for it to finish connecting
break;

case NONE:
case DISCONNECTING:
case DISCONNECTED:
reconnect();
break;

default:

break;
}
}

@Override
public <T extends ParseObject> void unsubscribe(final ParseQuery<T> query) {
if (query != null) {
Expand All @@ -124,22 +146,21 @@ public <T extends ParseObject> void unsubscribe(final ParseQuery<T> query, final

@Override
public void reconnect() {
disconnectAsync().continueWith(new Continuation<Void, Void>() {
@Override
public Void then(Task<Void> task) throws Exception {
webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri);
webSocketClient.open();
return null;
}
});
if (webSocketClient != null) {
webSocketClient.close();
}

webSocketClient = webSocketClientFactory.createInstance(webSocketClientCallback, uri);
webSocketClient.open();
userInitiatedDisconnect = false;
}

@Override
public void disconnect() {
if (webSocketClient != null) {
userInitiatedDisconnect = true;
disconnectAsync();
webSocketClient.close();
webSocketClient = null;
}
}

Expand All @@ -159,6 +180,15 @@ private synchronized int requestIdGenerator() {
return requestIdCount++;
}

private WebSocketClient.State getWebSocketState() {
WebSocketClient.State state = webSocketClient == null ? null : webSocketClient.getState();
return state == null ? WebSocketClient.State.NONE : state;
}

private boolean inAnyState(WebSocketClient.State... states) {
return Arrays.asList(states).contains(getWebSocketState());
}

private Task<Void> handleOperationAsync(final String message) {
return Task.call(new Callable<Void>() {
public Void call() throws Exception {
Expand All @@ -182,20 +212,6 @@ public Void call() throws Exception {
}, taskExecutor);
}

private Task<Void> disconnectAsync() {
return Task.call(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (webSocketClient != null) {
webSocketClient.close();
webSocketClient = null;
}

return null;
}
}, taskExecutor);
}

private void parseMessage(String message) throws LiveQueryException {
try {
JSONObject jsonObject = new JSONObject(message);
Expand Down
10 changes: 10 additions & 0 deletions ParseLiveQuery/src/test/java/com/parse/ImmediateExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.parse;

import java.util.concurrent.Executor;

class ImmediateExecutor implements Executor {
@Override
public void execute(Runnable runnable) {
runnable.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;

import bolts.Task;

Expand All @@ -41,7 +38,6 @@
@Config(constants = BuildConfig.class, sdk = 21)
public class TestParseLiveQueryClient {

private PauseableExecutor executor;
private WebSocketClient webSocketClient;
private WebSocketClient.WebSocketClientCallback webSocketClientCallback;
private ParseLiveQueryClient parseLiveQueryClient;
Expand Down Expand Up @@ -69,16 +65,14 @@ public Task<String> answer(InvocationOnMock invocation) throws Throwable {
});
ParseCorePlugins.getInstance().registerCurrentUserController(currentUserController);

executor = new PauseableExecutor();

parseLiveQueryClient = ParseLiveQueryClient.Factory.getClient(new URI(""), new WebSocketClientFactory() {
@Override
public WebSocketClient createInstance(WebSocketClient.WebSocketClientCallback webSocketClientCallback, URI hostUrl) {
TestParseLiveQueryClient.this.webSocketClientCallback = webSocketClientCallback;
webSocketClient = mock(WebSocketClient.class);
return webSocketClient;
}
}, executor);
}, new ImmediateExecutor());
reconnect();
}

Expand Down Expand Up @@ -389,16 +383,6 @@ public void testEmptySessionTokenOnSubscribe() {
contains("\"sessionToken\":\"the token\"")));
}

@Test
public void testDisconnectOnBackgroundThread() throws Exception {
executor.pause();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also get rid of the fancy executor too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be useful in the future if we need to test anything that happens asynchronously (like websocket connect/disconnect events, or something like that).

It will likely need to go away if/when #36 is done, because it won't be using an Executor anymore.

I can remove it for now if you think it's better not to have it if we're not using it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like we should just nuke...can't wait for #36 as well


parseLiveQueryClient.disconnect();
verify(webSocketClient, never()).close();
assertTrue(executor.advanceOne());
verify(webSocketClient, times(1)).close();
}

@Test
public void testCallbackNotifiedOnUnexpectedDisconnect() throws Exception {
LoggingCallbacks callbacks = new LoggingCallbacks();
Expand Down Expand Up @@ -580,41 +564,6 @@ public void onSocketError(ParseLiveQueryClient client, Throwable reason) {
}
}

private static class PauseableExecutor implements Executor {
private boolean isPaused = false;
private final Queue<Runnable> queue = new LinkedList<>();

void pause() {
isPaused = true;
}

void unpause() {
if (isPaused) {
isPaused = false;

//noinspection StatementWithEmptyBody
while (advanceOne()) {
// keep going
}
}
}

boolean advanceOne() {
Runnable next = queue.poll();
if (next != null) next.run();
return next != null;
}

@Override
public void execute(Runnable runnable) {
if (isPaused) {
queue.add(runnable);
} else {
runnable.run();
}
}
}

@ParseClassName("MockA")
static class MockClassA extends ParseObject {
}
Expand Down