1. Guaranteed Delivery
RabbitMQ offers two callbacks to ensure a message is never lost on its way from publisher to consumer.
| Checkpoint | Callback | Trigger |
|---|---|---|
| producer → exchange | ConfirmCallback |
broker ack/nack |
| exchange → queue | ReturnCallback |
unroutable message |
1.1 Publisher Confirms (ConfirmCallback)
Spring XML snippet
<rabbit:connection-factory id="cf"
publisher-confirms="true"
publisher-returns="true" />
<rabbit:template id="template" connection-factory="cf" />
<rabbit:queue name="q.confirm" />
<rabbit:direct-exchange name="x.confirm">
<rabbit:bindings>
<rabbit:binding queue="q.confirm" key="rk.confirm" />
</rabbit:bindings>
</rabbit:direct-exchange>
Java test
@Test
public void confirmDemo() {
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Broker received the message");
} else {
log.error("Broker rejected: {}", cause);
// retry or alert
}
});
template.convertAndSend("x.confirm", "rk.confirm", "payload");
}
1.2 Unroutable Messages (ReturnCallback)
@Test
public void returnDemo() {
template.setMandatory(true); // do NOT drop unroutable
template.setReturnCallback((msg, code, text, ex, rk) -> {
log.warn("Message {} returned: {} - {}", msg, code, text);
});
template.convertAndSend("missing.exchange", "rk", "payload");
}
2. Consumer Acknowledgements
| Mode | Attribute | Behaviour |
|---|---|---|
| auto | acknowledge="none" |
message removed as soon as delivered |
| manual | acknowledge="manual" |
application decides when to ack/nack |
Spring configuration
<rabbit:listener-container connection-factory="cf"
acknowledge="manual"
prefetch="1">
<rabbit:listener ref="myListener" queue-names="q.confirm" />
</rabbit:listener-container>
Listener implementation
@Component
public class MyListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message m, Channel ch) throws IOException {
long tag = m.getMessageProperties().getDeliveryTag();
try {
process(m);
ch.basicAck(tag, false); // success
} catch (Exception ex) {
ch.basicNack(tag, false, true); // requeue = true
}
}
}
3. Consumer-side Flow Control
Limit how many un-acked messages a consumer can hold at once.
<rabbit:listener-container connection-factory="cf"
acknowledge="manual"
prefetch="10"> <!-- max 10 in-flight -->
...
</rabbit:listener-container>
4. Time-to-Live (TTL)
TTL can be declared on either the queue or the individual message; whichever expires first wins.
Queue-level TTL
<rabbit:queue name="q.ttl">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="60000" value-type="java.lang.Integer"/>
</rabbit:queue-arguments>
</rabbit:queue>
Message-level TTL
MessagePostProcessor mpp = msg -> {
msg.getMessageProperties().setExpiration("5000"); // 5 s
return msg;
};
template.convertAndSend("x.ttl", "rk", "payload", mpp);
5. Dead-Letter Handling (DLX)
A message becomes a dead letter when:
- TTL exceeded
- Rjeected with
requeue=false - Queue overflow (
x-max-length)
Declare a DLX topology
<!-- normal queue -->
<rabbit:queue name="q.biz">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="dlx"/>
<entry key="x-dead-letter-routing-key" value="failed"/>
<entry key="x-message-ttl" value="30000"/>
<entry key="x-max-length" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- dead-letter exchange & queue -->
<rabbit:topic-exchange name="dlx"/>
<rabbit:queue name="q.dlx"/>
<rabbit:bindings>
<rabbit:binding exchange="dlx" pattern="failed" queue="q.dlx"/>
</rabbit:bindings>
6. Delayed Retry via TTL + DLX
Simulate a delayed queue by letting messages expire in a normal queue and then route them to the DLX where the real consumer is listening.
Producer
template.convertAndSend("order.exchange",
"order.create",
"orderId=1234",
m -> {
m.getMessageProperties().setExpiration("1800000"); // 30 min
return m;
});
Consumer bound to order.dlx receives the message only after 30 minutes and can cancel the unpaid order.
7. Idempotency & Deduplication
Consume the same message many times without side effects.
Common pattern:
- Attach a unique id (UUID) to every message.
- Store processed ids in a cache/database.
- Skip duplicates.
@Component
public class IdempotentListener implements ChannelAwareMessageListener {
private final Set<String> seen = ConcurrentHashMap.newKeySet();
public void onMessage(Message m, Channel ch) throws IOException {
String id = m.getMessageProperties().getMessageId();
if (seen.add(id)) { // returns false if already present
process(m);
}
ch.basicAck(m.getMessageProperties().getDeliveryTag(), false);
}
}
8. Clustering
8.1 Single-node Multi-instance Setup
Start two nodes on one host:
# node 1
RABBITMQ_NODE_PORT=5673 \
RABBITMQ_NODENAME=rabbit@node1 \
rabbitmq-server -detached
# node 2
RABBITMQ_NODE_PORT=5674 \
RABBITMQ_NODENAME=rabbit@node2 \
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" \
rabbitmq-server -detached
# join cluster
rabbitmqctl -n rabbit@node2 stop_app
rabbitmqctl -n rabbit@node2 reset
rabbitmqctl -n rabbit@node2 join_cluster rabbit@node1
rabbitmqctl -n rabbit@node2 start_app
Verify:
rabbitmqctl cluster_status -n rabbit@node1
8.2 High-Availability Queues (Mirrored)
Enable policy via CLI:
rabbitmqctl set_policy ha-all ".*" '{"ha-mode":"all"}'
or via web UI: Admin → Policies → add policy.
8.3 Load Balancer (HAProxy)
Minimal HAProxy configuration
listen rabbit_cluster
bind *:5672
mode tcp
balance roundrobin
server r1 127.0.0.1:5673 check
server r2 127.0.0.1:5674 check
listen stats
bind *:8100
mode http
stats enable
stats uri /stats
Start HAProxy:
haproxy -f /etc/haproxy/rabbit.cfg
Clients now connect to localhost:5672; HAProxy spreads connnections across the cluster nodes.