Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Yarp.Sample
{
public sealed class PrometheusDnsMetrics : INameResolutionMetricsConsumer
public sealed class PrometheusDnsMetrics : IMetricsConsumer<NameResolutionMetrics>
{
private static readonly Counter _dnsLookupsRequested = Metrics.CreateCounter(
"yarp_dns_lookups_requested",
Expand All @@ -20,10 +20,10 @@ public sealed class PrometheusDnsMetrics : INameResolutionMetricsConsumer
"Average DNS lookup duration"
);

public void OnNameResolutionMetrics(NameResolutionMetrics oldMetrics, NameResolutionMetrics newMetrics)
public void OnMetrics(NameResolutionMetrics previous, NameResolutionMetrics current)
{
_dnsLookupsRequested.IncTo(newMetrics.DnsLookupsRequested);
_averageLookupDuration.Set(newMetrics.AverageLookupDuration.TotalMilliseconds);
_dnsLookupsRequested.IncTo(current.DnsLookupsRequested);
_averageLookupDuration.Set(current.AverageLookupDuration.TotalMilliseconds);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Yarp.Sample
{
public sealed class PrometheusForwarderMetrics : IForwarderMetricsConsumer
public sealed class PrometheusForwarderMetrics : IMetricsConsumer<ForwarderMetrics>
{
private static readonly Counter _requestsStarted = Metrics.CreateCounter(
"yarp_proxy_requests_started",
Expand All @@ -23,11 +23,11 @@ public sealed class PrometheusForwarderMetrics : IForwarderMetricsConsumer
"Number of active proxy requests that have started but not yet completed or failed"
);

public void OnForwarderMetrics(ForwarderMetrics oldMetrics, ForwarderMetrics newMetrics)
public void OnMetrics(ForwarderMetrics previous, ForwarderMetrics current)
{
_requestsStarted.IncTo(newMetrics.RequestsStarted);
_requestsFailed.IncTo(newMetrics.RequestsFailed);
_CurrentRequests.Set(newMetrics.CurrentRequests);
_requestsStarted.IncTo(current.RequestsStarted);
_requestsFailed.IncTo(current.RequestsFailed);
_CurrentRequests.Set(current.CurrentRequests);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@

#if NET

using System;
using Yarp.Telemetry.Consumption;
using Prometheus;

namespace Yarp.Sample
{
public sealed class PrometheusKestrelMetrics : IKestrelMetricsConsumer
public sealed class PrometheusKestrelMetrics : IMetricsConsumer<KestrelMetrics>
{
private static readonly Counter _totalConnections = Metrics.CreateCounter(
"yarp_kestrel_total_connections",
Expand Down Expand Up @@ -47,15 +46,15 @@ public sealed class PrometheusKestrelMetrics : IKestrelMetricsConsumer
"Number of requests on the queue"
);

public void OnKestrelMetrics(KestrelMetrics oldMetrics, KestrelMetrics newMetrics)
public void OnMetrics(KestrelMetrics previous, KestrelMetrics current)
{
_totalConnections.IncTo(newMetrics.TotalConnections);
_totalTlsHandshakes.IncTo(newMetrics.TotalTlsHandshakes);
_currentTlsHandshakes.Set(newMetrics.CurrentTlsHandshakes);
_failedTlsHandshakes.IncTo(newMetrics.FailedTlsHandshakes);
_currentConnections.Set(newMetrics.CurrentConnections);
_connectionQueueLength.Set(newMetrics.ConnectionQueueLength);
_requestQueueLength.Set(newMetrics.RequestQueueLength);
_totalConnections.IncTo(current.TotalConnections);
_totalTlsHandshakes.IncTo(current.TotalTlsHandshakes);
_currentTlsHandshakes.Set(current.CurrentTlsHandshakes);
_failedTlsHandshakes.IncTo(current.FailedTlsHandshakes);
_currentConnections.Set(current.CurrentConnections);
_connectionQueueLength.Set(current.ConnectionQueueLength);
_requestQueueLength.Set(current.RequestQueueLength);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Yarp.Sample
/// <summary>
/// Collects outbound http metrics and exposes them using prometheus-net
/// </summary>
public sealed class PrometheusOutboundHttpMetrics: IHttpMetricsConsumer
public sealed class PrometheusOutboundHttpMetrics : IMetricsConsumer<HttpMetrics>
{
private static readonly double CUBE_ROOT_10 = Math.Pow(10, (1.0 / 3));

Expand Down Expand Up @@ -57,15 +57,15 @@ public sealed class PrometheusOutboundHttpMetrics: IHttpMetricsConsumer
Buckets = Histogram.ExponentialBuckets(10, CUBE_ROOT_10, 10)
});

public void OnHttpMetrics(HttpMetrics oldMetrics, HttpMetrics newMetrics)
public void OnMetrics(HttpMetrics previous, HttpMetrics current)
{
_outboundRequestsStarted.IncTo(newMetrics.RequestsStarted);
_outboundRequestsFailed.IncTo(newMetrics.RequestsFailed);
_outboundCurrentRequests.Set(newMetrics.CurrentRequests);
_outboundCurrentHttp11Connections.Set(newMetrics.CurrentHttp11Connections);
_outboundCurrentHttp20Connections.Set(newMetrics.CurrentHttp20Connections);
_outboundHttp11RequestQueueDuration.Observe(newMetrics.Http11RequestsQueueDuration.TotalMilliseconds);
_outboundHttp20RequestQueueDuration.Observe(newMetrics.Http20RequestsQueueDuration.TotalMilliseconds);
_outboundRequestsStarted.IncTo(current.RequestsStarted);
_outboundRequestsFailed.IncTo(current.RequestsFailed);
_outboundCurrentRequests.Set(current.CurrentRequests);
_outboundCurrentHttp11Connections.Set(current.CurrentHttp11Connections);
_outboundCurrentHttp20Connections.Set(current.CurrentHttp20Connections);
_outboundHttp11RequestQueueDuration.Observe(current.Http11RequestsQueueDuration.TotalMilliseconds);
_outboundHttp20RequestQueueDuration.Observe(current.Http20RequestsQueueDuration.TotalMilliseconds);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,36 @@ public static class PrometheusServiceExtensions
public static IServiceCollection AddPrometheusForwarderMetrics(this IServiceCollection services)
{
services.AddTelemetryListeners();
services.AddSingleton<IForwarderMetricsConsumer, PrometheusForwarderMetrics>();
services.AddSingleton<IMetricsConsumer<ForwarderMetrics>, PrometheusForwarderMetrics>();
return services;
}

#if NET
public static IServiceCollection AddPrometheusDnsMetrics(this IServiceCollection services)
{
services.AddTelemetryListeners();
services.AddSingleton<INameResolutionMetricsConsumer, PrometheusDnsMetrics>();
services.AddSingleton<IMetricsConsumer<NameResolutionMetrics>, PrometheusDnsMetrics>();
return services;
}

public static IServiceCollection AddPrometheusKestrelMetrics(this IServiceCollection services)
{
services.AddTelemetryListeners();
services.AddSingleton<IKestrelMetricsConsumer, PrometheusKestrelMetrics>();
services.AddSingleton<IMetricsConsumer<KestrelMetrics>, PrometheusKestrelMetrics>();
return services;
}

public static IServiceCollection AddPrometheusOutboundHttpMetrics(this IServiceCollection services)
{
services.AddTelemetryListeners();
services.AddSingleton<IHttpMetricsConsumer, PrometheusOutboundHttpMetrics>();
services.AddSingleton<IMetricsConsumer<HttpMetrics>, PrometheusOutboundHttpMetrics>();
return services;
}

public static IServiceCollection AddPrometheusSocketsMetrics(this IServiceCollection services)
{
services.AddTelemetryListeners();
services.AddSingleton<ISocketsMetricsConsumer, PrometheusSocketMetrics>();
services.AddSingleton<IMetricsConsumer<SocketsMetrics>, PrometheusSocketMetrics>();
return services;
}
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

namespace Yarp.Sample
{

public sealed class PrometheusSocketMetrics : ISocketsMetricsConsumer
public sealed class PrometheusSocketMetrics : IMetricsConsumer<SocketsMetrics>
{
private static readonly Counter _outgoingConnectionsEstablished = Metrics.CreateCounter(
"yarp_sockets_outgoing_connections_established",
Expand Down Expand Up @@ -42,14 +41,14 @@ public sealed class PrometheusSocketMetrics : ISocketsMetricsConsumer
"Number of datagrams Sent"
);

public void OnSocketsMetrics(SocketsMetrics oldMetrics, SocketsMetrics newMetrics)
public void OnMetrics(SocketsMetrics previous, SocketsMetrics current)
{
_outgoingConnectionsEstablished.IncTo(newMetrics.OutgoingConnectionsEstablished);
_incomingConnectionsEstablished.IncTo(newMetrics.IncomingConnectionsEstablished);
_bytesReceived.IncTo(newMetrics.BytesReceived);
_bytesSent.IncTo(newMetrics.BytesSent);
_datagramsReceived.IncTo(newMetrics.DatagramsReceived);
_datagramsSent.IncTo(newMetrics.DatagramsSent);
_outgoingConnectionsEstablished.IncTo(current.OutgoingConnectionsEstablished);
_incomingConnectionsEstablished.IncTo(current.IncomingConnectionsEstablished);
_bytesReceived.IncTo(current.BytesReceived);
_bytesSent.IncTo(current.BytesSent);
_datagramsReceived.IncTo(current.DatagramsReceived);
_datagramsSent.IncTo(current.DatagramsSent);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

namespace Yarp.Sample
{
public sealed class ForwarderMetricsConsumer : IForwarderMetricsConsumer
public sealed class ForwarderMetricsConsumer : IMetricsConsumer<ForwarderMetrics>
{
public void OnForwarderMetrics(ForwarderMetrics oldMetrics, ForwarderMetrics newMetrics)
public void OnMetrics(ForwarderMetrics previous, ForwarderMetrics current)
{
var elapsed = newMetrics.Timestamp - oldMetrics.Timestamp;
var newRequests = newMetrics.RequestsStarted - oldMetrics.RequestsStarted;
Console.Title = $"Forwarded {newMetrics.RequestsStarted} requests ({newRequests} in the last {(int)elapsed.TotalMilliseconds} ms)";
var elapsed = current.Timestamp - previous.Timestamp;
var newRequests = current.RequestsStarted - previous.RequestsStarted;
Console.Title = $"Forwarded {current.RequestsStarted} requests ({newRequests} in the last {(int)elapsed.TotalMilliseconds} ms)";
}
}
}
2 changes: 1 addition & 1 deletion samples/ReverseProxy.Metrics.Sample/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void ConfigureServices(IServiceCollection services)
services.AddHttpContextAccessor();

// Interface that collects general metrics about the proxy forwarder
services.AddSingleton<IForwarderMetricsConsumer, ForwarderMetricsConsumer>();
services.AddSingleton<IMetricsConsumer<ForwarderMetrics>, ForwarderMetricsConsumer>();

// Registration of a consumer to events for proxy forwarder telemetry
services.AddTelemetryConsumer<ForwarderTelemetryConsumer>();
Expand Down
115 changes: 99 additions & 16 deletions src/TelemetryConsumption/EventListenerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,21 @@

namespace Yarp.Telemetry.Consumption
{
internal abstract class EventListenerService<TService, TTelemetryConsumer, TMetricsConsumer> : EventListener, IHostedService
internal abstract class EventListenerService<TService, TTelemetryConsumer, TMetrics> : EventListener, IHostedService
where TMetrics : class, new()
{
protected abstract string EventSourceName { get; }
protected abstract int NumberOfMetrics { get; }
protected abstract void OnEvent(TTelemetryConsumer[] consumers, EventWrittenEventArgs eventData);
protected abstract bool TrySaveMetric(TMetrics metrics, string name, double value);

protected readonly ILogger<TService> Logger;
protected readonly TMetricsConsumer[]? MetricsConsumers;
protected readonly TTelemetryConsumer[]? TelemetryConsumers;
private readonly ILogger<TService> _logger;
private readonly TTelemetryConsumer[]? _telemetryConsumers;
private readonly IMetricsConsumer<TMetrics>[]? _metricsConsumers;

private int _metricsCount;
private TMetrics? _previousMetrics;
private TMetrics? _currentMetrics;

private EventSource? _eventSource;
private readonly object _syncObject = new();
Expand All @@ -27,29 +35,29 @@ internal abstract class EventListenerService<TService, TTelemetryConsumer, TMetr
public EventListenerService(
ILogger<TService> logger,
IEnumerable<TTelemetryConsumer> telemetryConsumers,
IEnumerable<TMetricsConsumer> metricsConsumers)
IEnumerable<IMetricsConsumer<TMetrics>> metricsConsumers)
{
Logger = logger ?? throw new ArgumentNullException(nameof(logger));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_ = telemetryConsumers ?? throw new ArgumentNullException(nameof(telemetryConsumers));
_ = metricsConsumers ?? throw new ArgumentNullException(nameof(metricsConsumers));

TelemetryConsumers = telemetryConsumers.ToArray();
MetricsConsumers = metricsConsumers.ToArray();
_telemetryConsumers = telemetryConsumers.ToArray();
_metricsConsumers = metricsConsumers.ToArray();

if (TelemetryConsumers.Any(s => s is null) || metricsConsumers.Any(c => c is null))
if (_telemetryConsumers.Any(s => s is null) || metricsConsumers.Any(c => c is null))
{
throw new ArgumentException("A consumer may not be null",
TelemetryConsumers.Any(s => s is null) ? nameof(telemetryConsumers) : nameof(metricsConsumers));
_telemetryConsumers.Any(s => s is null) ? nameof(telemetryConsumers) : nameof(metricsConsumers));
}

if (TelemetryConsumers.Length == 0)
if (_telemetryConsumers.Length == 0)
{
TelemetryConsumers = null;
_telemetryConsumers = null;
}

if (MetricsConsumers.Length == 0)
if (_metricsConsumers.Length == 0)
{
MetricsConsumers = null;
_metricsConsumers = null;
}

lock (_syncObject)
Expand Down Expand Up @@ -82,8 +90,8 @@ protected override void OnEventSourceCreated(EventSource eventSource)

private void EnableEventSource(EventSource eventSource)
{
var enableEvents = TelemetryConsumers is not null;
var enableMetrics = MetricsConsumers is not null;
var enableEvents = _telemetryConsumers is not null;
var enableMetrics = _metricsConsumers is not null;

if (!enableEvents && !enableMetrics)
{
Expand All @@ -96,6 +104,81 @@ private void EnableEventSource(EventSource eventSource)
EnableEvents(eventSource, eventLevel, EventKeywords.None, arguments);
}

protected sealed override void OnEventWritten(EventWrittenEventArgs eventData)
{
if (eventData.EventId <= 0)
{
OnNonUserEvent(eventData);
}
else if (_telemetryConsumers is TTelemetryConsumer[] consumers)
{
OnEvent(consumers, eventData);
}
}

private void OnNonUserEvent(EventWrittenEventArgs eventData)
{
if (eventData.EventId == -1)
{
// Throwing an exception here would crash the process
if (eventData.EventName != "EventCounters" ||
eventData.Payload?.Count != 1 ||
eventData.Payload[0] is not IDictionary<string, object> counters ||
!counters.TryGetValue("Name", out var nameObject) ||
nameObject is not string name ||
!(counters.TryGetValue("Mean", out var valueObj) || counters.TryGetValue("Increment", out valueObj)) ||
valueObj is not double value)
{
_logger?.LogDebug("Failed to parse EventCounters event from {EventSourceName}", EventSourceName);
return;
}

var metrics = _currentMetrics ??= new();

if (!TrySaveMetric(metrics, name, value))
{
return;
}

if (++_metricsCount == NumberOfMetrics)
{
_metricsCount = 0;

var previous = _previousMetrics;
_previousMetrics = metrics;
_currentMetrics = null;

if (previous is null)
{
return;
}

if (_metricsConsumers is IMetricsConsumer<TMetrics>[] consumers)
{
foreach (var consumer in consumers)
{
try
{
consumer.OnMetrics(previous, metrics);
}
catch (Exception ex)
{
_logger?.LogError(ex, "Uncaught exception occured while processing metrics for EventSource {EventSourceName}", EventSourceName);
}
}
}
}
}
else if (eventData.EventId == 0)
{
_logger?.LogError("Received an error message from EventSource {EventSourceName}: {Message}", EventSourceName, eventData.Message);
}
else
{
_logger?.LogDebug("Received an unknown event from EventSource {EventSourceName}: {EventId}", EventSourceName, eventData.EventId);
}
}

public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
Expand Down
Loading