Skip to content

Commit 81fef60

Browse files
author
Felipe Mattioli Dos Santos (MSC Technology Italia)
committed
FEAT: Adding new logic to work with middlewares
1 parent cc286ec commit 81fef60

File tree

8 files changed

+130
-37
lines changed

8 files changed

+130
-37
lines changed

src/AzureServiceBusFlow.Sample/AzureServiceBusFlow.Sample.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
</ItemGroup>
1515

1616
<ItemGroup>
17-
<PackageReference Include="Scalar.AspNetCore" Version="2.8.10" />
17+
<PackageReference Include="Scalar.AspNetCore" Version="2.9.0" />
1818
<PackageReference Include="Swashbuckle.AspNetCore" Version="9.0.6" />
19-
<PackageReference Include="Mattioli.Configurations" Version="1.59.0" />
19+
<PackageReference Include="Mattioli.Configurations" Version="1.61.0" />
2020
</ItemGroup>
2121

2222
<ItemGroup>

src/AzureServiceBusFlow.Sample/Commands/ExampleCommandHandler.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ public class CommandExemple1Handler : IMessageHandler<ExampleCommand1>
88
{
99
public Task HandleAsync(ExampleCommand1 message, ServiceBusReceivedMessage rawMessage, CancellationToken cancellationToken)
1010
{
11-
throw new Exception("Erro");
1211
return Task.CompletedTask;
1312
}
1413
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Azure.Messaging.ServiceBus;
2+
using AzureServiceBusFlow.Abstractions;
3+
using AzureServiceBusFlow.Middlewar;
4+
5+
namespace AzureServiceBusFlow.Sample.Middlewares
6+
{
7+
/// <summary>
8+
/// Sample middleware that adds a custom property
9+
/// to every message published to Azure Service Bus.
10+
/// </summary>
11+
public class AsbSampleMiddleware : IProducerMiddleware
12+
{
13+
public async Task InvokeAsync(ServiceBusMessage message, Func<Task> next)
14+
{
15+
// Add a custom property if it doesn't exist
16+
if (!message.ApplicationProperties.ContainsKey("SampleMiddleware"))
17+
{
18+
message.ApplicationProperties["SampleMiddleware"] = "Executed";
19+
}
20+
21+
// Add a UTC timestamp for debugging or tracking
22+
message.ApplicationProperties["ProcessedAtUtc"] = DateTime.UtcNow.ToString("O");
23+
24+
// Continue the pipeline (invoke the next middleware or the actual send)
25+
await next();
26+
}
27+
}
28+
}

src/AzureServiceBusFlow.Sample/Program.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using AzureServiceBusFlow.Models;
33
using AzureServiceBusFlow.Sample.Commands;
44
using AzureServiceBusFlow.Sample.Events;
5-
5+
using AzureServiceBusFlow.Sample.Middlewares;
66
using Mattioli.Configurations.Transformers;
77

88
using Scalar.AspNetCore;
@@ -27,6 +27,7 @@
2727
.AddProducer<ExampleCommand1>(p => p
2828
.EnsureQueueExists("command-queue-one")
2929
.WithCommandProducer()
30+
//.UseMiddleware<AsbSampleMiddleware>()
3031
.ToQueue("command-queue-one"))
3132
.AddProducer<ExampleCommand2>(p => p
3233
.EnsureQueueExists("command-queue-two")

src/AzureServiceBusFlow/AzureServiceBusFlow.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
<ItemGroup>
1717
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.20.1" />
1818
<PackageReference Include="Microsoft.Azure.ServiceBus" Version="5.2.0" />
19-
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.9" />
20-
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.9" />
19+
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.10" />
20+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.10" />
2121
<PackageReference Include="Polly" Version="8.6.4" />
2222
</ItemGroup>
2323

src/AzureServiceBusFlow/Builders/ServiceBusProducerConfigurationBuilder.cs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using AzureServiceBusFlow.Abstractions;
2+
using AzureServiceBusFlow.Middlewar;
23
using AzureServiceBusFlow.Models;
34
using AzureServiceBusFlow.Producers;
45
using Microsoft.Azure.ServiceBus.Management;
@@ -8,13 +9,21 @@
89
namespace AzureServiceBusFlow.Builders
910
{
1011
public class ServiceBusProducerConfigurationBuilder<TMessage>(AzureServiceBusConfiguration azureServiceBusConfiguration, IServiceCollection services)
11-
where TMessage : class, IServiceBusMessage
12+
where TMessage : class, IServiceBusMessage
1213
{
1314
private readonly AzureServiceBusConfiguration _azureServiceBusConfiguration = azureServiceBusConfiguration;
1415
private readonly IServiceCollection _services = services;
1516

1617
private string? _topicName;
1718
private string? _queueName;
19+
private readonly List<Type> _middlewares = [];
20+
21+
public ServiceBusProducerConfigurationBuilder<TMessage> UseMiddleware<TMiddleware>()
22+
where TMiddleware : IProducerMiddleware
23+
{
24+
_middlewares.Add(typeof(TMiddleware));
25+
return this;
26+
}
1827

1928
public ServiceBusProducerConfigurationBuilder<TMessage> WithTopic(string topic)
2029
{
@@ -80,29 +89,26 @@ public ServiceBusProducerConfigurationBuilder<TMessage> WithQueue(string queue)
8089

8190
internal void Build()
8291
{
83-
if (!string.IsNullOrEmpty(_queueName))
84-
{
85-
_services.AddSingleton<IServiceBusProducer<TMessage>>(sp =>
86-
{
87-
var logger = sp.GetRequiredService<ILogger<ServiceBusProducer<TMessage>>>();
88-
return new ServiceBusProducer<TMessage>(_azureServiceBusConfiguration, _queueName, logger);
89-
});
92+
if (string.IsNullOrEmpty(_queueName) && string.IsNullOrEmpty(_topicName))
93+
throw new InvalidOperationException("Either topic or queue name must be specified.");
9094

91-
return;
92-
}
93-
94-
if (!string.IsNullOrEmpty(_topicName))
95+
foreach (var middlewareType in _middlewares)
9596
{
96-
_services.AddSingleton<IServiceBusProducer<TMessage>>(sp =>
97-
{
98-
var logger = sp.GetRequiredService<ILogger<ServiceBusProducer<TMessage>>>();
99-
return new ServiceBusProducer<TMessage>(_azureServiceBusConfiguration, _topicName, logger);
100-
});
101-
102-
return;
97+
_services.AddSingleton(typeof(IProducerMiddleware), middlewareType);
10398
}
10499

105-
throw new InvalidOperationException("Either topic or queue name must be specified.");
100+
_services.AddSingleton<IServiceBusProducer<TMessage>>(sp =>
101+
{
102+
var logger = sp.GetRequiredService<ILogger<ServiceBusProducer<TMessage>>>();
103+
var middlewares = sp.GetServices<IProducerMiddleware>();
104+
var name = _queueName ?? _topicName!;
105+
return new ServiceBusProducer<TMessage>(
106+
_azureServiceBusConfiguration,
107+
name,
108+
logger,
109+
middlewares);
110+
});
106111
}
107112
}
113+
108114
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using Azure.Messaging.ServiceBus;
2+
3+
namespace AzureServiceBusFlow.Middlewar
4+
{
5+
public interface IProducerMiddleware
6+
{
7+
Task InvokeAsync(ServiceBusMessage message, Func<Task> next);
8+
}
9+
10+
}

src/AzureServiceBusFlow/Producers/ServiceBusProducer.cs

Lines changed: 60 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,29 @@
11
using Azure.Messaging.ServiceBus;
22
using AzureServiceBusFlow.Abstractions;
3+
using AzureServiceBusFlow.Middlewar;
34
using AzureServiceBusFlow.Models;
45
using Microsoft.Extensions.Logging;
56
using Newtonsoft.Json;
67

78
namespace AzureServiceBusFlow.Producers
89
{
9-
public class ServiceBusProducer<TMessage> : IServiceBusProducer<TMessage> where TMessage : class, IServiceBusMessage
10+
public class ServiceBusProducer<TMessage> : IServiceBusProducer<TMessage>
11+
where TMessage : class, IServiceBusMessage
1012
{
1113
private readonly ServiceBusSender _sender;
1214
private readonly ILogger _logger;
15+
private readonly IEnumerable<IProducerMiddleware>? _middlewares;
1316

14-
public ServiceBusProducer(AzureServiceBusConfiguration azureServiceBusConfiguration, string queueOrTopicName, ILogger logger)
17+
public ServiceBusProducer(
18+
AzureServiceBusConfiguration azureServiceBusConfiguration,
19+
string queueOrTopicName,
20+
ILogger logger,
21+
IEnumerable<IProducerMiddleware>? middlewares = null)
1522
{
1623
var client = new ServiceBusClient(azureServiceBusConfiguration.ConnectionString);
1724
_sender = client.CreateSender(queueOrTopicName);
18-
1925
_logger = logger;
26+
_middlewares = middlewares;
2027
}
2128

2229
public async Task ProduceAsync(TMessage message, CancellationToken cancellationToken)
@@ -32,12 +39,31 @@ public async Task ProduceAsync(TMessage message, CancellationToken cancellationT
3239
}
3340
};
3441

35-
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
42+
async Task finalStep()
43+
{
44+
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
45+
_logger.LogInformation("Message {MessageType} published successfully!", message.GetType().Name);
46+
}
3647

37-
_logger.LogInformation("Message {MessageType} published with successfully!", message.GetType().Name);
48+
// Run middlewares, if it exist
49+
if (_middlewares != null && _middlewares.Any())
50+
{
51+
Func<Task> next = finalStep;
52+
foreach (var middleware in _middlewares.Reverse())
53+
{
54+
var current = middleware;
55+
var prevNext = next;
56+
next = () => current.InvokeAsync(serviceBusMessage, prevNext);
57+
}
58+
await next();
59+
}
60+
else
61+
{
62+
await finalStep();
63+
}
3864
}
3965

40-
public Task ProduceAsync(TMessage message, MessageOptions producerOptions, CancellationToken cancellationToken)
66+
public async Task ProduceAsync(TMessage message, MessageOptions producerOptions, CancellationToken cancellationToken)
4167
{
4268
var json = JsonConvert.SerializeObject(message);
4369
var serviceBusMessage = new ServiceBusMessage(json)
@@ -52,18 +78,41 @@ public Task ProduceAsync(TMessage message, MessageOptions producerOptions, Cance
5278

5379
if (producerOptions?.ApplicationProperties is not null)
5480
{
55-
producerOptions?.ApplicationProperties?
56-
.Where(kvp => !serviceBusMessage.ApplicationProperties.ContainsKey(kvp.Key))
57-
.ToList()
58-
.ForEach(kvp => serviceBusMessage.ApplicationProperties.Add(kvp.Key, kvp.Value));
81+
foreach (var kvp in from kvp in producerOptions.ApplicationProperties
82+
where !serviceBusMessage.ApplicationProperties.ContainsKey(kvp.Key)
83+
select kvp)
84+
{
85+
serviceBusMessage.ApplicationProperties.Add(kvp.Key, kvp.Value);
86+
}
5987
}
6088

6189
if (producerOptions?.Delay is not null)
6290
{
6391
serviceBusMessage.ScheduledEnqueueTime = DateTimeOffset.UtcNow.Add(producerOptions.Delay.Value);
6492
}
6593

66-
return _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
94+
// Mesmo pipeline de middlewares para as versões com opções
95+
Func<Task> finalStep = async () =>
96+
{
97+
await _sender.SendMessageAsync(serviceBusMessage, cancellationToken);
98+
_logger.LogInformation("Message {MessageType} published successfully!", message.GetType().Name);
99+
};
100+
101+
if (_middlewares != null && _middlewares.Any())
102+
{
103+
Func<Task> next = finalStep;
104+
foreach (var middleware in _middlewares.Reverse())
105+
{
106+
var current = middleware;
107+
var prevNext = next;
108+
next = () => current.InvokeAsync(serviceBusMessage, prevNext);
109+
}
110+
await next();
111+
}
112+
else
113+
{
114+
await finalStep();
115+
}
67116
}
68117

69118
public Task ProduceAsync(TMessage message, TimeSpan delay, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)