-
Notifications
You must be signed in to change notification settings - Fork 39
Very rough outline of plugins #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
using System.Threading.Tasks; | ||
|
||
namespace Temporalio.Client.Interceptors | ||
{ | ||
/// <summary> | ||
/// Plugin for client calls. | ||
/// </summary> | ||
public interface IClientPlugin | ||
{ | ||
public void InitClientPlugin(IClientPlugin nextPlugin); | ||
Check warning on line 10 in src/Temporalio/Client/Interceptors/IClientPlugin.cs
|
||
|
||
public TemporalClientOptions OnCreateClient(TemporalClientOptions options); | ||
Check warning on line 12 in src/Temporalio/Client/Interceptors/IClientPlugin.cs
|
||
|
||
public Task<TemporalConnection> TemporalConnectAsync(TemporalClientConnectOptions options); | ||
Check warning on line 14 in src/Temporalio/Client/Interceptors/IClientPlugin.cs
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would call this |
||
|
||
public TemporalConnection TemporalConnect(TemporalClientConnectOptions options); | ||
Check warning on line 16 in src/Temporalio/Client/Interceptors/IClientPlugin.cs
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,8 @@ | ||||||||||||||
using System; | ||||||||||||||
using System.Linq; | ||||||||||||||
using System.Threading.Tasks; | ||||||||||||||
using Temporalio.Client.Interceptors; | ||||||||||||||
using Temporalio.Common; | ||||||||||||||
|
||||||||||||||
namespace Temporalio.Client | ||||||||||||||
{ | ||||||||||||||
|
@@ -21,6 +23,13 @@ public partial class TemporalClient : ITemporalClient | |||||||||||||
/// <param name="options">Options for this client.</param> | ||||||||||||||
public TemporalClient(ITemporalConnection connection, TemporalClientOptions options) | ||||||||||||||
{ | ||||||||||||||
// Apply plugin modifications to options | ||||||||||||||
if (options.Plugins != null) | ||||||||||||||
{ | ||||||||||||||
options = options.Plugins.Reverse().Aggregate( | ||||||||||||||
options, (clientOptions, plugin) => plugin.OnCreateClient(options)); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
Connection = connection; | ||||||||||||||
Options = options; | ||||||||||||||
OutboundInterceptor = new Impl(this); | ||||||||||||||
|
@@ -61,10 +70,21 @@ public TemporalClient(ITemporalConnection connection, TemporalClientOptions opti | |||||||||||||
/// <param name="options">Options for connecting.</param> | ||||||||||||||
/// <returns>The connected client.</returns> | ||||||||||||||
public static async Task<TemporalClient> ConnectAsync( | ||||||||||||||
TemporalClientConnectOptions options) => | ||||||||||||||
new( | ||||||||||||||
await TemporalConnection.ConnectAsync(options).ConfigureAwait(false), | ||||||||||||||
TemporalClientConnectOptions options) | ||||||||||||||
{ | ||||||||||||||
IClientPlugin rootPlugin = new RootPlugin(); | ||||||||||||||
if (options.Plugins != null) | ||||||||||||||
{ | ||||||||||||||
foreach (var plugin in options.Plugins) | ||||||||||||||
{ | ||||||||||||||
plugin.InitClientPlugin(rootPlugin); | ||||||||||||||
rootPlugin = plugin; | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
return new( | ||||||||||||||
await rootPlugin.TemporalConnectAsync(options).ConfigureAwait(false), | ||||||||||||||
options.ToClientOptions()); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// <summary> | ||||||||||||||
/// Create a client to a Temporal namespace that does not connect until first call. | ||||||||||||||
|
@@ -73,8 +93,19 @@ await TemporalConnection.ConnectAsync(options).ConfigureAwait(false), | |||||||||||||
/// </summary> | ||||||||||||||
/// <param name="options">Options for connecting.</param> | ||||||||||||||
/// <returns>The not-yet-connected client.</returns> | ||||||||||||||
public static TemporalClient CreateLazy(TemporalClientConnectOptions options) => | ||||||||||||||
new(TemporalConnection.CreateLazy(options), options.ToClientOptions()); | ||||||||||||||
public static TemporalClient CreateLazy(TemporalClientConnectOptions options) | ||||||||||||||
{ | ||||||||||||||
IClientPlugin rootPlugin = new RootPlugin(); | ||||||||||||||
if (options.Plugins != null) | ||||||||||||||
{ | ||||||||||||||
foreach (var plugin in options.Plugins) | ||||||||||||||
{ | ||||||||||||||
plugin.InitClientPlugin(rootPlugin); | ||||||||||||||
rootPlugin = plugin; | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
return new(rootPlugin.TemporalConnect(options), options.ToClientOptions()); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/// <summary> | ||||||||||||||
/// Get a default set of retry options given the optional options. This will not mutate the | ||||||||||||||
|
@@ -112,5 +143,33 @@ internal partial class Impl : Interceptors.ClientOutboundInterceptor | |||||||||||||
/// </summary> | ||||||||||||||
internal TemporalClient Client { get; init; } | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
#pragma warning disable CA1822 // We don't want to force plugin methods to be static | ||||||||||||||
/// <summary> | ||||||||||||||
/// Placeholder. | ||||||||||||||
/// </summary> | ||||||||||||||
internal class RootPlugin : Plugin | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would recommend making this implement the interface w/ a throw |
||||||||||||||
{ | ||||||||||||||
/// <summary> | ||||||||||||||
/// Place. | ||||||||||||||
/// </summary> | ||||||||||||||
/// <param name="options">holder.</param> | ||||||||||||||
/// <returns>Connection.</returns> | ||||||||||||||
public new async Task<TemporalConnection> TemporalConnectAsync(TemporalClientConnectOptions options) | ||||||||||||||
{ | ||||||||||||||
return await TemporalConnection.ConnectAsync(options).ConfigureAwait(false); | ||||||||||||||
} | ||||||||||||||
Comment on lines
+158
to
+161
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Pedantic shortcut |
||||||||||||||
|
||||||||||||||
/// <summary> | ||||||||||||||
/// Place. | ||||||||||||||
/// </summary> | ||||||||||||||
/// <param name="options">Holder.</param> | ||||||||||||||
/// <returns>Connection.</returns> | ||||||||||||||
public new TemporalConnection TemporalConnect(TemporalClientConnectOptions options) | ||||||||||||||
{ | ||||||||||||||
return TemporalConnection.CreateLazy(options); | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
#pragma warning restore CA1822 | ||||||||||||||
} | ||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,6 +46,8 @@ | |
public Converters.DataConverter DataConverter { get; set; } = | ||
Converters.DataConverter.Default; | ||
|
||
public IReadOnlyCollection<Interceptors.IClientPlugin>? Plugins { get; set; } | ||
Check warning on line 49 in src/Temporalio/Client/TemporalClientConnectOptions.cs
|
||
|
||
/// <summary> | ||
/// Gets or sets the interceptors to intercept client calls. | ||
/// </summary> | ||
|
@@ -76,6 +78,7 @@ | |
{ | ||
Namespace = Namespace, | ||
DataConverter = DataConverter, | ||
Plugins = Plugins, | ||
Interceptors = Interceptors, | ||
LoggerFactory = LoggerFactory, | ||
QueryRejectCondition = QueryRejectCondition, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Temporalio.Client; | ||
using Temporalio.Client.Interceptors; | ||
using Temporalio.Worker; | ||
using Temporalio.Worker.Interceptors; | ||
|
||
namespace Temporalio.Common | ||
{ | ||
public class Plugin : IClientPlugin, IWorkerPlugin | ||
Check warning on line 11 in src/Temporalio/Common/Plugin.cs
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mark all methods here as virtual |
||
{ | ||
private IWorkerPlugin? nextWorkerPlugin; | ||
private IClientPlugin? nextClientPlugin; | ||
Comment on lines
+13
to
+14
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would recommend these be protected properties, and that they return non-null values and fail if the backing fields are null |
||
|
||
public void InitClientPlugin(IClientPlugin nextPlugin) => nextClientPlugin = nextPlugin; | ||
Check warning on line 16 in src/Temporalio/Common/Plugin.cs
|
||
|
||
public TemporalClientOptions OnCreateClient(TemporalClientOptions options) => options; | ||
Check warning on line 18 in src/Temporalio/Common/Plugin.cs
|
||
|
||
public Task<TemporalConnection> TemporalConnectAsync(TemporalClientConnectOptions options) => | ||
Check warning on line 20 in src/Temporalio/Common/Plugin.cs
|
||
nextClientPlugin!.TemporalConnectAsync(options); | ||
|
||
public TemporalConnection TemporalConnect(TemporalClientConnectOptions options) => | ||
Check warning on line 23 in src/Temporalio/Common/Plugin.cs
|
||
nextClientPlugin!.TemporalConnect(options); | ||
|
||
public void InitWorkerPlugin(IWorkerPlugin nextPlugin) => nextWorkerPlugin = nextPlugin; | ||
Check warning on line 26 in src/Temporalio/Common/Plugin.cs
|
||
|
||
public TemporalWorkerOptions OnCreateWorker(TemporalWorkerOptions options) => | ||
nextWorkerPlugin!.OnCreateWorker(options); | ||
|
||
public Task ExecuteAsync(TemporalWorker worker, Func<Task>? untilComplete, | ||
CancellationToken stoppingToken = default) => | ||
nextWorkerPlugin!.ExecuteAsync(worker, untilComplete, stoppingToken); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
#pragma warning disable CA1822 // We don't want to force plugin methods to be static | ||
|
||
namespace Temporalio.Worker.Interceptors | ||
{ | ||
public interface IWorkerPlugin | ||
Check warning on line 8 in src/Temporalio/Worker/Interceptors/IWorkerPlugin.cs
|
||
{ | ||
public void InitWorkerPlugin(IWorkerPlugin nextPlugin); | ||
|
||
public TemporalWorkerOptions OnCreateWorker(TemporalWorkerOptions options); | ||
|
||
public Task ExecuteAsync( | ||
TemporalWorker worker, Func<Task>? untilComplete, CancellationToken stoppingToken = default); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
private readonly object clientLock = new(); | ||
private readonly ActivityWorker? activityWorker; | ||
private readonly WorkflowWorker? workflowWorker; | ||
private readonly IWorkerPlugin plugin; | ||
private readonly bool workflowTracingEventListenerEnabled; | ||
private IWorkerClient client; | ||
private int started; | ||
|
@@ -34,16 +35,40 @@ | |
public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options) | ||
{ | ||
this.client = client; | ||
var loggerFactory = options.LoggerFactory ?? client.Options.LoggerFactory; | ||
// Clone the options to discourage mutation (but we aren't completely disabling mutation | ||
// on the Options field herein). | ||
Options = (TemporalWorkerOptions)options.Clone(); | ||
|
||
var plugins = client.Options.Plugins?.OfType<IWorkerPlugin>() ?? Enumerable.Empty<IWorkerPlugin>(); | ||
if (Options.Plugins != null) | ||
{ | ||
plugins = plugins.Concat(Options.Plugins); | ||
} | ||
|
||
#pragma warning disable CA1851 | ||
plugins = plugins.Reverse(); | ||
Options = plugins | ||
.Aggregate(Options, (workerOptions, plugin) => plugin.OnCreateWorker(workerOptions)); | ||
|
||
IWorkerPlugin rootPlugin = new RootPlugin(); | ||
foreach (var plugin in plugins) | ||
{ | ||
plugin.InitWorkerPlugin(rootPlugin); | ||
rootPlugin = plugin; | ||
} | ||
plugin = rootPlugin; | ||
#pragma warning restore CA1851 | ||
|
||
// Ensure later accesses use the plugin modified options | ||
options = Options; | ||
|
||
var loggerFactory = options.LoggerFactory ?? client.Options.LoggerFactory; | ||
var bridgeClient = client.BridgeClientProvider.BridgeClient ?? | ||
throw new InvalidOperationException("Cannot use unconnected lazy client for worker"); | ||
BridgeWorker = new( | ||
(Bridge.Client)bridgeClient, | ||
client.Options.Namespace, | ||
options, | ||
Options, | ||
loggerFactory); | ||
if (options.Activities.Count == 0 && options.Workflows.Count == 0) | ||
{ | ||
|
@@ -276,6 +301,12 @@ | |
|
||
private async Task ExecuteInternalAsync( | ||
Func<Task>? untilComplete, CancellationToken stoppingToken) | ||
{ | ||
await plugin.ExecuteAsync(this, untilComplete, stoppingToken).ConfigureAwait(false); | ||
} | ||
|
||
private async Task ExecuteFromPluginAsync( | ||
Func<Task>? untilComplete, CancellationToken stoppingToken) | ||
{ | ||
if (Interlocked.Exchange(ref started, 1) != 0) | ||
{ | ||
|
@@ -408,5 +439,16 @@ | |
#pragma warning restore CA1031 | ||
} | ||
} | ||
|
||
internal class RootPlugin : Plugin | ||
Check warning on line 443 in src/Temporalio/Worker/TemporalWorker.cs
|
||
{ | ||
public new async Task ExecuteAsync( | ||
TemporalWorker worker, Func<Task>? untilComplete, CancellationToken stoppingToken = default) | ||
{ | ||
await worker.ExecuteFromPluginAsync( | ||
untilComplete, | ||
stoppingToken).ConfigureAwait(false); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -96,6 +96,8 @@ | |
/// </summary> | ||
public TaskFactory ActivityTaskFactory { get; set; } = Task.Factory; | ||
|
||
public IReadOnlyCollection<Interceptors.IWorkerPlugin>? Plugins { get; set; } | ||
Check warning on line 99 in src/Temporalio/Worker/TemporalWorkerOptions.cs
|
||
|
||
/// <summary> | ||
/// Gets or sets the interceptors. Note this automatically includes any | ||
/// <see cref="Client.TemporalClientOptions.Interceptors" /> that implement | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would not recommend putting this in the
Interceptors
namespace, and would recommend calling thisITemporalClientPlugin
(same two things for worker side)