Integrating RabbitMQ with Spring Boot Applications

Maven Dependencies and Configuration

Add the following dependency to integrate RabbitMQ functionality:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.2.7</version>
</dependency>

Configure connection parameters in application.yml:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

Exchange Types Implementation

Fanout Exchange Pattern

Fanout exchanges broadcast messages to all bound queues simultaneously:

@Configuration
public class BroadcastConfiguration {

    private static final String QUEUE_ONE = "broadcast.queue.one";
    private static final String QUEUE_TWO = "broadcast.queue.two";
    private static final String EXCHANGE_NAME = "broadcast.exchange";

    @Bean
    public Queue firstQueue() {
        return new Queue(QUEUE_ONE);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(QUEUE_TWO);
    }

    @Bean
    public FanoutExchange broadcastExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding linkFirstQueue() {
        return BindingBuilder.bind(firstQueue()).to(broadcastExchange());
    }

    @Bean
    public Binding linkSecondQueue() {
        return BindingBuilder.bind(secondQueue()).to(broadcastExchange());
    }
}

Testing message distribution:

@Test
public void testBroadcastMessaging() {
    rabbitTemplate.convertAndSend(BroadcastConfiguration.EXCHANGE_NAME, "", "broadcast-message");
}

Direct Exchange Routing

Direct exchanges route messages based on exact routing key matches:

@Configuration
public class RoutingConfiguration {

    private static final String PRIMARY_QUEUE = "routing.queue.primary";
    private static final String SECONDARY_QUEUE = "routing.queue.secondary";
    private static final String ROUTING_EXCHANGE = "routing.exchange";
    private static final String PRIMARY_ROUTE = "route.primary";

    @Bean
    public Queue primaryQueue() {
        return new Queue(PRIMARY_QUEUE);
    }

    @Bean
    public Queue secondaryQueue() {
        return new Queue(SECONDARY_QUEUE);
    }

    @Bean
    public DirectExchange routingExchange() {
        return new DirectExchange(ROUTING_EXCHANGE);
    }

    @Bean
    public Binding bindPrimary() {
        return BindingBuilder.bind(primaryQueue()).to(routingExchange()).with(PRIMARY_ROUTE);
    }

    @Bean
    public Binding bindSecondary() {
        return BindingBuilder.bind(secondaryQueue()).to(routingExchange()).with("");
    }
}

Routing tests:

@Test
public void testDirectRouting() {
    rabbitTemplate.convertAndSend(RoutingConfiguration.ROUTING_EXCHANGE, "", "default-route-message");
    rabbitTemplate.convertAndSend(RoutingConfiguration.ROUTING_EXCHANGE, RoutingConfiguration.PRIMARY_ROUTE, "primary-route-message");
    rabbitTemplate.convertAndSend(RoutingConfiguration.ROUTING_EXCHANGE, "unknown", "no-match-message");
}

Topic Exchange Patterns

Topic exchanges support wildcard-based routing using * (single word) and # (multiple words):

@Configuration
public class TopicConfiguration {

    private static final String FIRST_TOPIC_QUEUE = "topic.queue.first";
    private static final String SECOND_TOPIC_QUEUE = "topic.queue.second";
    private static final String TOPIC_EXCHANGE = "pattern.exchange";
    private static final String SINGLE_WORD_PATTERN = "pattern.route.*";
    private static final String MULTI_WORD_PATTERN = "pattern.route.#";

    @Bean
    public Queue patternQueueOne() {
        return new Queue(FIRST_TOPIC_QUEUE);
    }

    @Bean
    public Queue patternQueueTwo() {
        return new Queue(SECOND_TOPIC_QUEUE);
    }

    @Bean
    public TopicExchange patternExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Binding connectFirstPattern() {
        return BindingBuilder.bind(patternQueueOne()).to(patternExchange()).with(SINGLE_WORD_PATTERN);
    }

    @Bean
    public Binding connectSecondPattern() {
        return BindingBuilder.bind(patternQueueTwo()).to(patternExchange()).with(MULTI_WORD_PATTERN);
    }
}

Pattern matching tests:

@Test
public void testTopicPatterns() {
    rabbitTemplate.convertAndSend(TopicConfiguration.TOPIC_EXCHANGE, "pattern.route.one", "single-word-match");
    rabbitTemplate.convertAndSend(TopicConfiguration.TOPIC_EXCHANGE, "pattern.route.one.two", "multi-word-match");
}

Headers Exchange Matching

Headers exchanges route messages based on message header attributes rather than routing keys:

@Configuration
public class HeaderConfiguration {

