RabbitMQ supports four exchange types: Direct, Fenout, Topic, and Header. This article demonstrates a C# implementation using the Direct exchange.
Establish a connection to RabbitMQ using the following helper class:
public class RabbitMqConnector
{
public IConnection EstablishConnection()
{
var connectionSettings = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
return connectionSettings.CreateConnection();
}
}
Define a model for log messages:
public class LogEntry
{
public string Severity { get; set; }
public byte[] Content { get; set; }
}
Create a publisher that sends messages through a Direct exchange:
public class DirectExchangePublisher
{
public void SendLogMessages()
{
var connector = new RabbitMqConnector();
using (var connection = connector.EstablishConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "AllLogsQueue", durable: true, exclusive: false, autoDelete: false);
channel.QueueDeclare(queue: "ErrorLogsQueue", durable: true, exclusive: false, autoDelete: false);
channel.ExchangeDeclare(exchange: "LogsDirectExchange", type: ExchangeType.Direct, durable: true);
string[] severityLevels = { "debug", "info", "warning", "error" };
foreach (var level in severityLevels)
{
channel.QueueBind(queue: "AllLogsQueue", exchange: "LogsDirectExchange", routingKey: level);
}
channel.QueueBind(queue: "ErrorLogsQueue", exchange: "LogsDirectExchange", routingKey: "error");
var logEntries = new List<LogEntry>();
for (int i = 1; i <= 100; i++)
{
string severity;
switch (i % 4)
{
case 0: severity = "debug"; break;
case 1: severity = "info"; break;
case 2: severity = "warning"; break;
default: severity = "error"; break;
}
logEntries.Add(new LogEntry
{
Severity = severity,
Content = Encoding.UTF8.GetBytes($"{severity} message {i}")
});
}
foreach (var entry in logEntries)
{
channel.BasicPublish(exchange: "LogsDirectExchange", routingKey: entry.Severity, body: entry.Content);
Console.WriteLine($"Sent: {Encoding.UTF8.GetString(entry.Content)}");
}
}
}
}
This publisher distributes 100 log messages evenly across four severity levels. Message are routed to queues based on their routing keys: all messages go to AllLogsQueue, while error messages also go to ErrorLogsQueue.
Implement a consumer to process messages from the AllLogsQueue:
public class DirectExchangeConsumer
{
public void ProcessLogMessages()
{
var connectionSettings = new ConnectionFactory
{
HostName = "localhost",
UserName = "guest",
Password = "guest"
};
using (var connection = connectionSettings.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "AllLogsQueue", durable: true, exclusive: false, autoDelete: false);
channel.ExchangeDeclare(exchange: "LogsDirectExchange", type: ExchangeType.Direct, durable: true);
string[] severityLevels = { "debug", "info", "warning", "error" };
foreach (var level in severityLevels)
{
channel.QueueBind(queue: "AllLogsQueue", exchange: "LogsDirectExchange", routingKey: level);
}
var messageHandler = new EventingBasicConsumer(channel);
messageHandler.Received += (model, eventArgs) =>
{
var messageContent = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
Console.WriteLine($"Processed: [{messageContent}]");
};
channel.BasicConsume(queue: "AllLogsQueue", autoAck: true, consumer: messageHandler);
}
}
}
When executed, the consumer retrieves and processes all messages from the queue. Declarations of exchanges and queues are idempotent, so repeated calls do not create duplicates.