Skip to content

Resets ActionBlock if faulted or cancelled. #385

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Api/Storage/Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public TimeSpan Elapsed
public string? CalledAeTitle { get => Files.OfType<DicomFileStorageMetadata>().Select(p => p.CalledAeTitle).FirstOrDefault(); }

public int FilesUploaded { get => Files.Count(p => p.IsUploaded); }

public int FilesFailedToUpload { get => Files.Count(p => p.IsUploadFailed); }

public Payload(string key, string correlationId, uint timeout)
Expand Down
10 changes: 10 additions & 0 deletions src/Common/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,15 @@ public static async Task<bool> Post<TInput>(this ActionBlock<TInput> actionBlock
await Task.Delay(delay).ConfigureAwait(false);
return actionBlock.Post(input);
}

/// <summary>
/// Checks if a given task is faulted or cancelled.
/// </summary>
/// <param name="task">The task object</param>
/// <returns>True if canceled or faulted. False otherwise.</returns>
public static bool IsCanceledOrFaulted(this Task task)
{
return task.IsCanceled || task.IsFaulted;
}
}
}
2 changes: 1 addition & 1 deletion src/Configuration/DicomWebConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DicomWebConfiguration
/// Gets or sets the maximum number of simultaneous DICOMweb connections.
/// </summary>
[ConfigurationKeyName("maximumNumberOfConnections")]
public int MaximumNumberOfConnection { get; set; } = 2;
public ushort MaximumNumberOfConnection { get; set; } = 2;

/// <summary>
/// Gets or set the maximum allowed file size in bytes with default to 2GiB.
Expand Down
2 changes: 1 addition & 1 deletion src/Configuration/ScuConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ScuConfiguration
/// Gets or sets the maximum number of simultaneous DICOM associations for the SCU service.
/// </summary>
[ConfigurationKeyName("maximumNumberOfAssociations")]
public int MaximumNumberOfAssociations { get; set; } = 8;
public ushort MaximumNumberOfAssociations { get; set; } = 8;

public ScuConfiguration()
{
Expand Down
50 changes: 50 additions & 0 deletions src/InformaticsGateway/Common/PostPayloadException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System;
using System.Runtime.Serialization;
using Monai.Deploy.InformaticsGateway.Api.Storage;

namespace Monai.Deploy.InformaticsGateway.Common
{
internal class PostPayloadException : Exception
{
public Payload.PayloadState TargetQueue { get; }
public Payload Payload { get; }

public PostPayloadException()
{
}

public PostPayloadException(Api.Storage.Payload.PayloadState targetState, Payload payload)
{
TargetQueue = targetState;
Payload = payload;
}

public PostPayloadException(string message) : base(message)
{
}

public PostPayloadException(string message, Exception innerException) : base(message, innerException)
{
}

protected PostPayloadException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}
}
}
6 changes: 3 additions & 3 deletions src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 MONAI Consortium
* Copyright 2022-2023 MONAI Consortium
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,8 +33,8 @@ public static partial class Log
[LoggerMessage(EventId = 3004, Level = LogLevel.Trace, Message = "Number of incomplete payloads waiting for processing: {count}.")]
public static partial void BucketsActive(this ILogger logger, int count);

[LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {failedFiles} failures out of {totalNumberOfFiles}.")]
public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int failedFiles);
[LoggerMessage(EventId = 3005, Level = LogLevel.Trace, Message = "Checking elapsed time for bucket: {key} with timeout set to {timeout}s. Elapsed {elapsed}s with {succeededFiles} uplaoded and {failedFiles} failures out of {totalNumberOfFiles}.")]
public static partial void BucketElapsedTime(this ILogger logger, string key, uint timeout, double elapsed, int totalNumberOfFiles, int succeededFiles, int failedFiles);

[LoggerMessage(EventId = 3007, Level = LogLevel.Information, Message = "Bucket {key} sent to processing queue with {count} files.")]
public static partial void BucketReady(this ILogger logger, string key, int count);
Expand Down
6 changes: 6 additions & 0 deletions src/InformaticsGateway/Logging/Log.500.ExportService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,11 @@ public static partial class Log

[LoggerMessage(EventId = 533, Level = LogLevel.Error, Message = "Recovering messaging service connection due to {reason}.")]
public static partial void MessagingServiceErrorRecover(this ILogger logger, string reason);

[LoggerMessage(EventId = 534, Level = LogLevel.Error, Message = "Error posting export job for processing correlation ID {correlationId}, export task ID {exportTaskId}.")]
public static partial void ErrorPostingExportJobToQueue(this ILogger logger, string correlationId, string exportTaskId);

[LoggerMessage(EventId = 535, Level = LogLevel.Warning, Message = "Exceeded maximum number of worker in {serviceName}: {count}.")]
public static partial void ExceededMaxmimumNumberOfWorkers(this ILogger logger, string serviceName, ulong count);
}
}
12 changes: 12 additions & 0 deletions src/InformaticsGateway/Logging/Log.700.PayloadService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,17 @@ public static partial class Log

