Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public record KubernetesRuntimeConfiguration
{

/// <summary>
/// Gets the default worker <see cref="V1Pod"/>
/// Gets the default worker <see cref="V1PodTemplateSpec"/>
/// </summary>
public static readonly V1Pod DefaultPodTemplate = new()
public static readonly V1PodTemplateSpec DefaultPodTemplate = new()
{
Metadata = new(),
Spec = new()
Expand Down Expand Up @@ -79,7 +79,7 @@ public KubernetesRuntimeConfiguration()
/// Gets/sets the template to use to create runner containers
/// </summary>
[DataMember(Order = 2, Name = "podTemplate"), JsonPropertyOrder(2), JsonPropertyName("podTemplate"), YamlMember(Order = 2, Alias = "podTemplate")]
public virtual V1Pod PodTemplate { get; set; } = LoadPodTemplate();
public virtual V1PodTemplateSpec PodTemplate { get; set; } = LoadPodTemplate();

/// <summary>
/// Gets/sets the configuration of the secrets used by the Kubernetes runtime
Expand All @@ -103,12 +103,12 @@ public KubernetesRuntimeConfiguration()
/// Loads the runner container template
/// </summary>
/// <returns>The runner container template</returns>
public static V1Pod LoadPodTemplate()
public static V1PodTemplateSpec LoadPodTemplate()
{
var templateFilePath = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Kubernetes.Pod);
if (string.IsNullOrWhiteSpace(templateFilePath) || !File.Exists(templateFilePath)) return DefaultPodTemplate;
var yaml = File.ReadAllText(templateFilePath);
return YamlSerializer.Default.Deserialize<V1Pod>(yaml)!;
return YamlSerializer.Default.Deserialize<V1PodTemplateSpec>(yaml)!;
}

}
22 changes: 14 additions & 8 deletions src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,54 +26,60 @@ public record WorkflowInstanceStatus
[DataMember(Order = 1, Name = "phase"), JsonPropertyName("phase"), JsonPropertyOrder(1), YamlMember(Alias = "phase", Order = 1)]
public virtual string? Phase { get; set; }

/// <summary>
/// Gets/sets the unique identifier of the process that is executing the workflow instance, if applicable
/// </summary>
[DataMember(Name = "processId", Order = 2), JsonPropertyName("processId"), JsonPropertyOrder(2), YamlMember(Alias = "processId", Order = 2)]
public virtual string? ProcessId { get; set; }

/// <summary>
/// Gets/sets the date and time the task has been started at, if applicable
/// </summary>
[DataMember(Name = "startedAt", Order = 2), JsonPropertyName("startedAt"), JsonPropertyOrder(2), YamlMember(Alias = "startedAt", Order = 2)]
[DataMember(Name = "startedAt", Order = 3), JsonPropertyName("startedAt"), JsonPropertyOrder(3), YamlMember(Alias = "startedAt", Order = 3)]
public virtual DateTimeOffset? StartedAt { get; set; }

/// <summary>
/// Gets/sets the date and time the task has ended, if applicable
/// </summary>
[DataMember(Name = "endedAt", Order = 3), JsonPropertyName("endedAt"), JsonPropertyOrder(3), YamlMember(Alias = "endedAt", Order = 3)]
[DataMember(Name = "endedAt", Order = 4), JsonPropertyName("endedAt"), JsonPropertyOrder(4), YamlMember(Alias = "endedAt", Order = 4)]
public virtual DateTimeOffset? EndedAt { get; set; }

/// <summary>
/// Gets/sets a list containing the tasks that are being performed -or already have been performed- by the workflow
/// </summary>
[DataMember(Order = 4, Name = "tasks"), JsonPropertyName("tasks"), JsonPropertyOrder(4), YamlMember(Alias = "tasks", Order = 4)]
[DataMember(Order = 5, Name = "tasks"), JsonPropertyName("tasks"), JsonPropertyOrder(5), YamlMember(Alias = "tasks", Order = 5)]
public virtual EquatableList<TaskInstance>? Tasks { get; set; }

