Neste post mostro como implementar um EventBus, utilizando RabbitMQ, em C#.
Esta implementação é uma adaptação da entregue pela Microsoft no projeto eShopOnContainers (semelhanças não são mera coincidência).
O que é um EventBus?
De forma simplificada, um EventBus é um artefato de software que permite que um componente publique notificações (eventos) indicando a ocorrência de algo potencialmente relevante para outros. Então, esse “evento” pode ser “escutado” por esses outros componentes que irão realizar alguma ação correlata.
Por exemplo:
- O “estoque” pode publicar um evento indicando que um produto chegou a seu nível mínimo. Esse evento poderia ser escutado por “compras” que iniciaria o processo para reposição;
- O “estoque” poderia publicar um evento indicando que um produto foi reposto. Assim, “vendas” poderia iniciar um processo de envio de emails notificando clientes interessados no produto;
- O “faturamento” poderia publicar um evento indicando que teve problemas para processar um pagamento. Isso poderia fazer com que “relacionamento” enviasse um email para o cliente indicando possíveis alternativas.
Em uma boa implementação, é irrelevante para o componente que está publicando eventos quais componentes os estão “escutando”. Da mesma forma, é irrelevante para os componentes que estão escutando qual foi o componente que realizou a publicação.
Um EventBus facilita a comunicação assíncrona e desacoplada entre os componentes de uma aplicação. Também facilita a escalabilidade do sistema.
Por que usar RabbitMQ para implementar um EventBus?
RabbitMQ é um poderoso mecanismo de mensageria. Ele permite a adoção de padrões flexíveis para troca de mensagens entre componentes de software. É extremamente sólido (escrito em Erlang) e amplamente adotado pela indústria.
Não reinvente a roda!
RabbitMQ facilita consideravelmente a troca de mensagens em um ambiente de sistema distribuído.
Usando RabbitMQ com Docker
Se você, como eu, não gosta da ideia de ficar instalando frameworks e serviços em seu computador, recomendo que utilize RabbitMQ com Docker. É simples assim:
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Feito isso, você terá o RabbitMQ rodando em seu computador com a ferramenta de administração (em http://localhost:15672).
Se ainda não conhece RabbitMQ, gaste algum tempo em algum tutorial na Internet antes de continuar.
A anatomia de um evento
Um evento é uma mensagem indicando que algo ocorreu. Essas mensagens tem um nome que, geralmente, está no passado.
Na minha implementação, decidi criar uma classe abstrata para os eventos.
public abstract class Event { protected Event() { Id = Guid.NewGuid(); CreatedAt = DateTime.UtcNow; } public Guid Id { get; } public DateTime CreatedAt { get; } }
Essa abordagem define um conjunto mínimo de dados que todo evento deverá possuir para poder ser processado (aqui, optei por um Id e por um registro do momento da ocorrência).
Cada evento que um componente for publicar no EventBus deverá ter uma classe especializada.
public class PaymentFailureEvent : Event { public string TransactionId { get; } public string Description { get; } public PaymentFailureEvent( string transactionId, string description ) { TransactionId = transactionId; Description = description; } }
Como um evento é ouvido por um componente
Para ouvir um evento que será publicado no EventBus, um componente precisa definir um handler. Optei por duas interfaces.
using System.Threading.Tasks; namespace BuildingBlocks.Messaging.RabbitMQ.EventBus { public interface IEventHandler { Task Handle(TEvent @event); } public interface IDynamicEventHandler { Task Handle(dynamic @event); } }
A ideia da segunda interface é permitir que o componente que está “ouvindo” não precise ter uma classe específica para cada evento.
public class PaymentFailureEventHandler : IDynamicEventHandler { public Task Handle(dynamic @event) { string transactionId = @event.TransactionId; // business logic return Task.CompletedTask; } }
É importante indicar que, em um ambiente de microsserviços, considero amplamente recomendável que cada microsserviço defina suas classes “evento” – isso reduz o acoplamento (se as classes tiverem o mesmo nome, e os mesmos elementos, a solução funciona).
A anatomia de um EventBus
O EventBus é um building block que deverá ser consumido por todos os componentes da aplicação (exemplo, microsserviços)
Eis sua definição básica:
public interface IEventBus { void Publish(Event @event); void Subscribe<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler; void Subscribe(string eventName) where TEventHandler : IDynamicEventHandler; void Unsubscribe<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler; void Unsubscribe(string eventName) where TEventHandler : IDynamicEventHandler; }
Basicamente, tenho um método para a publicação de eventos, dois para que “consumidores” possam manifestar seu interesse por um determinado tipo e, finalmente, dois para cancelar o interesse (desassinar).
Por definição, nessa implementação, utilizo o nome do tipo para inferir o nome evento nas implementações com tipagem forte.
A implementação do EventBus
Agora, vamos a um pouco mais de código.
public class RabbitMQEventBus : IDisposable, IEventBus { private readonly ILifetimeScope _autofac; private readonly PersisterConnection _connection; private readonly ILogger _logger; private readonly SubscriptionsManager _subscriptionManager; private string _queueName; private IModel _consumerChannel; public string ExchangeName { get; } public RabbitMQEventBus( PersisterConnection connection, ILogger logger, ILifetimeScope autofac, string exchangeName = "DefaultExchange" ) { _connection = connection ?? throw new ArgumentNullException(nameof(connection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _autofac = autofac ?? throw new ArgumentNullException(nameof(autofac)); ExchangeName = exchangeName; _subscriptionManager = new SubscriptionsManager(); _subscriptionManager.OnEventRemoved += OnSubscriptionManagerEventRemoved; _subscriptionManager.OnEventAdded += OnSubscriptionManagerEventAdded; _consumerChannel = CreateConsumerChannel(); } void OnSubscriptionManagerEventAdded(object _, string eventName) { if (!_connection.IsConnected) { _connection.TryConnect(); } using (var channel = _connection.CreateModel()) { channel.QueueBind( queue: _queueName, exchange: ExchangeName, routingKey: eventName ); } } void OnSubscriptionManagerEventRemoved(object _, string eventName) { if (!_connection.IsConnected) { _connection.TryConnect(); } using (var channel = _connection.CreateModel()) { channel.QueueUnbind( queue: _queueName, exchange: ExchangeName, routingKey: eventName ); if (!_subscriptionManager.IsEmpty) return; _queueName = string.Empty; _consumerChannel.Close(); } } public void Publish(Event @event) { if (!_connection.IsConnected) { _connection.TryConnect(); } var policy = Policy.Handle() .Or() .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _connection.CreateModel()) { var eventName = @event.GetType() .Name; channel.ExchangeDeclare(exchange: ExchangeName, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { channel.BasicPublish(exchange: ExchangeName, routingKey: eventName, basicProperties: null, body: body); }); } } public void Subscribe<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler { var eventName = typeof(TEvent).Name; _subscriptionManager.AddSubscription<TEvent, TEventHandler>(); } public void Subscribe(string eventName) where TEventHandler : IDynamicEventHandler { _subscriptionManager.AddSubscription(eventName); } public void Unsubscribe<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler { _subscriptionManager.RemoveSubscription<TEvent, TEventHandler>(); } public void Unsubscribe(string eventName) where TEventHandler : IDynamicEventHandler { _subscriptionManager.RemoveSubscription(eventName); } private IModel CreateConsumerChannel() { if (!_connection.IsConnected) { _connection.TryConnect(); } var channel = _connection.CreateModel(); channel.ExchangeDeclare(exchange: ExchangeName, type: "direct"); _queueName = channel.QueueDeclare().QueueName; var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await HandleEvent(eventName, message); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; } private async Task HandleEvent(string eventName, string message) { if (!_subscriptionManager.HasSubscriptionsForEvent(eventName)) { return; } using (var scope = _autofac.BeginLifetimeScope("RabbitMQEventBus")) { var subscriptions = _subscriptionManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { await subscription.Handle(message, scope); } } } public void Dispose() { _consumerChannel?.Dispose(); } }
Algumas notas:
- Optei por manter uma topologia simples. Tenho um único Exchange fazendo encaminhamento direct para as queues dos assinantes. Cada assinante possui sua Queue. Há um Binding para cada tipo de evento que a Queue assina.
- Utilizei o modelo de conexão persistente indicado em um post anterior
- Quando o processo se encerra (fechando o canal), a queue também é encerrada (eventos só são processados quando o consumer está em operação).
- Utilzo AutoFac para fazer a carga do handler. Gosto muito da abordagem por permitir fácil injeção de dependências.
As assinaturas no EventBus são mantidas por um objeto auxiliar (SubscriptionManager)
public class SubscriptionsManager { private readonly IDictionary<string, IList> _handlers = new Dictionary<string, IList>(); public bool IsEmpty => !_handlers.Keys.Any(); public event EventHandler OnEventRemoved; public event EventHandler OnEventAdded; public void AddSubscription(string eventName) where TEventHandler : IDynamicEventHandler { AddSubscription( typeof(TEventHandler), eventName ); } public void AddSubscription<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler { AddSubscription( typeof(TEventHandler), typeof(TEvent).Name, typeof(TEvent) ); } private void AddSubscription(Type handlerType, string eventName, Type eventType = null) { if (!HasSubscriptionsForEvent(eventName)) { _handlers.Add(eventName, new List()); OnEventAdded?.Invoke(this, eventName); } if (_handlers[eventName].Any(s => s.HandlerType == handlerType)) { throw new ArgumentException( $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType)); } _handlers[eventName].Add(Subscription.New( handlerType, eventType) ); } public void RemoveSubscription(string eventName) where TEventHandler : IDynamicEventHandler { var handlerToRemove = FindSubscriptionToRemove(eventName); RemoveSubscription(eventName, handlerToRemove); } public void RemoveSubscription<TEvent, TEventHandler>() where TEvent : Event where TEventHandler : IEventHandler { var eventName = typeof(TEvent).Name; var handlerToRemove = FindSubscriptionToRemove(eventName); RemoveSubscription(eventName, handlerToRemove); } Subscription FindSubscriptionToRemove(string eventName) { if (!HasSubscriptionsForEvent(eventName)) { return null; } return _handlers[eventName].SingleOrDefault(s => s.HandlerType == typeof(TEventHandler)); } private void RemoveSubscription( string eventName, Subscription subsToRemove ) { if (subsToRemove == null) return; _handlers[eventName].Remove(subsToRemove); if (_handlers[eventName].Any()) return; _handlers.Remove(eventName); OnEventRemoved?.Invoke(this, eventName); } public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); public IEnumerable GetHandlersForEvent() where TEvent : Event { var key = typeof(TEvent).Name; return GetHandlersForEvent(key); } public IEnumerable GetHandlersForEvent(string eventName) => _handlers[eventName]; public Type GetEventTypeByName(string eventName) => _handlers[eventName] ?.FirstOrDefault(handler => !handler.IsDynamic) ?.EventType; }
Cada assinatura é mantida em um objeto especialista com acesso ao Handler
public class Subscription { public bool IsDynamic => EventType == null; public Type HandlerType { get; } public Type EventType { get; } private Subscription(Type handlerType, Type eventType = null) { HandlerType = handlerType; EventType = eventType; } public async Task Handle(string message, ILifetimeScope scope) { if (IsDynamic) { dynamic eventData = JObject.Parse(message); if (scope.ResolveOptional(HandlerType) is IDynamicEventHandler handler) await handler.Handle(eventData); } else { var eventData = JsonConvert.DeserializeObject(message, EventType); var handler = scope.ResolveOptional(HandlerType); var concreteType = typeof(IEventHandler<>).MakeGenericType(EventType); await(Task) concreteType.GetMethod("Handle") .Invoke(handler, new[] { eventData }); } } public static Subscription New(Type handlerType, Type eventType) => new Subscription(handlerType, eventType); }
Concluindo
Em um ambiente de múltiplos componentes, sobretudo Microsserviços, o EventBus é um componente essencial para facilitar escalabilidade e resiliência. RabbitMQ é uma excelente opção para implementação do EventBus.
É importante atentar que a má utilização do EventBus pode gerar algumas dores de cabeça, sobretudo em cenários onde os microsserviços precisem colaborar para completar uma atividade atômica… Mas, sobre SAGAS falaremos em outra oportunidade.
Capa: Joanna Kosinska
Gostaria de perguntar se seria possível disponibilizar a demo deste post
O código esta disponível no GIT?