ExecutorCompletionService: Efficient Task Result Handling in Concurrent Programming

Introduction

In concurrent programming, a common approach is to submit tasks to a thread pool and collect Future objects in a collection. After all tasks are submitted, we iterate through the Future collection, calling future.get() to retrieve each task's result. This method forces earlier submitted tasks to wait for completion, even if later submitted tasks finish first.

ExecutorCompletionService provides a solution that processes results as soon as they're completed, regardless of the submission order. This article explores how this utility works and when to use it effectively.

Implementation Analysis

ExecutorCompletionService implements the CompletionService interface, which defines methods for submitting tasks and retrieving results:

public interface CompletionService<V> {
    // Submit a task
    Future<V> submit(Callable<V> task);
    // Submit a task
    Future<V> submit(Runnable task, V result);
    // Retrieve task result (blocking)
    Future<V> take() throws InterruptedException;
    // Retrieve task result (non-blocking)
    Future<V> poll();
    // Retrieve task result with timeout
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

The implementation in ExecutorCompletionService is straightforward:

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        
        protected void done() { 
            completionQueue.add(task); 
        }
        private final Future<V> task;
    }

    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }

    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
            return aes.newTaskFor(task, result);
    }

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
}

The operation of ExecutorCompletionService follows these steps:

  1. When creating a ExecutorCompletionService, you provide an executor (thread pool) or a blocking queue.
  2. When submitting tasks, they're wrapped in QueueingFuture objects and executed by the specified thread pool.
  3. Up on task completion, the QueueingFuture's done() method is triggered, adding the completed task to the blocking queue.
  4. You can retrieve completed tasks using poll() or take() methods, allowing you to process results as soon as they're available.

Usage Scenarios

Scenario 1: Processing Results in Completion Order

When you don't care about the submission order and want to process results as they complete:

void processTasks(Executor executor, Collection<Callable<Result>> tasks) 
    throws InterruptedException, ExecutionException {
    CompletionService<Result> completionService = 
        new ExecutorCompletionService<Result>(executor);
    
    // Submit all tasks
    for (Callable<Result> task : tasks) {
        completionService.submit(task);
    }
    
    // Process results as they complete
    int taskCount = tasks.size();
    for (int i = 0; i < taskCount; ++i) {
        Result result = completionService.take().get();
        if (result != null) {
            handleResult(result);
        }
    }
}

Scenario 2: Retrieving Only the First Completed Result

When you need only the first completed result and can cancel the remaining tasks:

void getFirstResult(Executor executor, Collection<Callable<Result>> tasks) 
    throws InterruptedException {
    CompletionService<Result> completionService = 
        new ExecutorCompletionService<Result>(executor);
    int taskCount = tasks.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(taskCount);
    Result firstResult = null;
    
    try {
        // Submit all tasks
        for (Callable<Result> task : tasks) {
            futures.add(completionService.submit(task));
        }

        // Get first completed result
        for (int i = 0; i < taskCount; ++i) {
            try {
                Result result = completionService.take().get();
                if (result != null) {
                    firstResult = result;
                    break;
                }
            } catch (ExecutionException ignore) {
                // Handle or ignore exceptions
            }
        }
    } finally {
        // Cancel remaining tasks
        for (Future<Result> future : futures) {
            future.cancel(true);
        }
    }

    if (firstResult != null) {
        useResult(firstResult);
    }
}

Practical Implementation

Consider a batch order processing scenario where each order needs to be updated individual, and progress should be tracked in real-time:

protected void processOrdersInParallel(Map<String, OrderData> orderDataMap, 
                                      Map<String, OrderTaskDetail> taskDetailMap) {
    long startTime = System.currentTimeMillis();
    log.info("{} Parallel order processing started: {}", traceId, taskNo);
    int totalOrders = orderDataMap.size();
    BlockingQueue<Future<String>> resultQueue = 
        new LinkedBlockingQueue<>(totalOrders + 2);
    ExecutorCompletionService<String> completionService = 
        new ExecutorCompletionService<>(orderProcessingExecutor, resultQueue);
    
    // Submit all order processing tasks
    for (Map.Entry<String, OrderTaskDetail> entry : taskDetailMap.entrySet()) {
        String orderNumber = entry.getKey();
        OrderData orderData = orderDataMap.get(orderNumber);
        OrderTaskDetail taskDetail = entry.getValue();
        completionService.submit(() -> this.processSingleOrder(orderData, taskDetail), "completed");
    }

    // Process results as they complete
    for (int current = 0; current < taskDetailMap.size(); current++) {
        try {
            completionService.take().get();
        } catch (Exception e) {
            log.error("{} Error processing parallel order result: {}", 
                     traceId, e.getMessage(), e);
        } finally {
            progressTracker.increment(taskNo);
        }
    }

    long endTime = System.currentTimeMillis();
    log.info("{} Parallel order processing completed: {} in {} ms", 
             traceId, taskNo, (endTime - startTime));
}

Tags: java concurrent-programming Executor thread-pool completion-service

Posted on Mon, 29 Jun 2026 16:45:00 +0000 by Vern1271