Core Concepts
Message tracing captures the complete lifecycle of a message, including:
- Producer send timestamp
- Broker storage details
- Consumer consumption events
- Timing metrics at each stage
Configuration
Broker Setup
# Enable tracing in broker.conf
traceTopicEnable=true
Producer Implementation
public class MessageProducer {
private static final String NAMESRV_ADDR = "localhost:9876";
private static final String TOPIC = "traceDemo";
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer(
"traceProducerGroup",
true, // Enable tracing
"CUSTOM_TRACE_TOPIC" // Optional custom topic
);
producer.setNamesrvAddr(NAMESRV_ADDR);
producer.start();
Message msg = new Message(
TOPIC,
"TagA",
"MSG_" + System.currentTimeMillis(),
"Trace payload".getBytes()
);
SendResult result = producer.send(msg);
Thread.sleep(5000); // Allow async trace dispatch
producer.shutdown();
}
}
Consumer Implementation
public class MessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"traceConsumerGroup",
true // Enable tracing
);
consumer.subscribe("traceDemo", "*");
consumer.registerMessageListener((msgs, context) -> {
System.out.println("Received: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
Architecture Deep Dive
Trace Collection Mechanism
The system uses hook patterns to intercept message flow:
// Producer hook registration
if (enableMsgTrace) {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(
producerGroup,
TraceType.PRODUCE,
traceTopic,
rpcHook
);
producerImpl.registerSendMessageHook(
new TraceSendHook(dispatcher)
);
}
Trace Data Structure
Trace records contain:
TraceContext ctx = new TraceContext();
ctx.setTraceBeans(new ArrayList<>(1));
TraceBean bean = new TraceBean();
bean.setTopic(message.getTopic());
bean.setMsgId(sendResult.getMsgId());
bean.setStoreHost(brokerAddress);
bean.setCostTime(sendDuration);
ctx.getTraceBeans().add(bean);
dispatcher.append(ctx);
Asynchronous Dispatch
The system batches trace messages for efficiency:
// In AsyncTraceDispatcher
private final BlockingQueue<TraceContext> traceQueue =
new ArrayBlockingQueue<>(1024);
// Batched send logic
if (currentBatchSize >= maxBatchSize ||
(System.currentTimeMillis() - batchStartTime) > maxBatchDelay) {
submitBatchSendTask();
}