diff --git a/src/Api/Monai.Deploy.InformaticsGateway.Api.csproj b/src/Api/Monai.Deploy.InformaticsGateway.Api.csproj index ade970aad..967375a51 100644 --- a/src/Api/Monai.Deploy.InformaticsGateway.Api.csproj +++ b/src/Api/Monai.Deploy.InformaticsGateway.Api.csproj @@ -30,7 +30,7 @@ - + diff --git a/src/Configuration/Monai.Deploy.InformaticsGateway.Configuration.csproj b/src/Configuration/Monai.Deploy.InformaticsGateway.Configuration.csproj index 2ca1f6785..9e1c4b0ba 100644 --- a/src/Configuration/Monai.Deploy.InformaticsGateway.Configuration.csproj +++ b/src/Configuration/Monai.Deploy.InformaticsGateway.Configuration.csproj @@ -30,7 +30,7 @@ - + diff --git a/src/Database/DestinationApplicationEntityConfiguration.cs b/src/Database/DestinationApplicationEntityConfiguration.cs index 80a9f3ccb..a1a131e70 100644 --- a/src/Database/DestinationApplicationEntityConfiguration.cs +++ b/src/Database/DestinationApplicationEntityConfiguration.cs @@ -29,6 +29,9 @@ public void Configure(EntityTypeBuilder builder) builder.Property(j => j.AeTitle).IsRequired(); builder.Property(j => j.Port).IsRequired(); builder.Property(j => j.HostIp).IsRequired(); + + builder.HasIndex(p => p.Name, "idx_destination_name").IsUnique(); + builder.HasIndex(p => new { p.Name, p.AeTitle, p.HostIp, p.Port }, "idx_source_all").IsUnique(); } } } diff --git a/src/Database/InferenceRequestConfiguration.cs b/src/Database/InferenceRequestConfiguration.cs index 87681b112..fd740b7d3 100644 --- a/src/Database/InferenceRequestConfiguration.cs +++ b/src/Database/InferenceRequestConfiguration.cs @@ -70,6 +70,10 @@ public void Configure(EntityTypeBuilder builder) builder.Property(j => j.TryCount).IsRequired(); builder.Ignore(p => p.Application); + + builder.HasIndex(p => p.State, "idx_inferencerequest_state"); + builder.HasIndex(p => p.InferenceRequestId, "idx_inferencerequest_inferencerequestid").IsUnique(); + builder.HasIndex(p => p.TransactionId, "idx_inferencerequest_transactionid").IsUnique(); } } } diff --git a/src/Database/Migrations/20220802200605_R3_0.3.0.Designer.cs b/src/Database/Migrations/20220802200605_R3_0.3.0.Designer.cs index 01b3cb57b..824553cd6 100644 --- a/src/Database/Migrations/20220802200605_R3_0.3.0.Designer.cs +++ b/src/Database/Migrations/20220802200605_R3_0.3.0.Designer.cs @@ -199,4 +199,4 @@ protected override void BuildTargetModel(ModelBuilder modelBuilder) #pragma warning restore 612, 618 } } -} +} \ No newline at end of file diff --git a/src/Database/Migrations/20220802200605_R3_0.3.0.cs b/src/Database/Migrations/20220802200605_R3_0.3.0.cs index c64cbb6f7..fca7a122c 100644 --- a/src/Database/Migrations/20220802200605_R3_0.3.0.cs +++ b/src/Database/Migrations/20220802200605_R3_0.3.0.cs @@ -1,4 +1,4 @@ -/* +/* * Copyright 2022 MONAI Consortium * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -57,4 +57,4 @@ protected override void Down(MigrationBuilder migrationBuilder) defaultValue: ""); } } -} +} \ No newline at end of file diff --git a/src/Database/Migrations/20221010184458_R3_0.3.2.Designer.cs b/src/Database/Migrations/20221010184458_R3_0.3.2.Designer.cs new file mode 100644 index 000000000..e3ff92b16 --- /dev/null +++ b/src/Database/Migrations/20221010184458_R3_0.3.2.Designer.cs @@ -0,0 +1,221 @@ +// +using System; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Monai.Deploy.InformaticsGateway.Database; + +#nullable disable + +namespace Monai.Deploy.InformaticsGateway.Database.Migrations +{ + [DbContext(typeof(InformaticsGatewayContext))] + [Migration("20221010184458_R3_0.3.2")] + partial class R3_032 + { + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder.HasAnnotation("ProductVersion", "6.0.9"); + + modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Api.DestinationApplicationEntity", b => + { + b.Property("Name") + .HasColumnType("TEXT"); + + b.Property("AeTitle") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("HostIp") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("Port") + .HasColumnType("INTEGER"); + + b.HasKey("Name"); + + b.HasIndex(new[] { "Name" }, "idx_destination_name") + .IsUnique(); + + b.HasIndex(new[] { "Name", "AeTitle", "HostIp", "Port" }, "idx_source_all") + .IsUnique(); + + b.ToTable("DestinationApplicationEntities"); + }); + + modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Api.MonaiApplicationEntity", b => + { + b.Property("Name") + .HasColumnType("TEXT") + .HasColumnOrder(0); + + b.Property("AeTitle") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("AllowedSopClasses") + .HasColumnType("TEXT"); + + b.Property("Grouping") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("IgnoredSopClasses") + .HasColumnType("TEXT"); + + b.Property("Timeout") + .HasColumnType("INTEGER"); + + b.Property("Workflows") + .HasColumnType("TEXT"); + + b.HasKey("Name"); + + b.HasIndex(new[] { "Name" }, "idx_monaiae_name") + .IsUnique(); + + b.ToTable("MonaiApplicationEntities"); + }); + + modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Api.Rest.InferenceRequest", b => + { + b.Property("InferenceRequestId") + .ValueGeneratedOnAdd() + .HasColumnType("TEXT"); + + b.Property("InputMetadata") + .HasColumnType("TEXT"); + + b.Property("InputResources") + .HasColumnType("TEXT"); + + b.Property("OutputResources") + .HasColumnType("TEXT"); + + b.Property("Priority") + .HasColumnType("INTEGER"); + + b.Property("State") + .HasColumnType("INTEGER"); + + b.Property("Status") + .HasColumnType("INTEGER"); + + b.Property("TransactionId") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("TryCount") + .HasColumnType("INTEGER"); + + b.HasKey("InferenceRequestId"); + + b.HasIndex(new[] { "InferenceRequestId" }, "idx_inferencerequest_inferencerequestid") + .IsUnique(); + + b.HasIndex(new[] { "State" }, "idx_inferencerequest_state"); + + b.HasIndex(new[] { "TransactionId" }, "idx_inferencerequest_transactionid") + .IsUnique(); + + b.ToTable("InferenceRequest"); + }); + + modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Api.SourceApplicationEntity", b => + { + b.Property("Name") + .HasColumnType("TEXT"); + + b.Property("AeTitle") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("HostIp") + .IsRequired() + .HasColumnType("TEXT"); + + b.HasKey("Name"); + + b.HasIndex(new[] { "Name", "AeTitle", "HostIp" }, "idx_source_all") + .IsUnique() + .HasDatabaseName("idx_source_all1"); + + b.HasIndex(new[] { "Name" }, "idx_source_name") + .IsUnique(); + + b.ToTable("SourceApplicationEntities"); + }); + + modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Api.Storage.Payload", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("TEXT"); + + b.Property("CorrelationId") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("DateTimeCreated") + .HasColumnType("TEXT"); + + b.Property("Files") + .HasColumnType("TEXT"); + + b.Property("Key") + .IsRequired() + .HasColumnType("TEXT"); + + b.Property("RetryCount") + .HasColumnType("INTEGER"); + + b.Property("State") + .HasColumnType("INTEGER"); + + b.Property("Timeout") + .HasColumnType("INTEGER"); + + b.HasKey("Id"); + + b.HasIndex(new[] { "CorrelationId", "Id" }, "idx_payload_ids") + .IsUnique(); + + b.HasIndex(new[] { "State" }, "idx_payload_state"); + + b.ToTable("Payload"); + }); + + modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Database.StorageMetadataWrapper", b => + { + b.Property("CorrelationId") + .HasColumnType("TEXT"); + + b.Property("Identity") + .HasColumnType("TEXT"); + + b.Property("IsUploaded") + .HasColumnType("INTEGER"); + + b.Property("TypeName") + .HasColumnType("TEXT"); + + b.Property("Value") + .HasColumnType("TEXT"); + + b.HasKey("CorrelationId", "Identity"); + + b.HasIndex(new[] { "CorrelationId" }, "idx_storagemetadata_correlation"); + + b.HasIndex(new[] { "CorrelationId", "Identity" }, "idx_storagemetadata_ids"); + + b.HasIndex(new[] { "IsUploaded" }, "idx_storagemetadata_uploaded"); + + b.ToTable("StorageMetadataWrapper"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/Database/Migrations/20221010184458_R3_0.3.2.cs b/src/Database/Migrations/20221010184458_R3_0.3.2.cs new file mode 100644 index 000000000..92ad58af7 --- /dev/null +++ b/src/Database/Migrations/20221010184458_R3_0.3.2.cs @@ -0,0 +1,19 @@ +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace Monai.Deploy.InformaticsGateway.Database.Migrations +{ + public partial class R3_032 : Migration + { + protected override void Up(MigrationBuilder migrationBuilder) + { + + } + + protected override void Down(MigrationBuilder migrationBuilder) + { + + } + } +} diff --git a/src/Database/Migrations/InformaticsGatewayContextModelSnapshot.cs b/src/Database/Migrations/InformaticsGatewayContextModelSnapshot.cs index 6b9847296..18a940cef 100644 --- a/src/Database/Migrations/InformaticsGatewayContextModelSnapshot.cs +++ b/src/Database/Migrations/InformaticsGatewayContextModelSnapshot.cs @@ -1,20 +1,4 @@ -/* - * Copyright 2022 MONAI Consortium - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// +// using System; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; @@ -31,7 +15,7 @@ partial class InformaticsGatewayContextModelSnapshot : ModelSnapshot protected override void BuildModel(ModelBuilder modelBuilder) { #pragma warning disable 612, 618 - modelBuilder.HasAnnotation("ProductVersion", "6.0.6"); + modelBuilder.HasAnnotation("ProductVersion", "6.0.9"); modelBuilder.Entity("Monai.Deploy.InformaticsGateway.Api.DestinationApplicationEntity", b => { @@ -51,6 +35,12 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Name"); + b.HasIndex(new[] { "Name" }, "idx_destination_name") + .IsUnique(); + + b.HasIndex(new[] { "Name", "AeTitle", "HostIp", "Port" }, "idx_source_all") + .IsUnique(); + b.ToTable("DestinationApplicationEntities"); }); @@ -82,6 +72,9 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Name"); + b.HasIndex(new[] { "Name" }, "idx_monaiae_name") + .IsUnique(); + b.ToTable("MonaiApplicationEntities"); }); @@ -118,6 +111,14 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("InferenceRequestId"); + b.HasIndex(new[] { "InferenceRequestId" }, "idx_inferencerequest_inferencerequestid") + .IsUnique(); + + b.HasIndex(new[] { "State" }, "idx_inferencerequest_state"); + + b.HasIndex(new[] { "TransactionId" }, "idx_inferencerequest_transactionid") + .IsUnique(); + b.ToTable("InferenceRequest"); }); @@ -136,6 +137,13 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Name"); + b.HasIndex(new[] { "Name", "AeTitle", "HostIp" }, "idx_source_all") + .IsUnique() + .HasDatabaseName("idx_source_all1"); + + b.HasIndex(new[] { "Name" }, "idx_source_name") + .IsUnique(); + b.ToTable("SourceApplicationEntities"); }); @@ -170,6 +178,11 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("Id"); + b.HasIndex(new[] { "CorrelationId", "Id" }, "idx_payload_ids") + .IsUnique(); + + b.HasIndex(new[] { "State" }, "idx_payload_state"); + b.ToTable("Payload"); }); @@ -192,6 +205,12 @@ protected override void BuildModel(ModelBuilder modelBuilder) b.HasKey("CorrelationId", "Identity"); + b.HasIndex(new[] { "CorrelationId" }, "idx_storagemetadata_correlation"); + + b.HasIndex(new[] { "CorrelationId", "Identity" }, "idx_storagemetadata_ids"); + + b.HasIndex(new[] { "IsUploaded" }, "idx_storagemetadata_uploaded"); + b.ToTable("StorageMetadataWrapper"); }); #pragma warning restore 612, 618 diff --git a/src/Database/MonaiApplicationEntityConfiguration.cs b/src/Database/MonaiApplicationEntityConfiguration.cs index 7371cd1a6..d04189f6e 100644 --- a/src/Database/MonaiApplicationEntityConfiguration.cs +++ b/src/Database/MonaiApplicationEntityConfiguration.cs @@ -61,6 +61,8 @@ public void Configure(EntityTypeBuilder builder) v => JsonSerializer.Serialize(v, jsonSerializerSettings), v => JsonSerializer.Deserialize>(v, jsonSerializerSettings)) .Metadata.SetValueComparer(valueComparer); + + builder.HasIndex(p => p.Name, "idx_monaiae_name").IsUnique(); } } } diff --git a/src/Database/PayloadConfiguration.cs b/src/Database/PayloadConfiguration.cs index b40c9e7c1..f319d43ae 100644 --- a/src/Database/PayloadConfiguration.cs +++ b/src/Database/PayloadConfiguration.cs @@ -59,6 +59,9 @@ public void Configure(EntityTypeBuilder builder) builder.Ignore(j => j.HasTimedOut); builder.Ignore(j => j.Elapsed); builder.Ignore(j => j.Count); + + builder.HasIndex(p => p.State, "idx_payload_state"); + builder.HasIndex(p => new { p.CorrelationId, p.Id }, "idx_payload_ids").IsUnique(); } } } diff --git a/src/Database/SourceApplicationEntityConfiguration.cs b/src/Database/SourceApplicationEntityConfiguration.cs index b3af99489..ff7b5d160 100644 --- a/src/Database/SourceApplicationEntityConfiguration.cs +++ b/src/Database/SourceApplicationEntityConfiguration.cs @@ -27,6 +27,9 @@ public void Configure(Microsoft.EntityFrameworkCore.Metadata.Builders.EntityType builder.HasKey(j => j.Name); builder.Property(j => j.AeTitle).IsRequired(); builder.Property(j => j.HostIp).IsRequired(); + + builder.HasIndex(p => p.Name, "idx_source_name").IsUnique(); + builder.HasIndex(p => new { p.Name, p.AeTitle, p.HostIp }, "idx_source_all").IsUnique(); } } } diff --git a/src/Database/StorageMetadataWrapperEntityConfiguration.cs b/src/Database/StorageMetadataWrapperEntityConfiguration.cs index dbeb65c7d..f0620e17a 100644 --- a/src/Database/StorageMetadataWrapperEntityConfiguration.cs +++ b/src/Database/StorageMetadataWrapperEntityConfiguration.cs @@ -32,6 +32,10 @@ public void Configure(EntityTypeBuilder builder) builder.Property(j => j.CorrelationId); builder.Property(j => j.Value); builder.Property(j => j.TypeName); + + builder.HasIndex(p => new { p.CorrelationId, p.Identity }, "idx_storagemetadata_ids"); + builder.HasIndex(p => p.CorrelationId, "idx_storagemetadata_correlation"); + builder.HasIndex(p => p.IsUploaded, "idx_storagemetadata_uploaded"); } } } diff --git a/src/InformaticsGateway/Common/PayloadExtensions.cs b/src/InformaticsGateway/Common/PayloadExtensions.cs index 631ea5858..3733f7e94 100644 --- a/src/InformaticsGateway/Common/PayloadExtensions.cs +++ b/src/InformaticsGateway/Common/PayloadExtensions.cs @@ -98,31 +98,5 @@ await Policy .ConfigureAwait(false); } - public static async Task DeletePayloadStorageMetadataObjects(this Payload payload, IEnumerable retryDelays, ILogger logger, IStorageMetadataWrapperRepository repository) - { - Guard.Against.Null(payload, nameof(payload)); - Guard.Against.NullOrEmpty(retryDelays, nameof(retryDelays)); - Guard.Against.Null(logger, nameof(logger)); - Guard.Against.Null(repository, nameof(repository)); - - var retryPolicy = Policy - .Handle() - .WaitAndRetryAsync( - retryDelays, - (exception, timeSpan, retryCount, context) => - { - logger.ErrorDeletingPayloadAssociatedStorageMetadataObjects(payload.Id, timeSpan, retryCount, exception); - }); - - foreach (var metadata in payload.Files) - { - await retryPolicy.ExecuteAsync(async () => - { - await repository.DeleteAsync(metadata.CorrelationId, metadata.Id).ConfigureAwait(false); - logger.StorageMetadataObjectDeleted(metadata.Id); - }) - .ConfigureAwait(false); - } - } } } diff --git a/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs b/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs index 2cb300c84..9ad3443eb 100644 --- a/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs +++ b/src/InformaticsGateway/Logging/Log.3000.PayloadAssembler.cs @@ -21,14 +21,14 @@ namespace Monai.Deploy.InformaticsGateway.Logging { public static partial class Log { - [LoggerMessage(EventId = 3000, Level = LogLevel.Information, Message = "Restoring payloads from database.")] - public static partial void RestorePayloads(this ILogger logger); + [LoggerMessage(EventId = 3000, Level = LogLevel.Information, Message = "[Startup] Removing payloads from database.")] + public static partial void RemovingPendingPayloads(this ILogger logger); - [LoggerMessage(EventId = 3001, Level = LogLevel.Information, Message = "Payload {payloadId} restored from database.")] - public static partial void PayloadRestored(this ILogger logger, Guid payloadId); + [LoggerMessage(EventId = 3001, Level = LogLevel.Information, Message = "[Startup] Payload {payloadId} removed from database.")] + public static partial void PendingPayloadsRemoved(this ILogger logger, Guid payloadId); - [LoggerMessage(EventId = 3002, Level = LogLevel.Information, Message = "{count} payloads restored from database.")] - public static partial void TotalNumberOfPayloadsRestored(this ILogger logger, int count); + [LoggerMessage(EventId = 3002, Level = LogLevel.Information, Message = "[Startup] {count} payloads restored from database.")] + public static partial void TotalNumberOfPayloadsRemoved(this ILogger logger, int count); [LoggerMessage(EventId = 3003, Level = LogLevel.Information, Message = "File added to bucket {key}. Queue size: {count}")] public static partial void FileAddedToBucket(this ILogger logger, string key, int count); @@ -57,9 +57,6 @@ public static partial class Log [LoggerMessage(EventId = 3012, Level = LogLevel.Information, Message = "Bucket {key} created with timeout {timeout}s.")] public static partial void BucketCreated(this ILogger logger, string key, uint timeout); - [LoggerMessage(EventId = 3013, Level = LogLevel.Warning, Message = "Payload {payloadId} deleted at startup as data streams have been lost. Pleaes upload again.")] - public static partial void PayloadDeletedAtStartup(this ILogger logger, Guid payloadId); - [LoggerMessage(EventId = 3014, Level = LogLevel.Error, Message = "Payload deleted due to upload failure(s) {key}.")] public static partial void PayloadRemovedWithFailureUploads(this ILogger logger, string key); } diff --git a/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs b/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs index 702534340..292c02580 100644 --- a/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs +++ b/src/InformaticsGateway/Logging/Log.4000.ObjectUploadService.cs @@ -38,5 +38,14 @@ public static partial class Log [LoggerMessage(EventId = 4005, Level = LogLevel.Debug, Message = "Error uploading temporary store. Waiting {timeSpan} before next retry. Retry attempt {retryCount}.")] public static partial void ErrorUploadingFileToTemporaryStore(this ILogger logger, TimeSpan timespan, int retryCount, Exception ex); + + [LoggerMessage(EventId = 4006, Level = LogLevel.Information, Message = "File uploaded to temporary store at {filePath}.")] + public static partial void UploadedFileToTemporaryStore(this ILogger logger, string filePath); + + [LoggerMessage(EventId = 4007, Level = LogLevel.Information, Message = "Items in queue {count}.")] + public static partial void InstanceInUploadQueue(this ILogger logger, int count); + + [LoggerMessage(EventId = 4008, Level = LogLevel.Error, Message = "Unknown error occurred while uploading.")] + public static partial void ErrorUploading(this ILogger logger, Exception ex); } } diff --git a/src/InformaticsGateway/Monai.Deploy.InformaticsGateway.csproj b/src/InformaticsGateway/Monai.Deploy.InformaticsGateway.csproj index 814d2c8be..0b589c066 100644 --- a/src/InformaticsGateway/Monai.Deploy.InformaticsGateway.csproj +++ b/src/InformaticsGateway/Monai.Deploy.InformaticsGateway.csproj @@ -48,7 +48,7 @@ - + diff --git a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs index 9a0052ac8..e35d937d4 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadAssembler.cs @@ -61,7 +61,7 @@ public PayloadAssembler( _workItems = new BlockingCollection(); _payloads = new ConcurrentDictionary>(); - RestoreFromDatabase(); + RemovePendingPayloads(); _timer = new System.Timers.Timer(1000) { @@ -71,36 +71,22 @@ public PayloadAssembler( _timer.Enabled = true; } - private void RestoreFromDatabase() + private void RemovePendingPayloads() { - _logger.RestorePayloads(); + _logger.RemovingPendingPayloads(); var scope = _serviceScopeFactory.CreateScope(); var repository = scope.ServiceProvider.GetRequiredService>(); var payloads = repository.AsQueryable().Where(p => p.State == Payload.PayloadState.Created); - var restored = 0; + var removed = 0; foreach (var payload in payloads) { - if (!payload.IsUploadCompleted()) - { - // if there are any objects in a payload that is still pending upload, - // then it has to be dropped since objects are stored in the memory before uploading - // to the designated temporary location on the storage service. - - payload.DeletePayload(_options.Value.Storage.Retries.RetryDelays, _logger, repository).Wait(); - _logger.PayloadDeletedAtStartup(payload.Id); - continue; - } - - if (_payloads.TryAdd(payload.Key, new AsyncLazy(payload))) - { - _logger.PayloadRestored(payload.Id); - restored++; - } + payload.DeletePayload(_options.Value.Storage.Retries.RetryDelays, _logger, repository).Wait(); + _logger.PendingPayloadsRemoved(payload.Id); } - _logger.TotalNumberOfPayloadsRestored(restored); + _logger.TotalNumberOfPayloadsRemoved(removed); } /// diff --git a/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs b/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs index 0ddb5daad..28fd2b93a 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadNotificationActionHandler.cs @@ -73,7 +73,6 @@ public async Task NotifyAsync(Payload payload, ActionBlock notification { await NotifyPayloadReady(payload).ConfigureAwait(false); await DeletePayload(payload).ConfigureAwait(false); - await DeletePayloadStorageMetadataObjects(payload).ConfigureAwait(false); } catch (Exception ex) { @@ -87,14 +86,6 @@ public async Task NotifyAsync(Payload payload, ActionBlock notification } } - private async Task DeletePayloadStorageMetadataObjects(Payload payload) - { - Guard.Against.Null(payload, nameof(payload)); - var scope = _serviceScopeFactory.CreateScope(); - var repository = scope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IStorageMetadataWrapperRepository)); - await payload.DeletePayloadStorageMetadataObjects(_options.Value.Storage.Retries.RetryDelays, _logger, repository).ConfigureAwait(false); - } - private async Task DeletePayload(Payload payload) { Guard.Against.Null(payload, nameof(payload)); diff --git a/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs b/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs index b89fa9e92..eaef477cf 100644 --- a/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs +++ b/src/InformaticsGateway/Services/Connectors/PayloadNotificationService.cs @@ -182,11 +182,8 @@ private void BackgroundProcessing(CancellationToken cancellationToken) try { payload = _payloadAssembler.Dequeue(cancellationToken); - using (_logger.BeginScope(new LoggingDataDictionary { { "Payload", payload.Id }, { "Correlation ID", payload.CorrelationId } })) - { - _moveFileQueue.Post(payload); - _logger.PayloadQueuedForProcessing(payload.Id, ServiceName); - } + _moveFileQueue.Post(payload); + _logger.PayloadQueuedForProcessing(payload.Id, ServiceName); } catch (OperationCanceledException ex) { diff --git a/src/InformaticsGateway/Services/Storage/ObjectUploadQueue.cs b/src/InformaticsGateway/Services/Storage/ObjectUploadQueue.cs index 9c0a44c83..50cf597df 100644 --- a/src/InformaticsGateway/Services/Storage/ObjectUploadQueue.cs +++ b/src/InformaticsGateway/Services/Storage/ObjectUploadQueue.cs @@ -16,6 +16,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Threading; using Ardalis.GuardClauses; @@ -27,19 +28,19 @@ namespace Monai.Deploy.InformaticsGateway.Services.Storage { internal class ObjectUploadQueue : IObjectUploadQueue { - private readonly BlockingCollection _workItems; + private readonly ConcurrentQueue _workItems; private readonly ILogger _logger; public ObjectUploadQueue(ILogger logger) { - _workItems = new BlockingCollection(); + _workItems = new ConcurrentQueue(); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public void Queue(FileStorageMetadata file) { Guard.Against.Null(file, nameof(file)); - _workItems.Add(file); + _workItems.Enqueue(file); var process = Process.GetCurrentProcess(); @@ -48,7 +49,15 @@ public void Queue(FileStorageMetadata file) public FileStorageMetadata Dequeue(CancellationToken cancellationToken) { - return _workItems.Take(cancellationToken); + while (!cancellationToken.IsCancellationRequested) + { + if (_workItems.TryDequeue(out var reuslt)) + { + _logger.InstanceInUploadQueue(_workItems.Count); + return reuslt; + } + } + throw new OperationCanceledException("Cancellation requested."); } } } diff --git a/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs b/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs index f42c14b71..ba43f3563 100644 --- a/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs +++ b/src/InformaticsGateway/Services/Storage/ObjectUploadService.cs @@ -48,7 +48,6 @@ internal class ObjectUploadService : IHostedService, IMonaiService, IDisposable private readonly CancellationTokenSource _cancellationTokenSource; private readonly IOptions _configuration; private readonly IServiceScope _scope; - private ActionBlock _worker; private bool _disposedValue; public ServiceStatus Status { get; set; } = ServiceStatus.Unknown; @@ -68,62 +67,65 @@ public ObjectUploadService( _uplaodQueue = _scope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IObjectUploadQueue)); _storageService = _scope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IStorageService)); - RemovePendingUploadObjects(); } - /// - /// Removes all uploading pending objects from the database at startup since objects are lost upon service restart (crash). - /// - private void RemovePendingUploadObjects() + private async Task BackgroundProcessing(CancellationToken cancellationToken) { + _logger.ServiceRunning(ServiceName); + var tasks = new List(); try { - using var scope = _serviceScopeFactory.CreateScope(); - var repository = scope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IStorageMetadataWrapperRepository)); - repository.DeletePendingUploadsAsync(); + for (var i = 0; i < _configuration.Value.Storage.ConcurrentUploads; i++) + { + tasks.Add(Task.Run(async () => + { + await StartWorker(i, cancellationToken); + })); + } + + Task.WaitAll(tasks.ToArray()); + } + catch (ObjectDisposedException ex) + { + _logger.ServiceDisposed(ServiceName, ex); } catch (Exception ex) { - _logger.ErrorRemovingPendingUploadObjects(ex); + if (ex is InvalidOperationException || ex is OperationCanceledException) + { + _logger.ServiceInvalidOrCancelled(ServiceName, ex); + } } + Status = ServiceStatus.Cancelled; + _logger.ServiceCancelled(ServiceName); } - private void BackgroundProcessing(CancellationToken cancellationToken) + private async Task StartWorker(int thread, CancellationToken cancellationToken) { - _logger.ServiceRunning(ServiceName); while (!cancellationToken.IsCancellationRequested) { try { - _worker.Post(_uplaodQueue.Dequeue(cancellationToken)); + var item = _uplaodQueue.Dequeue(cancellationToken); + await ProcessObject(item); } - catch (ObjectDisposedException ex) + catch (OperationCanceledException ex) { - _logger.ServiceDisposed(ServiceName, ex); + _logger.ServiceCancelled(ServiceName); + break; } catch (Exception ex) { - if (ex is InvalidOperationException || ex is OperationCanceledException) - { - _logger.ServiceInvalidOrCancelled(ServiceName, ex); - } + _logger.ErrorUploading(ex); } } - Status = ServiceStatus.Cancelled; - _logger.ServiceCancelled(ServiceName); } public Task StartAsync(CancellationToken cancellationToken) { - var task = Task.Run(() => + var task = Task.Run(async () => { - _worker = new ActionBlock(ProcessObject, new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = _configuration.Value.Storage.ConcurrentUploads, - CancellationToken = cancellationToken, - }); - - BackgroundProcessing(cancellationToken); + await BackgroundProcessing(cancellationToken); }, CancellationToken.None); Status = ServiceStatus.Running; @@ -137,7 +139,6 @@ public Task StopAsync(CancellationToken cancellationToken) { _logger.ServiceStopping(ServiceName); _cancellationTokenSource.Cancel(); - _worker.Complete(); Status = ServiceStatus.Stopped; return Task.CompletedTask; } @@ -163,13 +164,10 @@ private async Task ProcessObject(FileStorageMetadata blob) } await UploadData(blob.Id, blob.File, blob.Source, blob.Workflows, _cancellationTokenSource.Token).ConfigureAwait(false); - await UpdateBlob(blob); } catch (Exception ex) { _logger.FailedToUploadFile(blob.Id, ex); - blob.SetFailed(); - await UpdateBlob(blob); } finally { @@ -178,13 +176,6 @@ private async Task ProcessObject(FileStorageMetadata blob) } } - private async Task UpdateBlob(FileStorageMetadata blob) - { - using var scope = _serviceScopeFactory.CreateScope(); - var repository = scope.ServiceProvider.GetService() ?? throw new ServiceNotFoundException(nameof(IStorageMetadataWrapperRepository)); - await repository.AddOrUpdateAsync(blob).ConfigureAwait(false); - } - private async Task UploadData(string identifier, StorageObjectMetadata storageObjectMetadata, string source, List workflows, CancellationToken cancellationToken) { Guard.Against.NullOrWhiteSpace(identifier, nameof(identifier)); diff --git a/src/InformaticsGateway/Test/Services/Connectors/PayloadAssemblerTest.cs b/src/InformaticsGateway/Test/Services/Connectors/PayloadAssemblerTest.cs index f09b7a058..cf1baecfc 100644 --- a/src/InformaticsGateway/Test/Services/Connectors/PayloadAssemblerTest.cs +++ b/src/InformaticsGateway/Test/Services/Connectors/PayloadAssemblerTest.cs @@ -89,23 +89,25 @@ public async Task GivenAFileStorageMetadata_WhenQueueingWihtoutSpecifyingATimeou } [RetryFact] - public async Task GivenFileStorageMetadataInTheDatabase_AtServiceStartup_ExpectFileStorageMetadataToBeRestoredFromTheDatabase() + public async Task GivenFileStorageMetadataInTheDatabase_AtServiceStartup_ExpectPayloadsInCreatedStateToBeRemoved() { var dataset = new List { - new Payload("created-test", Guid.NewGuid().ToString(), 10) { State = Payload.PayloadState.Created }, + new Payload("created-test1", Guid.NewGuid().ToString(), 10) { State = Payload.PayloadState.Created }, + new Payload("created-test2", Guid.NewGuid().ToString(), 10) { State = Payload.PayloadState.Created }, new Payload("upload-test", Guid.NewGuid().ToString(), 10) { State = Payload.PayloadState.Move }, new Payload("notify-test", Guid.NewGuid().ToString(), 10) { State = Payload.PayloadState.Notify }, }; _repository.Setup(p => p.AsQueryable()).Returns(dataset.AsQueryable()); + _repository.Setup(p => p.Remove(It.IsAny())); + var payloadAssembler = new PayloadAssembler(_options, _logger.Object, _serviceScopeFactory.Object); await Task.Delay(250); payloadAssembler.Dispose(); _cancellationTokenSource.Cancel(); - _logger.VerifyLogging($"Restoring payloads from database.", LogLevel.Information, Times.Once()); - _logger.VerifyLogging($"1 payloads restored from database.", LogLevel.Information, Times.Once()); + _repository.Verify(p => p.Remove(It.IsAny()), Times.Exactly(2)); } [RetryFact] diff --git a/src/InformaticsGateway/Test/Services/Connectors/PayloadNotificationActionHandlerTest.cs b/src/InformaticsGateway/Test/Services/Connectors/PayloadNotificationActionHandlerTest.cs index 6ccf719ed..67bc77e7f 100644 --- a/src/InformaticsGateway/Test/Services/Connectors/PayloadNotificationActionHandlerTest.cs +++ b/src/InformaticsGateway/Test/Services/Connectors/PayloadNotificationActionHandlerTest.cs @@ -41,7 +41,6 @@ public class PayloadNotificationActionHandlerTest private readonly Mock _messageBrokerPublisherService; private readonly Mock> _informaticsGatewayReepository; - private readonly Mock _storageReepository; private readonly Mock _serviceScope; private readonly ServiceProvider _serviceProvider; @@ -55,13 +54,11 @@ public PayloadNotificationActionHandlerTest() _messageBrokerPublisherService = new Mock(); _informaticsGatewayReepository = new Mock>(); - _storageReepository = new Mock(); _serviceScope = new Mock(); var services = new ServiceCollection(); services.AddScoped(p => _messageBrokerPublisherService.Object); services.AddScoped(p => _informaticsGatewayReepository.Object); - services.AddScoped(p => _storageReepository.Object); _serviceProvider = services.BuildServiceProvider(); _serviceScopeFactory.Setup(p => p.CreateScope()).Returns(_serviceScope.Object); @@ -196,7 +193,6 @@ public async Task GivenAPayload_WhenMessageIsPublished_ExpectPayloadToBeDeleted( _messageBrokerPublisherService.Verify(p => p.Publish(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); _informaticsGatewayReepository.Verify(p => p.Remove(It.IsAny()), Times.AtLeastOnce()); - _storageReepository.Verify(p => p.DeleteAsync(It.IsAny(), It.IsAny()), Times.AtLeastOnce()); } } } diff --git a/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs b/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs index f5a013784..ab2641705 100644 --- a/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs +++ b/src/InformaticsGateway/Test/Services/Storage/ObjectUploadServiceTest.cs @@ -45,7 +45,6 @@ public class ObjectUploadServiceTest private readonly Mock> _uploadQueueLogger; private readonly IObjectUploadQueue _uploadQueue; private readonly Mock _storageService; - private readonly Mock _storageMetadataWrapperRepository; private readonly CancellationTokenSource _cancellationTokenSource; private readonly ServiceProvider _serviceProvider; private readonly Mock _serviceScope; @@ -58,14 +57,12 @@ public ObjectUploadServiceTest() _storageService = new Mock(); _logger = new Mock>(); _options = Options.Create(new InformaticsGatewayConfiguration()); - _storageMetadataWrapperRepository = new Mock(); _cancellationTokenSource = new CancellationTokenSource(); _serviceScope = new Mock(); var services = new ServiceCollection(); services.AddScoped(p => _uploadQueue); - services.AddScoped(p => _storageMetadataWrapperRepository.Object); services.AddScoped(p => _storageService.Object); _serviceProvider = services.BuildServiceProvider(); _serviceScopeFactory.Setup(p => p.CreateScope()).Returns(_serviceScope.Object); @@ -98,7 +95,6 @@ public void GivenAObjectUploadService_WhenInitialized_ExpectItToRemovingAllPendi { var svc = new ObjectUploadService(_serviceScopeFactory.Object, _logger.Object, _options); - _storageMetadataWrapperRepository.Verify(p => p.DeletePendingUploadsAsync(), Times.Once()); } [Fact] diff --git a/tests/Integration.Test/Monai.Deploy.InformaticsGateway.Integration.Test.csproj b/tests/Integration.Test/Monai.Deploy.InformaticsGateway.Integration.Test.csproj index 152857690..2a9d060ef 100644 --- a/tests/Integration.Test/Monai.Deploy.InformaticsGateway.Integration.Test.csproj +++ b/tests/Integration.Test/Monai.Deploy.InformaticsGateway.Integration.Test.csproj @@ -33,7 +33,7 @@ - +