Implementando um EventBus com RabbitMQ em C#

Neste post mostro como implementar um EventBus, utilizando RabbitMQ, em C#.

Esta implementação é uma adaptação da entregue pela Microsoft no projeto eShopOnContainers (semelhanças não são mera coincidência).

O que é um EventBus?

De forma simplificada, um EventBus é um artefato de software que permite que um componente publique notificações (eventos) indicando a ocorrência de algo potencialmente relevante para outros. Então, esse “evento” pode ser “escutado” por esses outros componentes que irão realizar alguma ação correlata.

Por exemplo:

  • O “estoque” pode publicar um evento indicando que um produto chegou a seu nível mínimo. Esse evento poderia ser escutado por “compras” que iniciaria o processo para reposição;
  • O “estoque” poderia publicar um evento indicando que um produto foi reposto. Assim, “vendas” poderia iniciar um processo de envio de emails notificando clientes interessados no produto;
  • O “faturamento” poderia publicar um evento indicando que teve problemas para processar um pagamento. Isso poderia fazer com que “relacionamento” enviasse um email para o cliente indicando possíveis alternativas.

Em uma boa implementação, é irrelevante para o componente que está publicando eventos quais componentes os estão “escutando”. Da mesma forma, é irrelevante para os componentes que estão escutando qual foi o componente que realizou a publicação.

Um EventBus facilita a comunicação assíncrona e desacoplada entre os componentes de uma aplicação. Também facilita a escalabilidade do sistema.

Por que usar RabbitMQ para implementar um EventBus?

RabbitMQ é um poderoso mecanismo de mensageria. Ele permite a adoção de padrões flexíveis para troca de mensagens entre componentes de software. É extremamente sólido (escrito em Erlang) e amplamente adotado pela indústria.

Não reinvente a roda!

RabbitMQ facilita consideravelmente a troca de mensagens em um ambiente de sistema distribuído.

Usando RabbitMQ com Docker

Se você, como eu, não gosta da ideia de ficar instalando frameworks e serviços em seu computador, recomendo que utilize RabbitMQ com Docker. É simples assim:

docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Feito isso, você terá o RabbitMQ rodando em seu computador com a ferramenta de administração (em http://localhost:15672).

Se ainda não conhece RabbitMQ, gaste algum tempo em algum tutorial na Internet antes de continuar.

A anatomia de um evento

Um evento é uma mensagem indicando que algo ocorreu. Essas mensagens tem um nome que, geralmente, está no passado.

Na minha implementação, decidi criar uma classe abstrata para os eventos.

public abstract class Event
{
    protected Event()
    {
        Id = Guid.NewGuid();
        CreatedAt = DateTime.UtcNow;
    }

    public Guid Id { get; }
    public DateTime CreatedAt { get; }
}

Essa abordagem define um conjunto mínimo de dados que todo evento deverá possuir para poder ser processado (aqui, optei por um Id e por um registro do momento da ocorrência).

Cada evento que um componente for publicar no EventBus deverá ter uma classe especializada.

public class PaymentFailureEvent : Event
{
    public string TransactionId { get; }
    public string Description { get; }

    public PaymentFailureEvent(
        string transactionId,
        string description
        )
    {
        TransactionId = transactionId;
        Description = description;
    }
}

Como um evento é ouvido por um componente

Para ouvir um evento que será publicado no EventBus, um componente precisa definir um handler. Optei por duas interfaces.

using System.Threading.Tasks;

namespace BuildingBlocks.Messaging.RabbitMQ.EventBus
{
    public interface IEventHandler
    {
        Task Handle(TEvent @event);
    }

    public interface IDynamicEventHandler
    {
        Task Handle(dynamic @event);
    }
}

A ideia da segunda interface é permitir que o componente que está “ouvindo” não precise ter uma classe específica para cada evento.

public class PaymentFailureEventHandler :
    IDynamicEventHandler
{
    public Task Handle(dynamic @event)
    {
        string transactionId = @event.TransactionId;

        // business logic 

        return Task.CompletedTask;
    }
}

É importante indicar que, em um ambiente de microsserviços, considero amplamente recomendável que cada microsserviço defina suas classes “evento” – isso reduz o acoplamento (se as classes tiverem o mesmo nome, e os mesmos elementos, a solução funciona).

A anatomia de um EventBus

O EventBus é um building block que deverá ser consumido por todos os componentes da aplicação (exemplo, microsserviços)

Eis sua definição básica:

public interface IEventBus
{
    void Publish(Event @event);

    void Subscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler;

    void Subscribe(string eventName) 
        where TEventHandler : IDynamicEventHandler;

    void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler;

    void Unsubscribe(string eventName) 
        where TEventHandler : IDynamicEventHandler;
}

Basicamente, tenho um método para a publicação de eventos, dois para que “consumidores” possam manifestar seu interesse por um determinado tipo e, finalmente, dois para cancelar o interesse (desassinar).

Por definição, nessa implementação, utilizo o nome do tipo para inferir o nome evento nas implementações com tipagem forte.

A implementação do EventBus

Agora, vamos a um pouco mais de código.

public class RabbitMQEventBus : IDisposable, IEventBus
{
    private readonly ILifetimeScope _autofac;
    private readonly PersisterConnection _connection;
    private readonly ILogger _logger;
    private readonly SubscriptionsManager _subscriptionManager;

    private string _queueName;
    private IModel _consumerChannel;

    public string ExchangeName { get; }

    public RabbitMQEventBus(
        PersisterConnection connection,
        ILogger logger,
        ILifetimeScope autofac,
        string exchangeName = "DefaultExchange"
        )
    {
            
        _connection = connection 
            ?? throw new ArgumentNullException(nameof(connection));

        _logger = logger 
            ?? throw new ArgumentNullException(nameof(logger));

        _autofac = autofac
            ?? throw new ArgumentNullException(nameof(autofac));

        ExchangeName = exchangeName;

        _subscriptionManager = new SubscriptionsManager();
        _subscriptionManager.OnEventRemoved += OnSubscriptionManagerEventRemoved;
        _subscriptionManager.OnEventAdded += OnSubscriptionManagerEventAdded;

        _consumerChannel = CreateConsumerChannel();
    }

    void OnSubscriptionManagerEventAdded(object _, string eventName)
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        using (var channel = _connection.CreateModel())
        {
            channel.QueueBind(
                queue: _queueName,
                exchange: ExchangeName,
                routingKey: eventName
                );
        }
    }

    void OnSubscriptionManagerEventRemoved(object _, string eventName)
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        using (var channel = _connection.CreateModel())
        {
            channel.QueueUnbind(
                queue: _queueName,
                exchange: ExchangeName,
                routingKey: eventName
            );

            if (!_subscriptionManager.IsEmpty) return;

            _queueName = string.Empty;
            _consumerChannel.Close();
        }
    }

    public void Publish(Event @event)
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        var policy = Policy.Handle()
            .Or()
            .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
            {
                _logger.LogWarning(ex.ToString());
            });

        using (var channel = _connection.CreateModel())
        {
            var eventName = @event.GetType()
                .Name;

            channel.ExchangeDeclare(exchange: ExchangeName,
                type: "direct");

            var message = JsonConvert.SerializeObject(@event);
            var body = Encoding.UTF8.GetBytes(message);

            policy.Execute(() =>
            {
                channel.BasicPublish(exchange: ExchangeName,
                    routingKey: eventName,
                    basicProperties: null,
                    body: body);
            });
        }
    }

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {
        var eventName = typeof(TEvent).Name;
        _subscriptionManager.AddSubscription<TEvent, TEventHandler>();
    }

    public void Subscribe(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        _subscriptionManager.AddSubscription(eventName);
    }


    public void Unsubscribe<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {
        _subscriptionManager.RemoveSubscription<TEvent, TEventHandler>();
    }

    public void Unsubscribe(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        _subscriptionManager.RemoveSubscription(eventName);
    }

    private IModel CreateConsumerChannel()
    {
        if (!_connection.IsConnected)
        {
            _connection.TryConnect();
        }

        var channel = _connection.CreateModel();
        channel.ExchangeDeclare(exchange: ExchangeName, type: "direct");
        _queueName = channel.QueueDeclare().QueueName;

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += async (model, ea) =>
        {
            var eventName = ea.RoutingKey;
            var message = Encoding.UTF8.GetString(ea.Body);

            await HandleEvent(eventName, message);
        };

        channel.BasicConsume(queue: _queueName,
            autoAck: false,
            consumer: consumer);

        channel.CallbackException += (sender, ea) =>
        {
            _consumerChannel.Dispose();
            _consumerChannel = CreateConsumerChannel();
        };

        return channel;
    }

    private async Task HandleEvent(string eventName, string message)
    {
        if (!_subscriptionManager.HasSubscriptionsForEvent(eventName))
        {
            return;
        }

        using (var scope = _autofac.BeginLifetimeScope("RabbitMQEventBus"))
        {
            var subscriptions = _subscriptionManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                await subscription.Handle(message, scope);
            }
        }
    }

    public void Dispose()
    {
        _consumerChannel?.Dispose();
    }
}

