Skip to content

Add cluster to ConfigureRouteAsync #1231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ csharp_using_directive_placement = outside_namespace:error

# New-line options
# https://docs.microsoft.com/en-us/visualstudio/ide/editorconfig-formatting-conventions?view=vs-2019#new-line-options
csharp_new_line_before_open_brace = methods, properties, control_blocks, types, anonymous_methods, lambdas, object_collection_array_initializers
csharp_new_line_before_open_brace = methods, properties, control_blocks, types, anonymous_methods, lambdas, object_collection_array_initializers, accessors
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was bugging me for a while.

csharp_new_line_before_else = true
csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig origCluster,
return new ValueTask<ClusterConfig>(origCluster with { Destinations = newDests });
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
// Example: do not let config based routes take priority over code based routes.
// Lower numbers are higher priority. Code routes default to 0.
Expand Down
3 changes: 2 additions & 1 deletion src/ReverseProxy/Configuration/IProxyConfigFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface IProxyConfigFilter
/// Allows modification of a route configuration.
/// </summary>
/// <param name="route">The <see cref="RouteConfig"/> instance to configure.</param>
ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel);
/// <param name="cluster">The <see cref="ClusterConfig"/> instance related to <see cref="RouteConfig"/>.</param>
ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig? cluster, CancellationToken cancel);
}
}
41 changes: 25 additions & 16 deletions src/ReverseProxy/Management/ProxyConfigManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
Expand All @@ -15,9 +16,9 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Primitives;
using Yarp.ReverseProxy.Configuration;
using Yarp.ReverseProxy.Forwarder;
using Yarp.ReverseProxy.Health;
using Yarp.ReverseProxy.Model;
using Yarp.ReverseProxy.Forwarder;
using Yarp.ReverseProxy.Routing;
using Yarp.ReverseProxy.Transforms.Builder;

Expand All @@ -34,6 +35,8 @@ namespace Yarp.ReverseProxy.Management
/// </remarks>
internal sealed class ProxyConfigManager : EndpointDataSource, IDisposable
{
private static readonly IReadOnlyDictionary<string, ClusterConfig> _emptyClusterDictionary = new ReadOnlyDictionary<string, ClusterConfig>(new Dictionary<string, ClusterConfig>());

private readonly object _syncRoot = new object();
private readonly ILogger<ProxyConfigManager> _logger;
private readonly IProxyConfigProvider _provider;
Expand Down Expand Up @@ -209,21 +212,21 @@ private async Task ReloadConfigAsync()
// Throws for validation failures
private async Task<bool> ApplyConfigAsync(IProxyConfig config)
{
var (configuredRoutes, routeErrors) = await VerifyRoutesAsync(config.Routes, cancellation: default);
var (configuredClusters, clusterErrors) = await VerifyClustersAsync(config.Clusters, cancellation: default);
var (configuredRoutes, routeErrors) = await VerifyRoutesAsync(config.Routes, configuredClusters, cancellation: default);

if (routeErrors.Count > 0 || clusterErrors.Count > 0)
{
throw new AggregateException("The proxy config is invalid.", routeErrors.Concat(clusterErrors));
}

// Update clusters first because routes need to reference them.
UpdateRuntimeClusters(configuredClusters);
UpdateRuntimeClusters(configuredClusters.Values);
var routesChanged = UpdateRuntimeRoutes(configuredRoutes);
return routesChanged;
}

private async Task<(IList<RouteConfig>, IList<Exception>)> VerifyRoutesAsync(IReadOnlyList<RouteConfig> routes, CancellationToken cancellation)
private async Task<(IList<RouteConfig>, IList<Exception>)> VerifyRoutesAsync(IReadOnlyList<RouteConfig> routes, IReadOnlyDictionary<string, ClusterConfig> clusters, CancellationToken cancellation)
{
if (routes == null)
{
Expand All @@ -246,9 +249,18 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)

try
{
foreach (var filter in _filters)
if (_filters.Length != 0)
{
route = await filter.ConfigureRouteAsync(route, cancellation);
ClusterConfig? cluster = null;
if (route.ClusterId != null)
{
clusters.TryGetValue(route.ClusterId, out cluster);
}

foreach (var filter in _filters)
{
route = await filter.ConfigureRouteAsync(route, cluster, cancellation);
}
}
}
catch (Exception ex)
Expand All @@ -275,29 +287,26 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)
return (configuredRoutes, errors);
}

