From f7f301dc5fd89a36de411eb38ae4de3364c596ba Mon Sep 17 00:00:00 2001 From: Andrew Stanton-Nurse Date: Fri, 12 May 2017 13:26:52 -0700 Subject: [PATCH 01/17] fix #324 by adding streaming support to the server --- .../HubConnection.ts | 69 ++++-- ...rosoft.AspNetCore.SignalR.Client.TS.csproj | 1 + .../Observable.ts | 43 ++++ .../gulpfile.js | 2 +- samples/SocketsSample/Hubs/Streaming.cs | 81 +++++++ samples/SocketsSample/Startup.cs | 3 +- samples/SocketsSample/wwwroot/hubs.html | 34 +-- samples/SocketsSample/wwwroot/index.html | 8 +- samples/SocketsSample/wwwroot/streaming.html | 102 +++++++++ samples/SocketsSample/wwwroot/utils.js | 16 ++ .../Internal/Protocol/CompletionMessage.cs | 2 + .../HubEndPoint.cs | 88 +++++++- .../Internal/AsyncEnumeratorAdapters.cs | 87 ++++++++ .../Internal/TypeBaseEnumerationExtensions.cs | 18 ++ .../StreamingAttribute.cs | 10 + .../CancellationDisposable.cs | 24 ++ .../HubEndpointTests.cs | 208 +++++++++++++++--- .../TestClient.cs | 41 +++- 18 files changed, 749 insertions(+), 88 deletions(-) create mode 100644 client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Observable.ts create mode 100644 samples/SocketsSample/Hubs/Streaming.cs create mode 100644 samples/SocketsSample/wwwroot/streaming.html create mode 100644 src/Microsoft.AspNetCore.SignalR/Internal/AsyncEnumeratorAdapters.cs create mode 100644 src/Microsoft.AspNetCore.SignalR/Internal/TypeBaseEnumerationExtensions.cs create mode 100644 src/Microsoft.AspNetCore.SignalR/StreamingAttribute.cs create mode 100644 test/Microsoft.AspNetCore.SignalR.Tests/CancellationDisposable.cs diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts index 4967e0078d..2160695cec 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/HubConnection.ts @@ -2,7 +2,7 @@ import { ConnectionClosed } from "./Common" import { IConnection } from "./IConnection" import { Connection } from "./Connection" import { TransportType } from "./Transports" - +import { Subject, Observable } from "./Observable" const enum MessageType { Invocation = 1, @@ -35,7 +35,7 @@ export { TransportType } from "./Transports" export class HubConnection { private connection: IConnection; - private callbacks: Map void>; + private callbacks: Map void>; private methods: Map void>; private id: number; private connectionClosedCallback: ConnectionClosed; @@ -55,7 +55,7 @@ export class HubConnection { this.onConnectionClosed(error); } - this.callbacks = new Map void>(); + this.callbacks = new Map void>(); this.methods = new Map void>(); this.id = 0; } @@ -73,12 +73,14 @@ export class HubConnection { this.InvokeClientMethod(message); break; case MessageType.Result: - // TODO: Streaming (MessageType.Result) currently not supported - callback will throw case MessageType.Completion: let callback = this.callbacks.get(message.invocationId); if (callback != null) { callback(message); - this.callbacks.delete(message.invocationId); + + if (message.type == MessageType.Completion) { + this.callbacks.delete(message.invocationId); + } } break; default: @@ -125,17 +127,39 @@ export class HubConnection { return this.connection.stop(); } - invoke(methodName: string, ...args: any[]): Promise { - let id = this.id; - this.id++; + stream(methodName: string, ...args: any[]): Observable { + let invocationDescriptor = this.createInvocation(methodName, args); - let invocationDescriptor: InvocationMessage = { - type: MessageType.Invocation, - invocationId: id.toString(), - target: methodName, - arguments: args, - nonblocking: false - }; + let subject = new Subject(); + + this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => { + if (invocationEvent.type === MessageType.Completion) { + let completionMessage = invocationEvent; + if (completionMessage.error) { + subject.error(new Error(completionMessage.error)); + } + else { + // TODO: Log a warning if there's a payload? + subject.complete(); + } + } + else { + subject.next((invocationEvent).result); + } + }); + + //TODO: separate conversion to enable different data formats + this.connection.send(JSON.stringify(invocationDescriptor)) + .catch(e => { + subject.error(e); + this.callbacks.delete(invocationDescriptor.invocationId); + }); + + return subject; + } + + invoke(methodName: string, ...args: any[]): Promise { + let invocationDescriptor = this.createInvocation(methodName, args); let p = new Promise((resolve, reject) => { this.callbacks.set(invocationDescriptor.invocationId, (invocationEvent: CompletionMessage | ResultMessage) => { @@ -149,7 +173,7 @@ export class HubConnection { } } else { - reject(new Error("Streaming is not supported.")) + reject(new Error("Streaming methods must be invoked using HubConnection.stream")) } }); @@ -171,4 +195,17 @@ export class HubConnection { set onClosed(callback: ConnectionClosed) { this.connectionClosedCallback = callback; } + + private createInvocation(methodName: string, args: any[]): InvocationMessage { + let id = this.id; + this.id++; + + return { + type: MessageType.Invocation, + invocationId: id.toString(), + target: methodName, + arguments: args, + nonblocking: false + }; + } } diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Microsoft.AspNetCore.SignalR.Client.TS.csproj b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Microsoft.AspNetCore.SignalR.Client.TS.csproj index bf32b28e1a..0b30b88f2d 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Microsoft.AspNetCore.SignalR.Client.TS.csproj +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Microsoft.AspNetCore.SignalR.Client.TS.csproj @@ -13,6 +13,7 @@ + diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Observable.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Observable.ts new file mode 100644 index 0000000000..8c527f0dbf --- /dev/null +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/Observable.ts @@ -0,0 +1,43 @@ +// TODO: Seamless RxJs integration +// From RxJs: https://github.com/ReactiveX/rxjs/blob/master/src/Observer.ts +export interface Observer { + closed?: boolean; + next: (value: T) => void; + error: (err: any) => void; + complete: () => void; +} + +export interface Observable { + // TODO: Return a Subscription so the caller can unsubscribe? IDisposable in System.IObservable + subscribe(observer: Observer): void; +} + +export class Subject implements Observable { + observers: Observer[]; + + constructor() { + this.observers = []; + } + + public next(item: T): void { + for (let observer of this.observers) { + observer.next(item); + } + } + + public error(err: any): void { + for (let observer of this.observers) { + observer.error(err); + } + } + + public complete(): void { + for (let observer of this.observers) { + observer.complete(); + } + } + + public subscribe(observer: Observer): void { + this.observers.push(observer); + } +} diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/gulpfile.js b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/gulpfile.js index e77311aac7..a5b47fab36 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/gulpfile.js +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS/gulpfile.js @@ -26,4 +26,4 @@ gulp.task('browserify-client', ['compile-ts-client'], () => { gulp.task('build-ts-client', ['clean', 'compile-ts-client', 'browserify-client']); -gulp.task('default', ['build-ts-client']); \ No newline at end of file +gulp.task('default', ['build-ts-client']); diff --git a/samples/SocketsSample/Hubs/Streaming.cs b/samples/SocketsSample/Hubs/Streaming.cs new file mode 100644 index 0000000000..a82c0673aa --- /dev/null +++ b/samples/SocketsSample/Hubs/Streaming.cs @@ -0,0 +1,81 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Channels; +using Microsoft.AspNetCore.SignalR; + +namespace SocketsSample.Hubs +{ + public class Streaming : Hub + { + [return: Streaming] + public IObservable ObservableCounter(int count, int delay) + { + return new CounterObservable(count, delay); + } + + [return: Streaming] + public ReadableChannel ChannelCounter(int count, int delay) + { + var channel = Channel.CreateUnbounded(); + + Task.Run(async () => + { + for (var i = 0; i < count; i++) + { + await channel.Out.WriteAsync(i); + await Task.Delay(delay); + } + + channel.Out.TryComplete(); + }); + + return channel.In; + } + + private class CounterObservable : IObservable + { + private int _count; + private int _delay; + + public CounterObservable(int count, int delay) + { + _count = count; + _delay = delay; + } + + public IDisposable Subscribe(IObserver observer) + { + // Run in a thread-pool thread + var cts = new CancellationTokenSource(); + Task.Run(async () => + { + for (var i = 0; !cts.Token.IsCancellationRequested && i < _count; i++) + { + observer.OnNext(i); + await Task.Delay(_delay); + } + observer.OnCompleted(); + }); + + return new Disposable(() => cts.Cancel()); + } + } + + private class Disposable : IDisposable + { + private Action _action; + + public Disposable(Action action) + { + _action = action; + } + + public void Dispose() + { + _action(); + } + } + } +} diff --git a/samples/SocketsSample/Startup.cs b/samples/SocketsSample/Startup.cs index 1acae62ef4..3578b22557 100644 --- a/samples/SocketsSample/Startup.cs +++ b/samples/SocketsSample/Startup.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using Microsoft.AspNetCore.Builder; @@ -49,6 +49,7 @@ public void Configure(IApplicationBuilder app, IHostingEnvironment env) app.UseSignalR(routes => { routes.MapHub("hubs"); + routes.MapHub("streaming"); }); app.UseSockets(routes => diff --git a/samples/SocketsSample/wwwroot/hubs.html b/samples/SocketsSample/wwwroot/hubs.html index cb2331db55..2533f75dcc 100644 --- a/samples/SocketsSample/wwwroot/hubs.html +++ b/samples/SocketsSample/wwwroot/hubs.html @@ -1,4 +1,4 @@ - + @@ -55,13 +55,6 @@