Algumas notas:

  • Optei por manter uma topologia simples. Tenho um único Exchange fazendo encaminhamento direct para as queues dos assinantes. Cada assinante possui sua Queue. Há um Binding para cada tipo de evento que a Queue assina.
  • Utilizei o modelo de conexão persistente indicado em um post anterior
  • Quando o processo se encerra (fechando o canal), a queue também é encerrada (eventos só são processados quando o consumer está em operação).
  • Utilzo AutoFac para fazer a carga do handler. Gosto muito da abordagem por permitir fácil injeção de dependências.

As assinaturas no EventBus são mantidas por um objeto auxiliar (SubscriptionManager)

public class SubscriptionsManager
{
    private readonly IDictionary<string, IList> _handlers
        = new Dictionary<string, IList>();

    public bool IsEmpty => !_handlers.Keys.Any();

    public event EventHandler OnEventRemoved;
    public event EventHandler OnEventAdded;

    public void AddSubscription(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        AddSubscription(
            typeof(TEventHandler),
            eventName
            );
    }

    public void AddSubscription<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {
        AddSubscription(
            typeof(TEventHandler),
            typeof(TEvent).Name,
            typeof(TEvent)
            );
    }

    private void AddSubscription(Type handlerType, string eventName, Type eventType = null)
    {
        if (!HasSubscriptionsForEvent(eventName))
        {
            _handlers.Add(eventName, new List());
            OnEventAdded?.Invoke(this, eventName);
        }

        if (_handlers[eventName].Any(s => s.HandlerType == handlerType))
        {
            throw new ArgumentException(
                $"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
        }

        _handlers[eventName].Add(Subscription.New(
            handlerType, eventType)
            );
    }

    public void RemoveSubscription(string eventName)
        where TEventHandler : IDynamicEventHandler
    {
        var handlerToRemove = FindSubscriptionToRemove(eventName);
        RemoveSubscription(eventName, handlerToRemove);
    }


    public void RemoveSubscription<TEvent, TEventHandler>()
        where TEvent : Event
        where TEventHandler : IEventHandler
    {

        var eventName = typeof(TEvent).Name;
        var handlerToRemove = FindSubscriptionToRemove(eventName);
        RemoveSubscription(eventName, handlerToRemove);
    }

    Subscription FindSubscriptionToRemove(string eventName)
    {
        if (!HasSubscriptionsForEvent(eventName))
        {
            return null;
        }

        return _handlers[eventName].SingleOrDefault(s => s.HandlerType == typeof(TEventHandler));
    }

    private void RemoveSubscription(
        string eventName, 
        Subscription subsToRemove
        )
    {
        if (subsToRemove == null) return;
        _handlers[eventName].Remove(subsToRemove);

        if (_handlers[eventName].Any()) return;

        _handlers.Remove(eventName);
        OnEventRemoved?.Invoke(this, eventName);
    }

    public bool HasSubscriptionsForEvent(string eventName) => 
        _handlers.ContainsKey(eventName);

    public IEnumerable GetHandlersForEvent() 
        where TEvent : Event
    {
        var key = typeof(TEvent).Name;
        return GetHandlersForEvent(key);
    }

    public IEnumerable GetHandlersForEvent(string eventName) 
        => _handlers[eventName];

    public Type GetEventTypeByName(string eventName) => _handlers[eventName]
        ?.FirstOrDefault(handler => !handler.IsDynamic)
        ?.EventType;
}

Cada assinatura é mantida em um objeto especialista com acesso ao Handler

public class Subscription
{
    public bool IsDynamic => EventType == null;
    public Type HandlerType { get; }
    public Type EventType { get;  }

    private Subscription(Type handlerType, Type eventType = null)
    {
        HandlerType = handlerType;
        EventType = eventType;
    }

