Courier Feature
1. Overview
The Courier feature in Vidyano provides a reliable, message-based processing system built on top of RavenDB. It enables asynchronous communication between different parts of your application through a publish-subscribe pattern, supporting immediate and delayed message delivery with at-least-once guarantees.
2. Enabling the Courier Feature
To enable the Courier feature in your Vidyano application:
Register the feature in your
Startup.cs
file:
public void ConfigureServices(IServiceCollection services)
{
// Add other services...
// Register your Courier message handlers
services.AddRecipient<YourMessage, YourMessageHandler>();
}
3. How It Works
The Courier feature works as follows:
Message Publishing: Messages are stored in RavenDB as
CourierMessage
documents.Message Processing:
A RavenDB subscription monitors the database for new messages
When a message is detected, the system finds the appropriate recipient(s)
Each recipient processes the message through its
ReceiveAsync
methodAfter processing, messages are marked as processed with an expiration time
Message Lifecycle:
Created → Waiting for Processing → Processed → Expired
Or: Created → Delayed → Waiting for Processing → Processed → Expired
Or: Created → Waiting for Processing → Failed → Retry → Processed → Expired
4. Key Features
4.1 Message Delivery Patterns
Immediate Delivery: Messages are sent and processed as soon as possible
Delayed Delivery: Messages are processed after a specified time delay
At-Least-Once Delivery: Messages are guaranteed to be delivered at least once
4.2 Idempotent Messages
Messages can implement the IIdempotentMessage
interface to ensure they are only processed once, identified by a unique identifier.
4.3 Message Retry
Failed messages can be automatically retried based on configurable retry strategies.
4.4 Message Expiration
Processed messages are automatically removed from the system after a configurable expiration period.
5. Usage Examples
5.1 Defining a Message
// Regular message
public record NotificationMessage(string Subject, string Body, string RecipientEmail);
// Idempotent message
public record OrderProcessedMessage(string OrderId, DateTimeOffset ProcessedOn) : IIdempotentMessage
{
// Implementation of IIdempotentMessage.Identifier
public string Identifier => $"order-processed-{OrderId}";
}
5.2 Creating a Message Handler
public class NotificationMessageHandler : IRecipient<NotificationMessage>
{
private readonly IEmailService _emailService;
public NotificationMessageHandler(IEmailService emailService)
{
_emailService = emailService;
}
public async Task ReceiveAsync(ReceiveArgs<NotificationMessage> args, CancellationToken cancellationToken)
{
var message = args.Message;
// Process the message
await _emailService.SendEmailAsync(
message.RecipientEmail,
message.Subject,
message.Body,
cancellationToken);
// Configure message expiration (optional, defaults to 1 day)
args.ExpireAfter = TimeSpan.FromDays(7);
// Example of sending a follow-up message
args.Courier.Send(new NotificationLoggedMessage(message.RecipientEmail));
}
}
5.3 Sending Messages
// Get the courier service
using var _ = ServiceLocator.GetScopedRequiredService<ICourier>(out var courier);
// Send immediate message
courier.Send(new NotificationMessage(
"Welcome!",
"Thank you for registering.",
"user@example.com"));
// Send delayed message (relative time)
courier.DelaySend(
new ReminderMessage("Don't forget to complete your profile."),
TimeSpan.FromDays(3));
// Send delayed message (absolute time)
var nextMonday = GetNextMonday();
courier.DelaySend(
new WeeklyReportMessage(),
nextMonday);
// Send idempotent message
// This will only be processed once, even if sent multiple times with the same ID
courier.Send(new OrderProcessedMessage("ORD-12345", DateTimeOffset.UtcNow));
5.4 Chain of Messages
public class OrderCreatedMessageHandler : IRecipient<OrderCreatedMessage>
{
public Task ReceiveAsync(ReceiveArgs<OrderCreatedMessage> args, CancellationToken cancellationToken)
{
// Process the order
ProcessOrder(args.Message);
// Start order fulfillment process by sending another message
args.Courier.Send(new InitiateFulfillmentMessage(
args.Message.OrderId,
args.Message.Items));
// Schedule a follow-up message after 3 days
args.Courier.DelaySend(
new OrderFollowUpMessage(args.Message.OrderId, args.Message.CustomerEmail),
TimeSpan.FromDays(3));
return Task.CompletedTask;
}
}
6. Configuration Options
The Courier system can be configured in your appsettings.json
file:
{
"Vidyano": {
"Courier": {
"RetryWaitTime": "00:00:30",
"MaxDownTime": "00:05:00",
"MaxDocsPerBatch": 25
}
}
}
Configuration options:
RetryWaitTime: Time to wait before retrying subscription connection (default: 30 seconds)
MaxDownTime: Maximum period the system will be down before giving up (default: 5 minutes)
MaxDocsPerBatch: Maximum number of messages processed in a batch (default: 25)
7. Best Practices
Keep Messages Small: Store only essential information in messages.
Make Messages Immutable: Use C# record types or immutable classes.
Design for Idempotency: Ensure that processing a message multiple times does not cause issues.
Handle Exceptions: Implement proper exception handling in your message handlers.
Use Message Chains: Break complex workflows into chains of simple messages.
Consider Message Expiration: Set appropriate expiration times for processed messages.
Monitor the System: Implement logging and monitoring for message processing.
8. Troubleshooting
If messages are not being processed as expected:
Check Subscription Status: Ensure the RavenDB subscription is active.
Verify Handler Registration: Confirm that your message handlers are registered correctly.
Review Message Serialization: Check that your message classes can be properly serialized/deserialized by JSON.NET.
Examine Expired Messages: Look for messages that have expired before processing.
Look for Exceptions: Check exception logs for errors in message handlers.
Inspect RetryWaitTime: Adjust RetryWaitTime if messages are not being retried quickly enough.
Disable Specific Message Types: Use the
Courier{MessageTypeName}Disabled
setting to temporarily disable processing of specific message types.
Was this helpful?