Implementing Direct Exchanges in RabbitMQ with C#

RabbitMQ supports four exchange types: Direct, Fenout, Topic, and Header. This article demonstrates a C# implementation using the Direct exchange.

Establish a connection to RabbitMQ using the following helper class:

public class RabbitMqConnector
{
    public IConnection EstablishConnection()
    {
        var connectionSettings = new ConnectionFactory
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "guest"
        };
        return connectionSettings.CreateConnection();
    }
}

Define a model for log messages:

public class LogEntry
{
    public string Severity { get; set; }
    public byte[] Content { get; set; }
}

Create a publisher that sends messages through a Direct exchange:

public class DirectExchangePublisher
{
    public void SendLogMessages()
    {
        var connector = new RabbitMqConnector();
        using (var connection = connector.EstablishConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "AllLogsQueue", durable: true, exclusive: false, autoDelete: false);
            channel.QueueDeclare(queue: "ErrorLogsQueue", durable: true, exclusive: false, autoDelete: false);

            channel.ExchangeDeclare(exchange: "LogsDirectExchange", type: ExchangeType.Direct, durable: true);

            string[] severityLevels = { "debug", "info", "warning", "error" };
            foreach (var level in severityLevels)
            {
                channel.QueueBind(queue: "AllLogsQueue", exchange: "LogsDirectExchange", routingKey: level);
            }
            channel.QueueBind(queue: "ErrorLogsQueue", exchange: "LogsDirectExchange", routingKey: "error");

            var logEntries = new List<LogEntry>();
            for (int i = 1; i <= 100; i++)
            {
                string severity;
                switch (i % 4)
                {
                    case 0: severity = "debug"; break;
                    case 1: severity = "info"; break;
                    case 2: severity = "warning"; break;
                    default: severity = "error"; break;
                }
                logEntries.Add(new LogEntry
                {
                    Severity = severity,
                    Content = Encoding.UTF8.GetBytes($"{severity} message {i}")
                });
            }

            foreach (var entry in logEntries)
            {
                channel.BasicPublish(exchange: "LogsDirectExchange", routingKey: entry.Severity, body: entry.Content);
                Console.WriteLine($"Sent: {Encoding.UTF8.GetString(entry.Content)}");
            }
        }
    }
}

This publisher distributes 100 log messages evenly across four severity levels. Message are routed to queues based on their routing keys: all messages go to AllLogsQueue, while error messages also go to ErrorLogsQueue.

Implement a consumer to process messages from the AllLogsQueue:

public class DirectExchangeConsumer
{
    public void ProcessLogMessages()
    {
        var connectionSettings = new ConnectionFactory
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "guest"
        };

        using (var connection = connectionSettings.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "AllLogsQueue", durable: true, exclusive: false, autoDelete: false);
            channel.ExchangeDeclare(exchange: "LogsDirectExchange", type: ExchangeType.Direct, durable: true);

            string[] severityLevels = { "debug", "info", "warning", "error" };
            foreach (var level in severityLevels)
            {
                channel.QueueBind(queue: "AllLogsQueue", exchange: "LogsDirectExchange", routingKey: level);
            }

            var messageHandler = new EventingBasicConsumer(channel);
            messageHandler.Received += (model, eventArgs) =>
            {
                var messageContent = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
                Console.WriteLine($"Processed: [{messageContent}]");
            };
            channel.BasicConsume(queue: "AllLogsQueue", autoAck: true, consumer: messageHandler);
        }
    }
}

When executed, the consumer retrieves and processes all messages from the queue. Declarations of exchanges and queues are idempotent, so repeated calls do not create duplicates.

Tags: RabbitMQ Direct Exchange C# Message Queue logging

Posted on Mon, 11 May 2026 05:50:18 +0000 by F.Danials