    public async Task Handle(string message, ILifetimeScope scope)
    {
        if (IsDynamic)
        {
            dynamic eventData = JObject.Parse(message);
            if (scope.ResolveOptional(HandlerType) is IDynamicEventHandler handler)
                await handler.Handle(eventData);
        }
        else
        {
            var eventData = JsonConvert.DeserializeObject(message, EventType);
            var handler = scope.ResolveOptional(HandlerType);
            var concreteType = typeof(IEventHandler<>).MakeGenericType(EventType);
            await(Task) concreteType.GetMethod("Handle")
                .Invoke(handler, new[] { eventData });
        }
    }

    public static Subscription New(Type handlerType, Type eventType) =>
        new Subscription(handlerType, eventType);
}

Concluindo

Em um ambiente de múltiplos componentes, sobretudo Microsserviços, o EventBus é um componente essencial para facilitar escalabilidade e resiliência. RabbitMQ é uma excelente opção para implementação do EventBus.

É importante atentar que a má utilização do EventBus pode gerar algumas dores de cabeça, sobretudo em cenários onde os microsserviços precisem colaborar para completar uma atividade atômica… Mas, sobre SAGAS falaremos em outra oportunidade.

Capa: Joanna Kosinska

Compartilhe este insight:

Comentários

Participe deixando seu comentário sobre este artigo a seguir:

Subscribe
Notify of
guest
2 Comentários
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
José Nilson
José Nilson
5 anos atrás

Gostaria de perguntar se seria possível disponibilizar a demo deste post

Lucas
Lucas
5 anos atrás

O código esta disponível no GIT?

AUTOR

Elemar Júnior
Fundador e CEO da EximiaCo atua como tech trusted advisor ajudando empresas e profissionais a gerar mais resultados através da tecnologia.

NOVOS HORIZONTES PARA O SEU NEGÓCIO

Nosso time está preparado para superar junto com você grandes desafios tecnológicos.

Entre em contato e vamos juntos utilizar a tecnologia do jeito certo para gerar mais resultados.

Insights EximiaCo

Confira os conteúdos de negócios e tecnologia desenvolvidos pelos nossos consultores:

Arquivo

Pós-pandemia, trabalho remoto e a retenção dos profissionais de TI

CTO Consulting e Especialista em Execução em TI
2
0
Queremos saber a sua opinião, deixe seu comentáriox

A sua inscrição foi realizada com sucesso!

O link de acesso à live foi enviado para o seu e-mail. Nos vemos no dia da live.

Oferta de pré-venda!

Mentoria em
Arquitetura de Software

Práticas, padrões & técnicas para Arquitetura de Software, de maneira efetiva, com base em cenários reais para profissionais envolvidos no projeto e implantação de software.

Muito obrigado!

Deu tudo certo com seu envio!
Logo entraremos em contato

Implementando um EventBus com RabbitMQ em C#

Para se candidatar nesta turma aberta, preencha o formulário a seguir:

Implementando um EventBus com RabbitMQ em C#

Para se candidatar nesta turma aberta, preencha o formulário a seguir:

Condição especial de pré-venda: R$ 14.000,00 - contratando a mentoria até até 31/01/2023 e R$ 15.000,00 - contratando a mentoria a partir de 01/02/2023, em até 12x com taxas.

Tenho interesse nessa capacitação

Para solicitar mais informações sobre essa capacitação para a sua empresa, preencha o formulário a seguir:

Tenho interesse em conversar

Se você está querendo gerar resultados através da tecnologia, preencha este formulário que um de nossos consultores entrará em contato com você:

O seu insight foi excluído com sucesso!

O seu insight foi excluído e não está mais disponível.

O seu insight foi salvo com sucesso!

Ele está na fila de espera, aguardando ser revisado para ter sua publicação programada.

Tenho interesse em conversar

Se você está querendo gerar resultados através da tecnologia, preencha este formulário que um de nossos consultores entrará em contato com você:

Tenho interesse nessa solução

Se você está procurando este tipo de solução para o seu negócio, preencha este formulário que um de nossos consultores entrará em contato com você:

Tenho interesse neste serviço

Se você está procurando este tipo de solução para o seu negócio, preencha este formulário que um de nossos consultores entrará em contato com você:

× Precisa de ajuda?