RocketMQ Fundamentals: Installation, Messaging Patterns, and Core Features

  1. Overview ===========

MQ (Message Queue) is a mechanism for storing and distributing message data in a structured queue format. A queue, as a fundamental data structure, follows the First-In-First-Out (FIFO) principle.

  1. Purpose of Message Queues ============================
  • Application decoupling (essential for distributed systems)
  • Facilitate rapid application changes and maintenance
  • Traffic peak shaving and load balancing
  1. Advantages and Disadvantages ===============================

Disadvantages include:

1 Decreased system availability: requires clustering
2 Increased system complexity: requires skilled developers
3 Asynchronous messaging challenges:
    Message ordering
    Message loss
    Message consistency
    Message duplication

  1. Common Message Queue Products ================================
ActiveMQ: Java-based, thousands of messages/second throughput, ms-level latency, master-slave architecture, high maturity
RabbitMQ: Erlang-based, thousands of messages/second throughput, μs-level latency, master-slave architecture
RocketMQ: Java-based, hundred-thousands of messages/second throughput, ms-level latency, distributed architecture, powerful features, high scalability
Kafka: Scala-based, hundred-thousands of messages/second throughput, ms-level latency, distributed architecture, feature-rich for big data scenarios

Introduction to RocketMQ

RocketMQ is an open-source distributed messaging system originally developed at Alibaba (derived from MetaQ). It was donated to the Apache Software Foundation as an incubated project and achieved top-level project status with in just over a year. The system handles massive workloads internally at Alibaba, having processed trillions of messages during Double Eleven shopping festivals with peak TPS reaching 56 million in 2017.

  1. Installation ===============

JDK Setup

# Extract JDK package
tar -zxvf jdk-8u171-linux-x64.tar.gz

# Configure environment variables
vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_171
export PATH=$PATH:${JAVA_HOME}/bin

# Reload configuration
source /etc/profile
java -version

Troubleshooting

# If java -version shows openjdk after installation (needs removal)
# The OS ships with openjdk by default

# Check installed packages
rpm -qa | grep java

# Remove conflicting packages
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64
rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64

RocketMQ Setup

# Extract package
unzip rocketmq-all-4.5.2-bin-release.zip

# Rename directory
mv rocketmq-all-4.5.2-bin-release rocketmq

# Adjust JVM memory settings to 128m
runserver.sh
runbroker.sh

If running alongside Docker, modify the broker configuration:

conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.31.80
namesrvAddr=192.168.31.80:9876

Starting Services

# Start nameserver
sh mqnamesrv

# Start broker with configuration
sh mqbroker -n localhost:9876 -c ../conf/broker.conf

# Disable firewall
systemctl stop firewalld.service

Testing Installation

export NAMESRV_ADDR=localhost:9876

# Run producer and consumer examples from bin directory
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer

  1. Usage ========

6.1 Load Balancing

Environment Setup

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

Sending a Single Message

public static void main(String[] args) throws Exception {
    // Create producer instance
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    // Configure nameserver address
    producer.setNamesrvAddr("192.168.31.80:9876");
    // Start the producer
    producer.start();
    // Create message with topic and body content
    Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
    // Send message
    SendResult result = producer.send(msg);
    System.out.println("Result: " + result);
    // Cleanup
    producer.shutdown();
}

Sending Multiple Messages

for (int i = 1; i <= 10; i++) {
    Message msg = new Message("topic1", ("Producer: hello rocketmq " + i).getBytes("UTF-8"));
    SendResult result = producer.send(msg);
    System.out.println("Result: " + result);
}

Consumer Implementation