[LoggerMessage(EventId = 743, Level = LogLevel.Error, Message = "Exception moving payload.")]
public static partial void PayloadMoveException(this ILogger logger, Exception ex);

[LoggerMessage(EventId = 744, Level = LogLevel.Warning, Message = "PayloadNotification move payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")]
public static partial void MoveQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled);

[LoggerMessage(EventId = 745, Level = LogLevel.Warning, Message = "PayloadNotification publishing payload queue: faulted: {isFauled}, cancelled: {isCancelled}.")]
public static partial void PublishQueueFaulted(this ILogger logger, bool isFauled, bool isCancelled);

[LoggerMessage(EventId = 746, Level = LogLevel.Error, Message = "Error posting payload to move queue.")]
public static partial void ErrorPostingJobToMovePayloadsQueue(this ILogger logger);

[LoggerMessage(EventId = 747, Level = LogLevel.Error, Message = "Error posting payload to publish queue.")]
public static partial void ErrorPostingJobToPublishPayloadsQueue(this ILogger logger);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private async void OnTimedEvent(Object source, System.Timers.ElapsedEventArgs e)
var payload = await _payloads[key].Task.ConfigureAwait(false);
using var loggerScope = _logger.BeginScope(new LoggingDataDictionary<string, object> { { "CorrelationId", payload.CorrelationId } });

_logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesFailedToUpload);
_logger.BucketElapsedTime(key, payload.Timeout, payload.ElapsedTime().TotalSeconds, payload.Files.Count, payload.FilesUploaded, payload.FilesFailedToUpload);
// Wait for timer window closes before sending payload for processing
if (payload.HasTimedOut)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public async Task MoveFilesAsync(Payload payload, ActionBlock<Payload> moveQueue
var action = await UpdatePayloadState(payload, ex, cancellationToken).ConfigureAwait(false);
if (action == PayloadAction.Updated)
{
await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false);
if (!await moveQueue.Post(payload, _options.Value.Storage.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false))
{
throw new PostPayloadException(Payload.PayloadState.Move, payload);
}
}
}
finally
Expand All @@ -111,7 +114,11 @@ private async Task NotifyIfCompleted(Payload payload, ActionBlock<Payload> notif
await repository.UpdateAsync(payload, cancellationToken).ConfigureAwait(false);
_logger.PayloadSaved(payload.PayloadId);

notificationQueue.Post(payload);
if (!notificationQueue.Post(payload))
{
throw new PostPayloadException(Payload.PayloadState.Notify, payload);
}

_logger.PayloadReadyToBePublished(payload.PayloadId);
}
else // we should never hit this else block.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public async Task NotifyAsync(Payload payload, ActionBlock<Payload> notification
var action = await UpdatePayloadState(payload, cancellationToken).ConfigureAwait(false);
if (action == PayloadAction.Updated)
{
await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false);
_logger.FailedToPublishWorkflowRequest(payload.PayloadId, ex);
if (!await notificationQueue.Post(payload, _options.Value.Messaging.Retries.RetryDelays.ElementAt(payload.RetryCount - 1)).ConfigureAwait(false))
{
throw new PostPayloadException(Payload.PayloadState.Notify, payload);
}
}
}
}
Expand Down
127 changes: 103 additions & 24 deletions src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,38 @@ public PayloadNotificationService(IServiceScopeFactory serviceScopeFactory,
_cancellationTokenSource = new CancellationTokenSource();
}

public async Task StartAsync(CancellationToken cancellationToken)
public Task StartAsync(CancellationToken cancellationToken)
{
_moveFileQueue = new ActionBlock<Payload>(
MoveActionHandler,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
MaxMessagesPerTask = 1,
CancellationToken = cancellationToken
});
SetupQueues(cancellationToken);

var task = Task.Run(async () =>
{
await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false);
BackgroundProcessing(cancellationToken);
}, CancellationToken.None);

Status = ServiceStatus.Running;
_logger.ServiceStarted(ServiceName);

if (task.IsCompleted)
return task;

return Task.CompletedTask;
}

private void SetupQueues(CancellationToken cancellationToken)
{
ResetMoveQueue(cancellationToken);
ResetPublishQueue(cancellationToken);
}

private void ResetPublishQueue(CancellationToken cancellationToken)
{
if (_publishQueue is not null)
{
_logger.PublishQueueFaulted(_publishQueue.Completion.IsFaulted, _publishQueue.Completion.IsCanceled);
_publishQueue.Complete();
}

_publishQueue = new ActionBlock<Payload>(
NotificationHandler,
Expand All @@ -107,21 +129,24 @@ public async Task StartAsync(CancellationToken cancellationToken)
MaxMessagesPerTask = 1,
CancellationToken = cancellationToken
});
}