/// <summary>
/// Gets/sets a list that contains the workflow's runs, if any
/// </summary>
[DataMember(Order = 5, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(5), YamlMember(Alias = "runs", Order = 5)]
[DataMember(Order = 6, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(6), YamlMember(Alias = "runs", Order = 6)]
public virtual EquatableList<WorkflowRun>? Runs { get; set; }

/// <summary>
/// Gets/sets a name/context map that contains the workflow's pending correlations
/// </summary>
[DataMember(Order = 6, Name = "correlation"), JsonPropertyName("correlation"), JsonPropertyOrder(6), YamlMember(Alias = "correlation", Order = 6)]
[DataMember(Order = 7, Name = "correlation"), JsonPropertyName("correlation"), JsonPropertyOrder(7), YamlMember(Alias = "correlation", Order = 7)]
public virtual WorkflowInstanceCorrelationStatus? Correlation { get; set; }

/// <summary>
/// Gets/sets the error, if any, that has occurred during the workflow's execution
/// </summary>
[DataMember(Name = "error", Order = 7), JsonPropertyName("error"), JsonPropertyOrder(7), YamlMember(Alias = "error", Order = 7)]
[DataMember(Name = "error", Order = 8), JsonPropertyName("error"), JsonPropertyOrder(8), YamlMember(Alias = "error", Order = 8)]
public virtual Error? Error { get; set; }

/// <summary>
/// Gets/sets a reference to the workflow's context data, if any
/// </summary>
[Required, MinLength(1)]
[DataMember(Order = 8, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(8), YamlMember(Alias = "contextReference", Order = 8)]
[DataMember(Order = 9, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(9), YamlMember(Alias = "contextReference", Order = 9)]
public virtual string ContextReference { get; set; } = null!;

/// <summary>
/// Gets/sets a reference to the workflow's context data, if any
/// </summary>
[Required, MinLength(1)]
[DataMember(Order = 9, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(9), YamlMember(Alias = "outputReference", Order = 9)]
[DataMember(Order = 10, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(10), YamlMember(Alias = "outputReference", Order = 10)]
public virtual string? OutputReference { get; set; }

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ namespace Synapse.Operator.Services;
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
/// <param name="controllerOptions">The service used to access the current <see cref="IOptions{TOptions}"/></param>
/// <param name="repository">The service used to manage <see cref="IResource"/>s</param>
/// <param name="operatorController">The service used to access the current <see cref="Resources.Operator"/></param>
/// <param name="workflowController">The service used to access all monitored <see cref="Workflow"/>s</param>
/// <param name="operator">The service used to access the current <see cref="Resources.Operator"/></param>
/// <param name="workflows">The service used to access all monitored <see cref="Workflow"/>s</param>
/// <param name="workflowRuntime">The service used to run workflows</param>
/// <param name="documents">The <see cref="IRepository"/> used to manage <see cref="Document"/>s</param>
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository<Document, string> documents)
public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions<ResourceControllerOptions<WorkflowInstance>> controllerOptions, IResourceRepository repository, IOperatorController @operator, IWorkflowController workflows, IWorkflowRuntime workflowRuntime, IRepository<Document, string> documents)
: ResourceController<WorkflowInstance>(loggerFactory, controllerOptions, repository)
{

Expand All @@ -37,12 +38,17 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
/// <summary>
/// Gets the service used to monitor the current <see cref="Operator"/>
/// </summary>
protected IResourceMonitor<Resources.Operator> Operator => operatorController.Operator;
protected IResourceMonitor<Resources.Operator> Operator => @operator.Operator;

/// <summary>
/// Gets a dictionary containing all monitored <see cref="Workflow"/>s
/// </summary>
protected IReadOnlyDictionary<string, Workflow> Workflows => workflowController.Workflows;
protected IReadOnlyDictionary<string, Workflow> Workflows => workflows.Workflows;

/// <summary>
/// Gets the service used to run workflows
/// </summary>
protected IWorkflowRuntime WorkflowRuntime { get; } = workflowRuntime;

/// <summary>
/// Gets the <see cref="IRepository"/> used to manage <see cref="Document"/>s
Expand Down Expand Up @@ -231,11 +237,18 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
var selectors = new LabelSelector[]
{
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
};
await foreach (var correlation in this.Repository.GetAllAsync<Correlation>(null, selectors, cancellationToken: cancellationToken))
{
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
try
{
await this.Repository.RemoveAsync<Correlation>(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
}
catch(Exception ex)
{
Logger.LogWarning(ex, "Failed to delete correlation '{correlation}' for workflow instance '{workflowInstance}'", correlation.GetQualifiedName(), workflowInstance.GetQualifiedName());
}
}
if (workflowInstance.Status != null)
{
Expand All @@ -251,12 +264,33 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference);
}
}
foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
foreach (var documentReference in documentReferences.Distinct())
{
try
{
await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
Logger.LogWarning(ex, "Failed to delete document '{document}' for workflow instance '{workflowInstance}'", documentReference, workflowInstance.GetQualifiedName());
}
}
if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ProcessId))
{
try
{
await WorkflowRuntime.DeleteProcessAsync(workflowInstance.Status.ProcessId, cancellationToken).ConfigureAwait(false);
}
catch(Exception ex)
{
Logger.LogWarning(ex, "Failed to delete process with id '{processId}' for workflow instance '{workflowInstance}'", workflowInstance.Status.ProcessId, workflowInstance.GetQualifiedName());
}
}
}
}
catch(Exception ex)
{
this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
Logger.LogError("An error occurred while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}

Expand Down
33 changes: 33 additions & 0 deletions src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ protected virtual async Task StartProcessAsync(CancellationToken cancellationTok
this.LogSubscription?.Dispose();
var workflow = await this.GetWorkflowAsync(cancellationToken).ConfigureAwait(false);
var serviceAccount = await this.GetServiceAccountAsync(cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(WorkflowInstance.Resource.Status?.ProcessId)) await Runtime.DeleteProcessAsync(WorkflowInstance.Resource.Status.ProcessId, cancellationToken).ConfigureAwait(false);
this.Process = await this.Runtime.CreateProcessAsync(workflow, this.WorkflowInstance.Resource, serviceAccount, cancellationToken).ConfigureAwait(false);
if (!string.IsNullOrWhiteSpace(this.Process.Id)) await UpdateWorkflowInstanceStatusAsync(status =>
{
status.ProcessId = this.Process.Id;
}, cancellationToken).ConfigureAwait(false);
await this.Process.StartAsync(cancellationToken).ConfigureAwait(false);
this.LogSubscription = this.Process.StandardOutput?.Subscribe(this.LogBatchQueue.Enqueue);
this.LogBatchTimer ??= new(async _ => await this.OnPersistLogBatchAsync(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
Expand Down Expand Up @@ -205,6 +210,34 @@ protected virtual async Task OnPersistLogBatchAsync()
this._persistingLogs = false;
}

/// <summary>
/// Updates the status of the handled <see cref="Resources.WorkflowInstance"/>
/// </summary>
/// <param name="statusUpdate">An <see cref="Action{T}"/> used to update the <see cref="Resources.WorkflowInstance"/>'s status</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
protected virtual async Task UpdateWorkflowInstanceStatusAsync(Action<WorkflowInstanceStatus> statusUpdate, CancellationToken cancellationToken)
{
ArgumentNullException.ThrowIfNull(statusUpdate);
var maxRetries = 3;
for (var attempt = 0; attempt < maxRetries; attempt++)
{
try
{
var original = this.WorkflowInstance.Resource;
var updated = original.Clone()!;
updated.Status ??= new();
statusUpdate(updated.Status);
var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
await this.Resources.PatchAsync<WorkflowInstance>(new Patch(PatchType.JsonPatch, patch), updated.GetName(), updated.GetNamespace(), original.Metadata.ResourceVersion, false, cancellationToken).ConfigureAwait(false);
}
catch (ConcurrencyException) when (attempt + 1 < maxRetries)
{
await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken).ConfigureAwait(false);
}
}
}

/// <summary>
/// Disposes of the <see cref="WorkflowInstanceHandler"/>
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public interface IWorkflowRuntime
/// <returns>A new <see cref="IWorkflowProcess"/></returns>
Task<IWorkflowProcess> CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, ServiceAccount serviceAccount, CancellationToken cancellationToken = default);

/// <summary>
/// Deletes the <see cref="IWorkflowProcess"/> with the specified id
/// </summary>
/// <param name="processId">The unique identifier of the process to delete</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/></param>
/// <returns>A new awaitable <see cref="Task"/></returns>
Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ protected WorkflowRuntimeBase(ILoggerFactory loggerFactory)
/// <inheritdoc/>
public abstract Task<IWorkflowProcess> CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, ServiceAccount serviceAccount, CancellationToken cancellationToken = default);

/// <inheritdoc/>
public abstract Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default);