public static void main(String[] args) throws Exception {
    // Create consumer instance
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // Configure nameserver address
    consumer.setNamesrvAddr("192.168.31.80:9876");
    // Subscribe to topic with wildcard tag filter
    consumer.subscribe("topic1", "*");
    // Register message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, 
                ConsumeConcurrentlyContext context) {
            for (MessageExt msg : list) {
                System.out.println("Message: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // Start consumer
    consumer.start();
    System.out.println("Consumer service started");
}

6.2 Broadcasting Mode

Producer

Same as standard producer implementation.

Consumer

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.31.80:9876");
    consumer.subscribe("topic1", "*");
    
    // Set consumption mode to broadcast
    consumer.setMessageModel(MessageModel.BROADCASTING);
    
    // Register message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, 
                ConsumeConcurrentlyContext context) {
            for (MessageExt msg : list) {
                System.out.println("Consumer 1: " + new String(msg.getBody()));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    
    consumer.start();
    System.out.println("Consumer service started");
}

Important: In broadcasting mode, consumers must be started before the producer sends messages. If messages are sent first, they will only be consumed once.

6.3 Message Send Types

  • Synchronous: High urgency, critical messages requiring acknowledgment (e.g., SMS, transaction notifications)
  • Asynchronous: Lower urgency but requiring acknowledgment (e.g., certain order information)
  • One-way: No acknowledgment needed (e.g., logging operations)
public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.31.80:9876");
    producer.start();
    
    for (int i = 1; i <= 5; i++) {
        // Synchronous message
        Message msg = new Message("topic2", ("Sync message: hello rocketmq " + i).getBytes("UTF-8"));
        SendResult result = producer.send(msg);
        System.out.println("Result: " + result);
        
        // Asynchronous message
        Message msg2 = new Message("topic2", ("Async message: hello rocketmq " + i).getBytes("UTF-8"));
        producer.send(msg2, new SendCallback() {
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            public void onException(Throwable t) {
                System.out.println(t);
            }
        });
        
        // One-way message (no response)
        Message msg3 = new Message("topic2", ("One-way message: hello rocketmq " + i).getBytes("UTF-8"));
        producer.sendOneway(msg3);
    }
    
    // Wait for async callbacks to complete
    TimeUnit.SECONDS.sleep(10);
    
    producer.shutdown();
}

6.4 Delayed Messages

Messages are sent immediately but hidden from consumers for a specified duration.

Use Cases: Order cancellation after payment timeout, scheduled notifications, etc.

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.31.80:9876");
    producer.start();
    
    for (int i = 1; i <= 5; i++) {
        Message msg = new Message("topic3", ("Non-delayed message: hello rocketmq " + i).getBytes("UTF-8"));
        // Set delay level (message becomes visible after delay expires)
        msg.setDelayTimeLevel(3); // Level 3 = 30 seconds
        // Available levels: 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
        SendResult result = producer.send(msg);
        System.out.println("Result: " + result);
    }
    
    producer.shutdown();
}

The message remains in the queue during the delay period and becomes visible to consumers only after the specified time elapses.

6.5 Batch Messages

// Create a message collection
List<Message> msgList = new ArrayList<Message>();
// Add messages to list
SendResult send = producer.send(msgList);

Constraints: Total message size must not exceed 4MB, including topic bytes, body length, message properties (key-value pairs), and log overhead (20 bytes fixed).

6.6 Message Filtering

1. Tag-Based Filtering

Producer

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.31.80:9876");
    producer.start();
    
    // Create message with topic and tag
    Message msg = new Message("topic6", "tag2", ("Filtered by tag: hello rocketmq 2").getBytes("UTF-8"));
    SendResult send = producer.send(msg);
    System.out.println(send);
    
    producer.shutdown();
}

Consumer

// Subscribe with tag filter (* for any, "tag1 || tag2" for multiple)
consumer.subscribe("topic6", "tag1 || tag2");

2. SQL-Based Filtering

Producer

// Add custom properties to message
msg.putUserProperty("vip", "1");
msg.putUserProperty("age", "20");

Consumer

// Use SQL-style selector for property-based filtering
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));

Note: SQL filtering requires server-side support. Enable in broker configuration:

enablePropertyFilter=true

Start the broker with this configuration:

sh mqbroker -n localhost:9876 -c ../conf/broker.conf

6.7 Ordered Messages

By default, messages distribute across multiple queues with concurrent consumption, which cannot guarantee ordering. To ensure message ordering:

  1. Route messages to specific queues on the producer side
  2. Consume messages from each queue sequentially using single-threaded consumers

Producer

