Using RabbitMQ with Go: Synchronous vs Asynchronous Messaging and Common Patterns

Synchronous vs Asynchronous Communication

In synchronous communication, the sender blocks until it receives a response from the receiver. This model ensures data integrity and is commonly used in scenarios like user authentication, order processing, database queries, financial transactions, and real-time feedback systems.

Drawbacks include tight coupling, poor scalability, performance bottlenecks due to blocking, and cascading failures.

In contrast, asynchronous communication decouples components via a message broker. The sender dispatches a message and continues without waiting for a reply. This approach improves throughput, enables fault isolation, supports traffic shaping, and avoids blocking—but introduces eventual consistency and depends heavily on broker reliability.

Message Queue Options

Common message brokers include:

Broker Language Protocol Support Throughput Latency Reliability
RabbitMQ Erlang AMQP, STOMP, MQTT, etc. Moderate Microsecond High
ActiveMQ Java OpenWire, AMQP, STOMP, REST, XMPP Low Millisecond Medium
RocketMQ Java Custom High Millisecond High
Kafka Scala/Java Custom Very High Sub-ms Medium

RabbitMQ is often chosen for its robust protocol support, management UI, and strong reliability guarantees.

Setting Up RabbitMQ

Use the management Docker image for built-in monitoring:

docker pull rabbitmq:3.12-management

Run with persistent plugin volume and custom credentials:

docker network create mq-net

docker run -d \
  --name rabbitmq \
  --hostname rabbitmq \
  --network mq-net \
  -e RABBITMQ_DEFAULT_USER=user \
  -e RABBITMQ_DEFAULT_PASS=pass \
  -v mq-plugins:/plugins \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3.12-management

Key ports:

  • 5672: AMQP client communication
  • 15672: HTTP management UI
  • 4369 / 25672: Erlang node clustering
  • 5671: TLS-secured AMQP

To enable delayed messages, install the rabbitmq_delayed_message_exchange plugin:

docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Core RabbitMQ Concepts

  • Publisher: Sends messages.
  • Consumer: Receives and processes messages.
  • Queue: Holds messages until consumed.
  • Exchange: Routes messages to queues based on rules.
  • Binding: Links a queue to an exchange with a routing key.
  • Virtual Host: Logical isolation boundary (like a namespace).

Messaging Pattersn in Go

The following examples use the github.com/streadway/amqp client.

1. Simple (Point-to-Point)

A single queue receives messages from a producer; one consumer processes them.

// simple/rabbitmq.go
package simple

import (
	"log"
	"github.com/streadway/amqp"
)

const url = "amqp://user:pass@localhost:5672/%2F"

type Client struct {
	conn    *amqp.Connection
	channel *amqp.Channel
	queue   string
}

func New(queue string) *Client {
	conn, err := amqp.Dial(url)
	if err != nil {
		log.Fatal("Dial failed:", err)
	}
	ch, err := conn.Channel()
	if err != nil {
		log.Fatal("Channel failed:", err)
	}
	return &Client{conn: conn, channel: ch, queue: queue}
}

func (c *Client) Publish(msg string) {
	q, _ := c.channel.QueueDeclare(c.queue, true, false, false, false, nil)
	c.channel.Publish("", q.Name, false, false, amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(msg),
	})
}

func (c *Client) Consume() {
	q, _ := c.channel.QueueDeclare(c.queue, true, false, false, false, nil)
	msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)

	go func() {
		for m := range msgs {
			log.Printf("Received: %s", m.Body)
		}
	}()
	<-make(chan bool)
}

Producer:

// cmd/simple-pub/main.go
package main

import "your/module/simple"

func main() {
	s := simple.New("task_queue")
	s.Publish("Hello from Go!")
}

Consumer:

// cmd/simple-consume/main.go
package main

import "your/module/simple"

func main() {
	s := simple.New("task_queue")
	s.Consume()
}

2. Work Queues (Competing Consumers)

Multiple consumers share one queue—each message goes to only one worker.

Same Client structure as above, but launch multiple consumers. RabbitMQ distributes messages in round-robin fashion by default.

3. Publish/Subscribe (Fanout)

Messages are broadcast to all bound queues using a fanout exchange.

// pubsub/client.go
func (c *Client) PublishFanout(exchange, msg string) {
	c.channel.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
	c.channel.Publish(exchange, "", false, false, amqp.Publishing{
		Body: []byte(msg),
	})
}

func (c *Client) ConsumeFanout(exchange string) {
	c.channel.ExchangeDeclare(exchange, "fanout", true, false, false, false, nil)
	q, _ := c.channel.QueueDeclare("", false, true, true, false, nil)
	c.channel.QueueBind(q.Name, "", exchange, false, nil)

	msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
	for m := range msgs {
		log.Printf("[Fanout] %s", m.Body)
	}
}

Each consumer creates its own ephemeral queue, ensuring all receive every message.

4. Routing (Direct Exchange)

Messages are routed based on an exact routing key match.

// routing/client.go
func (c *Client) PublishDirect(exchange, key, msg string) {
	c.channel.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
	c.channel.Publish(exchange, key, false, false, amqp.Publishing{Body: []byte(msg)})
}

func (c *Client) ConsumeDirect(exchange, key string) {
	c.channel.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
	q, _ := c.channel.QueueDeclare("", false, true, true, false, nil)
	c.channel.QueueBind(q.Name, key, exchange, false, nil)

	msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
	for m := range msgs {
		log.Printf("[Routing %s] %s", key, m.Body)
	}
}

Example: Send logs with keys like error, info; cnosumers bind to specific keys.

5. Topics (Topic Exchange)

Uses pattern matching with wildcards:

  • * matches exactly one word
  • # matches zero or more words

Example binding keys: stock.us.*, audit.#

// topic/client.go
func (c *Client) PublishTopic(exchange, pattern, msg string) {
	c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
	c.channel.Publish(exchange, pattern, false, false, amqp.Publishing{Body: []byte(msg)})
}

func (c *Client) ConsumeTopic(exchange, pattern string) {
	c.channel.ExchangeDeclare(exchange, "topic", true, false, false, false, nil)
	q, _ := c.channel.QueueDeclare("", false, true, true, false, nil)
	c.channel.QueueBind(q.Name, pattern, exchange, false, nil)

	msgs, _ := c.channel.Consume(q.Name, "", true, false, false, false, nil)
	for m := range msgs {
		log.Printf("[Topic %s] %s", pattern, m.Body)
	}
}

A consumer binding to *.news receives sports.news and tech.news, but not news.sports. One binding to # receives all message.

These patterns form the foundation for building scalable, decoupled systems with RabbitMQ in Go.

Tags: Go RabbitMQ Message Queue AMQP Asynchronous Messaging

Posted on Thu, 07 May 2026 15:57:29 +0000 by daleks