    private static final String FIRST_HEADER_QUEUE = "header.queue.first";
    private static final String SECOND_HEADER_QUEUE = "header.queue.second";
    private static final String THIRD_HEADER_QUEUE = "header.queue.third";
    private static final String HEADER_EXCHANGE = "attribute.exchange";
    private static final String ATTRIBUTE_ONE = "attr.one";
    private static final String ATTRIBUTE_TWO = "attr.two";

    @Bean
    public Queue attributeQueueOne() {
        return new Queue(FIRST_HEADER_QUEUE);
    }

    @Bean
    public Queue attributeQueueTwo() {
        return new Queue(SECOND_HEADER_QUEUE);
    }

    @Bean
    public Queue attributeQueueThree() {
        return new Queue(THIRD_HEADER_QUEUE);
    }

    @Bean
    public HeadersExchange attributeExchange() {
        return new HeadersExchange(HEADER_EXCHANGE);
    }

    @Bean
    public Binding bindAttributeOne() {
        return BindingBuilder.bind(attributeQueueOne()).to(attributeExchange()).where(ATTRIBUTE_ONE).matches("valueA");
    }

    @Bean
    public Binding bindAttributeTwo() {
        return BindingBuilder.bind(attributeQueueTwo()).to(attributeExchange()).where(ATTRIBUTE_TWO).matches("valueB");
    }

    @Bean
    public Binding bindCombinedAttributes() {
        Map<String, Object> conditions = new HashMap<>();
        conditions.put(ATTRIBUTE_ONE, "valueA");
        conditions.put(ATTRIBUTE_TWO, "valueB");
        return BindingBuilder.bind(attributeQueueThree()).to(attributeExchange()).whereAny(conditions).match();
    }
}

Header-based routing tests:

@Test
public void testHeaderRouting() {
    MessageProperties props = new MessageProperties();
    props.setHeader(HeaderConfiguration.ATTRIBUTE_ONE, "valueA");
    rabbitTemplate.convertAndSend(HeaderConfiguration.HEADER_EXCHANGE, "", new Message("header-message-A".getBytes(), props));
    
    props.setHeader(HeaderConfiguration.ATTRIBUTE_ONE, "");
    props.setHeader(HeaderConfiguration.ATTRIBUTE_TWO, "valueB");
    rabbitTemplate.convertAndSend(HeaderConfiguration.HEADER_EXCHANGE, "", new Message("header-message-B".getBytes(), props));
}

Alternate Exchange Mechanism

Alternate exchanges handle unroutable messages by redirecting them to backup queues:

@Configuration
public class FallbackConfiguration {

    private static final String BACKUP_QUEUE_NAME = "fallback.queue";
    private static final String BACKUP_EXCHANGE_NAME = "fallback.exchange";
    private static final String PRIMARY_QUEUE_NAME = "primary.queue";
    private static final String PRIMARY_EXCHANGE_NAME = "primary.exchange";
    private static final String VALID_ROUTE = "valid.route";

    @Bean
    public Queue fallbackQueue() {
        return new Queue(BACKUP_QUEUE_NAME, true, false, false);
    }

    @Bean
    public Queue primaryQueue() {
        return new Queue(PRIMARY_QUEUE_NAME, true, false, false);
    }

    @Bean
    public DirectExchange primaryExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", BACKUP_EXCHANGE_NAME);
        return new DirectExchange(PRIMARY_EXCHANGE_NAME, true, false, arguments);
    }

    @Bean
    public FanoutExchange fallbackExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Binding connectPrimaryQueue() {
        return BindingBuilder.bind(primaryQueue()).to(primaryExchange()).with(VALID_ROUTE);
    }

    @Bean
    public Binding connectFallbackQueue() {
        return BindingBuilder.bind(fallbackQueue()).to(fallbackExchange());
    }
}

Fallback testing:

@Test
public void testFallbackMechanism() {
    rabbitTemplate.convertAndSend(FallbackConfiguration.PRIMARY_EXCHANGE_NAME, FallbackConfiguration.VALID_ROUTE, "routed-message");
    rabbitTemplate.convertAndSend(FallbackConfiguration.PRIMARY_EXCHANGE_NAME, "invalid.route", "unrouted-message");
}

Dead Letter Exchange Processing

Dead letter exchanges handle expired, rejected, or overflow messages:

@Configuration
public class ExpirationConfiguration {

    private static final String DLX_QUEUE = "dlx.queue";
    private static final String DLX_EXCHANGE = "dlx.exchange";
    private static final String DLX_ROUTE = "dlx.route";
    private static final String REGULAR_QUEUE = "regular.queue";
    private static final String REGULAR_EXCHANGE = "regular.exchange";
    private static final String REGULAR_ROUTE = "regular.route";

