diff --git a/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs b/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs
index d15c4b27f..4f5514bd6 100644
--- a/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs
+++ b/src/core/Synapse.Core/Resources/KubernetesRuntimeConfiguration.cs
@@ -24,9 +24,9 @@ public record KubernetesRuntimeConfiguration
{
///
- /// Gets the default worker
+ /// Gets the default worker
///
- public static readonly V1Pod DefaultPodTemplate = new()
+ public static readonly V1PodTemplateSpec DefaultPodTemplate = new()
{
Metadata = new(),
Spec = new()
@@ -79,7 +79,7 @@ public KubernetesRuntimeConfiguration()
/// Gets/sets the template to use to create runner containers
///
[DataMember(Order = 2, Name = "podTemplate"), JsonPropertyOrder(2), JsonPropertyName("podTemplate"), YamlMember(Order = 2, Alias = "podTemplate")]
- public virtual V1Pod PodTemplate { get; set; } = LoadPodTemplate();
+ public virtual V1PodTemplateSpec PodTemplate { get; set; } = LoadPodTemplate();
///
/// Gets/sets the configuration of the secrets used by the Kubernetes runtime
@@ -103,12 +103,12 @@ public KubernetesRuntimeConfiguration()
/// Loads the runner container template
///
/// The runner container template
- public static V1Pod LoadPodTemplate()
+ public static V1PodTemplateSpec LoadPodTemplate()
{
var templateFilePath = Environment.GetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.Runtime.Kubernetes.Pod);
if (string.IsNullOrWhiteSpace(templateFilePath) || !File.Exists(templateFilePath)) return DefaultPodTemplate;
var yaml = File.ReadAllText(templateFilePath);
- return YamlSerializer.Default.Deserialize(yaml)!;
+ return YamlSerializer.Default.Deserialize(yaml)!;
}
}
diff --git a/src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs b/src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs
index 06b52549e..222b2cb8b 100644
--- a/src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs
+++ b/src/core/Synapse.Core/Resources/WorkflowInstanceStatus.cs
@@ -26,54 +26,60 @@ public record WorkflowInstanceStatus
[DataMember(Order = 1, Name = "phase"), JsonPropertyName("phase"), JsonPropertyOrder(1), YamlMember(Alias = "phase", Order = 1)]
public virtual string? Phase { get; set; }
+ ///
+ /// Gets/sets the unique identifier of the process that is executing the workflow instance, if applicable
+ ///
+ [DataMember(Name = "processId", Order = 2), JsonPropertyName("processId"), JsonPropertyOrder(2), YamlMember(Alias = "processId", Order = 2)]
+ public virtual string? ProcessId { get; set; }
+
///
/// Gets/sets the date and time the task has been started at, if applicable
///
- [DataMember(Name = "startedAt", Order = 2), JsonPropertyName("startedAt"), JsonPropertyOrder(2), YamlMember(Alias = "startedAt", Order = 2)]
+ [DataMember(Name = "startedAt", Order = 3), JsonPropertyName("startedAt"), JsonPropertyOrder(3), YamlMember(Alias = "startedAt", Order = 3)]
public virtual DateTimeOffset? StartedAt { get; set; }
///
/// Gets/sets the date and time the task has ended, if applicable
///
- [DataMember(Name = "endedAt", Order = 3), JsonPropertyName("endedAt"), JsonPropertyOrder(3), YamlMember(Alias = "endedAt", Order = 3)]
+ [DataMember(Name = "endedAt", Order = 4), JsonPropertyName("endedAt"), JsonPropertyOrder(4), YamlMember(Alias = "endedAt", Order = 4)]
public virtual DateTimeOffset? EndedAt { get; set; }
///
/// Gets/sets a list containing the tasks that are being performed -or already have been performed- by the workflow
///
- [DataMember(Order = 4, Name = "tasks"), JsonPropertyName("tasks"), JsonPropertyOrder(4), YamlMember(Alias = "tasks", Order = 4)]
+ [DataMember(Order = 5, Name = "tasks"), JsonPropertyName("tasks"), JsonPropertyOrder(5), YamlMember(Alias = "tasks", Order = 5)]
public virtual EquatableList? Tasks { get; set; }
///
/// Gets/sets a list that contains the workflow's runs, if any
///
- [DataMember(Order = 5, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(5), YamlMember(Alias = "runs", Order = 5)]
+ [DataMember(Order = 6, Name = "runs"), JsonPropertyName("runs"), JsonPropertyOrder(6), YamlMember(Alias = "runs", Order = 6)]
public virtual EquatableList? Runs { get; set; }
///
/// Gets/sets a name/context map that contains the workflow's pending correlations
///
- [DataMember(Order = 6, Name = "correlation"), JsonPropertyName("correlation"), JsonPropertyOrder(6), YamlMember(Alias = "correlation", Order = 6)]
+ [DataMember(Order = 7, Name = "correlation"), JsonPropertyName("correlation"), JsonPropertyOrder(7), YamlMember(Alias = "correlation", Order = 7)]
public virtual WorkflowInstanceCorrelationStatus? Correlation { get; set; }
///
/// Gets/sets the error, if any, that has occurred during the workflow's execution
///
- [DataMember(Name = "error", Order = 7), JsonPropertyName("error"), JsonPropertyOrder(7), YamlMember(Alias = "error", Order = 7)]
+ [DataMember(Name = "error", Order = 8), JsonPropertyName("error"), JsonPropertyOrder(8), YamlMember(Alias = "error", Order = 8)]
public virtual Error? Error { get; set; }
///
/// Gets/sets a reference to the workflow's context data, if any
///
[Required, MinLength(1)]
- [DataMember(Order = 8, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(8), YamlMember(Alias = "contextReference", Order = 8)]
+ [DataMember(Order = 9, Name = "contextReference"), JsonPropertyName("contextReference"), JsonPropertyOrder(9), YamlMember(Alias = "contextReference", Order = 9)]
public virtual string ContextReference { get; set; } = null!;
///
/// Gets/sets a reference to the workflow's context data, if any
///
[Required, MinLength(1)]
- [DataMember(Order = 9, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(9), YamlMember(Alias = "outputReference", Order = 9)]
+ [DataMember(Order = 10, Name = "outputReference"), JsonPropertyName("outputReference"), JsonPropertyOrder(10), YamlMember(Alias = "outputReference", Order = 10)]
public virtual string? OutputReference { get; set; }
}
diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs
index 8f8b8b834..f2731e801 100644
--- a/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs
+++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceController.cs
@@ -22,10 +22,11 @@ namespace Synapse.Operator.Services;
/// The service used to create s
/// The service used to access the current
/// The service used to manage s
-/// The service used to access the current
-/// The service used to access all monitored s
+/// The service used to access the current
+/// The service used to access all monitored s
+/// The service used to run workflows
/// The used to manage s
-public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController operatorController, IWorkflowController workflowController, IRepository documents)
+public class WorkflowInstanceController(IServiceProvider serviceProvider, ILoggerFactory loggerFactory, IOptions> controllerOptions, IResourceRepository repository, IOperatorController @operator, IWorkflowController workflows, IWorkflowRuntime workflowRuntime, IRepository documents)
: ResourceController(loggerFactory, controllerOptions, repository)
{
@@ -37,12 +38,17 @@ public class WorkflowInstanceController(IServiceProvider serviceProvider, ILogge
///
/// Gets the service used to monitor the current
///
- protected IResourceMonitor Operator => operatorController.Operator;
+ protected IResourceMonitor Operator => @operator.Operator;
///
/// Gets a dictionary containing all monitored s
///
- protected IReadOnlyDictionary Workflows => workflowController.Workflows;
+ protected IReadOnlyDictionary Workflows => workflows.Workflows;
+
+ ///
+ /// Gets the service used to run workflows
+ ///
+ protected IWorkflowRuntime WorkflowRuntime { get; } = workflowRuntime;
///
/// Gets the used to manage s
@@ -231,11 +237,18 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
if (this.Handlers.TryRemove(workflowInstance.GetQualifiedName(), out var process)) await process.DisposeAsync().ConfigureAwait(false);
var selectors = new LabelSelector[]
{
- new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
+ new(SynapseDefaults.Resources.Labels.WorkflowInstance, LabelSelectionOperator.Equals, workflowInstance.GetQualifiedName())
};
await foreach (var correlation in this.Repository.GetAllAsync(null, selectors, cancellationToken: cancellationToken))
{
- await this.Repository.RemoveAsync(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
+ try
+ {
+ await this.Repository.RemoveAsync(correlation.GetName(), correlation.GetNamespace(), false, cancellationToken).ConfigureAwait(false);
+ }
+ catch(Exception ex)
+ {
+ Logger.LogWarning(ex, "Failed to delete correlation '{correlation}' for workflow instance '{workflowInstance}'", correlation.GetQualifiedName(), workflowInstance.GetQualifiedName());
+ }
}
if (workflowInstance.Status != null)
{
@@ -251,12 +264,33 @@ protected override async Task OnResourceDeletedAsync(WorkflowInstance workflowIn
if (!string.IsNullOrWhiteSpace(task.OutputReference)) documentReferences.Add(task.OutputReference);
}
}
- foreach (var documentReference in documentReferences.Distinct()) await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
+ foreach (var documentReference in documentReferences.Distinct())
+ {
+ try
+ {
+ await this.Documents.RemoveAsync(documentReference, cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ Logger.LogWarning(ex, "Failed to delete document '{document}' for workflow instance '{workflowInstance}'", documentReference, workflowInstance.GetQualifiedName());
+ }
+ }
+ if (!string.IsNullOrWhiteSpace(workflowInstance.Status.ProcessId))
+ {
+ try
+ {
+ await WorkflowRuntime.DeleteProcessAsync(workflowInstance.Status.ProcessId, cancellationToken).ConfigureAwait(false);
+ }
+ catch(Exception ex)
+ {
+ Logger.LogWarning(ex, "Failed to delete process with id '{processId}' for workflow instance '{workflowInstance}'", workflowInstance.Status.ProcessId, workflowInstance.GetQualifiedName());
+ }
+ }
}
}
catch(Exception ex)
{
- this.Logger.LogError("An error occured while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
+ Logger.LogError("An error occurred while handling the deletion of workflow instance '{workflowInstance}': {ex}", workflowInstance.GetQualifiedName(), ex);
}
}
diff --git a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs
index a556834fb..45e4b8b47 100644
--- a/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs
+++ b/src/operator/Synapse.Operator/Services/WorkflowInstanceHandler.cs
@@ -133,7 +133,12 @@ protected virtual async Task StartProcessAsync(CancellationToken cancellationTok
this.LogSubscription?.Dispose();
var workflow = await this.GetWorkflowAsync(cancellationToken).ConfigureAwait(false);
var serviceAccount = await this.GetServiceAccountAsync(cancellationToken).ConfigureAwait(false);
+ if (!string.IsNullOrWhiteSpace(WorkflowInstance.Resource.Status?.ProcessId)) await Runtime.DeleteProcessAsync(WorkflowInstance.Resource.Status.ProcessId, cancellationToken).ConfigureAwait(false);
this.Process = await this.Runtime.CreateProcessAsync(workflow, this.WorkflowInstance.Resource, serviceAccount, cancellationToken).ConfigureAwait(false);
+ if (!string.IsNullOrWhiteSpace(this.Process.Id)) await UpdateWorkflowInstanceStatusAsync(status =>
+ {
+ status.ProcessId = this.Process.Id;
+ }, cancellationToken).ConfigureAwait(false);
await this.Process.StartAsync(cancellationToken).ConfigureAwait(false);
this.LogSubscription = this.Process.StandardOutput?.Subscribe(this.LogBatchQueue.Enqueue);
this.LogBatchTimer ??= new(async _ => await this.OnPersistLogBatchAsync(), null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
@@ -205,6 +210,34 @@ protected virtual async Task OnPersistLogBatchAsync()
this._persistingLogs = false;
}
+ ///
+ /// Updates the status of the handled
+ ///
+ /// An used to update the 's status
+ /// A
+ /// A new awaitable
+ protected virtual async Task UpdateWorkflowInstanceStatusAsync(Action statusUpdate, CancellationToken cancellationToken)
+ {
+ ArgumentNullException.ThrowIfNull(statusUpdate);
+ var maxRetries = 3;
+ for (var attempt = 0; attempt < maxRetries; attempt++)
+ {
+ try
+ {
+ var original = this.WorkflowInstance.Resource;
+ var updated = original.Clone()!;
+ updated.Status ??= new();
+ statusUpdate(updated.Status);
+ var patch = JsonPatchUtility.CreateJsonPatchFromDiff(original, updated);
+ await this.Resources.PatchAsync(new Patch(PatchType.JsonPatch, patch), updated.GetName(), updated.GetNamespace(), original.Metadata.ResourceVersion, false, cancellationToken).ConfigureAwait(false);
+ }
+ catch (ConcurrencyException) when (attempt + 1 < maxRetries)
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(100 * (attempt + 1)), cancellationToken).ConfigureAwait(false);
+ }
+ }
+ }
+
///
/// Disposes of the
///
diff --git a/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowRuntime.cs b/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowRuntime.cs
index c8118859c..4067f91c3 100644
--- a/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowRuntime.cs
+++ b/src/runtime/Synapse.Runtime.Abstractions/Services/Interfaces/IWorkflowRuntime.cs
@@ -30,4 +30,12 @@ public interface IWorkflowRuntime
/// A new
Task CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, ServiceAccount serviceAccount, CancellationToken cancellationToken = default);
+ ///
+ /// Deletes the with the specified id
+ ///
+ /// The unique identifier of the process to delete
+ /// A
+ /// A new awaitable
+ Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default);
+
}
diff --git a/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowRuntimeBase.cs b/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowRuntimeBase.cs
index a10332f6a..b85f00d93 100644
--- a/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowRuntimeBase.cs
+++ b/src/runtime/Synapse.Runtime.Abstractions/Services/WorkflowRuntimeBase.cs
@@ -39,6 +39,9 @@ protected WorkflowRuntimeBase(ILoggerFactory loggerFactory)
///
public abstract Task CreateProcessAsync(Workflow workflow, WorkflowInstance workflowInstance, ServiceAccount serviceAccount, CancellationToken cancellationToken = default);
+ ///
+ public abstract Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default);
+
///
/// Disposes of the
///
diff --git a/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs b/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs
index 2f177f6e7..c20fbb57e 100644
--- a/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs
+++ b/src/runtime/Synapse.Runtime.Docker/Services/DockerRuntime.cs
@@ -15,7 +15,6 @@
using Docker.DotNet.Models;
using Microsoft.Extensions.DependencyInjection;
using Synapse.Runtime.Services;
-using static Synapse.SynapseDefaults.Resources;
using System.Net;
namespace Synapse.Runtime.Docker.Services;
@@ -143,6 +142,29 @@ public override async Task CreateProcessAsync(Workflow workflo
}
}
+ ///
+ public override async Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(processId);
+ try
+ {
+ Logger.LogDebug("Deleting the Docker process with id '{processId}'...", processId);
+ await Docker!.Containers.RemoveContainerAsync(processId, new()
+ {
+ Force = true,
+ RemoveVolumes = true,
+ RemoveLinks = true
+ }, cancellationToken).ConfigureAwait(false);
+ Processes.TryRemove(processId, out _);
+ Logger.LogDebug("The Docker process with id '{processId}' has been successfully deleted", processId);
+ }
+ catch(Exception ex)
+ {
+ Logger.LogError("An error occurred while deleting the Docker process with id '{processId}': {ex}", processId, ex);
+ throw;
+ }
+ }
+
///
protected override ValueTask DisposeAsync(bool disposing)
{
diff --git a/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs b/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs
index ca610e774..441ad73e7 100644
--- a/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs
+++ b/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesRuntime.cs
@@ -82,14 +82,14 @@ public override async Task CreateProcessAsync(Workflow workflo
ArgumentNullException.ThrowIfNull(serviceAccount);
try
{
- this.Logger.LogDebug("Creating a new Kubernetes pod for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName());
+ this.Logger.LogDebug("Creating a new Kubernetes job for workflow instance '{workflowInstance}'...", workflowInstance.GetQualifiedName());
if (this.Kubernetes == null) await this.InitializeAsync(cancellationToken).ConfigureAwait(false);
var workflowDefinition = workflow.Spec.Versions.Get(workflowInstance.Spec.Definition.Version) ?? throw new NullReferenceException($"Failed to find version '{workflowInstance.Spec.Definition.Version}' of workflow '{workflow.GetQualifiedName()}'");
var pod = this.Runner.Runtime.Kubernetes!.PodTemplate.Clone()!;
pod.Metadata ??= new();
pod.Metadata.Name = $"{workflowInstance.GetQualifiedName()}-{Guid.NewGuid().ToString("N")[..12].ToLowerInvariant()}";
if (!string.IsNullOrWhiteSpace(this.Runner.Runtime.Kubernetes.Namespace)) pod.Metadata.NamespaceProperty = this.Runner.Runtime.Kubernetes.Namespace;
- if (pod.Spec == null || pod.Spec.Containers == null || !pod.Spec.Containers.Any()) throw new InvalidOperationException("The specified Kubernetes runtime pod template is not valid");
+ if (pod.Spec == null || pod.Spec.Containers == null || !pod.Spec.Containers.Any()) throw new InvalidOperationException("The configured Kubernetes runtime pod template is not valid");
var volumeMounts = new List();
pod.Spec.Volumes ??= [];
if (workflowDefinition.Use?.Secrets?.Count > 0)
@@ -131,7 +131,7 @@ public override async Task CreateProcessAsync(Workflow workflo
if (this.Runner.Certificates?.Validate == false) container.SetEnvironmentVariable(SynapseDefaults.EnvironmentVariables.SkipCertificateValidation, "true");
container.VolumeMounts = volumeMounts;
}
- if(this.Runner.ContainerPlatform == ContainerPlatform.Kubernetes) pod.Spec.ServiceAccountName = this.Runner.Runtime.Kubernetes.ServiceAccount;
+ if (this.Runner.ContainerPlatform == ContainerPlatform.Kubernetes) pod.Spec.ServiceAccountName = this.Runner.Runtime.Kubernetes.ServiceAccount;
var process = ActivatorUtilities.CreateInstance(this.ServiceProvider, this.Kubernetes!, pod);
this.Processes.AddOrUpdate(process.Id, _ => process, (key, current) =>
{
@@ -141,7 +141,7 @@ public override async Task CreateProcessAsync(Workflow workflo
#pragma warning restore CA2012 // Use ValueTasks correctly
return process;
});
- this.Logger.LogDebug("A new container with id '{id}' has been successfully created to run workflow instance '{workflowInstance}'", process.Id, workflowInstance.GetQualifiedName());
+ this.Logger.LogDebug("A new job with id '{id}' has been successfully created to run workflow instance '{workflowInstance}'", process.Id, workflowInstance.GetQualifiedName());
return process;
}
catch (Exception ex)
@@ -151,6 +151,45 @@ public override async Task CreateProcessAsync(Workflow workflo
}
}
+ ///
+ public override async Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(processId);
+ try
+ {
+ Logger.LogDebug("Deleting the Kubernetes process with id '{processId}'...", processId);
+ var components = processId.Split('.', StringSplitOptions.RemoveEmptyEntries);
+ if (components.Length < 2) throw new ArgumentException($"The specified value '{processId}' is not valid Kubernetes process id", nameof(processId));
+ var name = components[0];
+ var @namespace = components[1];
+ if (Processes.TryGetValue(processId, out var process))
+ {
+ try
+ {
+ Logger.LogDebug("Attempting graceful shutdown for process '{processId}'...", processId);
+ await process.StopAsync(cancellationToken).ConfigureAwait(false);
+ }
+ catch (Exception ex)
+ {
+ Logger.LogWarning("Failed to gracefully stop process '{processId}': {ex}", processId, ex);
+ }
+ }
+ await Kubernetes!.BatchV1.DeleteNamespacedJobAsync(name, @namespace, new()
+ {
+ GracePeriodSeconds = 0,
+ PropagationPolicy = "Foreground",
+ IgnoreStoreReadErrorWithClusterBreakingPotential = true,
+ }, cancellationToken: cancellationToken).ConfigureAwait(false);
+ Processes.TryRemove(processId, out _);
+ Logger.LogDebug("The Kubernetes process with id '{processId}' has been successfully deleted", processId);
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError("An error occurred while deleting the Kubernetes process with id '{processId}': {ex}", processId, ex);
+ throw;
+ }
+ }
+
///
protected override ValueTask DisposeAsync(bool disposing)
{
diff --git a/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesWorkflowProcess.cs b/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesWorkflowProcess.cs
index 5d57bbc51..7469028b9 100644
--- a/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesWorkflowProcess.cs
+++ b/src/runtime/Synapse.Runtime.Kubernetes/Services/KubernetesWorkflowProcess.cs
@@ -22,22 +22,27 @@ namespace Synapse.Runtime.Kubernetes.Services;
///
/// Represents the Kubernetes implementation of the interface
///
-/// The associated with the
+/// The associated with the
/// The service used to perform logging
/// The service used to interact with the Kubernetes API
-public class KubernetesWorkflowProcess(V1Pod pod, ILogger logger, IKubernetes kubernetes)
+public class KubernetesWorkflowProcess(V1PodTemplateSpec pod, ILogger logger, IKubernetes kubernetes)
: WorkflowProcessBase
{
long? _exitCode;
///
- public override string Id => $"{this.Pod.Name()}.{this.Pod.Namespace()}";
+ public override string Id => $"{this.PodTemplate.Name()}.{this.PodTemplate.Namespace()}";
///
- /// Gets the the container belongs to
+ /// Gets the used to configure the Kubernetes pod
///
- protected V1Pod Pod { get; set; } = pod;
+ protected V1PodTemplateSpec PodTemplate { get; } = pod;
+
+ ///
+ /// Gets the Kubernetes associated with the
+ ///
+ protected V1Job? Job { get; set; }
///
/// Gets the service used to perform logging
@@ -78,13 +83,30 @@ public override async Task StartAsync(CancellationToken cancellationToken = defa
{
try
{
- this.Logger.LogDebug("Creating pod '{pod}'...", $"{this.Pod.Name()}.{this.Pod.Namespace()}");
- this.Pod = await this.Kubernetes.CoreV1.CreateNamespacedPodAsync(this.Pod, this.Pod.Namespace(), cancellationToken: cancellationToken);
- this.Logger.LogDebug("The pod '{pod}' has been successfully created", $"{this.Pod.Name()}.{this.Pod.Namespace()}");
+ this.Logger.LogDebug("Creating pod '{pod}'...", $"{this.PodTemplate.Name()}.{this.PodTemplate.Namespace()}");
+ this.Job = await this.Kubernetes.BatchV1.CreateNamespacedJobAsync(new()
+ {
+ Metadata = new V1ObjectMeta()
+ {
+ Name = this.PodTemplate.Name(),
+ NamespaceProperty = this.PodTemplate.Namespace(),
+ Labels = this.PodTemplate.Metadata?.Labels ?? new Dictionary()
+ },
+ Spec = new V1JobSpec()
+ {
+ Template = PodTemplate,
+ BackoffLimit = 0,
+ Completions = 1,
+ Parallelism = 1,
+ TtlSecondsAfterFinished = 7 * 24 * 60 * 60
+ }
+ }, this.PodTemplate.Namespace(), cancellationToken: cancellationToken);
+ this.Job.Spec.Template.Spec.RestartPolicy = "Never";
+ this.Logger.LogDebug("The job '{job}' has been successfully created", $"{this.PodTemplate.Name()}.{this.PodTemplate.Namespace()}");
}
catch(Exception ex)
{
- this.Logger.LogError("An error occurred while creating the specified pod '{pod}': {ex}", $"{this.Pod.Name()}.{this.Pod.Namespace()}", ex);
+ this.Logger.LogError("An error occurred while creating the specified job '{job}': {ex}", $"{this.PodTemplate.Name()}.{this.PodTemplate.Namespace()}", ex);
}
_ = Task.Run(() => this.ReadPodLogsAsync(cancellationToken), cancellationToken);
}
@@ -97,7 +119,7 @@ public override async Task StartAsync(CancellationToken cancellationToken = defa
protected virtual async Task ReadPodLogsAsync(CancellationToken cancellationToken = default)
{
await this.WaitForReadyAsync(cancellationToken);
- var logStream = await this.Kubernetes.CoreV1.ReadNamespacedPodLogAsync(this.Pod.Name(), this.Pod.Namespace(), cancellationToken: cancellationToken).ConfigureAwait(false);
+ var logStream = await this.Kubernetes.CoreV1.ReadNamespacedPodLogAsync(this.PodTemplate.Name(), this.PodTemplate.Namespace(), cancellationToken: cancellationToken).ConfigureAwait(false);
using var reader = new StreamReader(logStream);
string? line;
while ((line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null) this.StandardOutputSubject.OnNext(line);
@@ -110,20 +132,23 @@ protected virtual async Task ReadPodLogsAsync(CancellationToken cancellationToke
/// A new awaitable
protected virtual async Task WaitForReadyAsync(CancellationToken cancellationToken = default)
{
- this.Logger.LogDebug("Waiting for pod '{pod}'...", $"{this.Pod.Name()}.{this.Pod.Namespace()}");
- this.Pod = await this.Kubernetes.CoreV1.ReadNamespacedPodAsync(this.Pod.Name(), this.Pod.Namespace(), cancellationToken: cancellationToken);
- while (this.Pod.Status.Phase == "Pending")
+ this.Logger.LogDebug("Waiting for job '{job}'...", $"{this.Job.Name()}.{this.Job.Namespace()}");
+ V1Job? job;
+ do
{
+ job = await this.Kubernetes.BatchV1.ReadNamespacedJobAsync(this.Job.Name(), this.Job.Namespace(), cancellationToken: cancellationToken);
+ var jobStatus = job?.Status;
+ if (jobStatus != null && (jobStatus.Active > 0 || jobStatus.Succeeded > 0 || jobStatus.Failed > 0)) break;
await Task.Delay(100, cancellationToken).ConfigureAwait(false);
- this.Pod = await this.Kubernetes.CoreV1.ReadNamespacedPodAsync(this.Pod.Name(), this.Pod.Namespace(), cancellationToken: cancellationToken);
}
- this.Logger.LogDebug("The pod '{pod}' is up and running", $"{this.Pod.Name()}.{this.Pod.Namespace()}");
+ while (!cancellationToken.IsCancellationRequested);
+ this.Logger.LogDebug("The job '{job}' is up and running", $"{this.Job.Name()}.{this.Job.Namespace()}");
}
///
public virtual async Task WaitForExitAsync(CancellationToken cancellationToken = default)
{
- var response = this.Kubernetes.CoreV1.ListNamespacedPodWithHttpMessagesAsync(this.Pod.Namespace(), fieldSelector: $"metadata.name={Pod.Name()}", watch: true, cancellationToken: cancellationToken);
+ var response = this.Kubernetes.CoreV1.ListNamespacedPodWithHttpMessagesAsync(this.PodTemplate.Namespace(), fieldSelector: $"metadata.name={PodTemplate.Name()}", watch: true, cancellationToken: cancellationToken);
await foreach (var (_, item) in response.WatchAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
{
if (item.Status.Phase != "Succeeded" && item.Status.Phase != "Failed") continue;
@@ -136,7 +161,7 @@ public virtual async Task WaitForExitAsync(CancellationToken cancellationToken =
///
public override async Task StopAsync(CancellationToken cancellationToken = default)
{
- await this.Kubernetes.CoreV1.DeleteNamespacedPodAsync(this.Pod.Name(), this.Pod.Namespace(), cancellationToken: cancellationToken).ConfigureAwait(false);
+ await this.Kubernetes.BatchV1.DeleteNamespacedJobAsync(this.PodTemplate.Name(), this.PodTemplate.Namespace(), cancellationToken: cancellationToken).ConfigureAwait(false);
await this.CancellationTokenSource.CancelAsync().ConfigureAwait(false);
}
diff --git a/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs b/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs
index 556b0d052..7309c26b2 100644
--- a/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs
+++ b/src/runtime/Synapse.Runtime.Native/Services/NativeRuntime.cs
@@ -12,7 +12,6 @@
// limitations under the License.
using Neuroglia.Data.Infrastructure.ResourceOriented;
-using System.ComponentModel;
namespace Synapse.Runtime.Services;
@@ -89,6 +88,13 @@ public override Task CreateProcessAsync(Workflow workflow, Wor
return Task.FromResult(this.Processes.AddOrUpdate(workflowInstance.GetQualifiedName(), new NativeProcess(process), (key, current) => current));
}
+ ///
+ public override async Task DeleteProcessAsync(string processId, CancellationToken cancellationToken = default)
+ {
+ ArgumentException.ThrowIfNullOrWhiteSpace(processId);
+ if (this.Processes.TryRemove(processId, out var process)) await process.DisposeAsync().ConfigureAwait(false);
+ }
+
///
/// Handles the exit of a
///