Implementing Message Tracing in RocketMQ

Core Concepts

Message tracing captures the complete lifecycle of a message, including:

  • Producer send timestamp
  • Broker storage details
  • Consumer consumption events
  • Timing metrics at each stage

Configuration

Broker Setup

# Enable tracing in broker.conf
traceTopicEnable=true

Producer Implementation

public class MessageProducer {
    private static final String NAMESRV_ADDR = "localhost:9876";
    private static final String TOPIC = "traceDemo";
    
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
            "traceProducerGroup", 
            true,  // Enable tracing
            "CUSTOM_TRACE_TOPIC" // Optional custom topic
        );
        
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        
        Message msg = new Message(
            TOPIC,
            "TagA",
            "MSG_" + System.currentTimeMillis(),
            "Trace payload".getBytes()
        );
        
        SendResult result = producer.send(msg);
        Thread.sleep(5000); // Allow async trace dispatch
        producer.shutdown();
    }
}

Consumer Implementation

public class MessageConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
            "traceConsumerGroup",
            true  // Enable tracing
        );
        
        consumer.subscribe("traceDemo", "*");
        consumer.registerMessageListener((msgs, context) -> {
            System.out.println("Received: " + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
    }
}

Architecture Deep Dive

Trace Collection Mechanism

The system uses hook patterns to intercept message flow:

// Producer hook registration
if (enableMsgTrace) {
    AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
        producerGroup, 
        TraceType.PRODUCE,
        traceTopic,
        rpcHook
    );
    
    producerImpl.registerSendMessageHook(
        new TraceSendHook(dispatcher)
    );
}

Trace Data Structure

Trace records contain:

TraceContext ctx = new TraceContext();
ctx.setTraceBeans(new ArrayList<>(1));

TraceBean bean = new TraceBean();
bean.setTopic(message.getTopic());
bean.setMsgId(sendResult.getMsgId());
bean.setStoreHost(brokerAddress);
bean.setCostTime(sendDuration);

ctx.getTraceBeans().add(bean);
dispatcher.append(ctx);

Asynchronous Dispatch

The system batches trace messages for efficiency:

// In AsyncTraceDispatcher
private final BlockingQueue<TraceContext> traceQueue = 
    new ArrayBlockingQueue<>(1024);

// Batched send logic
if (currentBatchSize >= maxBatchSize || 
    (System.currentTimeMillis() - batchStartTime) > maxBatchDelay) {
    submitBatchSendTask();
}

Tags: rocketmq MessageTracing DistributedSystems messaging MQTT

Posted on Sat, 13 Jun 2026 17:29:59 +0000 by sinter4911