For message processing in Java applications, developers often consdier heavyweight solutions like message queues. However, Java's built-in concurrency utilities can effectively handle many scenarios with lower overhead.
The java.util.concurrant.Flow API (introduced in Java 9) provides a lightweight alternative for implementing the publisher-subscribre pattern. Here's a practical implementation:
Message Entity
package messaging.flow;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class EventMessage {
private int processedCount;
private final int totalSubscribers;
private final String payload;
private final Map<String, Boolean> acknowledgmentMap;
public EventMessage(String payload, List<String> subscriberIds) {
this.payload = payload;
this.totalSubscribers = subscriberIds.size();
this.processedCount = 0;
this.acknowledgmentMap = new ConcurrentHashMap<>();
subscriberIds.forEach(id -> acknowledgmentMap.put(id, false));
}
public synchronized void markProcessed(String subscriberId) {
if (processedCount < totalSubscribers &&
acknowledgmentMap.containsKey(subscriberId) &&
!acknowledgmentMap.get(subscriberId)) {
acknowledgmentMap.put(subscriberId, true);
processedCount++;
System.out.println("Subscriber " + subscriberId + " processed: " + payload);
if (processedCount == totalSubscribers) {
System.out.println("\nAll subscribers completed processing: " + payload + "\n");
}
}
}
public boolean isComplete() {
return processedCount == totalSubscribers;
}
}
Subscriber Implementation
package messaging.flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
public class EventSubscriber implements Subscriber<EventMessage> {
private Subscription subscription;
private final String subscriberId;
public EventSubscriber(String subscriberId) {
this.subscriberId = subscriberId;
}
@Override
public void onSubscribe(Subscription subscription) {
System.out.println(subscriberId + " subscribed");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(EventMessage message) {
message.markProcessed(subscriberId);
subscription.request(1);
try {
Thread.sleep(message.getPayload().contains("long") ? 1300 : 200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error occurred: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(subscriberId + " completed processing");
}
}
Publisher Implementation
package messaging.flow;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import java.util.Arrays;
public class EventPublisher {
public static void main(String[] args) {
List<String> subscriberIds = Arrays.asList("sub1", "sub2", "sub3", "sub4");
try (SubmissionPublisher<EventMessage> publisher = new SubmissionPublisher<>()) {
subscriberIds.forEach(id ->
publisher.subscribe(new EventSubscriber(id)));
List<EventMessage> messages = Arrays.asList(
new EventMessage("quick task", subscriberIds),
new EventMessage("long running task", subscriberIds),
new EventMessage("standard task", subscriberIds)
);
messages.forEach(msg -> {
System.out.println("Publishing: " + msg.getPayload());
publisher.submit(msg);
});
}
}
}
Sample Output
sub1 subscribed
sub2 subscribed
sub3 subscribed
sub4 subscribed
Publishing: quick task
Publishing: long running task
Publishing: standard task
Subscriber sub1 processed: quick task
Subscriber sub2 processed: quick task
Subscriber sub3 processed: quick task
Subscriber sub4 processed: quick task
All subscribers completed processing: quick task
Subscriber sub1 processed: standard task
Subscriber sub2 processed: standard task
Subscriber sub3 processed: standard task
Subscriber sub4 processed: standard task
All subscribers completed processing: standard task
Subscriber sub1 processed: long running task
Subscriber sub2 processed: long running task
Subscriber sub3 processed: long running task
Subscriber sub4 processed: long running task
All subscribers completed processing: long running task
sub1 completed processing
sub2 completed processing
sub3 completed processing
sub4 completed processing
This implementation demonstrates the core publisher-subscriber pattern using Java's Flow API, providing a foundation that can be extended for more complex scenarios.