public static void main(String[] args) throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("group1");
    producer.setNamesrvAddr("192.168.31.80:9876");
    producer.start();
    
    List<Order> orderList = new ArrayList<>();
    
    Order order11 = new Order();
    order11.setId("a");
    order11.setMsg("Main order-1");
    orderList.add(order11);
    
    Order order12 = new Order();
    order12.setId("a");
    order12.setMsg("Sub order-2");
    orderList.add(order12);
    
    Order order13 = new Order();
    order13.setId("a");
    order13.setMsg("Payment-3");
    orderList.add(order13);
    
    Order order14 = new Order();
    order14.setId("a");
    order14.setMsg("Notification-4");
    orderList.add(order14);
    
    Order order21 = new Order();
    order21.setId("b");
    order21.setMsg("Main order-1");
    orderList.add(order21);
    
    Order order22 = new Order();
    order22.setId("b");
    order22.setMsg("Sub order-2");
    orderList.add(order22);
    
    Order order31 = new Order();
    order31.setId("c");
    order31.setMsg("Main order-1");
    orderList.add(order31);
    
    Order order32 = new Order();
    order32.setId("c");
    order32.setMsg("Sub order-2");
    orderList.add(order32);
    
    Order order33 = new Order();
    order33.setId("c");
    order33.setMsg("Payment-3");
    orderList.add(order33);
    
    // Route messages to specific queues
    for (final Order order : orderList) {
        Message msg = new Message("orderTopic", order.toString().getBytes());
        SendResult result = producer.send(msg, new MessageQueueSelector() {
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                int mqIndex = order.getId().hashCode() % list.size();
                return list.get(mqIndex);
            }
        }, null);
        System.out.println(result);
    }
    
    producer.shutdown();
}

Consumer

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    consumer.setNamesrvAddr("192.168.31.80:9876");
    consumer.subscribe("orderTopic", "*");
    
    // Use MessageListenerOrderly to consume each queue in single-threaded mode
    consumer.registerMessageListener(new MessageListenerOrderly() {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, 
                ConsumeOrderlyContext context) {
            for (MessageExt msg : list) {
                System.out.println(Thread.currentThread().getName() + " Message: " 
                        + new String(msg.getBody()));
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    
    consumer.start();
    System.out.println("Consumer service started");
}

6.8 Transaction Messages

RocketMQ supports transactional messaging similar to database transactions, ensuring consistency between local operations and message delivery.

Transaction Flow

Normal transaction process when local operations succeed.

Transaction Compensation

Handles cases where local operations fail by rolling back messages.

Transaction States

  • COMMIT: Message anters the queue (equivalent to non-transactional messages)
  • ROLLBACK: Message does not enter the queue (effectively not sent)
  • UNKNOWN: Half message sent, awaiting status confirmation (triggers compensation)

Note: Transaction messages only affect producers; consumers operate normally.

Producer Implementation

public static void main(String[] args) throws Exception {
    TransactionMQProducer producer = new TransactionMQProducer("group1");
    producer.setNamesrvAddr("192.168.184.128:9876");
    
    producer.setTransactionListener(new TransactionListener() {
        // Execute local transaction
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            // Implement local business logic here
            // Return COMMIT if successful, ROLLBACK if failed
            // Return UNKNOWN for async operations pending confirmation
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        
        // Transaction compensation (check transaction status)
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            return null;
        }
    });
    
    producer.start();
    
    Message msg = new Message("topic8", ("Transaction message: hello rocketmq").getBytes("UTF-8"));
    SendResult result = producer.sendMessageInTransaction(msg, null);
    System.out.println("Result: " + result);
    
    producer.shutdown();
}

Compensation Implementation

public static void main(String[] args) throws Exception {
    TransactionMQProducer producer = new TransactionMQProducer("group1");
    producer.setNamesrvAddr("192.168.184.128:9876");
    
    producer.setTransactionListener(new TransactionListener() {
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            return LocalTransactionState.UNKNOW;
        }
        
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            System.out.println("Executing transaction compensation");
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    });
    
    producer.start();
    
    Message msg = new Message("topic11", ("Transaction message: hello rocketmq").getBytes("UTF-8"));
    SendResult result = producer.sendMessageInTransaction(msg, null);
    System.out.println("Result: " + result);
}

Tags: rocketmq message-queue apache distributed-systems messaging

Posted on Mon, 11 May 2026 08:59:27 +0000 by MattMan