Skip to content
This repository was archived by the owner on Nov 17, 2023. It is now read-only.

Use AsyncEventingBasicConsumer in RabbitMQ #987

Merged
merged 1 commit into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 33 additions & 14 deletions src/BuildingBlocks/EventBus/EventBusRabbitMQ/EventBusRabbitMQ.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,27 +178,46 @@ private void StartBasicConsume()
{
if (_consumerChannel != null)
{
var consumer = new EventingBasicConsumer(_consumerChannel);
consumer.Received += async (model, ea) =>
{
var eventName = ea.RoutingKey;
var message = Encoding.UTF8.GetString(ea.Body);

await ProcessEvent(eventName, message);
var consumer = new AsyncEventingBasicConsumer(_consumerChannel);

_consumerChannel.BasicAck(ea.DeliveryTag, multiple: false);
};
consumer.Received += Consumer_Received;

_consumerChannel.BasicConsume(queue: _queueName,
autoAck: false,
consumer: consumer);
_consumerChannel.BasicConsume(
queue: _queueName,
autoAck: false,
consumer: consumer);
}
else
{
_logger.LogError("StartBasicConsume can not call on _consumerChannelCreated == false");
_logger.LogError("StartBasicConsume can't call on _consumerChannel == null");
}
}

private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
var eventName = eventArgs.RoutingKey;
var message = Encoding.UTF8.GetString(eventArgs.Body);

try
{
if (message.ToLowerInvariant().Contains("throw-fake-exception"))
{
throw new InvalidOperationException($"Fake exception requested: \"{message}\"");
}

await ProcessEvent(eventName, message);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);
}

// Even on exception we take the message off the queue.
// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX).
// For more information see: https://www.rabbitmq.com/dlx.html
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
}

private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)
Expand All @@ -209,7 +228,7 @@ private IModel CreateConsumerChannel()
var channel = _persistentConnection.CreateModel();

channel.ExchangeDeclare(exchange: BROKER_NAME,
type: "direct");
type: "direct");

channel.QueueDeclare(queue: _queueName,
durable: true,
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Basket/Basket.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)

var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Catalog/Catalog.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,8 @@ public static IServiceCollection AddIntegrationServices(this IServiceCollection

var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"]
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Location/Locations.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)

var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Marketing/Marketing.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)

var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Ordering/Ordering.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ public static IServiceCollection AddCustomIntegrations(this IServiceCollection s

var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"]
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Ordering/Ordering.BackgroundTasks/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)

var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Ordering/Ordering.SignalrHub/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)

var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Payment/Payment.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public IServiceProvider ConfigureServices(IServiceCollection services)
var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
var factory = new ConnectionFactory()
{
HostName = Configuration["EventBusConnection"]
HostName = Configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(Configuration["EventBusUserName"]))
Expand Down
3 changes: 2 additions & 1 deletion src/Services/Webhooks/Webhooks.API/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ public static IServiceCollection AddIntegrationServices(this IServiceCollection

var factory = new ConnectionFactory()
{
HostName = configuration["EventBusConnection"]
HostName = configuration["EventBusConnection"],
DispatchConsumersAsync = true
};

if (!string.IsNullOrEmpty(configuration["EventBusUserName"]))
Expand Down