Netflix Hystrix provides a robust framework for managing interactions between distributed services. One of its core features is the ability to execute commands in different ways depending on the requirements of your application. This guide explores the implementation and execution of HystrixCommand and HystrixObservableCommand.
Basic Command Implementation
The standard way to wrap a logic block in Hystrix is by extending HystrixCommand. This is typically used for operations that return a single response.
public class GreetingCommand extends HystrixCommand<String> {
private final String accountName;
public GreetingCommand(String accountName) {
super(HystrixCommandGroupKey.Factory.asKey("MessagingService"));
this.accountName = accountName;
}
@Override
protected String run() {
return "Welcome, " + accountName + "!";
}
}
For scenarios requiring reactive streams or multiple emitted values, HystrixObservableCommand is the appropriate choice. You implement the construct method instead of run.
public class ReactiveGreetingCommand extends HystrixObservableCommand<String> {
private final String accountName;
public ReactiveGreetingCommand(String accountName) {
super(HystrixCommandGroupKey.Factory.asKey("MessagingService"));
this.accountName = accountName;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> observer) {
try {
if (!observer.isUnsubscribed()) {
observer.onNext("Connecting...");
observer.onNext("Hello, " + accountName);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
Synchronous Execution
The execute() method is used for blocking calls. It triggers the command and waits for the thread to return the result.
String result = new GreetingCommand("Alice").execute();
Under the hood, execute() calls queue().get(). While HystrixObservableCommand does not have a direct execute() method, you can achieve synchronous behavior using RxJava blocking operators:
String result = new ReactiveGreetingCommand("Alice").toObservable().toBlocking().single();
Asynchronous Execuiton
By using the queue() method, Hystrix executes the command in a separate thread pool and returns a Future object, allowing the caller to perform other tasks while waiting.
Future<String> futureResult = new GreetingCommand("Bob").queue();
// Perform other operations
String result = futureResult.get();
For HystrixObservableCommand, you can simulate asynchronous queuing by converting the observable to a future:
Future<String> future = new ReactiveGreetingCommand("Bob").toObservable().toBlocking().toFuture();
Reactive Execution
Hystrix offers two primary ways to obtain an Observable for reactive processing:
- observe(): Returns a "Hot" Observable. It triggers the command execution immediately and allows multiple subscribers to capture the same emitted data.
- toObservable(): Returns a "Cold" Observable. The command does not start until a subscriber is actually attached.
Observable<String> hotObservable = new GreetingCommand("Charlie").observe();
hotObservable.subscribe(val -> {
System.out.println("Received: " + val);
});
Using Java 8 lambdas simplifies the subscription logic, including error handling:
hotObservable.subscribe(
item -> System.out.println("Success: " + item),
error -> error.printStackTrace()
);
These reactive patterns allow for powerful compositions, such as merging results from multiple commands or applying transformations before the data reaches the UI or the next service layer.