Disruptor is a high-performence inter-thread messaging library developed by LMAX. It's widely used in projects like Log4j2 and Storm for its exceptional throughput characteristics.
Ring Buffer Architecture
Disruptor employs a ring buffer structure with several performence advantages:
- Array-based storage: Uses fixed-size arrays instead of linked lists to avoid garbage collection overhead
- Efficient indexing: Array size is always a power of two, enabling fast bitwise position calculation
- Lock-free design: Threads reserve array positions before performing operations, eliminating contention
Basic Implementation Example
Maven Dependency
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
Event Definition
public class NumericData {
private long numericValue;
public void assignValue(long value) {
this.numericValue = value;
}
@Override
public String toString() {
return "NumericData{value=" + numericValue + "}";
}
}
Event Factory
public class NumericFactory implements EventFactory<NumericData> {
@Override
public NumericData newInstance() {
return new NumericData();
}
}
Event Handler
public class DataProcessor implements EventHandler<NumericData> {
@Override
public void onEvent(NumericData event, long sequence, boolean endOfBatch) {
System.out.println("Processing thread: " + Thread.currentThread().getName() +
" Data: " + event);
}
}
Main Execution
public class DisruptorDemo {
public static void main(String[] args) throws InterruptedException {
int ringSize = 1024;
Disruptor<NumericData> disruptor = new Disruptor<>(
new NumericFactory(),
ringSize,
DaemonThreadFactory.INSTANCE,
ProducerType.SINGLE,
new BlockingWaitStrategy()
);
disruptor.handleEventsWith(new DataProcessor());
disruptor.start();
RingBuffer<NumericData> buffer = disruptor.getRingBuffer();
ByteBuffer byteContainer = ByteBuffer.allocate(8);
for (long counter = 0; ; counter++) {
byteContainer.putLong(0, counter);
buffer.publishEvent((event, seq, container) ->
event.assignValue(container.getLong(0)), byteContainer);
Thread.sleep(1000);
}
}
}
Log Processing Application
Manager Class
public class DisruptorController<T> {
private static final int DEFAULT_WORKERS = 4;
private static final int DEFAULT_BUFFER_SIZE = 16384;
private DataHandler<T> dataHandler;
private EventProducer<T> producer;
private int bufferSize;
private int workerCount;
public DisruptorController(DataHandler<T> handler, int workers, int bufferSize) {
this.dataHandler = handler;
this.workerCount = workers;
this.bufferSize = bufferSize;
}
public void initialize() {
EventFactory<MessageEvent<T>> factory = new MessageEventFactory<>();
Disruptor<MessageEvent<T>> disruptor = new Disruptor<>(
factory,
bufferSize,
CustomThreadFactory.create("processor", true),
ProducerType.MULTI,
new SleepingWaitStrategy()
);
EventWorker<T>[] workers = new EventWorker[workerCount];
for (int i = 0; i < workerCount; i++) {
workers[i] = new EventWorker<>(dataHandler);
}
disruptor.handleEventsWithWorkerPool(workers);
disruptor.start();
RingBuffer<MessageEvent<T>> ringBuffer = disruptor.getRingBuffer();
this.producer = new EventProducer<>(ringBuffer, disruptor);
}
public EventProducer<T> getProducer() {
return producer;
}
}
Worker Implementation
public class EventWorker<T> implements WorkHandler<MessageEvent<T>> {
private DataHandler<T> dataHandler;
public EventWorker(DataHandler<T> handler) {
this.dataHandler = handler;
}
@Override
public void onEvent(MessageEvent<T> event) throws Exception {
if (event != null) {
dataHandler.process(event);
}
}
}
Producer Implementation
public class EventProducer<T> {
private final RingBuffer<MessageEvent<T>> ringBuffer;
private final EventTranslatorOneArg<MessageEvent<T>, T> translator =
(event, sequence, data) -> event.setContent(data);
public EventProducer(RingBuffer<MessageEvent<T>> buffer, Disruptor<MessageEvent<T>> disruptor) {
this.ringBuffer = buffer;
}
public void sendData(T data) {
try {
ringBuffer.publishEvent(translator, data);
} catch (Exception e) {
// Handle publication errors
}
}
}
Spring Configuartion
@Configuration
public class DisruptorSetup {
@Bean
public DataHandler<String> createHandler() {
return new DataHandler<String>() {
@Override
public void process(MessageEvent<String> event) {
// Process log data
System.out.println("Processing: " + event.getContent());
}
};
}
@Bean
public EventProducer<String> createProducer(DataHandler<String> handler) {
DisruptorController<String> controller = new DisruptorController<>(
handler, 8, 1048576);
controller.initialize();
return controller.getProducer();
}
}
REST Controller
@RestController
public class LogController {
@Autowired
private EventProducer<String> logProducer;
@PostMapping("/logs")
public ResponseEntity<?> receiveLog(@RequestBody String logEntry) {
logProducer.sendData(logEntry);
return ResponseEntity.ok().build();
}
}