Courier Feature

1. Overview

The Courier feature in Vidyano provides a reliable, message-based processing system built on top of RavenDB. It enables asynchronous communication between different parts of your application through a publish-subscribe pattern, supporting immediate and delayed message delivery with at-least-once guarantees.

2. Enabling the Courier Feature

To enable the Courier feature in your Vidyano application:

  1. Register the feature in your Startup.cs file:

public void ConfigureServices(IServiceCollection services)
{
    // Add other services...
    
    // Register your Courier message handlers
    services.AddRecipient<YourMessage, YourMessageHandler>();
}

3. How It Works

The Courier feature works as follows:

  1. Message Publishing: Messages are stored in RavenDB as CourierMessage documents.

  2. Message Processing:

    • A RavenDB subscription monitors the database for new messages

    • When a message is detected, the system finds the appropriate recipient(s)

    • Each recipient processes the message through its ReceiveAsync method

    • After processing, messages are marked as processed with an expiration time

  3. Message Lifecycle:

    • Created → Waiting for Processing → Processed → Expired

    • Or: Created → Delayed → Waiting for Processing → Processed → Expired

    • Or: Created → Waiting for Processing → Failed → Retry → Processed → Expired

4. Key Features

4.1 Message Delivery Patterns

  • Immediate Delivery: Messages are sent and processed as soon as possible

  • Delayed Delivery: Messages are processed after a specified time delay

  • At-Least-Once Delivery: Messages are guaranteed to be delivered at least once

4.2 Idempotent Messages

Messages can implement the IIdempotentMessage interface to ensure they are only processed once, identified by a unique identifier.

4.3 Message Retry

Failed messages can be automatically retried based on configurable retry strategies.

4.4 Message Expiration

Processed messages are automatically removed from the system after a configurable expiration period.

5. Usage Examples

5.1 Defining a Message

// Regular message
public record NotificationMessage(string Subject, string Body, string RecipientEmail);

// Idempotent message
public record OrderProcessedMessage(string OrderId, DateTimeOffset ProcessedOn) : IIdempotentMessage
{
    // Implementation of IIdempotentMessage.Identifier
    public string Identifier => $"order-processed-{OrderId}";
}

5.2 Creating a Message Handler

public class NotificationMessageHandler : IRecipient<NotificationMessage>
{
    private readonly IEmailService _emailService;
    
    public NotificationMessageHandler(IEmailService emailService)
    {
        _emailService = emailService;
    }
    
    public async Task ReceiveAsync(ReceiveArgs<NotificationMessage> args, CancellationToken cancellationToken)
    {
        var message = args.Message;
        
        // Process the message
        await _emailService.SendEmailAsync(
            message.RecipientEmail, 
            message.Subject, 
            message.Body, 
            cancellationToken);
        
        // Configure message expiration (optional, defaults to 1 day)
        args.ExpireAfter = TimeSpan.FromDays(7);
        
        // Example of sending a follow-up message
        args.Courier.Send(new NotificationLoggedMessage(message.RecipientEmail));
    }
}

5.3 Sending Messages

// Get the courier service
using var _ = ServiceLocator.GetScopedRequiredService<ICourier>(out var courier);

// Send immediate message
courier.Send(new NotificationMessage(
    "Welcome!", 
    "Thank you for registering.", 
    "user@example.com"));

// Send delayed message (relative time)
courier.DelaySend(
    new ReminderMessage("Don't forget to complete your profile."), 
    TimeSpan.FromDays(3));

// Send delayed message (absolute time)
var nextMonday = GetNextMonday();
courier.DelaySend(
    new WeeklyReportMessage(), 
    nextMonday);

// Send idempotent message
// This will only be processed once, even if sent multiple times with the same ID
courier.Send(new OrderProcessedMessage("ORD-12345", DateTimeOffset.UtcNow));

5.4 Chain of Messages

public class OrderCreatedMessageHandler : IRecipient<OrderCreatedMessage>
{
    public Task ReceiveAsync(ReceiveArgs<OrderCreatedMessage> args, CancellationToken cancellationToken)
    {
        // Process the order
        ProcessOrder(args.Message);
        
        // Start order fulfillment process by sending another message
        args.Courier.Send(new InitiateFulfillmentMessage(
            args.Message.OrderId, 
            args.Message.Items));
            
        // Schedule a follow-up message after 3 days
        args.Courier.DelaySend(
            new OrderFollowUpMessage(args.Message.OrderId, args.Message.CustomerEmail),
            TimeSpan.FromDays(3));
            
        return Task.CompletedTask;
    }
}

6. Configuration Options

The Courier system can be configured in your appsettings.json file:

{
  "Vidyano": {
    "Courier": {
      "RetryWaitTime": "00:00:30",
      "MaxDownTime": "00:05:00",
      "MaxDocsPerBatch": 25
    }
  }
}

Configuration options:

  • RetryWaitTime: Time to wait before retrying subscription connection (default: 30 seconds)

  • MaxDownTime: Maximum period the system will be down before giving up (default: 5 minutes)

  • MaxDocsPerBatch: Maximum number of messages processed in a batch (default: 25)

7. Best Practices

  1. Keep Messages Small: Store only essential information in messages.

  2. Make Messages Immutable: Use C# record types or immutable classes.

  3. Design for Idempotency: Ensure that processing a message multiple times does not cause issues.

  4. Handle Exceptions: Implement proper exception handling in your message handlers.

  5. Use Message Chains: Break complex workflows into chains of simple messages.

  6. Consider Message Expiration: Set appropriate expiration times for processed messages.

  7. Monitor the System: Implement logging and monitoring for message processing.

8. Troubleshooting

If messages are not being processed as expected:

  1. Check Subscription Status: Ensure the RavenDB subscription is active.

  2. Verify Handler Registration: Confirm that your message handlers are registered correctly.

  3. Review Message Serialization: Check that your message classes can be properly serialized/deserialized by JSON.NET.

  4. Examine Expired Messages: Look for messages that have expired before processing.

  5. Look for Exceptions: Check exception logs for errors in message handlers.

  6. Inspect RetryWaitTime: Adjust RetryWaitTime if messages are not being retried quickly enough.

  7. Disable Specific Message Types: Use the Courier{MessageTypeName}Disabled setting to temporarily disable processing of specific message types.

Was this helpful?