-
Notifications
You must be signed in to change notification settings - Fork 12
Durable Command Inbox
Introduced in v4.0, the Durable Command Inbox is a powerful feature for ensuring that critical commands are processed reliably, even in the event of application failures or restarts. It implements the transactional outbox/inbox pattern for commands, providing guaranteed, at-least-once execution.
The Command Inbox is a mechanism for deferred and durable command execution. When a command is marked for the inbox:
- Instead of being processed immediately, it is first serialized and persisted to a durable storage medium (like a database table).
- A background process periodically fetches batches of commands from this store.
- The background process then sends these commands through the normal LiteBus mediation pipeline for execution.
This ensures that once a command is accepted into the inbox, it will eventually be processed.
- Critical Operations: Processing payments, placing orders, or any other business-critical action that must not be lost.
- Long-Running Tasks: Offloading time-consuming tasks from the initial request thread to a background worker.
- Integration with External Systems: Ensuring that calls to external APIs are reliably executed and can be retried upon failure.
Enabling the inbox for a command is simple: decorate the command class with the [StoreInInbox] attribute.
using LiteBus.Commands.Abstractions;
/// <summary>
/// This command is critical. By applying the [StoreInInbox] attribute,
/// we ensure it's persisted and processed reliably by a background worker.
/// </summary>
[StoreInInbox]
public sealed class ProcessPaymentCommand : ICommand
{
public required Guid OrderId { get; init; }
public required decimal Amount { get; init; }
public required string CreditCardToken { get; init; }
}When you call SendAsync on an inbox-enabled command, the behavior is different from a regular command:
- The command is not executed immediately.
- It is serialized and passed to your
ICommandInbox.StoreAsyncimplementation. -
SendAsyncreturns immediately.- For
ICommand, it returnsTask.CompletedTask. - For
ICommand<TResult>, it returnsTask.FromResult(default(TResult)).
- For
This is ideal for APIs that should immediately return an HTTP 202 Accepted response, acknowledging that the request has been accepted for processing.
[HttpPost]
public async Task<IActionResult> SubmitPayment(ProcessPaymentCommand command)
{
// This will store the command in the inbox and return immediately.
// The actual payment processing will happen in the background.
await _commandMediator.SendAsync(command);
// Return HTTP 202 Accepted to indicate the request is being processed.
return Accepted();
}To enable the inbox feature, you must provide and register three key components. LiteBus provides the interfaces; you provide the implementation specific to your infrastructure (e.g., EF Core, Dapper, MongoDB).
-
ICommandInbox: Responsible for persisting the command.public interface ICommandInbox { Task StoreAsync(ICommand command, CancellationToken cancellationToken = default); }
-
ICommandInboxProcessor: The long-running service that fetches and processes commands.public interface ICommandInboxProcessor { Task RunAsync(CommandBatchHandler handler, CancellationToken cancellationToken = default); }
-
CommandInboxProcessorHostedService: TheIHostedServicethat manages the lifecycle of your processor.
// 1. Register your custom implementations.
// These would typically be scoped to manage database connections.
builder.Services.AddScoped<ICommandInbox, MyDatabaseCommandInbox>();
builder.Services.AddScoped<ICommandInboxProcessor, MyDatabaseCommandProcessor>();
// 2. Register the LiteBus hosted service that runs the processor.
builder.Services.AddHostedService<CommandInboxProcessorHostedService>();The complete lifecycle of an inbox command is as follows:
-
_commandMediator.SendAsync(command)is called. -
CommandMediatordetects the[StoreInInbox]attribute. - It resolves your
ICommandInboximplementation and callsStoreAsync(command). The command is saved to your database. - The
CommandInboxProcessorHostedService(running in the background) starts yourICommandInboxProcessor. - Your processor implementation fetches a batch of unprocessed commands from the database.
- It invokes the
CommandBatchHandlerdelegate provided by the hosted service. - This delegate creates a new DI scope and, for each command in the batch, calls
_commandMediator.SendAsync(command). -
Crucially, this second
SendAsynccall includes a flag (IsInboxExecution) in theExecutionContext.Items. TheCommandMediatordetects this flag and processes the command directly instead of re-inboxing it, thus preventing an infinite loop. - The command proceeds through its normal pipeline (pre-handlers, main handler, post-handlers).
- Your processor marks the commands as processed in the database.
- Idempotent Handlers: Since the inbox guarantees at-least-once delivery, your command handlers should be idempotent. This means they can be safely executed multiple times with the same input without causing incorrect side effects.
- Use for Critical Commands Only: The inbox adds overhead. Use it only for commands that truly require guaranteed execution.
- Transactional Consistency: Ensure that storing the command in the inbox is part of the same database transaction as the business operation that triggers it (the transactional outbox pattern). This prevents scenarios where you save a business entity but fail to save the corresponding command.
-
Error Handling: Your
ICommandInboxProcessorimplementation is responsible for handling errors, retries, and moving failed commands to a dead-letter queue.