I'm building a piece of software responsible for processing different types of events from my system. This component is hosted as an Azure Function App, triggered by an Azure Storage Queue. The challenge is that I can have many different event types enqueued into the same queue.
To handle this, I've created a base message record:
public abstract record BaseMessage { public virtual string MessageName { get; init; } = nameof(BaseMessage); public Guid CorrelationId { get; init; } public Guid ProcessId { get; init; } public int DequeueCount { get; init; } } And specific records for different events, for example:
public record DataProcessingRequestedEvent : BaseMessage { public Guid ProcessDataId { get; init; } public override string MessageName => $"{nameof(DataProcessingRequestedEvent)}"; } Azure Function Implementation:
[Function("MessageBrokerFunction")] public void Run([QueueTrigger("message-broker", Connection = "StorageQueue")] QueueMessage queueMessage) { var anyEventResult = AnyMessage.Create(queueMessage.MessageId, queueMessage.MessageText); if (anyEventResult.IsFailed) { _logger.LogError($"Failed to parse message: {anyEventResult.Errors}"); return; } _messageBroker.Handle(anyEventResult.Value); } To remove dependency on queue-specific types, I convert QueueMessage into an AnyMessage class:
public class AnyMessage { public string MessageId { get; init; } public JObject MessageBody { get; init; } public string MessageName { get; init; } public static Result<AnyMessage> Create(string messageId, string messageBody) { try { var parsedMessageBody = JObject.Parse(messageBody); var messageName = parsedMessageBody.Properties() .FirstOrDefault(p => string.Equals(p.Name, nameof(BaseMessage.MessageName), StringComparison.OrdinalIgnoreCase)) ?.Value?.ToString(); if (string.IsNullOrEmpty(messageName)) { return Result.Fail(new MissingMessageTypeError(messageBody)); } return Result.Ok(new AnyMessage { MessageId = messageId, MessageBody = parsedMessageBody, MessageName = messageName }); } catch (Exception ex) { return Result.Fail(new ExceptionOccuredWhileParsingMessageError(messageBody, ex)); } } } Event Handling and Dispatching:
public interface IEventHandler<T> where T : BaseMessage { Task Handle(T eventMessage, EventExecutionContext context); } public interface IEventDispatcher { Task<EventExecutionResult> Dispatch<T>(T message, EventExecutionContext context) where T : BaseMessage; } public class EventDispatcher : IEventDispatcher { private readonly ILogger<EventDispatcher> _logger; private readonly IServiceProvider _serviceProvider; public EventDispatcher(ILogger<EventDispatcher> logger, IServiceProvider serviceProvider) { _logger = logger; _serviceProvider = serviceProvider; } public async Task<EventExecutionResult> Dispatch<T>(T message, EventExecutionContext context) where T : BaseMessage { try { var handlerType = typeof(IEventHandler<>).MakeGenericType(typeof(T)); var handler = _serviceProvider.GetService(handlerType); var handleMethod = handler.GetType().GetMethod("Handle"); var task = (Task)handleMethod.Invoke(handler, new object[] { message, context }); await task.ConfigureAwait(false); return new EventExecutionResult(); } catch (Exception ex) { // TODO: Handle exceptions appropriately _logger.LogError(ex, "Error dispatching event."); return new EventExecutionResult(); } } } The Core Issue: The challenge lies in determining where to convert the AnyMessage into a specific event type and then invoke the event dispatcher.
One approach is to introduce an additional layer, such as an IMessagePreProcessor for each MessageName. This would receive the AnyMessage, deserialize it into the specific event type, and then invoke the EventDispatcher.
However, I'm wondering if there is a better, more dynamic solution that:
Doesn't rely on compile-time types Can dynamically deserialize the message and invoke the appropriate event handler at runtime
Any suggestions or recommendations would be greatly appreciated!