await RestoreFromDatabaseAsync(cancellationToken).ConfigureAwait(false);

var task = Task.Run(() =>
private void ResetMoveQueue(CancellationToken cancellationToken)
{
if (_moveFileQueue is not null)
{
BackgroundProcessing(cancellationToken);
}, CancellationToken.None);

Status = ServiceStatus.Running;
_logger.ServiceStarted(ServiceName);

if (task.IsCompleted)
await task.ConfigureAwait(false);
_logger.MoveQueueFaulted(_moveFileQueue.Completion.IsFaulted, _moveFileQueue.Completion.IsCanceled);
_moveFileQueue.Complete();
}

await Task.CompletedTask.ConfigureAwait(false);
_moveFileQueue = new ActionBlock<Payload>(
MoveActionHandler,
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = _options.Value.Storage.PayloadProcessThreads,
MaxMessagesPerTask = 1,
CancellationToken = cancellationToken
});
}

private async Task NotificationHandler(Payload payload)
Expand All @@ -134,6 +159,10 @@ private async Task NotificationHandler(Payload payload)
{
await _payloadNotificationActionHandler.NotifyAsync(payload, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (PostPayloadException ex)
{
HandlePostPayloadException(ex);
}
catch (Exception ex)
{
if (ex is PayloadNotifyException payloadMoveException &&
Expand All @@ -158,6 +187,10 @@ private async Task MoveActionHandler(Payload payload)
{
await _payloadMoveActionHandler.MoveFilesAsync(payload, _moveFileQueue, _publishQueue, _cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (PostPayloadException ex)
{
HandlePostPayloadException(ex);
}
catch (Exception ex)
{
if (ex is PayloadNotifyException payloadMoveException &&
Expand All @@ -172,17 +205,45 @@ private async Task MoveActionHandler(Payload payload)
}
}

private void HandlePostPayloadException(PostPayloadException ex)
{
Guard.Against.Null(ex);

if (ex.TargetQueue == Payload.PayloadState.Move)
{
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, CancellationToken.None);
if (!_moveFileQueue.Post(ex.Payload))
{
_logger.ErrorPostingJobToMovePayloadsQueue();
}
}
else if (ex.TargetQueue == Payload.PayloadState.Notify)
{
ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, CancellationToken.None);
if (!_publishQueue.Post(ex.Payload))
{
_logger.ErrorPostingJobToPublishPayloadsQueue();
}
}
}

private void BackgroundProcessing(CancellationToken cancellationToken)
{
_logger.ServiceRunning(ServiceName);

while (!cancellationToken.IsCancellationRequested)
{
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken);
ResetIfFaultedOrCancelled(_publishQueue, ResetPublishQueue, cancellationToken);

Payload payload = null;
try
{
payload = _payloadAssembler.Dequeue(cancellationToken);
_moveFileQueue.Post(payload);
while (!_moveFileQueue.Post(payload))
{
ResetIfFaultedOrCancelled(_moveFileQueue, ResetMoveQueue, cancellationToken);
}
_logger.PayloadQueuedForProcessing(payload.PayloadId, ServiceName);
}
catch (OperationCanceledException ex)
Expand All @@ -202,6 +263,18 @@ private void BackgroundProcessing(CancellationToken cancellationToken)
_logger.ServiceCancelled(ServiceName);
}

private static void ResetIfFaultedOrCancelled(ActionBlock<Payload> queue, Action<CancellationToken> resetFunction, CancellationToken cancellationToken)
{
Guard.Against.Null(queue);
Guard.Against.Null(resetFunction);

if (queue.Completion.IsCanceledOrFaulted())
{
resetFunction(cancellationToken);
}
}


private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken)
{
_logger.StartupRestoreFromDatabase();
Expand All @@ -214,11 +287,17 @@ private async Task RestoreFromDatabaseAsync(CancellationToken cancellationToken)
{
if (payload.State == Payload.PayloadState.Move)
{
_moveFileQueue.Post(payload);
if (!_moveFileQueue.Post(payload))
{
_logger.ErrorPostingJobToMovePayloadsQueue();
}
}
else if (payload.State == Payload.PayloadState.Notify)
{
_publishQueue.Post(payload);
if (!_publishQueue.Post(payload))
{
_logger.ErrorPostingJobToPublishPayloadsQueue();
}
}
}
_logger.RestoredFromDatabase(payloads.Count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ internal class DicomWebExportService : ExportServiceBase
private readonly IOptions<InformaticsGatewayConfiguration> _configuration;
private readonly IDicomToolkit _dicomToolkit;

protected override int Concurrency { get; }
protected override ushort Concurrency { get; }
public override string RoutingKey { get; }
public override string ServiceName => "DICOMweb Export Service";

Expand Down
Loading