Implementing Publisher-Subscriber Pattern with Java Flow API

For message processing in Java applications, developers often consdier heavyweight solutions like message queues. However, Java's built-in concurrency utilities can effectively handle many scenarios with lower overhead.

The java.util.concurrant.Flow API (introduced in Java 9) provides a lightweight alternative for implementing the publisher-subscribre pattern. Here's a practical implementation:

Message Entity

package messaging.flow;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class EventMessage {
    private int processedCount;
    private final int totalSubscribers;
    private final String payload;
    private final Map<String, Boolean> acknowledgmentMap;

    public EventMessage(String payload, List<String> subscriberIds) {
        this.payload = payload;
        this.totalSubscribers = subscriberIds.size();
        this.processedCount = 0;
        this.acknowledgmentMap = new ConcurrentHashMap<>();
        subscriberIds.forEach(id -> acknowledgmentMap.put(id, false));
    }

    public synchronized void markProcessed(String subscriberId) {
        if (processedCount < totalSubscribers && 
            acknowledgmentMap.containsKey(subscriberId) && 
            !acknowledgmentMap.get(subscriberId)) {
            
            acknowledgmentMap.put(subscriberId, true);
            processedCount++;
            System.out.println("Subscriber " + subscriberId + " processed: " + payload);
            
            if (processedCount == totalSubscribers) {
                System.out.println("\nAll subscribers completed processing: " + payload + "\n");
            }
        }
    }
    
    public boolean isComplete() {
        return processedCount == totalSubscribers;
    }
}

Subscriber Implementation

package messaging.flow;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class EventSubscriber implements Subscriber<EventMessage> {
    private Subscription subscription;
    private final String subscriberId;

    public EventSubscriber(String subscriberId) {
        this.subscriberId = subscriberId;
    }

    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println(subscriberId + " subscribed");
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(EventMessage message) {
        message.markProcessed(subscriberId);
        subscription.request(1);
        
        try {
            Thread.sleep(message.getPayload().contains("long") ? 1300 : 200);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println(subscriberId + " completed processing");
    }
}

Publisher Implementation

package messaging.flow;

import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.Arrays;

public class EventPublisher {
    public static void main(String[] args) {
        List<String> subscriberIds = Arrays.asList("sub1", "sub2", "sub3", "sub4");
        
        try (SubmissionPublisher<EventMessage> publisher = new SubmissionPublisher<>()) {
            subscriberIds.forEach(id -> 
                publisher.subscribe(new EventSubscriber(id)));
            
            List<EventMessage> messages = Arrays.asList(
                new EventMessage("quick task", subscriberIds),
                new EventMessage("long running task", subscriberIds),
                new EventMessage("standard task", subscriberIds)
            );
            
            messages.forEach(msg -> {
                System.out.println("Publishing: " + msg.getPayload());
                publisher.submit(msg);
            });
        }
    }
}

Sample Output

sub1 subscribed
sub2 subscribed
sub3 subscribed
sub4 subscribed
Publishing: quick task
Publishing: long running task
Publishing: standard task
Subscriber sub1 processed: quick task
Subscriber sub2 processed: quick task
Subscriber sub3 processed: quick task
Subscriber sub4 processed: quick task

All subscribers completed processing: quick task

Subscriber sub1 processed: standard task
Subscriber sub2 processed: standard task
Subscriber sub3 processed: standard task
Subscriber sub4 processed: standard task

All subscribers completed processing: standard task

Subscriber sub1 processed: long running task
Subscriber sub2 processed: long running task
Subscriber sub3 processed: long running task
Subscriber sub4 processed: long running task

All subscribers completed processing: long running task

sub1 completed processing
sub2 completed processing
sub3 completed processing
sub4 completed processing

This implementation demonstrates the core publisher-subscriber pattern using Java's Flow API, providing a foundation that can be extended for more complex scenarios.

Tags: java-concurrency flow-api publisher-subscriber java-util-concurrent

Posted on Mon, 18 May 2026 13:05:29 +0000 by kwilder