Private Message

\ No newline at end of file + diff --git a/samples/SocketsSample/wwwroot/index.html b/samples/SocketsSample/wwwroot/index.html index f8404615c5..2ee45dccbb 100644 --- a/samples/SocketsSample/wwwroot/index.html +++ b/samples/SocketsSample/wwwroot/index.html @@ -1,4 +1,4 @@ - + @@ -18,5 +18,11 @@

ASP.NET SignalR (Hubs)

  • Server Sent Events
  • Web Sockets
  • +

    ASP.NET SignalR (Streaming)

    + diff --git a/samples/SocketsSample/wwwroot/streaming.html b/samples/SocketsSample/wwwroot/streaming.html new file mode 100644 index 0000000000..51c203adb9 --- /dev/null +++ b/samples/SocketsSample/wwwroot/streaming.html @@ -0,0 +1,102 @@ + + + + + + + +

    Unknown Transport

    + +

    Controls

    +
    + + +
    + +
    + + + +
    + +

    Results

    +
      + +
        + + + + + diff --git a/samples/SocketsSample/wwwroot/utils.js b/samples/SocketsSample/wwwroot/utils.js index ef155b5a2d..8bf6b7c2b3 100644 --- a/samples/SocketsSample/wwwroot/utils.js +++ b/samples/SocketsSample/wwwroot/utils.js @@ -10,3 +10,19 @@ function getParameterByName(name, url) { return decodeURIComponent(results[2].replace(/\+/g, " ")); } +function click(id, callback) { + document.getElementById(id).addEventListener('click', event => { + callback(event); + event.preventDefault(); + }); +} + +function addLine(listId, line, color) { + var child = document.createElement('li'); + if (color) { + child.style.color = color; + } + child.innerText = line; + document.getElementById(listId).appendChild(child); +} + diff --git a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/CompletionMessage.cs b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/CompletionMessage.cs index 44b7206121..78c39c9a9f 100644 --- a/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/CompletionMessage.cs +++ b/src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/CompletionMessage.cs @@ -34,5 +34,7 @@ public override string ToString() public static CompletionMessage WithError(string invocationId, string error) => new CompletionMessage(invocationId, error, result: null, hasResult: false); public static CompletionMessage WithResult(string invocationId, object payload) => new CompletionMessage(invocationId, error: null, result: payload, hasResult: true); + + public static CompletionMessage Empty(string invocationId) => new CompletionMessage(invocationId, error: null, result: null, hasResult: false); } } diff --git a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs index f16170ff78..feeaafbe6d 100644 --- a/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs +++ b/src/Microsoft.AspNetCore.SignalR/HubEndPoint.cs @@ -3,10 +3,12 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.SignalR.Internal; using Microsoft.AspNetCore.SignalR.Internal.Protocol; using Microsoft.AspNetCore.Sockets; @@ -220,8 +222,7 @@ private async Task Execute(ConnectionContext connection, IHubProtocol protocol, } else { - var result = await Invoke(descriptor, connection, invocationMessage); - await SendMessageAsync(connection, protocol, result); + await Invoke(descriptor, connection, protocol, invocationMessage); } } @@ -243,7 +244,7 @@ private async Task SendMessageAsync(ConnectionContext connection, IHubProtocol p throw new OperationCanceledException("Outbound channel was closed while trying to write hub message"); } - private async Task Invoke(HubMethodDescriptor descriptor, ConnectionContext connection, InvocationMessage invocationMessage) + private async Task Invoke(HubMethodDescriptor descriptor, ConnectionContext connection, IHubProtocol protocol, InvocationMessage invocationMessage) { var methodExecutor = descriptor.MethodExecutor; @@ -257,10 +258,14 @@ private async Task Invoke(HubMethodDescriptor descriptor, Con InitializeHub(hub, connection); object result = null; - if (methodExecutor.IsMethodAsync) + var hasResult = true; // Need to distinguish betweeen 'null' and 'void' + + // ReadableChannel is awaitable but we don't want to await it. + if (methodExecutor.IsMethodAsync && !IsChannel(methodExecutor.MethodReturnType, out _)) { if (methodExecutor.MethodReturnType == typeof(Task)) { + hasResult = false; await (Task)methodExecutor.Execute(hub, invocationMessage.Arguments); } else @@ -273,17 +278,24 @@ private async Task Invoke(HubMethodDescriptor descriptor, Con result = methodExecutor.Execute(hub, invocationMessage.Arguments); } - return CompletionMessage.WithResult(invocationMessage.InvocationId, result); + if (hasResult && IsStreamed(methodExecutor, result, out var channel)) + { + await StreamResultsAsync(invocationMessage.InvocationId, connection, protocol, channel); + } + else + { + await SendMessageAsync(connection, protocol, CompletionMessage.WithResult(invocationMessage.InvocationId, result)); + } } catch (TargetInvocationException ex) { _logger.LogError(0, ex, "Failed to invoke hub method"); - return CompletionMessage.WithError(invocationMessage.InvocationId, ex.InnerException.Message); + await SendMessageAsync(connection, protocol, CompletionMessage.WithError(invocationMessage.InvocationId, ex.InnerException.Message)); } catch (Exception ex) { _logger.LogError(0, ex, "Failed to invoke hub method"); - return CompletionMessage.WithError(invocationMessage.InvocationId, ex.Message); + await SendMessageAsync(connection, protocol, CompletionMessage.WithError(invocationMessage.InvocationId, ex.Message)); } finally { @@ -299,6 +311,68 @@ private void InitializeHub(THub hub, ConnectionContext connection) hub.Groups = new GroupManager(connection, _lifetimeManager); } + private bool IsChannel(Type type, out Type payloadType) + { + var channelType = type.AllBaseTypes().FirstOrDefault(t => t.IsGenericType && t.GetGenericTypeDefinition() == typeof(ReadableChannel<>)); + if (channelType == null) + { + payloadType = null; + return false; + } + else + { + payloadType = channelType.GetGenericArguments()[0]; + return true; + } + } + + private async Task StreamResultsAsync(string invocationId, Connection connection, IHubProtocol protocol, IAsyncEnumerator enumerator) + { + // TODO: Cancellation? + try + { + while (await enumerator.MoveNextAsync()) + { + // Send the stream item + await SendMessageAsync(connection, protocol, new StreamItemMessage(invocationId, enumerator.Current)); + } + + await SendMessageAsync(connection, protocol, CompletionMessage.Empty(invocationId)); + } + catch (Exception ex) + { + await SendMessageAsync(connection, protocol, CompletionMessage.WithError(invocationId, ex.Message)); + } + } + + private bool IsStreamed(ObjectMethodExecutor methodExecutor, object result, out IAsyncEnumerator enumerator) + { + // TODO: Cache attributes the OME? + var streamingAttribute = methodExecutor.MethodInfo.ReturnParameter.GetCustomAttribute(); + if (streamingAttribute == null) + { + enumerator = null; + return false; + } + + var observableInterface = result.GetType().GetInterfaces().FirstOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IObservable<>)); + if (observableInterface != null) + { + enumerator = AsyncEnumeratorAdapters.FromObservable(result, observableInterface); + return true; + } + else if (IsChannel(result.GetType(), out var payloadType)) + { + enumerator = AsyncEnumeratorAdapters.FromChannel(result, payloadType); + return true; + } + else + { + // This type cannot be streamed + throw new NotSupportedException($"Cannot stream results of type: {result.GetType().FullName}"); + } + } + private void DiscoverHubMethods() { var hubType = typeof(THub); diff --git a/src/Microsoft.AspNetCore.SignalR/Internal/AsyncEnumeratorAdapters.cs b/src/Microsoft.AspNetCore.SignalR/Internal/AsyncEnumeratorAdapters.cs new file mode 100644 index 0000000000..7e8cde77d4 --- /dev/null +++ b/src/Microsoft.AspNetCore.SignalR/Internal/AsyncEnumeratorAdapters.cs @@ -0,0 +1,87 @@ +using System; +using System.Linq; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks.Channels; + +namespace Microsoft.AspNetCore.SignalR.Internal +{ + // True-internal because this is a weird and tricky class to use :) + internal static class AsyncEnumeratorAdapters + { + private static readonly MethodInfo _fromObservableMethod = typeof(AsyncEnumeratorAdapters) + .GetRuntimeMethods() + .Single(m => m.Name.Equals(nameof(FromObservable)) && m.IsGenericMethod); + private static readonly object[] _getAsyncEnumeratorArgs = new object[] { CancellationToken.None }; + + public static IAsyncEnumerator FromObservable(object observable, Type observableInterface) + { + // TODO: Cache expressions by observable.GetType()? + return (IAsyncEnumerator)_fromObservableMethod + .MakeGenericMethod(observableInterface.GetGenericArguments()) + .Invoke(null, new[] { observable }); + } + + public static IAsyncEnumerator FromObservable(IObservable observable) + { + // TODO: Allow bounding and optimizations? + var channel = Channel.CreateUnbounded(); + var cancellationTokenSource = new CancellationTokenSource(); + + var subscription = observable.Subscribe(new ChannelObserver(channel.Out, cancellationTokenSource.Token)); + + return channel.In.GetAsyncEnumerator(); + } + + public static IAsyncEnumerator FromChannel(object readableChannelOfT, Type payloadType) + { + return (IAsyncEnumerator)readableChannelOfT + .GetType() + .GetRuntimeMethod("GetAsyncEnumerator", new[] { typeof(CancellationToken) }) + .Invoke(readableChannelOfT, _getAsyncEnumeratorArgs); + } + + private class ChannelObserver : IObserver + { + private WritableChannel _output; + private CancellationToken _cancellationToken; + + public ChannelObserver(WritableChannel output, CancellationToken cancellationToken) + { + _output = output; + _cancellationToken = cancellationToken; + } + + public void OnCompleted() + { + _output.TryComplete(); + } + + public void OnError(Exception error) + { + _output.TryComplete(error); + } + + public void OnNext(T value) + { + _cancellationToken.ThrowIfCancellationRequested(); + + // This will block the thread emitting the object if the channel is bounded and full + // I think this is OK, since we want to push the backpressure up. However, we may need + // to find a way to force the subscription off to a dedicated thread in order to + // ensure we don't block other tasks + + // Right now however, we use unbounded channels, so all of the above is moot because TryWrite will always succeed + while (!_output.TryWrite(value)) + { + // Wait for a spot + if (!_output.WaitToWriteAsync(_cancellationToken).Result) + { + // Channel was closed. + throw new InvalidOperationException("Output channel was closed"); + } + } + } + } + } +} diff --git a/src/Microsoft.AspNetCore.SignalR/Internal/TypeBaseEnumerationExtensions.cs b/src/Microsoft.AspNetCore.SignalR/Internal/TypeBaseEnumerationExtensions.cs new file mode 100644 index 0000000000..d175b1280b --- /dev/null +++ b/src/Microsoft.AspNetCore.SignalR/Internal/TypeBaseEnumerationExtensions.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; + +namespace Microsoft.AspNetCore.SignalR.Internal +{ + public static class TypeBaseEnumerationExtensions + { + public static IEnumerable AllBaseTypes(this Type type) + { + var current = type; + while(current != null) + { + yield return current; + current = current.BaseType; + } + } + } +} diff --git a/src/Microsoft.AspNetCore.SignalR/StreamingAttribute.cs b/src/Microsoft.AspNetCore.SignalR/StreamingAttribute.cs new file mode 100644 index 0000000000..72ba8fae96 --- /dev/null +++ b/src/Microsoft.AspNetCore.SignalR/StreamingAttribute.cs @@ -0,0 +1,10 @@ +using System; + +namespace Microsoft.AspNetCore.SignalR +{ + [AttributeUsage(AttributeTargets.ReturnValue, AllowMultiple = false, Inherited = false)] + public class StreamingAttribute : Attribute + { + // TODO: Allow specifying channel bounds and optimizations here? + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/CancellationDisposable.cs b/test/Microsoft.AspNetCore.SignalR.Tests/CancellationDisposable.cs new file mode 100644 index 0000000000..50a02883ca --- /dev/null +++ b/test/Microsoft.AspNetCore.SignalR.Tests/CancellationDisposable.cs @@ -0,0 +1,24 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Threading; + +namespace Microsoft.AspNetCore.SignalR.Tests +{ + internal class CancellationDisposable : IDisposable + { + private CancellationTokenSource _cts; + + public CancellationDisposable(CancellationTokenSource cts) + { + _cts = cts; + } + + public void Dispose() + { + _cts.Cancel(); + _cts.Dispose(); + } + } +} diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs index bf8536a2e8..db5948fda7 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/HubEndpointTests.cs @@ -2,7 +2,9 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Channels; using Microsoft.AspNetCore.SignalR.Internal.Protocol; using Microsoft.AspNetCore.SignalR.Tests.Common; using Microsoft.AspNetCore.Sockets; @@ -21,7 +23,7 @@ public async Task HubsAreDisposed() var serviceProvider = CreateServiceProvider(s => s.AddSingleton(trackDispose)); var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -51,7 +53,7 @@ public async Task LifetimeManagerOnDisconnectedAsyncCalledIfLifetimeManagerOnCon var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var exception = await Assert.ThrowsAsync( @@ -79,7 +81,7 @@ public async Task HubOnDisconnectedAsyncCalledIfHubOnConnectedAsyncThrows() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); client.Dispose(); @@ -103,7 +105,7 @@ public async Task LifetimeManagerOnDisconnectedAsyncCalledIfHubOnDisconnectedAsy var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); client.Dispose(); @@ -123,7 +125,7 @@ public async Task HubMethodCanReturnValueFromTask() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -146,7 +148,7 @@ public async Task HubMethodsAreCaseInsensitive() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -170,7 +172,7 @@ public async Task HubMethodCanThrowOrYieldFailedTask(string methodName) var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -192,7 +194,7 @@ public async Task HubMethodCanReturnValue() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -215,7 +217,7 @@ public async Task HubMethodCanBeVoid() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -237,7 +239,7 @@ public async Task HubMethodWithMultiParam() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -259,7 +261,7 @@ public async Task CanCallInheritedHubMethodFromInheritingHub() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -281,7 +283,7 @@ public async Task CanCallOverridenVirtualHubMethod() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -303,7 +305,7 @@ public async Task CannotCallOverriddenBaseHubMethod() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -341,8 +343,8 @@ public async Task BroadcastHubMethod_SendsToAllClients() var endPoint = serviceProvider.GetService>(); - using (var firstClient = new TestClient(serviceProvider)) - using (var secondClient = new TestClient(serviceProvider)) + using (var firstClient = new TestClient()) + using (var secondClient = new TestClient()) { var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection); var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection); @@ -376,8 +378,8 @@ public async Task HubsCanAddAndSendToGroup() var endPoint = serviceProvider.GetService>(); - using (var firstClient = new TestClient(serviceProvider)) - using (var secondClient = new TestClient(serviceProvider)) + using (var firstClient = new TestClient()) + using (var secondClient = new TestClient()) { var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection); var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection); @@ -418,7 +420,7 @@ public async Task RemoveFromGroupWhenNotInGroupDoesNotFail() var endPoint = serviceProvider.GetService>(); - using (var client = new TestClient(serviceProvider)) + using (var client = new TestClient()) { var endPointTask = endPoint.OnConnectedAsync(client.Connection); @@ -438,8 +440,8 @@ public async Task HubsCanSendToUser() var endPoint = serviceProvider.GetService>(); - using (var firstClient = new TestClient(serviceProvider)) - using (var secondClient = new TestClient(serviceProvider)) + using (var firstClient = new TestClient()) + using (var secondClient = new TestClient()) { var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection); var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection); @@ -470,8 +472,8 @@ public async Task HubsCanSendToConnection() var endPoint = serviceProvider.GetService>(); - using (var firstClient = new TestClient(serviceProvider)) - using (var secondClient = new TestClient(serviceProvider)) + using (var firstClient = new TestClient()) + using (var secondClient = new TestClient()) { var firstEndPointTask = endPoint.OnConnectedAsync(firstClient.Connection); var secondEndPointTask = endPoint.OnConnectedAsync(secondClient.Connection); @@ -495,6 +497,86 @@ public async Task HubsCanSendToConnection() } } + [Theory] + [InlineData(nameof(StreamingHub.CounterChannel))] + [InlineData(nameof(StreamingHub.CounterObservable))] + public async Task HubsCanStreamResponses(string method) + { + var serviceProvider = CreateServiceProvider(); + + var endPoint = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var endPointLifetime = endPoint.OnConnectedAsync(client.Connection); + + await client.Connected.OrTimeout(); + + var messages = await client.StreamAsync(method, 4).OrTimeout(); + + Assert.Equal(5, messages.Count); + AssertHubMessage(new StreamItemMessage(string.Empty, "0"), messages[0]); + AssertHubMessage(new StreamItemMessage(string.Empty, "1"), messages[1]); + AssertHubMessage(new StreamItemMessage(string.Empty, "2"), messages[2]); + AssertHubMessage(new StreamItemMessage(string.Empty, "3"), messages[3]); + AssertHubMessage(new CompletionMessage(string.Empty, error: null, result: null, hasResult: false), messages[4]); + + client.Dispose(); + + await endPointLifetime; + } + } + + [Fact] + public async Task HubEndPointYieldsErrorWhenAskedToStreamNonStreamableResultAsync() +{ + var serviceProvider = CreateServiceProvider(); + + var endPoint = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var endPointLifetime = endPoint.OnConnectedAsync(client.Connection); + + await client.Connected.OrTimeout(); + + var messages = await client.StreamAsync(nameof(StreamingHub.NotStreamable)).OrTimeout(); + + Assert.Equal(1, messages.Count); + AssertHubMessage(new CompletionMessage(string.Empty, error: $"Cannot stream results of type: {typeof(int).FullName}", result: null, hasResult: false), messages[0]); + + client.Dispose(); + + await endPointLifetime; + } + } + + private static void AssertHubMessage(HubMessage expected, HubMessage actual) + { + // We aren't testing InvocationIds here + switch (expected) + { + case CompletionMessage expectedCompletion: + var actualCompletion = Assert.IsType(actual); + Assert.Equal(expectedCompletion.Error, actualCompletion.Error); + Assert.Equal(expectedCompletion.HasResult, actualCompletion.HasResult); + Assert.Equal(expectedCompletion.Result, actualCompletion.Result); + break; + case StreamItemMessage expectedStreamItem: + var actualStreamItem = Assert.IsType(actual); + Assert.Equal(expectedStreamItem.Item, actualStreamItem.Item); + break; + case InvocationMessage expectedInvocation: + var actualInvocation = Assert.IsType(actual); + Assert.Equal(expectedInvocation.NonBlocking, actualInvocation.NonBlocking); + Assert.Equal(expectedInvocation.Target, actualInvocation.Target); + Assert.Equal(expectedInvocation.Arguments, actualInvocation.Arguments); + break; + default: + throw new InvalidOperationException($"Unsupported Hub Message type {expected.GetType()}"); + } + } + private static Type GetEndPointType(Type hubType) { var endPointType = typeof(HubEndPoint<>); @@ -518,6 +600,63 @@ private IServiceProvider CreateServiceProvider(Action addServ return services.BuildServiceProvider(); } + public class StreamingHub : TestHub + { + [return: Streaming] + public IObservable CounterObservable(int count) + { + return new CountingObservable(count); + } + + [return: Streaming] + public ReadableChannel CounterChannel(int count) + { + var channel = Channel.CreateUnbounded(); + + var task = Task.Run(async () => + { + for (int i = 0; i < count; i++) + { + await channel.Out.WriteAsync(i.ToString()); + } + channel.Out.Complete(); + }); + + return channel.In; + } + + [return: Streaming] + public int NotStreamable() + { + return 42; + } + + private class CountingObservable : IObservable + { + private int _count; + + public CountingObservable(int count) + { + _count = count; + } + + public IDisposable Subscribe(IObserver observer) + { + var cts = new CancellationTokenSource(); + Task.Run(() => + { + for (int i = 0; !cts.Token.IsCancellationRequested && i < _count; i++) + { + observer.OnNext(i.ToString()); + } + observer.OnCompleted(); + }); + + return new CancellationDisposable(cts); + } + } + } + public class OnConnectedThrowsHub : Hub { public override Task OnConnectedAsync() @@ -528,7 +667,7 @@ public override Task OnConnectedAsync() } } - public class OnDisconnectedThrowsHub : Hub + public class OnDisconnectedThrowsHub : TestHub { public override Task OnDisconnectedAsync(Exception exception) { @@ -538,14 +677,8 @@ public override Task OnDisconnectedAsync(Exception exception) } } - private class MethodHub : Hub + private class MethodHub : TestHub { - public override Task OnConnectedAsync() - { - Context.Connection.Metadata.Get>("ConnectedTask").SetResult(true); - return base.OnConnectedAsync(); - } - public Task GroupRemoveMethod(string groupName) { return Groups.RemoveAsync(groupName); @@ -624,7 +757,7 @@ public override int VirtualMethod(int num) } } - private class BaseHub : Hub + private class BaseHub : TestHub { public string BaseMethod(string message) { @@ -637,7 +770,7 @@ public virtual int VirtualMethod(int num) } } - private class InvalidHub : Hub + private class InvalidHub : TestHub { public void OverloadedMethod(int num) { @@ -648,7 +781,7 @@ public void OverloadedMethod(string message) } } - private class DisposeTrackingHub : Hub + private class DisposeTrackingHub : TestHub { private TrackDispose _trackDispose; @@ -670,5 +803,14 @@ private class TrackDispose { public int DisposeCount = 0; } + + public abstract class TestHub : Hub + { + public override Task OnConnectedAsync() + { + Context.Connection.Metadata.Get>("ConnectedTask")?.TrySetResult(true); + return base.OnConnectedAsync(); + } + } } } diff --git a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs index e9edcdc268..d7c70866cb 100644 --- a/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs +++ b/test/Microsoft.AspNetCore.SignalR.Tests/TestClient.cs @@ -1,4 +1,4 @@ -// Copyright (c) .NET Foundation. All rights reserved. +// Copyright (c) .NET Foundation. All rights reserved. // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; @@ -21,11 +21,11 @@ public class TestClient : IDisposable, IInvocationBinder private IHubProtocol _protocol; private CancellationTokenSource _cts; - public ConnectionContext Connection; + public ConnectionContext Connection { get; } public IChannelConnection Application { get; } public Task Connected => Connection.Metadata.Get>("ConnectedTask").Task; - public TestClient(IServiceProvider serviceProvider) + public TestClient() { var transportToApplication = Channel.CreateUnbounded(); var applicationToTransport = Channel.CreateUnbounded(); @@ -42,6 +42,39 @@ public TestClient(IServiceProvider serviceProvider) _cts = new CancellationTokenSource(); } + public async Task> StreamAsync(string methodName, params object[] args) + { + var invocationId = await SendInvocationAsync(methodName, args); + + var messages = new List(); + while (true) + { + var message = await Read(); + + if (!string.Equals(message.InvocationId, invocationId)) + { + throw new NotSupportedException("TestClient does not support multiple outgoing invocations!"); + } + + if (message == null) + { + throw new InvalidOperationException("Connection aborted!"); + } + + switch (message) + { + case StreamItemMessage _: + messages.Add(message); + break; + case CompletionMessage _: + messages.Add(message); + return messages; + default: + throw new NotSupportedException("TestClient does not support receiving invocations!"); + } + } + } + public async Task InvokeAsync(string methodName, params object[] args) { var invocationId = await SendInvocationAsync(methodName, args); @@ -63,7 +96,7 @@ public async Task InvokeAsync(string methodName, params objec switch (message) { case StreamItemMessage result: - throw new NotSupportedException("TestClient does not support streaming!"); + throw new NotSupportedException("Use 'StreamAsync' to call a streaming method"); case CompletionMessage completion: return completion; default: From ef795eeb64225c661618f73fcf3ea1816abb1a7b Mon Sep 17 00:00:00 2001 From: Andrew Stanton-Nurse Date: Thu, 18 May 2017 14:15:58 -0700 Subject: [PATCH 02/17] fix test --- .../HubConnection.spec.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts index 82b8d0a591..00682fecd7 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts @@ -143,9 +143,9 @@ describe("HubConnection", () => { fail(); } catch (e) { - expect(e.message).toBe("Streaming is not supported."); + expect(e.message).toBe("Streaming methods must be invoked using HubConnection.stream"); } done(); }); -}); \ No newline at end of file +}); From 053f0254b654dc2752312536c5cb3ac189e86236 Mon Sep 17 00:00:00 2001 From: Andrew Stanton-Nurse Date: Fri, 19 May 2017 11:50:10 -0700 Subject: [PATCH 03/17] fix Channel streaming and javascript sample --- .../HubConnection.spec.ts | 2 +- samples/SocketsSample/wwwroot/index.html | 6 +-- samples/SocketsSample/wwwroot/streaming.html | 51 ++++++++++--------- .../Internal/AsyncEnumeratorAdapters.cs | 42 ++++++++++++++- 4 files changed, 71 insertions(+), 30 deletions(-) diff --git a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts index 00682fecd7..aceafc7130 100644 --- a/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts +++ b/client-ts/Microsoft.AspNetCore.SignalR.Client.TS.Tests/HubConnection.spec.ts @@ -112,7 +112,7 @@ describe("HubConnection", () => { done(); }); - it("rejects streaming responses", async done => { + it("rejects streaming responses made using 'invoke'", async done => { let connection: IConnection = { start(transportType: TransportType): Promise { return Promise.resolve(); diff --git a/samples/SocketsSample/wwwroot/index.html b/samples/SocketsSample/wwwroot/index.html index 2ee45dccbb..5f0e8c06b2 100644 --- a/samples/SocketsSample/wwwroot/index.html +++ b/samples/SocketsSample/wwwroot/index.html @@ -5,20 +5,20 @@ -

        ASP.NET Sockets

        +

        ASP.NET Core Sockets

        -

        ASP.NET SignalR (Hubs)

        +

        ASP.NET Core SignalR (Hubs)

        -

        ASP.NET SignalR (Streaming)

        +

        ASP.NET Core SignalR (Streaming)

        • Long polling
        • Server Sent Events
        • diff --git a/samples/SocketsSample/wwwroot/streaming.html b/samples/SocketsSample/wwwroot/streaming.html index 51c203adb9..348b11e796 100644 --- a/samples/SocketsSample/wwwroot/streaming.html +++ b/samples/SocketsSample/wwwroot/streaming.html @@ -11,12 +11,12 @@

          Controls

          +
          -

          Results

          @@ -27,40 +27,45 @@

          Results