CompletableFuture

Helper Utility Class

import java.util.StringJoiner;

public class DebugUtil {

    public static void delay(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void logTimeAndThread(String tag) {
        String result = new StringJoiner("\t|\t")
                .add(String.valueOf(System.currentTimeMillis()))
                .add(String.valueOf(Thread.currentThread().getId()))
                .add(Thread.currentThread().getName())
                .add(tag)
                .toString();
        System.out.println(result);
    }
}

1. Creating Asynchronous Tasks

1.1 runAsync

Creates a CompletableFuture<Void> by executing a Runnable.

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

Example:

public static void main(String[] args) throws InterruptedException {
    System.out.println("Main thread starting");

    CompletableFuture.runAsync(() -> {
        System.out.println("Async task running in separate thread");
    });

    Thread.sleep(3000);
    System.out.println("Main thread ending");
}

1.2 supplyAsync

Takes a Supplier<U> and returns a CompletableFuture<U>.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

Example:

public static void main(String[] args) throws Exception {
    System.out.println("Main thread starting");

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        String threadName = Thread.currentThread().getName();
        System.out.println("Async task executing in: " + threadName);
        return "Async result";
    });

    String result = future.get();
    System.out.println("Result: " + result);
    System.out.println("Main thread ending");
}

1.3 Custom Thread Pool

Both runAsync and supplyAsync use ForkJoinPool.commonPool() by default. You can provide a custom Executor.

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

Example:

public static void main(String[] args) throws Exception {
    System.out.println("Main thread starting");

    ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        String threadName = Thread.currentThread().getName();
        System.out.println("Async task in: " + threadName);
        return "Async result";
    }, executor);

    String result = future.get();
    System.out.println("Result: " + result);
    executor.shutdown();
    System.out.println("Main thread ending");
}

2. Callback on Asynchronuos Tasks

2.1 thenApply

Transforms the result using a Function<T, U> and returns a new CompletableFuture<U>.

public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
    return uniApplyStage(null, fn);
}

Example:

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Async task");
        DebugUtil.delay(2000);
        return "HTTP response";
    }).thenApply(content -> {
        DebugUtil.logTimeAndThread("Callback transformation");
        return content + " processed";
    });

    String result = future.get();
    DebugUtil.logTimeAndThread(result);
    DebugUtil.logTimeAndThread("Program end");
}

Note: thenApply runs on the current thread. Use thenApplyAsync for parallel execution.

2.2 thenAccept

Consumes the result without returning a value.

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
    return uniAcceptStage(null, action);
}

Example:

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");

    CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Async task");
        DebugUtil.delay(1000);
        return "HTTP response";
    }).thenApply(content -> {
        DebugUtil.logTimeAndThread("Transformation");
        return content + " processed";
    }).thenAccept(res -> {
        DebugUtil.logTimeAndThread("Final consumption");
    });

    DebugUtil.delay(3000);
    DebugUtil.logTimeAndThread("Program end");
}

2.3 thenRun

Runs a Runnable after the previous stage completes, ignoring the result.

public CompletableFuture<Void> thenRun(Runnable action) {
    return uniRunStage(null, action);
}

Example:

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");

    CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Async task");
        DebugUtil.delay(1000);
        return "HTTP response";
    }).thenRun(() -> {
        DebugUtil.logTimeAndThread("Task completed, sending notification");
    });

    DebugUtil.delay(3000);
    DebugUtil.logTimeAndThread("Program end");
}

2.4 Asynchronous Callbacks for Parallelism

To execute callbacks in a separate thread, use thenApplyAsync, thenAcceptAsync, or thenRunAsync.

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
    return uniApplyStage(asyncPool, fn);
}

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
    return uniApplyStage(screenExecutor(executor), fn);
}

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
    return uniRunStage(asyncPool, action);
}

3. Composing Asynchronous Tasks

3.1 thenCompose

Chains two dependent tasks where the second task uses the result of the first. Returns a CompletableFuture<U>.

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
    return uniComposeStage(null, fn);
}

Example:

public static CompletableFuture<Integer> fetchPrice() {
    return CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Fetching price");
        DebugUtil.delay(2000);
        return 200;
    });
}

public static CompletableFuture<Integer> applyDiscount(Integer price) {
    return CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Applying discount");
        DebugUtil.delay(2000);
        return price - 50;
    });
}

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");

    CompletableFuture<Integer> finalPrice = fetchPrice().thenCompose(price -> applyDiscount(price));
    System.out.println(finalPrice.get());
}

3.2 thenCombine

Combines two independent tasks and processes both results.

public <U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
    return biApplyStage(null, other, fn);
}

Example:

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");
    
    CompletableFuture<Integer> priceFuture = CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Fetching price");
        DebugUtil.delay(2000);
        return 200;
    });

    CompletableFuture<Integer> discountFuture = CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Fetching discount");
        DebugUtil.delay(2000);
        return 50;
    });

    CompletableFuture<Integer> combinedFuture = priceFuture.thenCombine(discountFuture, (price, discount) -> {
        DebugUtil.logTimeAndThread("Calculating final price");
        return price - discount;
    });

    Integer finalPrice = combinedFuture.join();
    DebugUtil.logTimeAndThread(String.valueOf(finalPrice));
    DebugUtil.logTimeAndThread("Program end");
}

3.3 allOf

Waits for multiple independent tasks to complete.

public static CompletableFuture<String> fetchData(String id) {
    return CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Fetching data for " + id);
        DebugUtil.delay(2000);
        return id + " response";
    });
}

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");

    List<String> ids = Arrays.asList("req1", "req2", "req3");
    List<CompletableFuture<String>> futures = ids.stream()
            .map(id -> fetchData(id))
            .collect(Collectors.toList());

    CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0]));

    CompletableFuture<Long> countFuture = allFutures.thenApply(v ->
            futures.stream().map(CompletableFuture::join).count());

    Long count = countFuture.join();
    System.out.println(count);
    DebugUtil.logTimeAndThread("Program end");
}

3.4 anyOf

Returns the result of the first completed task.

public static CompletableFuture<String> fetchDataWithDelay(Integer delay) {
    return CompletableFuture.supplyAsync(() -> {
        DebugUtil.logTimeAndThread("Fetching with delay " + delay);
        DebugUtil.delay(delay);
        return delay + " response";
    });
}

public static void main(String[] args) throws Exception {
    DebugUtil.logTimeAndThread("Program start");

    List<Integer> delays = Arrays.asList(2000, 4000, 10000);
    List<CompletableFuture<String>> futures = delays.stream()
            .map(delay -> fetchDataWithDelay(delay))
            .collect(Collectors.toList());

    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
            futures.toArray(new CompletableFuture[0]));

    Object result = anyFuture.get();
    System.out.println(result);
    DebugUtil.logTimeAndThread("Program end");
}

Tags: java CompletableFuture Async Concurrency

Posted on Tue, 09 Jun 2026 18:10:10 +0000 by BlueSkyIS