Adds toObservable function to HystrixCommands.

Lets user have full control over how the Observable is created with sane defaults for eager/non-eager cases.
parent c6a00c1a
......@@ -16,6 +16,8 @@
package org.springframework.cloud.netflix.hystrix;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
......@@ -48,6 +50,7 @@ public class HystrixCommands {
private Publisher<T> fallback;
private Setter setter;
private boolean eager = false;
private Function<HystrixObservableCommand<T>, Observable<T>> toObservable;
public PublisherBuilder(Publisher<T> publisher) {
this.publisher = publisher;
......@@ -78,10 +81,38 @@ public class HystrixCommands {
return this;
}
public PublisherBuilder<T> toObservable(Function<HystrixObservableCommand<T>, Observable<T>> toObservable) {
this.toObservable = toObservable;
return this;
}
public Publisher<T> build() {
if (!StringUtils.hasText(commandName) && setter == null) {
throw new IllegalStateException("commandName and setter can not both be empty");
}
Setter setterToUse = getSetter();
PublisherHystrixCommand<T> command = new PublisherHystrixCommand<>(setterToUse, this.publisher, this.fallback);
Observable<T> observable = getObservableFunction().apply(command);
return RxReactiveStreams.toPublisher(observable);
}
public Function<HystrixObservableCommand<T>, Observable<T>> getObservableFunction() {
Function<HystrixObservableCommand<T>, Observable<T>> observableFunc;
if (this.toObservable != null) {
observableFunc = this.toObservable;
} else if (this.eager) {
observableFunc = cmd -> cmd.observe();
} else { // apply a default onBackpressureBuffer if not eager
observableFunc = cmd -> cmd.toObservable().onBackpressureBuffer();
}
return observableFunc;
}
public Setter getSetter() {
Setter setterToUse;
if (this.setter != null) {
setterToUse = this.setter;
......@@ -97,16 +128,7 @@ public class HystrixCommands {
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(this.commandName);
setterToUse = Setter.withGroupKey(groupKey).andCommandKey(commandKey);
}
PublisherHystrixCommand<T> command = new PublisherHystrixCommand<>(setterToUse, this.publisher, this.fallback);
Observable<T> observable;
if (this.eager) {
observable = command.observe();
} else {
observable = command.toObservable().onBackpressureBuffer();
}
return RxReactiveStreams.toPublisher(observable);
return setterToUse;
}
public Flux<T> toFlux() {
......@@ -119,30 +141,6 @@ public class HystrixCommands {
}
public static <T> Mono<T> inject(String commandName, Mono<T> mono, Mono<T> fallback, boolean eager) {
String groupName = commandName + "group";
PublisherHystrixCommand<T> command = createHystrixCommand(commandName, groupName,
mono, fallback);
Observable<T> observable;
if (eager) {
observable = command.observe();
} else {
observable = command.toObservable();
}
return Mono.from(RxReactiveStreams.toPublisher(observable));
}
private static <T> PublisherHystrixCommand<T> createHystrixCommand(String commandName,
String groupName, Publisher<T> publisher, Publisher<T> fallback) {
HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(groupName);
HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(commandName);
Setter setter = Setter
.withGroupKey(groupKey).andCommandKey(commandKey);
return new PublisherHystrixCommand<>(setter, publisher, fallback);
}
private static class PublisherHystrixCommand<T> extends HystrixObservableCommand<T> {
private Publisher<T> publisher;
......
......@@ -89,6 +89,18 @@ public class HystrixCommandsTests {
}
@Test
public void toObservableFunctionWorks() {
StepVerifier.create(HystrixCommands.from(Flux.just("1", "2"))
.commandName("multiflux")
.toObservable(cmd -> cmd.toObservable())
.build(), 1)
.expectNext("1")
.thenAwait(Duration.ofSeconds(1))
.thenRequest(1)
.expectError();
}
@Test
public void eagerFluxWorks() {
StepVerifier.create(HystrixCommands.from( Flux.just("1", "2"))
.commandName("multiflux")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment