Introduction
In concurrent programming, a common approach is to submit tasks to a thread pool and collect Future objects in a collection. After all tasks are submitted, we iterate through the Future collection, calling future.get() to retrieve each task's result. This method forces earlier submitted tasks to wait for completion, even if later submitted tasks finish first.
ExecutorCompletionService provides a solution that processes results as soon as they're completed, regardless of the submission order. This article explores how this utility works and when to use it effectively.
Implementation Analysis
ExecutorCompletionService implements the CompletionService interface, which defines methods for submitting tasks and retrieving results:
public interface CompletionService<V> {
// Submit a task
Future<V> submit(Callable<V> task);
// Submit a task
Future<V> submit(Runnable task, V result);
// Retrieve task result (blocking)
Future<V> take() throws InterruptedException;
// Retrieve task result (non-blocking)
Future<V> poll();
// Retrieve task result with timeout
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
The implementation in ExecutorCompletionService is straightforward:
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() {
completionQueue.add(task);
}
private final Future<V> task;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
The operation of ExecutorCompletionService follows these steps:
- When creating a ExecutorCompletionService, you provide an executor (thread pool) or a blocking queue.
- When submitting tasks, they're wrapped in QueueingFuture objects and executed by the specified thread pool.
- Up on task completion, the QueueingFuture's done() method is triggered, adding the completed task to the blocking queue.
- You can retrieve completed tasks using poll() or take() methods, allowing you to process results as soon as they're available.
Usage Scenarios
Scenario 1: Processing Results in Completion Order
When you don't care about the submission order and want to process results as they complete:
void processTasks(Executor executor, Collection<Callable<Result>> tasks)
throws InterruptedException, ExecutionException {
CompletionService<Result> completionService =
new ExecutorCompletionService<Result>(executor);
// Submit all tasks
for (Callable<Result> task : tasks) {
completionService.submit(task);
}
// Process results as they complete
int taskCount = tasks.size();
for (int i = 0; i < taskCount; ++i) {
Result result = completionService.take().get();
if (result != null) {
handleResult(result);
}
}
}
Scenario 2: Retrieving Only the First Completed Result
When you need only the first completed result and can cancel the remaining tasks:
void getFirstResult(Executor executor, Collection<Callable<Result>> tasks)
throws InterruptedException {
CompletionService<Result> completionService =
new ExecutorCompletionService<Result>(executor);
int taskCount = tasks.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(taskCount);
Result firstResult = null;
try {
// Submit all tasks
for (Callable<Result> task : tasks) {
futures.add(completionService.submit(task));
}
// Get first completed result
for (int i = 0; i < taskCount; ++i) {
try {
Result result = completionService.take().get();
if (result != null) {
firstResult = result;
break;
}
} catch (ExecutionException ignore) {
// Handle or ignore exceptions
}
}
} finally {
// Cancel remaining tasks
for (Future<Result> future : futures) {
future.cancel(true);
}
}
if (firstResult != null) {
useResult(firstResult);
}
}
Practical Implementation
Consider a batch order processing scenario where each order needs to be updated individual, and progress should be tracked in real-time:
protected void processOrdersInParallel(Map<String, OrderData> orderDataMap,
Map<String, OrderTaskDetail> taskDetailMap) {
long startTime = System.currentTimeMillis();
log.info("{} Parallel order processing started: {}", traceId, taskNo);
int totalOrders = orderDataMap.size();
BlockingQueue<Future<String>> resultQueue =
new LinkedBlockingQueue<>(totalOrders + 2);
ExecutorCompletionService<String> completionService =
new ExecutorCompletionService<>(orderProcessingExecutor, resultQueue);
// Submit all order processing tasks
for (Map.Entry<String, OrderTaskDetail> entry : taskDetailMap.entrySet()) {
String orderNumber = entry.getKey();
OrderData orderData = orderDataMap.get(orderNumber);
OrderTaskDetail taskDetail = entry.getValue();
completionService.submit(() -> this.processSingleOrder(orderData, taskDetail), "completed");
}
// Process results as they complete
for (int current = 0; current < taskDetailMap.size(); current++) {
try {
completionService.take().get();
} catch (Exception e) {
log.error("{} Error processing parallel order result: {}",
traceId, e.getMessage(), e);
} finally {
progressTracker.increment(taskNo);
}
}
long endTime = System.currentTimeMillis();
log.info("{} Parallel order processing completed: {} in {} ms",
traceId, taskNo, (endTime - startTime));
}