Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit 479e832

Browse files
committed
pr feedback
1 parent f66d4de commit 479e832

File tree

5 files changed

+171
-102
lines changed

5 files changed

+171
-102
lines changed

client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ describe("HubConnection", () => {
1212
let connection = new TestConnection();
1313

1414
let hubConnection = new HubConnection(connection);
15-
var invokePromise = hubConnection.invoke("testMethod", "foo", 42)
15+
var invokePromise = hubConnection.invoke("testMethod", "arg", 42)
1616
.catch((_) => { }); // Suppress exception and unhandled promise rejection warning.
1717

1818
// Verify the message is sent
@@ -23,7 +23,7 @@ describe("HubConnection", () => {
2323
target: "testMethod",
2424
nonblocking: false,
2525
arguments: [
26-
"foo",
26+
"arg",
2727
42
2828
]
2929
});
@@ -36,7 +36,7 @@ describe("HubConnection", () => {
3636
let connection = new TestConnection();
3737

3838
let hubConnection = new HubConnection(connection);
39-
var invokePromise = hubConnection.invoke("testMethod", "foo", 42);
39+
var invokePromise = hubConnection.invoke("testMethod", "arg", 42);
4040

4141
connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" });
4242

@@ -48,7 +48,7 @@ describe("HubConnection", () => {
4848
let connection = new TestConnection();
4949

5050
let hubConnection = new HubConnection(connection);
51-
var invokePromise = hubConnection.invoke("testMethod", "foo", 42);
51+
var invokePromise = hubConnection.invoke("testMethod", "arg", 42);
5252

5353
connection.receive({ type: 3, invocationId: connection.lastInvocationId, result: "foo" });
5454

@@ -97,7 +97,7 @@ describe("HubConnection", () => {
9797
let connection = new TestConnection();
9898

9999
let hubConnection = new HubConnection(connection);
100-
var invokePromise = hubConnection.stream("testStream", "foo", 42);
100+
var invokePromise = hubConnection.stream("testStream", "arg", 42);
101101

102102
// Verify the message is sent
103103
expect(connection.sentData.length).toBe(1);
@@ -107,7 +107,7 @@ describe("HubConnection", () => {
107107
target: "testStream",
108108
nonblocking: false,
109109
arguments: [
110-
"foo",
110+
"arg",
111111
42
112112
]
113113
});
@@ -121,7 +121,7 @@ describe("HubConnection", () => {
121121

122122
let hubConnection = new HubConnection(connection);
123123
let observer = new TestObserver();
124-
hubConnection.stream<any>("testMethod", "foo", 42)
124+
hubConnection.stream<any>("testMethod", "arg", 42)
125125
.subscribe(observer);
126126

127127
connection.receive({ type: 3, invocationId: connection.lastInvocationId, error: "foo" });
@@ -136,7 +136,7 @@ describe("HubConnection", () => {
136136

137137
let hubConnection = new HubConnection(connection);
138138
let observer = new TestObserver();
139-
hubConnection.stream<any>("testMethod", "foo", 42)
139+
hubConnection.stream<any>("testMethod", "arg", 42)
140140
.subscribe(observer);
141141

142142
connection.receive({ type: 3, invocationId: connection.lastInvocationId });

client-ts/Microsoft.AspNetCore.SignalR.Test.Server/wwwroot/js/hubConnectionTests.js

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ describe('hubConnection', () => {
6161
});
6262
});
6363

64-
it(`rethrows an exception from the server`, done => {
64+
it(`rethrows an exception from the server when invoking`, done => {
6565
const errorMessage = "An error occurred.";
6666
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
6767

@@ -88,6 +88,33 @@ describe('hubConnection', () => {
8888
});
8989
});
9090

91+
it(`rethrows an exception from the server when streaming`, done => {
92+
const errorMessage = "An error occurred.";
93+
let hubConnection = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
94+
95+
hubConnection.start(transportType)
96+
.then(() => {
97+
hubConnection.stream('ThrowException', errorMessage)
98+
.subscribe({
99+
next: (item) => {
100+
fail();
101+
},
102+
error: (err) => {
103+
expect(err.message).toEqual("An error occurred.");
104+
done();
105+
},
106+
complete: () => {
107+
fail();
108+
}
109+
});
110+
111+
})
112+
.catch(e => {
113+
fail(e);
114+
done();
115+
});
116+
});
117+
91118
it(`can receive server calls`, done => {
92119
let client = new signalR.HubConnection(TESTHUBENDPOINT_URL, 'formatType=json&format=text');
93120
const message = "Hello SignalR";

src/Microsoft.AspNetCore.SignalR.Client/HubConnection.cs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,45 +117,43 @@ public void On(string methodName, Type[] parameterTypes, Action<object[]> handle
117117

118118
public IObservable<object> Stream(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
119119
{
120-
var irq = InvokeCore(methodName, returnType, cancellationToken, args, streaming: true);
121-
return irq.Observable;
120+
var irq = InvocationRequest.Stream(cancellationToken, returnType, GetNextId(), _loggerFactory, out var observable);
121+
InvokeCore(methodName, irq, args);
122+
return observable;
122123
}
123124

124125
public Task<object> Invoke(string methodName, Type returnType, CancellationToken cancellationToken, params object[] args)
125126
{
126-
var irq = InvokeCore(methodName, returnType, cancellationToken, args, streaming: false);
127-
return irq.Result;
127+
var irq = InvocationRequest.Invoke(cancellationToken, returnType, GetNextId(), _loggerFactory, out var task);
128+
InvokeCore(methodName, irq, args);
129+
return task;
128130
}
129131

130-
private InvocationRequest InvokeCore(string methodName, Type returnType, CancellationToken cancellationToken, object[] args, bool streaming)
132+
private void InvokeCore(string methodName, InvocationRequest irq, object[] args)
131133
{
132134
ThrowIfConnectionTerminated();
133-
_logger.LogTrace("Preparing invocation of '{target}', with return type '{returnType}' and {argumentCount} args", methodName, returnType.AssemblyQualifiedName, args.Length);
135+
_logger.LogTrace("Preparing invocation of '{target}', with return type '{returnType}' and {argumentCount} args", methodName, irq.ResultType.AssemblyQualifiedName, args.Length);
134136

135137
// Create an invocation descriptor. Client invocations are always blocking
136-
var invocationMessage = new InvocationMessage(GetNextId(), nonBlocking: false, target: methodName, arguments: args);
138+
var invocationMessage = new InvocationMessage(irq.InvocationId, nonBlocking: false, target: methodName, arguments: args);
137139

138140
// I just want an excuse to use 'irq' as a variable name...
139141
_logger.LogDebug("Registering Invocation ID '{invocationId}' for tracking", invocationMessage.InvocationId);
140-
var irq = new InvocationRequest(cancellationToken, returnType, invocationMessage.InvocationId, _loggerFactory, streaming);
141142

142143
AddInvocation(irq);
143144

144145
// Trace the full invocation, but only if that logging level is enabled (because building the args list is a bit slow)
145146
if (_logger.IsEnabled(LogLevel.Trace))
146147
{
147148
var argsList = string.Join(", ", args.Select(a => a.GetType().FullName));
148-
_logger.LogTrace("Issuing Invocation '{invocationId}': {returnType} {methodName}({args})", invocationMessage.InvocationId, returnType.FullName, methodName, argsList);
149+
_logger.LogTrace("Issuing Invocation '{invocationId}': {returnType} {methodName}({args})", invocationMessage.InvocationId, irq.ResultType.FullName, methodName, argsList);
149150
}
150151

151152
// We don't need to wait for this to complete. It will signal back to the invocation request.
152-
_ = SendInvocation(invocationMessage, irq, cancellationToken);
153-
154-
// Return the completion task. It will be completed by ReceiveMessages when the response is received.
155-
return irq;
153+
_ = SendInvocation(invocationMessage, irq);
156154
}
157155

158-
private async Task SendInvocation(InvocationMessage invocationMessage, InvocationRequest irq, CancellationToken cancellationToken)
156+
private async Task SendInvocation(InvocationMessage invocationMessage, InvocationRequest irq)
159157
{
160158
try
161159
{

src/Microsoft.AspNetCore.SignalR.Client/InvocationRequest.cs

Lines changed: 82 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,106 +8,135 @@
88

99
namespace Microsoft.AspNetCore.SignalR.Client
1010
{
11-
internal class InvocationRequest : IDisposable
11+
internal abstract class InvocationRequest : IDisposable
1212
{
13-
private readonly TaskCompletionSource<object> _completionSource;
1413
private readonly CancellationTokenRegistration _cancellationTokenRegistration;
1514
private readonly bool _streaming;
16-
private readonly ILogger _logger;
17-
private readonly InvocationSubject _subject;
15+
16+
protected ILogger Logger { get; }
1817

1918
public Type ResultType { get; }
2019
public CancellationToken CancellationToken { get; }
2120
public string InvocationId { get; }
2221

23-
public Task<object> Result => _completionSource?.Task ?? Task.FromResult<object>(null);
24-
public IObservable<object> Observable => _subject;
25-
26-
public InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, bool streaming)
22+
protected InvocationRequest(CancellationToken cancellationToken, Type resultType, string invocationId, ILogger logger)
2723
{
28-
_logger = loggerFactory.CreateLogger<InvocationRequest>();
29-
_cancellationTokenRegistration = cancellationToken.Register(state => (state as TaskCompletionSource<object>)?.TrySetCanceled(), _completionSource);
30-
_streaming = streaming;
31-
32-
if (_streaming)
33-
{
34-
_subject = new InvocationSubject();
35-
}
36-
else
37-
{
38-
_completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
39-
}
24+
_cancellationTokenRegistration = cancellationToken.Register(self => ((InvocationRequest)self).Cancel(), this);
4025

4126
InvocationId = invocationId;
4227
CancellationToken = cancellationToken;
4328
ResultType = resultType;
4429

45-
_logger.LogTrace("Invocation {invocationId} created", InvocationId);
30+
Logger.LogTrace("Invocation {invocationId} created", InvocationId);
4631
}
4732

48-
public void Fail(Exception exception)
33+
public static InvocationRequest Invoke(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, out Task<object> result)
4934
{
50-
_logger.LogTrace("Invocation {invocationId} marked as failed.", InvocationId);
51-
if (_streaming)
52-
{
53-
_subject.TryOnError(exception);
54-
}
55-
else
56-
{
57-
_completionSource.TrySetException(exception);
58-
}
35+
var req = new NonStreaming(cancellationToken, resultType, invocationId, loggerFactory);
36+
result = req.Result;
37+
return req;
5938
}
6039

61-
public void Complete(object result)
40+
41+
public static InvocationRequest Stream(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory, out IObservable<object> result)
6242
{
63-
_logger.LogTrace("Invocation {invocationId} marked as completed.", InvocationId);
64-
if (_streaming)
43+
var req = new Streaming(cancellationToken, resultType, invocationId, loggerFactory);
44+
result = req.Result;
45+
return req;
46+
}
47+
48+
public abstract void Fail(Exception exception);
49+
public abstract void Complete(object result);
50+
public abstract void StreamItem(object item);
51+
52+
protected abstract void Cancel();
53+
54+
public virtual void Dispose()
55+
{
56+
Logger.LogTrace("Invocation {invocationId} disposed", InvocationId);
57+
58+
// Just in case it hasn't already been completed
59+
Cancel();
60+
61+
_cancellationTokenRegistration.Dispose();
62+
}
63+
64+
private class Streaming : InvocationRequest
65+
{
66+
private readonly InvocationSubject _subject;
67+
68+
public Streaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
69+
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<Streaming>())
6570
{
71+
}
72+
73+
public IObservable<object> Result => _subject;
74+
75+
public override void Complete(object result)
76+
{
77+
Logger.LogTrace("Invocation {invocationId} marked as completed.", InvocationId);
6678
if (result != null)
6779
{
68-
_logger.LogError("Invocation {invocationId} received a completion result, but was invoked as a streaming invocation.", InvocationId);
80+
Logger.LogError("Invocation {invocationId} received a completion result, but was invoked as a streaming invocation.", InvocationId);
6981
_subject.TryOnError(new InvalidOperationException("Server provided a result in a completion response to a streamed invocation."));
7082
}
7183
else
7284
{
7385
_subject.TryOnCompleted();
7486
}
7587
}
76-
else
88+
89+
public override void Fail(Exception exception)
7790
{
78-
_completionSource.TrySetResult(result);
91+
Logger.LogTrace("Invocation {invocationId} marked as failed.", InvocationId);
92+
_subject.TryOnError(exception);
7993
}
80-
}
8194

82-
public void StreamItem(object item)
83-
{
84-
if (_streaming)
95+
public override void StreamItem(object item)
8596
{
86-
_logger.LogTrace("Invocation {invocationId} received stream item.", InvocationId);
97+
Logger.LogTrace("Invocation {invocationId} received stream item.", InvocationId);
8798
_subject.TryOnNext(item);
8899
}
89-
else
100+
101+
protected override void Cancel()
90102
{
91-
_logger.LogError("Invocation {invocationId} received stream item but was invoked as a non-streamed invocation.", InvocationId);
92-
_completionSource.TrySetException(new InvalidOperationException("Streaming methods must be invoked using HubConnection.Stream"));
103+
_subject.TryOnError(new OperationCanceledException("Connection terminated"));
93104
}
94105
}
95106

96-
public void Dispose()
107+
private class NonStreaming : InvocationRequest
97108
{
98-
_logger.LogTrace("Invocation {invocationId} disposed", InvocationId);
109+
private readonly TaskCompletionSource<object> _completionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
99110

100-
// Just in case it hasn't already been completed
101-
if (_streaming)
111+
public NonStreaming(CancellationToken cancellationToken, Type resultType, string invocationId, ILoggerFactory loggerFactory)
112+
: base(cancellationToken, resultType, invocationId, loggerFactory.CreateLogger<NonStreaming>())
102113
{
103-
_subject.TryOnError(new OperationCanceledException("Connection terminated"));
104114
}
105-
else
115+
116+
public Task<object> Result => _completionSource.Task;
117+
118+
public override void Complete(object result)
119+
{
120+
Logger.LogTrace("Invocation {invocationId} marked as completed.", InvocationId);
121+
_completionSource.TrySetResult(result);
122+
}
123+
124+
public override void Fail(Exception exception)
106125
{
107-
_completionSource.TrySetCanceled();
126+
Logger.LogTrace("Invocation {invocationId} marked as failed.", InvocationId);
127+
_completionSource.TrySetException(exception);
108128
}
109129

110-
_cancellationTokenRegistration.Dispose();
130+
public override void StreamItem(object item)
131+
{
132+
Logger.LogError("Invocation {invocationId} received stream item but was invoked as a non-streamed invocation.", InvocationId);
133+
_completionSource.TrySetException(new InvalidOperationException("Streaming methods must be invoked using HubConnection.Stream"));
134+
}
135+
136+
protected override void Cancel()
137+
{
138+
_completionSource.TrySetCanceled();
139+
}
111140
}
112141
}
113142
}

0 commit comments

Comments
 (0)