Commit a67a0779 by Stephane Maldini Committed by Spencer Gibb

fix overflow issue with hystrix timeout operator (#2391)

- The fix adds a necessary buffer between the rx-rs adapter and the timeout operator introduced in AbstractCommand. Unfortunately, the hystrix operator does not comply with RS backpressure requirements. It will always request an unbounded demand to a source regardless of the command consumer demand. With this commit we protect RS adapter and downstream flow from overflow. Ideally a rework of the operator should be planned.
parent eb07e2b0
......@@ -45,6 +45,11 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<optional>true</optional>
......
......@@ -104,7 +104,7 @@ public class HystrixCommands {
if (this.eager) {
observable = command.observe();
} else {
observable = command.toObservable();
observable = command.toObservable().onBackpressureBuffer();
}
return RxReactiveStreams.toPublisher(observable);
}
......
......@@ -16,18 +16,19 @@
package org.springframework.cloud.netflix.hystrix;
import java.time.Duration;
import java.util.List;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
public class HystrixCommandsTests {
......@@ -76,6 +77,18 @@ public class HystrixCommandsTests {
}
@Test
public void fluxWorksDeferredRequest() {
StepVerifier.create(HystrixCommands.from(Flux.just("1", "2"))
.commandName("multiflux")
.build(), 1)
.expectNext("1")
.thenAwait(Duration.ofSeconds(1))
.thenRequest(1)
.expectNext("2")
.verifyComplete();
}
@Test
public void eagerFluxWorks() {
List<String> list = HystrixCommands.from( Flux.just("1", "2"))
.commandName("multiflux")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment