Implementing High-Performance Queues with Disruptor

Disruptor is a high-performence inter-thread messaging library developed by LMAX. It's widely used in projects like Log4j2 and Storm for its exceptional throughput characteristics.

Ring Buffer Architecture

Disruptor employs a ring buffer structure with several performence advantages:

  • Array-based storage: Uses fixed-size arrays instead of linked lists to avoid garbage collection overhead
  • Efficient indexing: Array size is always a power of two, enabling fast bitwise position calculation
  • Lock-free design: Threads reserve array positions before performing operations, eliminating contention

Basic Implementation Example

Maven Dependency

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.4.4</version>
</dependency>

Event Definition

public class NumericData {
    private long numericValue;
    
    public void assignValue(long value) {
        this.numericValue = value;
    }
    
    @Override
    public String toString() {
        return "NumericData{value=" + numericValue + "}";
    }
}

Event Factory

public class NumericFactory implements EventFactory<NumericData> {
    @Override
    public NumericData newInstance() {
        return new NumericData();
    }
}

Event Handler

public class DataProcessor implements EventHandler<NumericData> {
    @Override
    public void onEvent(NumericData event, long sequence, boolean endOfBatch) {
        System.out.println("Processing thread: " + Thread.currentThread().getName() + 
                         " Data: " + event);
    }
}

Main Execution

public class DisruptorDemo {
    public static void main(String[] args) throws InterruptedException {
        int ringSize = 1024;
        Disruptor<NumericData> disruptor = new Disruptor<>(
            new NumericFactory(),
            ringSize,
            DaemonThreadFactory.INSTANCE,
            ProducerType.SINGLE,
            new BlockingWaitStrategy()
        );
        
        disruptor.handleEventsWith(new DataProcessor());
        disruptor.start();
        
        RingBuffer<NumericData> buffer = disruptor.getRingBuffer();
        ByteBuffer byteContainer = ByteBuffer.allocate(8);
        
        for (long counter = 0; ; counter++) {
            byteContainer.putLong(0, counter);
            buffer.publishEvent((event, seq, container) -> 
                event.assignValue(container.getLong(0)), byteContainer);
            Thread.sleep(1000);
        }
    }
}

Log Processing Application

Manager Class

public class DisruptorController<T> {
    private static final int DEFAULT_WORKERS = 4;
    private static final int DEFAULT_BUFFER_SIZE = 16384;
    
    private DataHandler<T> dataHandler;
    private EventProducer<T> producer;
    private int bufferSize;
    private int workerCount;
    
    public DisruptorController(DataHandler<T> handler, int workers, int bufferSize) {
        this.dataHandler = handler;
        this.workerCount = workers;
        this.bufferSize = bufferSize;
    }
    
    public void initialize() {
        EventFactory<MessageEvent<T>> factory = new MessageEventFactory<>();
        Disruptor<MessageEvent<T>> disruptor = new Disruptor<>(
            factory,
            bufferSize,
            CustomThreadFactory.create("processor", true),
            ProducerType.MULTI,
            new SleepingWaitStrategy()
        );
        
        EventWorker<T>[] workers = new EventWorker[workerCount];
        for (int i = 0; i < workerCount; i++) {
            workers[i] = new EventWorker<>(dataHandler);
        }
        
        disruptor.handleEventsWithWorkerPool(workers);
        disruptor.start();
        
        RingBuffer<MessageEvent<T>> ringBuffer = disruptor.getRingBuffer();
        this.producer = new EventProducer<>(ringBuffer, disruptor);
    }
    
    public EventProducer<T> getProducer() {
        return producer;
    }
}

Worker Implementation

public class EventWorker<T> implements WorkHandler<MessageEvent<T>> {
    private DataHandler<T> dataHandler;
    
    public EventWorker(DataHandler<T> handler) {
        this.dataHandler = handler;
    }
    
    @Override
    public void onEvent(MessageEvent<T> event) throws Exception {
        if (event != null) {
            dataHandler.process(event);
        }
    }
}

Producer Implementation

public class EventProducer<T> {
    private final RingBuffer<MessageEvent<T>> ringBuffer;
    private final EventTranslatorOneArg<MessageEvent<T>, T> translator = 
        (event, sequence, data) -> event.setContent(data);
    
    public EventProducer(RingBuffer<MessageEvent<T>> buffer, Disruptor<MessageEvent<T>> disruptor) {
        this.ringBuffer = buffer;
    }
    
    public void sendData(T data) {
        try {
            ringBuffer.publishEvent(translator, data);
        } catch (Exception e) {
            // Handle publication errors
        }
    }
}

Spring Configuartion

@Configuration
public class DisruptorSetup {
    
    @Bean
    public DataHandler<String> createHandler() {
        return new DataHandler<String>() {
            @Override
            public void process(MessageEvent<String> event) {
                // Process log data
                System.out.println("Processing: " + event.getContent());
            }
        };
    }
    
    @Bean
    public EventProducer<String> createProducer(DataHandler<String> handler) {
        DisruptorController<String> controller = new DisruptorController<>(
            handler, 8, 1048576);
        controller.initialize();
        return controller.getProducer();
    }
}

REST Controller

@RestController
public class LogController {
    
    @Autowired
    private EventProducer<String> logProducer;
    
    @PostMapping("/logs")
    public ResponseEntity<?> receiveLog(@RequestBody String logEntry) {
        logProducer.sendData(logEntry);
        return ResponseEntity.ok().build();
    }
}

Tags: disruptor java Performance Concurrency Queue

Posted on Sat, 30 May 2026 19:12:15 +0000 by dustinnoe