Skip to content

NMS 2.0 #214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFramework>$(TargetFullFrameworkVersion)</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Apache.NMS" Version="1.8.0" />
<PackageReference Include="Apache.NMS" Version="2.0.0" />
<PackageReference Include="Apache.NMS.ActiveMQ" Version="1.7.2" />
</ItemGroup>
<ItemGroup>
Expand Down
40 changes: 37 additions & 3 deletions src/Spring/Spring.Core/Threading/ThreadStaticStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
#region License
// /*
// * Copyright 2022 the original author or authors.
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
#endregion

using System.Collections;
using System.Threading;

namespace Spring.Threading
{
Expand All @@ -9,14 +28,29 @@ namespace Spring.Threading
public class ThreadStaticStorage : IThreadStorage
{
[ThreadStatic]
private static Hashtable data;
private static Hashtable _dataThreadStatic;
// AsyncLocal for it to work in async NMS lib
private static AsyncLocal<Hashtable> _dataAsyncLocal = new AsyncLocal<Hashtable>();

/// <summary>
/// Allows to switch how context is being held, if true, then it will use AsyncLocal
/// </summary>
public static bool UseAsyncLocal { get; set; } = false;

private static Hashtable Data
{
get
{
if (data == null) data = new Hashtable();
return data;
if (UseAsyncLocal)
{
if (_dataAsyncLocal.Value == null) _dataAsyncLocal.Value = new Hashtable();
return _dataAsyncLocal.Value;
}
else
{
if (_dataThreadStatic == null) _dataThreadStatic = new Hashtable();
return _dataThreadStatic;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,16 @@ public IMessageConsumer Target
get { return target; }
}

public string MessageSelector
{
get
{
return target.MessageSelector;
}
}

/// <summary>
/// Register for message events.
/// Register for message events.
/// </summary>
public event MessageListener Listener
{
Expand All @@ -75,6 +83,11 @@ public IMessage Receive()
return this.target.Receive();
}

public Task<IMessage> ReceiveAsync()
{
return this.target.ReceiveAsync();
}

/// <summary>
/// Receives the next message that arrives within the specified timeout interval.
/// </summary>
Expand All @@ -85,6 +98,11 @@ public IMessage Receive(TimeSpan timeout)
return this.target.Receive(timeout);
}

public Task<IMessage> ReceiveAsync(TimeSpan timeout)
{
return this.target.ReceiveAsync(timeout);
}

/// <summary>
/// Receives the next message if one is immediately available.
/// </summary>
Expand All @@ -102,6 +120,12 @@ public void Close()
// It's a cached MessageConsumer...
}

public Task CloseAsync()
{
// It's a cached MessageConsumer...
return Task.FromResult(true);
}

/// <summary>
/// A Delegate that is called each time a Message is dispatched to allow the client to do
/// any necessary transformations on the received message before it is delivered.
Expand Down Expand Up @@ -131,4 +155,4 @@ public override string ToString()
return "Cached NMS MessageConsumer: " + this.target;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,32 @@ public void Send(IDestination destination, IMessage message, MsgDeliveryMode del
target.Send(destination, message, deliveryMode, priority, timeToLive);
}

public TimeSpan DeliveryDelay
{
get { return target.DeliveryDelay; }
set { target.DeliveryDelay = value; }
}

public Task SendAsync(IMessage message)
{
return target.SendAsync(message);
}

public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
return target.SendAsync(message, deliveryMode, priority, timeToLive);
}

public Task SendAsync(IDestination destination, IMessage message)
{
return target.SendAsync(destination, message);
}

public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
{
return target.SendAsync(destination, message, deliveryMode, priority, timeToLive);
}

#region Odd Message Creationg Methods on IMessageProducer - not in-line with JMS APIs.
/// <summary>
/// Creates the message.
Expand All @@ -121,6 +147,11 @@ public IMessage CreateMessage()
return target.CreateMessage();
}

public Task<IMessage> CreateMessageAsync()
{
return target.CreateMessageAsync();
}

/// <summary>
/// Creates the text message.
/// </summary>
Expand All @@ -130,6 +161,11 @@ public ITextMessage CreateTextMessage()
return target.CreateTextMessage();
}

public Task<ITextMessage> CreateTextMessageAsync()
{
return target.CreateTextMessageAsync();
}

/// <summary>
/// Creates the text message.
/// </summary>
Expand All @@ -140,6 +176,11 @@ public ITextMessage CreateTextMessage(string text)
return target.CreateTextMessage(text);
}

public Task<ITextMessage> CreateTextMessageAsync(string text)
{
return target.CreateTextMessageAsync(text);
}

/// <summary>
/// Creates the map message.
/// </summary>
Expand All @@ -149,6 +190,11 @@ public IMapMessage CreateMapMessage()
return target.CreateMapMessage();
}

public Task<IMapMessage> CreateMapMessageAsync()
{
return target.CreateMapMessageAsync();
}

/// <summary>
/// Creates the object message.
/// </summary>
Expand All @@ -159,6 +205,11 @@ public IObjectMessage CreateObjectMessage(object body)
return target.CreateObjectMessage(body);
}

public Task<IObjectMessage> CreateObjectMessageAsync(object body)
{
return target.CreateObjectMessageAsync(body);
}

/// <summary>
/// Creates the bytes message.
/// </summary>
Expand All @@ -168,6 +219,11 @@ public IBytesMessage CreateBytesMessage()
return target.CreateBytesMessage();
}

public Task<IBytesMessage> CreateBytesMessageAsync()
{
return target.CreateBytesMessageAsync();
}

/// <summary>
/// Creates the bytes message.
/// </summary>
Expand All @@ -178,6 +234,11 @@ public IBytesMessage CreateBytesMessage(byte[] body)
return target.CreateBytesMessage(body);
}

public Task<IBytesMessage> CreateBytesMessageAsync(byte[] body)
{
return target.CreateBytesMessageAsync(body);
}

/// <summary>
/// Creates the stream message.
/// </summary>
Expand All @@ -187,6 +248,12 @@ public IStreamMessage CreateStreamMessage()
return target.CreateStreamMessage();
}

public Task<IStreamMessage> CreateStreamMessageAsync()
{
return target.CreateStreamMessageAsync();
}


/// <summary>
/// A delegate that is called each time a Message is sent from this Producer which allows
/// the application to perform any needed transformations on the Message before it is sent.
Expand Down Expand Up @@ -302,7 +369,13 @@ public void Close()
originalDisableMessageTimestamp = null;
}
}


public Task CloseAsync()
{
Close();
return Task.FromResult(true);
}

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
Expand All @@ -321,4 +394,4 @@ public override string ToString()
return "Cached NMS MessageProducer: " + this.target;
}
}
}
}
Loading