Skip to content

Commit c5f1607

Browse files
authored
Add WebSockets telemetry (#1237)
* Add WebSockets telemetry * Simplify test * Add IClock, don't count control frames * Add WebSocketsParser unit tests * Only inspect WebSocket streams * Test control frames with payloads
1 parent dc4f552 commit c5f1607

21 files changed

+1182
-27
lines changed

samples/ReverseProxy.Metrics.Sample/Startup.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public void ConfigureServices(IServiceCollection services)
4444
// Registration of a consumer to events for HttpClient telemetry
4545
// Note: this depends on changes implemented in .NET 5
4646
services.AddTelemetryConsumer<HttpClientTelemetryConsumer>();
47+
48+
services.AddTelemetryConsumer<WebSocketsTelemetryConsumer>();
4749
}
4850

4951
/// <summary>
@@ -55,6 +57,9 @@ public void Configure(IApplicationBuilder app)
5557
// Placed at the beginning so it is the first and last thing run for each request
5658
app.UsePerRequestMetricCollection();
5759

60+
// Middleware used to intercept the WebSocket connection and collect telemetry exposed to WebSocketsTelemetryConsumer
61+
app.UseWebSocketsTelemetry();
62+
5863
app.UseRouting();
5964
app.UseEndpoints(endpoints =>
6065
{
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using System;
2+
using Microsoft.Extensions.Logging;
3+
using Yarp.Telemetry.Consumption;
4+
5+
namespace Yarp.Sample
6+
{
7+
public sealed class WebSocketsTelemetryConsumer : IWebSocketsTelemetryConsumer
8+
{
9+
private readonly ILogger<WebSocketsTelemetryConsumer> _logger;
10+
11+
public WebSocketsTelemetryConsumer(ILogger<WebSocketsTelemetryConsumer> logger)
12+
{
13+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
14+
}
15+
16+
public void OnWebSocketClosed(DateTime timestamp, DateTime establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten)
17+
{
18+
_logger.LogInformation($"WebSocket connection closed ({closeReason}) after reading {messagesRead} and writing {messagesWritten} messages over {(timestamp - establishedTime).TotalSeconds:N2} seconds.");
19+
}
20+
}
21+
}

src/ReverseProxy/Forwarder/HttpForwarder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ private async ValueTask<ForwarderError> HandleUpgradedResponse(HttpContext conte
585585
var (secondResult, secondException) = await secondTask;
586586
if (secondResult != StreamCopyResult.Success)
587587
{
588-
error = ReportResult(context, requestFinishedFirst, secondResult, secondException!);
588+
error = ReportResult(context, !requestFinishedFirst, secondResult, secondException!);
589589
}
590590
else
591591
{

test/ReverseProxy.Tests/Common/DelegatingStream.cs renamed to src/ReverseProxy/Utilities/DelegatingStream.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
// Licensed to the .NET Foundation under one or more agreements.
2-
// The .NET Foundation licenses this file to you under the MIT license.
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
33

44
using System;
55
using System.Diagnostics;
66
using System.IO;
77
using System.Threading;
88
using System.Threading.Tasks;
99

10-
namespace Yarp.Tests.Common
10+
namespace Yarp.ReverseProxy.Utilities
1111
{
12+
// Taken from https://github.com/dotnet/runtime/blob/00f37bc13b4edbba1afca9e98d74432a94f5192f/src/libraries/Common/src/System/IO/DelegatingStream.cs
1213
// Forwards all calls to an inner stream except where overridden in a derived class.
1314
internal abstract class DelegatingStream : Stream
1415
{
@@ -113,9 +114,9 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
113114
return _innerStream.ReadAsync(buffer, cancellationToken);
114115
}
115116

116-
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
117+
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
117118
{
118-
return _innerStream.BeginRead(buffer, offset, count, callback, state);
119+
return _innerStream.BeginRead(buffer, offset, count, callback!, state);
119120
}
120121

121122
public override int EndRead(IAsyncResult asyncResult)
@@ -167,9 +168,9 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
167168
return _innerStream.WriteAsync(buffer, cancellationToken);
168169
}
169170

170-
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
171+
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
171172
{
172-
return _innerStream.BeginWrite(buffer, offset, count, callback, state);
173+
return _innerStream.BeginWrite(buffer, offset, count, callback!, state);
173174
}
174175

175176
public override void EndWrite(IAsyncResult asyncResult)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
namespace Yarp.ReverseProxy.WebSocketsTelemetry
5+
{
6+
internal enum WebSocketCloseReason : int
7+
{
8+
Unknown,
9+
ClientGracefulClose,
10+
ServerGracefulClose,
11+
ClientDisconnect,
12+
ServerDisconnect,
13+
}
14+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Diagnostics;
6+
using Yarp.ReverseProxy.Utilities;
7+
8+
namespace Yarp.ReverseProxy.WebSocketsTelemetry
9+
{
10+
internal unsafe struct WebSocketsParser
11+
{
12+
private const int MaskLength = 4;
13+
private const int MinHeaderSize = 2;
14+
private const int MaxHeaderSize = MinHeaderSize + MaskLength + sizeof(ulong);
15+
16+
private fixed byte _leftoverBuffer[MaxHeaderSize - 1];
17+
private readonly byte _minHeaderSize;
18+
private byte _leftover;
19+
private ulong _bytesToSkip;
20+
private long _closeTime;
21+
private readonly IClock _clock;
22+
23+
public long MessageCount { get; private set; }
24+
25+
public DateTime? CloseTime => _closeTime == 0 ? null : new DateTime(_closeTime, DateTimeKind.Utc);
26+
27+
public WebSocketsParser(IClock clock, bool isServer)
28+
{
29+
_minHeaderSize = (byte)(MinHeaderSize + (isServer ? MaskLength : 0));
30+
_leftover = 0;
31+
_bytesToSkip = 0;
32+
_closeTime = 0;
33+
_clock = clock;
34+
MessageCount = 0;
35+
}
36+
37+
// The WebSocket Protocol: https://datatracker.ietf.org/doc/html/rfc6455#section-5.2
38+
// 0 1 2 3
39+
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
40+
// +-+-+-+-+-------+-+-------------+-------------------------------+
41+
// |F|R|R|R| opcode|M| Payload len | Extended payload length |
42+
// |I|S|S|S| (4) |A| (7) | (16/64) |
43+
// |N|V|V|V| |S| | (if payload len==126/127) |
44+
// | |1|2|3| |K| | |
45+
// +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
46+
// | Extended payload length continued, if payload len == 127 |
47+
// + - - - - - - - - - - - - - - - +-------------------------------+
48+
// | |Masking-key, if MASK set to 1 |
49+
// +-------------------------------+-------------------------------+
50+
// | Masking-key (continued) | Payload Data |
51+
// +-------------------------------- - - - - - - - - - - - - - - - +
52+
// : Payload Data continued ... :
53+
// +---------------------------------------------------------------+
54+
//
55+
// The header can be 2-10 bytes long, followed by a 4 byte mask if the message was sent by the client.
56+
// We have to read the first 2 bytes to know how long the frame header will be.
57+
// Since the buffer may not contain the full frame, we make use of a leftoverBuffer
58+
// where we store leftover bytes that don't represent a complete frame header.
59+
// On the next call to Consume, we interpret the leftover bytes as the beginning of the frame.
60+
// As we are not interested in the actual payload data, we skip over (payload length + mask length) bytes after each header.
61+
public void Consume(ReadOnlySpan<byte> buffer)
62+
{
63+
int leftover = _leftover;
64+
var bytesToSkip = _bytesToSkip;
65+
66+
while (true)
67+
{
68+
var toSkip = Math.Min(bytesToSkip, (ulong)buffer.Length);
69+
buffer = buffer.Slice((int)toSkip);
70+
bytesToSkip -= toSkip;
71+
72+
var available = leftover + buffer.Length;
73+
int headerSize = _minHeaderSize;
74+
75+
if (available < headerSize)
76+
{
77+
break;
78+
}
79+
80+
var length = (leftover > 1 ? _leftoverBuffer[1] : buffer[1 - leftover]) & 0x7FUL;
81+
82+
if (length > 125)
83+
{
84+
// The actual length will be encoded in 2 or 8 bytes, based on whether the length was 126 or 127
85+
var lengthBytes = 2 << (((int)length & 1) << 1);
86+
headerSize += lengthBytes;
87+
Debug.Assert(leftover < headerSize);
88+
89+
if (available < headerSize)
90+
{
91+
break;
92+
}
93+
94+
lengthBytes += MinHeaderSize;
95+
96+
length = 0;
97+
for (var i = MinHeaderSize; i < lengthBytes; i++)
98+
{
99+
length <<= 8;
100+
length |= i < leftover ? _leftoverBuffer[i] : buffer[i - leftover];
101+
}
102+
}
103+
104+
Debug.Assert(leftover < headerSize);
105+
bytesToSkip = length;
106+
107+
const int NonReservedBitsMask = 0b_1000_1111;
108+
var header = (leftover > 0 ? _leftoverBuffer[0] : buffer[0]) & NonReservedBitsMask;
109+
110+
// Don't count control frames under MessageCount
111+
if ((uint)(header - 0x80) <= 0x02)
112+
{
113+
// Has FIN (0x80) and is a Continuation (0x00) / Text (0x01) / Binary (0x02) opcode
114+
MessageCount++;
115+
}
116+
else if ((header & 0xF) == 0x8) // CLOSE
117+
{
118+
if (_closeTime == 0)
119+
{
120+
_closeTime = _clock.GetUtcNow().Ticks;
121+
}
122+
}
123+
124+
// Advance the buffer by the number of bytes read for the header,
125+
// accounting for any bytes we may have read from the leftoverBuffer
126+
buffer = buffer.Slice(headerSize - leftover);
127+
leftover = 0;
128+
}
129+
130+
Debug.Assert(bytesToSkip == 0 || buffer.Length == 0);
131+
_bytesToSkip = bytesToSkip;
132+
133+
Debug.Assert(leftover + buffer.Length < MaxHeaderSize);
134+
for (var i = 0; i < buffer.Length; i++, leftover++)
135+
{
136+
_leftoverBuffer[leftover] = buffer[i];
137+
}
138+
139+
_leftover = (byte)leftover;
140+
}
141+
}
142+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Diagnostics.Tracing;
5+
6+
namespace Yarp.ReverseProxy.WebSocketsTelemetry
7+
{
8+
[EventSource(Name = "Yarp.ReverseProxy.WebSockets")]
9+
internal sealed class WebSocketsTelemetry : EventSource
10+
{
11+
public static readonly WebSocketsTelemetry Log = new();
12+
13+
[Event(1, Level = EventLevel.Informational)]
14+
public void WebSocketClosed(long establishedTime, WebSocketCloseReason closeReason, long messagesRead, long messagesWritten)
15+
{
16+
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
17+
{
18+
WriteEvent(eventId: 1, establishedTime, closeReason, messagesRead, messagesWritten);
19+
}
20+
}
21+
}
22+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using System.Linq;
5+
using Microsoft.Extensions.DependencyInjection;
6+
using Yarp.ReverseProxy.Utilities;
7+
using Yarp.ReverseProxy.WebSocketsTelemetry;
8+
9+
namespace Microsoft.AspNetCore.Builder
10+
{
11+
/// <summary>
12+
/// <see cref="IApplicationBuilder"/> extension methods to add the <see cref="WebSocketsTelemetryMiddleware"/>.
13+
/// </summary>
14+
public static class WebSocketsTelemetryExtensions
15+
{
16+
/// <summary>
17+
/// Adds a <see cref="WebSocketsTelemetryMiddleware"/> to the request pipeline.
18+
/// Must be added before <see cref="WebSockets.WebSocketMiddleware"/>.
19+
/// </summary>
20+
public static IApplicationBuilder UseWebSocketsTelemetry(this IApplicationBuilder app)
21+
{
22+
return app.Use(next =>
23+
{
24+
// Avoid exposing another extension method (AddWebSocketsTelemetry) just because of IClock
25+
var clock = app.ApplicationServices.GetServices<IClock>().FirstOrDefault() ?? new Clock();
26+
return new WebSocketsTelemetryMiddleware(next, clock).InvokeAsync;
27+
});
28+
}
29+
}
30+
}

0 commit comments

Comments
 (0)