Synchronous vs Asynchronous Communication
In synchronous communication, the sender blocks until it receives a response from the receiver. This model ensures data integrity and is commonly used in scenarios like user authentication, order processing, database queries, financial transactions, and real-time feedback systems.
Drawbacks include tight coupling, poor scalability, performance bottlenecks due to blocking, and cascading failures.
In contrast, asynchronous communication decouples components via a message broker. The sender dispatches a message and continues without waiting for a reply. This approach improves throughput, enables fault isolation, supports traffic shaping, and avoids blocking—but introduces eventual consistency and depends heavily on broker reliability.
Message Queue Options
Common message brokers include:
| Broker | Language | Protocol Support | Throughput | Latency | Reliability |
|---|---|---|---|---|---|
| RabbitMQ | Erlang | AMQP, STOMP, MQTT, etc. | Moderate | Microsecond | High |
| ActiveMQ | Java | OpenWire, AMQP, STOMP, REST, XMPP | Low | Millisecond | Medium |
| RocketMQ | Java | Custom | High | Millisecond | High |
| Kafka | Scala/Java | Custom | Very High | Sub-ms | Medium |
RabbitMQ is often chosen for its robust protocol support, management UI, and strong reliability guarantees.
Setting Up RabbitMQ
Use the management Docker image for built-in monitoring:
docker pull rabbitmq:3.12-management
Run with persistent plugin volume and custom credentials:
docker network create mq-net
docker run -d \
--name rabbitmq \
--hostname rabbitmq \
--network mq-net \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=pass \
-v mq-plugins:/plugins \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3.12-management
Key ports:
- 5672: AMQP client communication
- 15672: HTTP management UI
- 4369 / 25672: Erlang node clustering
- 5671: TLS-secured AMQP
To enable delayed messages, install the rabbitmq_delayed_message_exchange plugin:
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Core RabbitMQ Concepts
- Publisher: Sends messages.
- Consumer: Receives and processes messages.
- Queue: Holds messages until consumed.
- Exchange: Routes messages to queues based on rules.
- Binding: Links a queue to an exchange with a routing key.
- Virtual Host: Logical isolation boundary (like a namespace).
Messaging Pattersn in Go
The following examples use the github.com/streadway/amqp client.
1. Simple (Point-to-Point)
A single queue receives messages from a producer; one consumer processes them.
// simple/rabbitmq.go
package simple
import (
"log"
"github.com/streadway/amqp"
)
const url = "amqp://user:pass@localhost:5672/%2F"
type Client struct {
conn *amqp.Connection
channel *amqp.Channel
queue string
}
func New(queue string) *Client {
conn, err := amqp.Dial(url)
if err != nil {
log.Fatal("Dial failed:", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatal("Channel failed:", err)
}
return &Client{conn: conn, channel: ch, queue: queue}
}
func (c *Client) Publish(msg string) {
q, _ := c.channel.QueueDeclare(c.queue, true, false, false, false, nil)
c.channel.Publish("", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
}
func (c *Client) Consume() {
q, _ := c.channel.QueueDeclare(c.queue, true, false, false, false, nil)
msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
go func() {
for m := range msgs {
log.Printf("Received: %s", m.Body)
}
}()
<-make(chan bool)
}
Producer:
// cmd/simple-pub/main.go
package main
import "your/module/simple"
func main() {
s := simple.New("task_queue")
s.Publish("Hello from Go!")
}
Consumer:
// cmd/simple-consume/main.go
package main
import "your/module/simple"
func main() {
s := simple.New("task_queue")
s.Consume()
}
2. Work Queues (Competing Consumers)
Multiple consumers share one queue—each message goes to only one worker.
Same Client structure as above, but launch multiple consumers. RabbitMQ distributes messages in round-robin fashion by default.
3. Publish/Subscribe (Fanout)
Messages are broadcast to all bound queues using a fanout exchange.
// pubsub/client.go
func (c *Client) PublishFanout(exchange, msg string) {
c.channel.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
c.channel.Publish(exchange, "", false, false, amqp.Publishing{
Body: []byte(msg),
})
}
func (c *Client) ConsumeFanout(exchange string) {
c.channel.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
q, _ := c.channel.QueueDeclare("", false, true, true, false, nil)
c.channel.QueueBind(q.Name, "", exchange, false, nil)
msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
for m := range msgs {
log.Printf("[Fanout] %s", m.Body)
}
}
Each consumer creates its own ephemeral queue, ensuring all receive every message.
4. Routing (Direct Exchange)
Messages are routed based on an exact routing key match.
// routing/client.go
func (c *Client) PublishDirect(exchange, key, msg string) {
c.channel.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
c.channel.Publish(exchange, key, false, false, amqp.Publishing{Body: []byte(msg)})
}
func (c *Client) ConsumeDirect(exchange, key string) {
c.channel.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
q, _ := c.channel.QueueDeclare("", false, true, true, false, nil)
c.channel.QueueBind(q.Name, key, exchange, false, nil)
msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
for m := range msgs {
log.Printf("[Routing %s] %s", key, m.Body)
}
}
Example: Send logs with keys like error, info; cnosumers bind to specific keys.
5. Topics (Topic Exchange)
Uses pattern matching with wildcards:
*matches exactly one word#matches zero or more words
Example binding keys: stock.us.*, audit.#
// topic/client.go
func (c *Client) PublishTopic(exchange, pattern, msg string) {
c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
c.channel.Publish(exchange, pattern, false, false, amqp.Publishing{Body: []byte(msg)})
}
func (c *Client) ConsumeTopic(exchange, pattern string) {
c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
q, _ := c.channel.QueueDeclare("", false, true, true, false, nil)
c.channel.QueueBind(q.Name, pattern, exchange, false, nil)
msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
for m := range msgs {
log.Printf("[Topic %s] %s", pattern, m.Body)
}
}
A consumer binding to *.news receives sports.news and tech.news, but not news.sports. One binding to # receives all message.
These patterns form the foundation for building scalable, decoupled systems with RabbitMQ in Go.