-
Notifications
You must be signed in to change notification settings - Fork 24
Adding CancellationToken to context.Items to subscribe #105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Adding CancellationToken to context.Items to subscribe #105
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds tracking and cancellation of message lock renewal tokens in the Azure Service Bus transport to ensure that renewal tasks can be stopped when a message is completed, abandoned, or an error occurs.
- Introduces a
_messageRenewerTokenSources
dictionary to holdCancellationTokenSource
instances per message. - Creates a linked cancellation token source in
Receive
and stores it in the context items. - Hooks into the
OnAck
,OnNack
, andOnDisposed
callbacks to remove token sources, and cancels them when renew failures occur.
Comments suppressed due to low confidence (1)
Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs:620
- [nitpick] Consider renaming
renewFailedTokenSource
to something likerenewalTokenSource
ormessageRenewalTokenSource
for clarity.
var renewFailedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationToken);
Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs
Outdated
Show resolved
Hide resolved
Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.cs
Outdated
Show resolved
Hide resolved
Hi @mookid8000 |
We see a problem where Azure Service bus is never going to accept the renewal request for the locks - which results in a state where the message is never going to be completed by the handler.
This is a transient error - so instead we want to be able to fail fast.
This is done by adding a CancellationToken that can be listened to in the Handler to see whether to stop the handling of the message by throwing a OperationCancelException
Enhancements to message lock renewal and cancellation:
ConcurrentDictionary
named_messageRenewerTokenSources
to storeCancellationTokenSource
objects for each message to manage cancellation tokens during lock renewal. (AzureServiceBusTransport.cs
, Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.csR64)Receive
method to create and store a linkedCancellationTokenSource
for each message, which combines the pipeline cancellation token with a token that cancels if lock renewal fails. The token is added to the transaction context for use during message processing. (AzureServiceBusTransport.cs
, Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.csL621-R628)OnAck
,OnNack
, andOnDisposed
callbacks of the transaction context. (AzureServiceBusTransport.cs
, [1] [2] [3]AzureServiceBusTransport.cs
, Rebus.AzureServiceBus/AzureServiceBus/AzureServiceBusTransport.csL968-R985)