    @Bean
    public Queue dlxQueue() {
        return new Queue(DLX_QUEUE, true, false, false);
    }

    @Bean
    public Queue regularQueue() {
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("x-message-ttl", 10000);
        parameters.put("x-dead-letter-exchange", DLX_EXCHANGE);
        parameters.put("x-dead-letter-routing-key", DLX_ROUTE);
        return new Queue(REGULAR_QUEUE, true, false, false, parameters);
    }

    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(DLX_EXCHANGE, false, false);
    }

    @Bean
    public DirectExchange regularExchange() {
        return new DirectExchange(REGULAR_EXCHANGE, true, false);
    }

    @Bean
    public Binding bindDlxQueue() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTE);
    }

    @Bean
    public Binding bindRegularQueue() {
        return BindingBuilder.bind(regularQueue()).to(regularExchange()).with(REGULAR_ROUTE);
    }
}

Expiration testing:

@Test
public void testMessageExpiration() {
    rabbitTemplate.convertAndSend(ExpirationConfiguration.REGULAR_EXCHANGE, ExpirationConfiguration.REGULAR_ROUTE, "expiring-message");
}

Dynamic Component Creation

Programmatically create messaging components:

@Autowired
private AmqpAdmin adminInterface;

public void setupMessagingComponents() {
    String queueIdentifier = "dynamic.queue";
    String exchangeIdentifier = "dynamic.exchange";
    
    Queue dynamicQueue = new Queue(queueIdentifier, false, false, false, null);
    adminInterface.declareQueue(dynamicQueue);
    
    FanoutExchange dynamicExchange = new FanoutExchange(exchangeIdentifier, false, false, null);
    adminInterface.declareExchange(dynamicExchange);
    
    adminInterface.declareBinding(new Binding(queueIdentifier, Binding.DestinationType.QUEUE, exchangeIdentifier, "", null));
}

Message Consumption

Enable RabbitMQ processing and implement listeners:

@Component
@RabbitListener(queues = "broadcast.queue.one")
public class BroadcastConsumer {

    @RabbitHandler
    public void processMessage(String content) {
        System.out.println("Received from broadcast.queue.one: " + content);
    }
}

Message Acknowledgment Strategies

Producer Confirmation Setup

YAML conifguration for message reliability:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual

Producer confirmation implementation:

@Configuration
public class ConfirmationConfiguration implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate messagingTemplate;

    @PostConstruct
    public void configureCallbacks() {
        messagingTemplate.setReturnsCallback(this);
        messagingTemplate.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlation, boolean acknowledgment, String failureReason) {
        System.out.println("Producer confirmation callback triggered");
        System.out.println(String.format("Correlation: %s, Ack: %s, Reason: %s", correlation, acknowledgment, failureReason));
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("Unroutable message callback triggered");
        System.out.println(returned.toString());
    }
}

Consumer Acknowledgment Handling

Manual acknowledgment implementation:

@Component
@RabbitListener(queues = "broadcast.queue.one")
public class AcknowledgingConsumer {

    @RabbitHandler
    public void handleMessage(String content, Channel channel, @Headers Map<String, Object> metadata) {
        System.out.println("Processing: " + content);
        System.out.println("Delivery tag: " + metadata.get(AmqpHeaders.DELIVERY_TAG));
        
        try {
            if ("success".equalsIgnoreCase(content)) {
                channel.basicAck((long) metadata.get(AmqpHeaders.DELIVERY_TAG), false);
            }
            if ("retry".equalsIgnoreCase(content)) {
                channel.basicNack((long) metadata.get(AmqpHeaders.DELIVERY_TAG), false, true);
            }
            if ("discard".equalsIgnoreCase(content)) {
                channel.basicReject((long) metadata.get(AmqpHeaders.DELIVERY_TAG), false);
            }
        } catch (IOException exception) {
            exception.printStackTrace();
        }
    }
}

Testing acknowledgment scenarios:

@Test
public void testAcknowledgments() {
    rabbitTemplate.convertAndSend(BroadcastConfiguration.EXCHANGE_NAME, "", "success", new CorrelationData("1"));
    rabbitTemplate.convertAndSend(BroadcastConfiguration.EXCHANGE_NAME, "", "retry", new CorrelationData("2"));
}

Tags: spring-boot RabbitMQ message-queue AMQP messaging

Posted on Thu, 07 May 2026 16:36:16 +0000 by dta