private async Task<(IList<ClusterConfig>, IList<Exception>)> VerifyClustersAsync(IReadOnlyList<ClusterConfig> clusters, CancellationToken cancellation)
private async Task<(IReadOnlyDictionary<string, ClusterConfig>, IList<Exception>)> VerifyClustersAsync(IReadOnlyList<ClusterConfig> clusters, CancellationToken cancellation)
{
if (clusters == null)
{
return (Array.Empty<ClusterConfig>(), Array.Empty<Exception>());
return (_emptyClusterDictionary, Array.Empty<Exception>());
}

var seenClusterIds = new HashSet<string>(clusters.Count, StringComparer.OrdinalIgnoreCase);
var configuredClusters = new List<ClusterConfig>(clusters.Count);
var configuredClusters = new Dictionary<string, ClusterConfig>(clusters.Count, StringComparer.OrdinalIgnoreCase);
var errors = new List<Exception>();
// The IProxyConfigProvider provides a fresh snapshot that we need to reconfigure each time.
foreach (var c in clusters)
{
try
{
if (seenClusterIds.Contains(c.ClusterId))
if (configuredClusters.ContainsKey(c.ClusterId))
{
errors.Add(new ArgumentException($"Duplicate cluster '{c.ClusterId}'."));
continue;
}

seenClusterIds.Add(c.ClusterId);

// Don't modify the original
var cluster = c;

Expand All @@ -313,7 +322,7 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)
continue;
}

configuredClusters.Add(cluster);
configuredClusters.Add(cluster.ClusterId, cluster);
}
catch (Exception ex)
{
Expand All @@ -323,13 +332,13 @@ private async Task<bool> ApplyConfigAsync(IProxyConfig config)

if (errors.Count > 0)
{
return (Array.Empty<ClusterConfig>(), errors);
return (_emptyClusterDictionary, errors);
}

return (configuredClusters, errors);
}

private void UpdateRuntimeClusters(IList<ClusterConfig> incomingClusters)
private void UpdateRuntimeClusters(IEnumerable<ClusterConfig> incomingClusters)
{
var desiredClusters = new HashSet<string>(StringComparer.OrdinalIgnoreCase);

Expand Down
53 changes: 40 additions & 13 deletions test/ReverseProxy.Tests/Management/ProxyConfigManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
return new ValueTask<ClusterConfig>(cluster);
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
return new ValueTask<RouteConfig>(route with
{
Expand All @@ -387,45 +387,72 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
});
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
return new ValueTask<RouteConfig>(route with { Order = 12 });
string order;
if (cluster != null)
{
order = cluster.Metadata["Order"];
}
else
{
order = "12";
}

return new ValueTask<RouteConfig>(route with { Order = int.Parse(order) });
}
}

[Fact]
public async Task LoadAsync_ConfigFilterConfiguresCluster_Works()
{
var route = new RouteConfig
var route1 = new RouteConfig
{
RouteId = "route1",
ClusterId = "cluster1",
Match = new RouteMatch { Path = "/" }
};
var route2 = new RouteConfig
{
RouteId = "route2",
ClusterId = "cluster2",
Match = new RouteMatch { Path = "/" }
};
var cluster = new ClusterConfig()
{
ClusterId = "cluster1",
Destinations = new Dictionary<string, DestinationConfig>(StringComparer.OrdinalIgnoreCase)
{
{ "d1", new DestinationConfig() { Address = "http://localhost" } }
},
Metadata = new Dictionary<string, string>
{
["Order"] = "47"
}
};
var services = CreateServices(new List<RouteConfig>() { route }, new List<ClusterConfig>() { cluster }, proxyBuilder =>
var services = CreateServices(new List<RouteConfig>() { route1, route2 }, new List<ClusterConfig>() { cluster }, proxyBuilder =>
{
proxyBuilder.AddConfigFilter<ClusterAndRouteFilter>();
});
var manager = services.GetRequiredService<ProxyConfigManager>();
var dataSource = await manager.InitialLoadAsync();

Assert.NotNull(dataSource);
var endpoint = Assert.Single(dataSource.Endpoints);
var routeConfig = endpoint.Metadata.GetMetadata<RouteModel>();
var clusterState = routeConfig.Cluster;
Assert.NotNull(clusterState);
Assert.True(clusterState.Model.Config.HealthCheck.Active.Enabled);
Assert.Equal(TimeSpan.FromSeconds(12), clusterState.Model.Config.HealthCheck.Active.Interval);
var destination = Assert.Single(clusterState.DestinationsState.AllDestinations);
Assert.Equal(2, dataSource.Endpoints.Count);

var endpoint1 = Assert.Single(dataSource.Endpoints.Where(x => x.DisplayName == "route1"));
var routeConfig1 = endpoint1.Metadata.GetMetadata<RouteModel>();
Assert.Equal(47, routeConfig1.Config.Order);
var clusterState1 = routeConfig1.Cluster;
Assert.NotNull(clusterState1);
Assert.True(clusterState1.Model.Config.HealthCheck.Active.Enabled);
Assert.Equal(TimeSpan.FromSeconds(12), clusterState1.Model.Config.HealthCheck.Active.Interval);
var destination = Assert.Single(clusterState1.DestinationsState.AllDestinations);
Assert.Equal("http://localhost", destination.Model.Config.Address);

var endpoint2 = Assert.Single(dataSource.Endpoints.Where(x => x.DisplayName == "route2"));
var routeConfig2 = endpoint2.Metadata.GetMetadata<RouteModel>();
Assert.Equal(12, routeConfig2.Config.Order);
}

private class ClusterAndRouteThrows : IProxyConfigFilter
Expand All @@ -435,7 +462,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
throw new NotFiniteNumberException("Test exception");
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
throw new NotFiniteNumberException("Test exception");
}
Expand Down
2 changes: 1 addition & 1 deletion testassets/ReverseProxy.Config/CustomConfigFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, Can
return new ValueTask<ClusterConfig>(cluster);
}

public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, CancellationToken cancel)
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig cluster, CancellationToken cancel)
{
// Do not let config based routes take priority over code based routes.
// Lower numbers are higher priority. Code routes default to 0.
Expand Down