/// <summary>
/// Disposes of the <see cref="WorkflowProcessBase"/>
/// </summary>
Expand Down
24 changes: 23 additions & 1 deletion src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using Docker.DotNet.Models;
using Microsoft.Extensions.DependencyInjection;
using Synapse.Runtime.Services;
using static Synapse.SynapseDefaults.Resources;
using System.Net;

namespace Synapse.Runtime.Docker.Services;
Expand Down Expand Up @@ -143,6 +142,29 @@ public override async Task<IWorkflowProcess> CreateProcessAsync(Workflow workflo
}
}

/// <inheritdoc/>
public override async Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(processId);
try
{
Logger.LogDebug("Deleting the Docker process with id '{processId}'...", processId);
await Docker!.Containers.RemoveContainerAsync(processId, new()
{
Force = true,
RemoveVolumes = true,
RemoveLinks = true
}, cancellationToken).ConfigureAwait(false);
Processes.TryRemove(processId, out _);
Logger.LogDebug("The Docker process with id '{processId}' has been successfully deleted", processId);
}
catch(Exception ex)
{
Logger.LogError("An error occurred while deleting the Docker process with id '{processId}': {ex}", processId, ex);
throw;
}
}

/// <inheritdoc/>
protected override ValueTask DisposeAsync(bool disposing)
{
Expand Down
Loading
Loading