- Overview ===========
MQ (Message Queue) is a mechanism for storing and distributing message data in a structured queue format. A queue, as a fundamental data structure, follows the First-In-First-Out (FIFO) principle.
- Purpose of Message Queues ============================
- Application decoupling (essential for distributed systems)
- Facilitate rapid application changes and maintenance
- Traffic peak shaving and load balancing
- Advantages and Disadvantages ===============================
Disadvantages include:
1 Decreased system availability: requires clustering
2 Increased system complexity: requires skilled developers
3 Asynchronous messaging challenges:
Message ordering
Message loss
Message consistency
Message duplication
- Common Message Queue Products ================================
ActiveMQ: Java-based, thousands of messages/second throughput, ms-level latency, master-slave architecture, high maturity
RabbitMQ: Erlang-based, thousands of messages/second throughput, μs-level latency, master-slave architecture
RocketMQ: Java-based, hundred-thousands of messages/second throughput, ms-level latency, distributed architecture, powerful features, high scalability
Kafka: Scala-based, hundred-thousands of messages/second throughput, ms-level latency, distributed architecture, feature-rich for big data scenarios
Introduction to RocketMQ
RocketMQ is an open-source distributed messaging system originally developed at Alibaba (derived from MetaQ). It was donated to the Apache Software Foundation as an incubated project and achieved top-level project status with in just over a year. The system handles massive workloads internally at Alibaba, having processed trillions of messages during Double Eleven shopping festivals with peak TPS reaching 56 million in 2017.
- Installation ===============
JDK Setup
# Extract JDK package
tar -zxvf jdk-8u171-linux-x64.tar.gz
# Configure environment variables
vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_171
export PATH=$PATH:${JAVA_HOME}/bin
# Reload configuration
source /etc/profile
java -version
Troubleshooting
# If java -version shows openjdk after installation (needs removal)
# The OS ships with openjdk by default
# Check installed packages
rpm -qa | grep java
# Remove conflicting packages
rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64
rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
RocketMQ Setup
# Extract package
unzip rocketmq-all-4.5.2-bin-release.zip
# Rename directory
mv rocketmq-all-4.5.2-bin-release rocketmq
# Adjust JVM memory settings to 128m
runserver.sh
runbroker.sh
If running alongside Docker, modify the broker configuration:
conf/broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=192.168.31.80
namesrvAddr=192.168.31.80:9876
Starting Services
# Start nameserver
sh mqnamesrv
# Start broker with configuration
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
# Disable firewall
systemctl stop firewalld.service
Testing Installation
export NAMESRV_ADDR=localhost:9876
# Run producer and consumer examples from bin directory
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
- Usage ========
6.1 Load Balancing
Environment Setup
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
Sending a Single Message
public static void main(String[] args) throws Exception {
// Create producer instance
DefaultMQProducer producer = new DefaultMQProducer("group1");
// Configure nameserver address
producer.setNamesrvAddr("192.168.31.80:9876");
// Start the producer
producer.start();
// Create message with topic and body content
Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
// Send message
SendResult result = producer.send(msg);
System.out.println("Result: " + result);
// Cleanup
producer.shutdown();
}
Sending Multiple Messages
for (int i = 1; i <= 10; i++) {
Message msg = new Message("topic1", ("Producer: hello rocketmq " + i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("Result: " + result);
}
Consumer Implementation
public static void main(String[] args) throws Exception {
// Create consumer instance
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// Configure nameserver address
consumer.setNamesrvAddr("192.168.31.80:9876");
// Subscribe to topic with wildcard tag filter
consumer.subscribe("topic1", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : list) {
System.out.println("Message: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Start consumer
consumer.start();
System.out.println("Consumer service started");
}
6.2 Broadcasting Mode
Producer
Same as standard producer implementation.
Consumer
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.31.80:9876");
consumer.subscribe("topic1", "*");
// Set consumption mode to broadcast
consumer.setMessageModel(MessageModel.BROADCASTING);
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : list) {
System.out.println("Consumer 1: " + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer service started");
}
Important: In broadcasting mode, consumers must be started before the producer sends messages. If messages are sent first, they will only be consumed once.
6.3 Message Send Types
- Synchronous: High urgency, critical messages requiring acknowledgment (e.g., SMS, transaction notifications)
- Asynchronous: Lower urgency but requiring acknowledgment (e.g., certain order information)
- One-way: No acknowledgment needed (e.g., logging operations)
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
// Synchronous message
Message msg = new Message("topic2", ("Sync message: hello rocketmq " + i).getBytes("UTF-8"));
SendResult result = producer.send(msg);
System.out.println("Result: " + result);
// Asynchronous message
Message msg2 = new Message("topic2", ("Async message: hello rocketmq " + i).getBytes("UTF-8"));
producer.send(msg2, new SendCallback() {
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
public void onException(Throwable t) {
System.out.println(t);
}
});
// One-way message (no response)
Message msg3 = new Message("topic2", ("One-way message: hello rocketmq " + i).getBytes("UTF-8"));
producer.sendOneway(msg3);
}
// Wait for async callbacks to complete
TimeUnit.SECONDS.sleep(10);
producer.shutdown();
}
6.4 Delayed Messages
Messages are sent immediately but hidden from consumers for a specified duration.
Use Cases: Order cancellation after payment timeout, scheduled notifications, etc.
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
for (int i = 1; i <= 5; i++) {
Message msg = new Message("topic3", ("Non-delayed message: hello rocketmq " + i).getBytes("UTF-8"));
// Set delay level (message becomes visible after delay expires)
msg.setDelayTimeLevel(3); // Level 3 = 30 seconds
// Available levels: 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
SendResult result = producer.send(msg);
System.out.println("Result: " + result);
}
producer.shutdown();
}
The message remains in the queue during the delay period and becomes visible to consumers only after the specified time elapses.
6.5 Batch Messages
// Create a message collection
List<Message> msgList = new ArrayList<Message>();
// Add messages to list
SendResult send = producer.send(msgList);
Constraints: Total message size must not exceed 4MB, including topic bytes, body length, message properties (key-value pairs), and log overhead (20 bytes fixed).
6.6 Message Filtering
1. Tag-Based Filtering
Producer
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
// Create message with topic and tag
Message msg = new Message("topic6", "tag2", ("Filtered by tag: hello rocketmq 2").getBytes("UTF-8"));
SendResult send = producer.send(msg);
System.out.println(send);
producer.shutdown();
}
Consumer
// Subscribe with tag filter (* for any, "tag1 || tag2" for multiple)
consumer.subscribe("topic6", "tag1 || tag2");
2. SQL-Based Filtering
Producer
// Add custom properties to message
msg.putUserProperty("vip", "1");
msg.putUserProperty("age", "20");
Consumer
// Use SQL-style selector for property-based filtering
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
Note: SQL filtering requires server-side support. Enable in broker configuration:
enablePropertyFilter=true
Start the broker with this configuration:
sh mqbroker -n localhost:9876 -c ../conf/broker.conf
6.7 Ordered Messages
By default, messages distribute across multiple queues with concurrent consumption, which cannot guarantee ordering. To ensure message ordering:
- Route messages to specific queues on the producer side
- Consume messages from each queue sequentially using single-threaded consumers
Producer
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.31.80:9876");
producer.start();
List<Order> orderList = new ArrayList<>();
Order order11 = new Order();
order11.setId("a");
order11.setMsg("Main order-1");
orderList.add(order11);
Order order12 = new Order();
order12.setId("a");
order12.setMsg("Sub order-2");
orderList.add(order12);
Order order13 = new Order();
order13.setId("a");
order13.setMsg("Payment-3");
orderList.add(order13);
Order order14 = new Order();
order14.setId("a");
order14.setMsg("Notification-4");
orderList.add(order14);
Order order21 = new Order();
order21.setId("b");
order21.setMsg("Main order-1");
orderList.add(order21);
Order order22 = new Order();
order22.setId("b");
order22.setMsg("Sub order-2");
orderList.add(order22);
Order order31 = new Order();
order31.setId("c");
order31.setMsg("Main order-1");
orderList.add(order31);
Order order32 = new Order();
order32.setId("c");
order32.setMsg("Sub order-2");
orderList.add(order32);
Order order33 = new Order();
order33.setId("c");
order33.setMsg("Payment-3");
orderList.add(order33);
// Route messages to specific queues
for (final Order order : orderList) {
Message msg = new Message("orderTopic", order.toString().getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
}
Consumer
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.31.80:9876");
consumer.subscribe("orderTopic", "*");
// Use MessageListenerOrderly to consume each queue in single-threaded mode
consumer.registerMessageListener(new MessageListenerOrderly() {
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,
ConsumeOrderlyContext context) {
for (MessageExt msg : list) {
System.out.println(Thread.currentThread().getName() + " Message: "
+ new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer service started");
}
6.8 Transaction Messages
RocketMQ supports transactional messaging similar to database transactions, ensuring consistency between local operations and message delivery.
Transaction Flow
Normal transaction process when local operations succeed.
Transaction Compensation
Handles cases where local operations fail by rolling back messages.
Transaction States
- COMMIT: Message anters the queue (equivalent to non-transactional messages)
- ROLLBACK: Message does not enter the queue (effectively not sent)
- UNKNOWN: Half message sent, awaiting status confirmation (triggers compensation)
Note: Transaction messages only affect producers; consumers operate normally.
Producer Implementation
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
producer.setTransactionListener(new TransactionListener() {
// Execute local transaction
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// Implement local business logic here
// Return COMMIT if successful, ROLLBACK if failed
// Return UNKNOWN for async operations pending confirmation
return LocalTransactionState.COMMIT_MESSAGE;
}
// Transaction compensation (check transaction status)
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8", ("Transaction message: hello rocketmq").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("Result: " + result);
producer.shutdown();
}
Compensation Implementation
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.184.128:9876");
producer.setTransactionListener(new TransactionListener() {
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.UNKNOW;
}
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("Executing transaction compensation");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic11", ("Transaction message: hello rocketmq").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("Result: " + result);
}