Maven Dependencies and Configuration
Add the following dependency to integrate RabbitMQ functionality:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.2.7</version>
</dependency>
Configure connection parameters in application.yml:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
Exchange Types Implementation
Fanout Exchange Pattern
Fanout exchanges broadcast messages to all bound queues simultaneously:
@Configuration
public class BroadcastConfiguration {
private static final String QUEUE_ONE = "broadcast.queue.one";
private static final String QUEUE_TWO = "broadcast.queue.two";
private static final String EXCHANGE_NAME = "broadcast.exchange";
@Bean
public Queue firstQueue() {
return new Queue(QUEUE_ONE);
}
@Bean
public Queue secondQueue() {
return new Queue(QUEUE_TWO);
}
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
@Bean
public Binding linkFirstQueue() {
return BindingBuilder.bind(firstQueue()).to(broadcastExchange());
}
@Bean
public Binding linkSecondQueue() {
return BindingBuilder.bind(secondQueue()).to(broadcastExchange());
}
}
Testing message distribution:
@Test
public void testBroadcastMessaging() {
rabbitTemplate.convertAndSend(BroadcastConfiguration.EXCHANGE_NAME, "", "broadcast-message");
}
Direct Exchange Routing
Direct exchanges route messages based on exact routing key matches:
@Configuration
public class RoutingConfiguration {
private static final String PRIMARY_QUEUE = "routing.queue.primary";
private static final String SECONDARY_QUEUE = "routing.queue.secondary";
private static final String ROUTING_EXCHANGE = "routing.exchange";
private static final String PRIMARY_ROUTE = "route.primary";
@Bean
public Queue primaryQueue() {
return new Queue(PRIMARY_QUEUE);
}
@Bean
public Queue secondaryQueue() {
return new Queue(SECONDARY_QUEUE);
}
@Bean
public DirectExchange routingExchange() {
return new DirectExchange(ROUTING_EXCHANGE);
}
@Bean
public Binding bindPrimary() {
return BindingBuilder.bind(primaryQueue()).to(routingExchange()).with(PRIMARY_ROUTE);
}
@Bean
public Binding bindSecondary() {
return BindingBuilder.bind(secondaryQueue()).to(routingExchange()).with("");
}
}
Routing tests:
@Test
public void testDirectRouting() {
rabbitTemplate.convertAndSend(RoutingConfiguration.ROUTING_EXCHANGE, "", "default-route-message");
rabbitTemplate.convertAndSend(RoutingConfiguration.ROUTING_EXCHANGE, RoutingConfiguration.PRIMARY_ROUTE, "primary-route-message");
rabbitTemplate.convertAndSend(RoutingConfiguration.ROUTING_EXCHANGE, "unknown", "no-match-message");
}
Topic Exchange Patterns
Topic exchanges support wildcard-based routing using * (single word) and # (multiple words):
@Configuration
public class TopicConfiguration {
private static final String FIRST_TOPIC_QUEUE = "topic.queue.first";
private static final String SECOND_TOPIC_QUEUE = "topic.queue.second";
private static final String TOPIC_EXCHANGE = "pattern.exchange";
private static final String SINGLE_WORD_PATTERN = "pattern.route.*";
private static final String MULTI_WORD_PATTERN = "pattern.route.#";
@Bean
public Queue patternQueueOne() {
return new Queue(FIRST_TOPIC_QUEUE);
}
@Bean
public Queue patternQueueTwo() {
return new Queue(SECOND_TOPIC_QUEUE);
}
@Bean
public TopicExchange patternExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding connectFirstPattern() {
return BindingBuilder.bind(patternQueueOne()).to(patternExchange()).with(SINGLE_WORD_PATTERN);
}
@Bean
public Binding connectSecondPattern() {
return BindingBuilder.bind(patternQueueTwo()).to(patternExchange()).with(MULTI_WORD_PATTERN);
}
}
Pattern matching tests:
@Test
public void testTopicPatterns() {
rabbitTemplate.convertAndSend(TopicConfiguration.TOPIC_EXCHANGE, "pattern.route.one", "single-word-match");
rabbitTemplate.convertAndSend(TopicConfiguration.TOPIC_EXCHANGE, "pattern.route.one.two", "multi-word-match");
}
Headers Exchange Matching
Headers exchanges route messages based on message header attributes rather than routing keys:
@Configuration
public class HeaderConfiguration {
private static final String FIRST_HEADER_QUEUE = "header.queue.first";
private static final String SECOND_HEADER_QUEUE = "header.queue.second";
private static final String THIRD_HEADER_QUEUE = "header.queue.third";
private static final String HEADER_EXCHANGE = "attribute.exchange";
private static final String ATTRIBUTE_ONE = "attr.one";
private static final String ATTRIBUTE_TWO = "attr.two";
@Bean
public Queue attributeQueueOne() {
return new Queue(FIRST_HEADER_QUEUE);
}
@Bean
public Queue attributeQueueTwo() {
return new Queue(SECOND_HEADER_QUEUE);
}
@Bean
public Queue attributeQueueThree() {
return new Queue(THIRD_HEADER_QUEUE);
}
@Bean
public HeadersExchange attributeExchange() {
return new HeadersExchange(HEADER_EXCHANGE);
}
@Bean
public Binding bindAttributeOne() {
return BindingBuilder.bind(attributeQueueOne()).to(attributeExchange()).where(ATTRIBUTE_ONE).matches("valueA");
}
@Bean
public Binding bindAttributeTwo() {
return BindingBuilder.bind(attributeQueueTwo()).to(attributeExchange()).where(ATTRIBUTE_TWO).matches("valueB");
}
@Bean
public Binding bindCombinedAttributes() {
Map<String, Object> conditions = new HashMap<>();
conditions.put(ATTRIBUTE_ONE, "valueA");
conditions.put(ATTRIBUTE_TWO, "valueB");
return BindingBuilder.bind(attributeQueueThree()).to(attributeExchange()).whereAny(conditions).match();
}
}
Header-based routing tests:
@Test
public void testHeaderRouting() {
MessageProperties props = new MessageProperties();
props.setHeader(HeaderConfiguration.ATTRIBUTE_ONE, "valueA");
rabbitTemplate.convertAndSend(HeaderConfiguration.HEADER_EXCHANGE, "", new Message("header-message-A".getBytes(), props));
props.setHeader(HeaderConfiguration.ATTRIBUTE_ONE, "");
props.setHeader(HeaderConfiguration.ATTRIBUTE_TWO, "valueB");
rabbitTemplate.convertAndSend(HeaderConfiguration.HEADER_EXCHANGE, "", new Message("header-message-B".getBytes(), props));
}
Alternate Exchange Mechanism
Alternate exchanges handle unroutable messages by redirecting them to backup queues:
@Configuration
public class FallbackConfiguration {
private static final String BACKUP_QUEUE_NAME = "fallback.queue";
private static final String BACKUP_EXCHANGE_NAME = "fallback.exchange";
private static final String PRIMARY_QUEUE_NAME = "primary.queue";
private static final String PRIMARY_EXCHANGE_NAME = "primary.exchange";
private static final String VALID_ROUTE = "valid.route";
@Bean
public Queue fallbackQueue() {
return new Queue(BACKUP_QUEUE_NAME, true, false, false);
}
@Bean
public Queue primaryQueue() {
return new Queue(PRIMARY_QUEUE_NAME, true, false, false);
}
@Bean
public DirectExchange primaryExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
return new DirectExchange(PRIMARY_EXCHANGE_NAME, true, false, arguments);
}
@Bean
public FanoutExchange fallbackExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME, true, false);
}
@Bean
public Binding connectPrimaryQueue() {
return BindingBuilder.bind(primaryQueue()).to(primaryExchange()).with(VALID_ROUTE);
}
@Bean
public Binding connectFallbackQueue() {
return BindingBuilder.bind(fallbackQueue()).to(fallbackExchange());
}
}
Fallback testing:
@Test
public void testFallbackMechanism() {
rabbitTemplate.convertAndSend(FallbackConfiguration.PRIMARY_EXCHANGE_NAME, FallbackConfiguration.VALID_ROUTE, "routed-message");
rabbitTemplate.convertAndSend(FallbackConfiguration.PRIMARY_EXCHANGE_NAME, "invalid.route", "unrouted-message");
}
Dead Letter Exchange Processing
Dead letter exchanges handle expired, rejected, or overflow messages:
@Configuration
public class ExpirationConfiguration {
private static final String DLX_QUEUE = "dlx.queue";
private static final String DLX_EXCHANGE = "dlx.exchange";
private static final String DLX_ROUTE = "dlx.route";
private static final String REGULAR_QUEUE = "regular.queue";
private static final String REGULAR_EXCHANGE = "regular.exchange";
private static final String REGULAR_ROUTE = "regular.route";
@Bean
public Queue dlxQueue() {
return new Queue(DLX_QUEUE, true, false, false);
}
@Bean
public Queue regularQueue() {
Map<String, Object> parameters = new HashMap<>();
parameters.put("x-message-ttl", 10000);
parameters.put("x-dead-letter-exchange", DLX_EXCHANGE);
parameters.put("x-dead-letter-routing-key", DLX_ROUTE);
return new Queue(REGULAR_QUEUE, true, false, false, parameters);
}
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange(DLX_EXCHANGE, false, false);
}
@Bean
public DirectExchange regularExchange() {
return new DirectExchange(REGULAR_EXCHANGE, true, false);
}
@Bean
public Binding bindDlxQueue() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTE);
}
@Bean
public Binding bindRegularQueue() {
return BindingBuilder.bind(regularQueue()).to(regularExchange()).with(REGULAR_ROUTE);
}
}
Expiration testing:
@Test
public void testMessageExpiration() {
rabbitTemplate.convertAndSend(ExpirationConfiguration.REGULAR_EXCHANGE, ExpirationConfiguration.REGULAR_ROUTE, "expiring-message");
}
Dynamic Component Creation
Programmatically create messaging components:
@Autowired
private AmqpAdmin adminInterface;
public void setupMessagingComponents() {
String queueIdentifier = "dynamic.queue";
String exchangeIdentifier = "dynamic.exchange";
Queue dynamicQueue = new Queue(queueIdentifier, false, false, false, null);
adminInterface.declareQueue(dynamicQueue);
FanoutExchange dynamicExchange = new FanoutExchange(exchangeIdentifier, false, false, null);
adminInterface.declareExchange(dynamicExchange);
adminInterface.declareBinding(new Binding(queueIdentifier, Binding.DestinationType.QUEUE, exchangeIdentifier, "", null));
}
Message Consumption
Enable RabbitMQ processing and implement listeners:
@Component
@RabbitListener(queues = "broadcast.queue.one")
public class BroadcastConsumer {
@RabbitHandler
public void processMessage(String content) {
System.out.println("Received from broadcast.queue.one: " + content);
}
}
Message Acknowledgment Strategies
Producer Confirmation Setup
YAML conifguration for message reliability:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
Producer confirmation implementation:
@Configuration
public class ConfirmationConfiguration implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate messagingTemplate;
@PostConstruct
public void configureCallbacks() {
messagingTemplate.setReturnsCallback(this);
messagingTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlation, boolean acknowledgment, String failureReason) {
System.out.println("Producer confirmation callback triggered");
System.out.println(String.format("Correlation: %s, Ack: %s, Reason: %s", correlation, acknowledgment, failureReason));
}
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("Unroutable message callback triggered");
System.out.println(returned.toString());
}
}
Consumer Acknowledgment Handling
Manual acknowledgment implementation:
@Component
@RabbitListener(queues = "broadcast.queue.one")
public class AcknowledgingConsumer {
@RabbitHandler
public void handleMessage(String content, Channel channel, @Headers Map<String, Object> metadata) {
System.out.println("Processing: " + content);
System.out.println("Delivery tag: " + metadata.get(AmqpHeaders.DELIVERY_TAG));
try {
if ("success".equalsIgnoreCase(content)) {
channel.basicAck((long) metadata.get(AmqpHeaders.DELIVERY_TAG), false);
}
if ("retry".equalsIgnoreCase(content)) {
channel.basicNack((long) metadata.get(AmqpHeaders.DELIVERY_TAG), false, true);
}
if ("discard".equalsIgnoreCase(content)) {
channel.basicReject((long) metadata.get(AmqpHeaders.DELIVERY_TAG), false);
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
}
Testing acknowledgment scenarios:
@Test
public void testAcknowledgments() {
rabbitTemplate.convertAndSend(BroadcastConfiguration.EXCHANGE_NAME, "", "success", new CorrelationData("1"));
rabbitTemplate.convertAndSend(BroadcastConfiguration.EXCHANGE_NAME, "", "retry", new CorrelationData("2"));
}