|
8 | 8 | using System.Diagnostics.CodeAnalysis;
|
9 | 9 | using System.Globalization;
|
10 | 10 | using System.IO;
|
| 11 | +using System.IO.Pipelines; |
11 | 12 | using System.Linq;
|
12 | 13 | using System.Net;
|
13 | 14 | using System.Reflection;
|
|
16 | 17 | using System.Threading.Channels;
|
17 | 18 | using System.Threading.Tasks;
|
18 | 19 | using Microsoft.AspNetCore.Connections;
|
| 20 | +using Microsoft.AspNetCore.Connections.Abstractions; |
19 | 21 | using Microsoft.AspNetCore.Connections.Features;
|
20 | 22 | using Microsoft.AspNetCore.Internal;
|
21 | 23 | using Microsoft.AspNetCore.Shared;
|
@@ -946,11 +948,19 @@ private async Task InvokeStreamCore(ConnectionState connectionState, string meth
|
946 | 948 | private async Task SendHubMessage(ConnectionState connectionState, HubMessage hubMessage, CancellationToken cancellationToken = default)
|
947 | 949 | {
|
948 | 950 | _state.AssertConnectionValid();
|
949 |
| - _protocol.WriteMessage(hubMessage, connectionState.Connection.Transport.Output); |
950 | 951 |
|
951 | 952 | Log.SendingMessage(_logger, hubMessage);
|
952 | 953 |
|
953 |
| - await connectionState.Connection.Transport.Output.FlushAsync(cancellationToken).ConfigureAwait(false); |
| 954 | + if (connectionState.UsingAcks()) |
| 955 | + { |
| 956 | + await connectionState.WriteAsync(new SerializedHubMessage(hubMessage), cancellationToken).ConfigureAwait(false); |
| 957 | + } |
| 958 | + else |
| 959 | + { |
| 960 | + _protocol.WriteMessage(hubMessage, connectionState.Connection.Transport.Output); |
| 961 | + |
| 962 | + await connectionState.Connection.Transport.Output.FlushAsync(cancellationToken).ConfigureAwait(false); |
| 963 | + } |
954 | 964 | Log.MessageSent(_logger, hubMessage);
|
955 | 965 |
|
956 | 966 | // We've sent a message, so don't ping for a while
|
@@ -1004,6 +1014,11 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
|
1004 | 1014 | Log.ResettingKeepAliveTimer(_logger);
|
1005 | 1015 | connectionState.ResetTimeout();
|
1006 | 1016 |
|
| 1017 | + if (!connectionState.ShouldProcessMessage(message)) |
| 1018 | + { |
| 1019 | + return null; |
| 1020 | + } |
| 1021 | + |
1007 | 1022 | InvocationRequest? irq;
|
1008 | 1023 | switch (message)
|
1009 | 1024 | {
|
@@ -1055,6 +1070,14 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
|
1055 | 1070 | Log.ReceivedPing(_logger);
|
1056 | 1071 | // timeout is reset above, on receiving any message
|
1057 | 1072 | break;
|
| 1073 | + case AckMessage ackMessage: |
| 1074 | + Log.ReceivedAckMessage(_logger, ackMessage.SequenceId); |
| 1075 | + connectionState.Ack(ackMessage); |
| 1076 | + break; |
| 1077 | + case SequenceMessage sequenceMessage: |
| 1078 | + Log.ReceivedSequenceMessage(_logger, sequenceMessage.SequenceId); |
| 1079 | + connectionState.ResetSequence(sequenceMessage); |
| 1080 | + break; |
1058 | 1081 | default:
|
1059 | 1082 | throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
|
1060 | 1083 | }
|
@@ -1235,6 +1258,7 @@ private async Task HandshakeAsync(ConnectionState startingConnectionState, Cance
|
1235 | 1258 | }
|
1236 | 1259 |
|
1237 | 1260 | Log.HandshakeComplete(_logger);
|
| 1261 | + |
1238 | 1262 | break;
|
1239 | 1263 | }
|
1240 | 1264 | }
|
@@ -1813,6 +1837,7 @@ private sealed class ConnectionState : IInvocationBinder
|
1813 | 1837 | private readonly HubConnection _hubConnection;
|
1814 | 1838 | private readonly ILogger _logger;
|
1815 | 1839 | private readonly bool _hasInherentKeepAlive;
|
| 1840 | + private readonly MessageBuffer? _messageBuffer; |
1816 | 1841 |
|
1817 | 1842 | private readonly object _lock = new object();
|
1818 | 1843 | private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>(StringComparer.Ordinal);
|
@@ -1850,6 +1875,13 @@ public ConnectionState(ConnectionContext connection, HubConnection hubConnection
|
1850 | 1875 |
|
1851 | 1876 | _logger = _hubConnection._logger;
|
1852 | 1877 | _hasInherentKeepAlive = connection.Features.Get<IConnectionInherentKeepAliveFeature>()?.HasInherentKeepAlive ?? false;
|
| 1878 | + |
| 1879 | + if (Connection.Features.Get<IReconnectFeature>() is IReconnectFeature feature) |
| 1880 | + { |
| 1881 | + _messageBuffer = new MessageBuffer(connection, hubConnection._protocol); |
| 1882 | + |
| 1883 | + feature.NotifyOnReconnect = _messageBuffer.Resend; |
| 1884 | + } |
1853 | 1885 | }
|
1854 | 1886 |
|
1855 | 1887 | public string GetNextId() => (++_nextInvocationId).ToString(CultureInfo.InvariantCulture);
|
@@ -1935,6 +1967,8 @@ private async Task StopAsyncCore()
|
1935 | 1967 | {
|
1936 | 1968 | Log.Stopping(_logger);
|
1937 | 1969 |
|
| 1970 | + _messageBuffer?.Dispose(); |
| 1971 | + |
1938 | 1972 | // Complete our write pipe, which should cause everything to shut down
|
1939 | 1973 | Log.TerminatingReceiveLoop(_logger);
|
1940 | 1974 | Connection.Transport.Input.CancelPendingRead();
|
@@ -1966,6 +2000,44 @@ public async Task TimerLoop(TimerAwaitable timer)
|
1966 | 2000 | }
|
1967 | 2001 | }
|
1968 | 2002 |
|
| 2003 | + public ValueTask<FlushResult> WriteAsync(SerializedHubMessage message, CancellationToken cancellationToken) |
| 2004 | + { |
| 2005 | + Debug.Assert(_messageBuffer is not null); |
| 2006 | + return _messageBuffer.WriteAsync(message, cancellationToken); |
| 2007 | + } |
| 2008 | + |
| 2009 | + public bool ShouldProcessMessage(HubMessage message) |
| 2010 | + { |
| 2011 | + if (UsingAcks()) |
| 2012 | + { |
| 2013 | + if (!_messageBuffer.ShouldProcessMessage(message)) |
| 2014 | + { |
| 2015 | + Log.DroppingMessage(_logger, ((HubInvocationMessage)message).GetType().Name, ((HubInvocationMessage)message).InvocationId); |
| 2016 | + return false; |
| 2017 | + } |
| 2018 | + } |
| 2019 | + return true; |
| 2020 | + } |
| 2021 | + |
| 2022 | + public void Ack(AckMessage ackMessage) |
| 2023 | + { |
| 2024 | + if (UsingAcks()) |
| 2025 | + { |
| 2026 | + _messageBuffer.Ack(ackMessage); |
| 2027 | + } |
| 2028 | + } |
| 2029 | + |
| 2030 | + public void ResetSequence(SequenceMessage sequenceMessage) |
| 2031 | + { |
| 2032 | + if (UsingAcks()) |
| 2033 | + { |
| 2034 | + _messageBuffer.ResetSequence(sequenceMessage); |
| 2035 | + } |
| 2036 | + } |
| 2037 | + |
| 2038 | + [MemberNotNullWhen(true, nameof(_messageBuffer))] |
| 2039 | + public bool UsingAcks() => _messageBuffer is not null; |
| 2040 | + |
1969 | 2041 | public void ResetSendPing()
|
1970 | 2042 | {
|
1971 | 2043 | Volatile.Write(ref _nextActivationSendPing, (DateTime.UtcNow + _hubConnection.KeepAliveInterval).Ticks);
|
|
0 commit comments