Skip to content

[SignalR] [Java] Log 'WebSocket stopped' once #43532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,14 @@ private Completable stop(String errorMessage) {
this.state.unlock();
}

Completable stopTask = startTask.onErrorComplete().andThen(Completable.defer(() ->
CompletableSubject subject = CompletableSubject.create();
startTask.onErrorComplete().subscribe(() ->
{
Completable stop = connectionState.transport.stop();
stop.onErrorComplete().subscribe();
return stop;
}));
stopTask.onErrorComplete().subscribe();
stop.subscribe(() -> subject.onComplete(), e -> subject.onError(e));
});

return stopTask;
return subject;
}

private void ReceiveLoop(ByteBuffer payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ public void setOnClose(TransportOnClosedCallback onCloseCallback) {

@Override
public Completable stop() {
return webSocketClient.stop().doOnEvent(t -> logger.info("WebSocket connection stopped."));
Completable stop = webSocketClient.stop();
stop.onErrorComplete().subscribe(() -> logger.info("WebSocket connection stopped."));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to write a test that we're no longer writing this four times?

return stop;
}

void onClose(Integer code, String reason) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2797,14 +2797,11 @@ public void SkippingNegotiateDoesNotNegotiate() {
.create("http://example")
.withTransport(TransportEnum.WEBSOCKETS)
.shouldSkipNegotiate(true)
.withHandshakeResponseTimeout(1)
.withHttpClient(client)
.build();

try {
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
} catch (Exception e) {
assertEquals("WebSockets isn't supported in testing currently.", e.getMessage());
}
assertThrows(RuntimeException.class, () -> hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait());
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
assertFalse(negotiateCalled.get());

Expand Down Expand Up @@ -3986,4 +3983,43 @@ public void keepAliveIntervalIsSetThroughBuilder()

assertEquals(interval, hubConnection.getKeepAliveInterval());
}

@Test
public void WebsocketStopLoggedOnce() {
try (TestLogger logger = new TestLogger(WebSocketTransport.class.getName())) {
AtomicBoolean negotiateCalled = new AtomicBoolean(false);
TestHttpClient client = new TestHttpClient().on("POST", "http://example.com/negotiate?negotiateVersion=1",
(req) -> {
negotiateCalled.set(true);
return Single.just(new HttpResponse(200, "",
TestUtils.stringToByteBuffer("{\"connectionId\":\"bVOiRPG8-6YiJ6d7ZcTOVQ\",\""
+ "availableTransports\":[{\"transport\":\"WebSockets\",\"transferFormats\":[\"Text\",\"Binary\"]}]}")));
});

HubConnection hubConnection = HubConnectionBuilder
.create("http://example")
.withTransport(TransportEnum.WEBSOCKETS)
.shouldSkipNegotiate(true)
.withHandshakeResponseTimeout(100)
.withHttpClient(client)
.build();

Completable startTask = hubConnection.start().timeout(30, TimeUnit.SECONDS);
hubConnection.stop().timeout(30, TimeUnit.SECONDS).blockingAwait();

assertThrows(RuntimeException.class, () -> startTask.blockingAwait());
assertEquals(HubConnectionState.DISCONNECTED, hubConnection.getConnectionState());
assertFalse(negotiateCalled.get());

ILoggingEvent[] logs = logger.getLogs();
int count = 0;
for (ILoggingEvent iLoggingEvent : logs) {
if (iLoggingEvent.getFormattedMessage().startsWith("WebSocket connection stopped.")) {
count++;
}
}

assertEquals(1, count);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public TestHttpClient on(String method, String url, TestHttpRequestHandler handl

@Override
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
throw new RuntimeException("WebSockets isn't supported in testing currently.");
return new TestWebSocketWrapper(url, headers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public void append(ILoggingEvent event) {
this.logger.addAppender(this.appender);
}

public ILoggingEvent[] getLogs() {
lock.lock();
try {
return list.toArray(new ILoggingEvent[0]);
} finally {
lock.unlock();
}
}

public ILoggingEvent assertLog(String logMessage) {
ILoggingEvent[] localList;
lock.lock();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package com.microsoft.signalr;

import java.nio.ByteBuffer;
import java.util.Map;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;

class WebSocketTestHttpClient extends HttpClient {
@Override
public Single<HttpResponse> send(HttpRequest request) {
return null;
}

@Override
public Single<HttpResponse> send(HttpRequest request, ByteBuffer body) {
return null;
}

@Override
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
return new TestWebSocketWrapper(url, headers);
}

@Override
public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
return null;
}

@Override
public void close() {
}
}

class TestWebSocketWrapper extends WebSocketWrapper {
private WebSocketOnClosedCallback onClose;

public TestWebSocketWrapper(String url, Map<String, String> headers)
{
}

@Override
public Completable start() {
return Completable.complete();
}

@Override
public Completable stop() {
if (onClose != null) {
onClose.invoke(null, "");
}
return Completable.complete();
}

@Override
public Completable send(ByteBuffer message) {
return Completable.complete();
}

@Override
public void setOnReceive(OnReceiveCallBack onReceive) {
}

@Override
public void setOnClose(WebSocketOnClosedCallback onClose) {
this.onClose = onClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,11 @@

import static org.junit.jupiter.api.Assertions.*;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Single;

class WebSocketTransportTest {
@Test
public void CanPassNullExitCodeToOnClosed() {
Expand All @@ -28,61 +22,4 @@ public void CanPassNullExitCodeToOnClosed() {
transport.stop();
assertTrue(closed.get());
}

class WebSocketTestHttpClient extends HttpClient {
@Override
public Single<HttpResponse> send(HttpRequest request) {
return null;
}

@Override
public Single<HttpResponse> send(HttpRequest request, ByteBuffer body) {
return null;
}

@Override
public WebSocketWrapper createWebSocket(String url, Map<String, String> headers) {
return new TestWrapper();
}

@Override
public HttpClient cloneWithTimeOut(int timeoutInMilliseconds) {
return null;
}

@Override
public void close() {
}
}

class TestWrapper extends WebSocketWrapper {
private WebSocketOnClosedCallback onClose;

@Override
public Completable start() {
return Completable.complete();
}

@Override
public Completable stop() {
if (onClose != null) {
onClose.invoke(null, "");
}
return Completable.complete();
}

@Override
public Completable send(ByteBuffer message) {
return null;
}

@Override
public void setOnReceive(OnReceiveCallBack onReceive) {
}

@Override
public void setOnClose(WebSocketOnClosedCallback onClose) {
this.onClose = onClose;
}
}
}