Commit 2bca3504 by Spencer Gibb

Merge pull request #778 from jmnarloch/rxjava-stream

* rxjava-stream: Spring Mvc RxJava integration.
parents 0f61521c bca86191
......@@ -139,7 +139,7 @@ or you will encounter compile problems. It also needs a specific version of mave
enabled. Intellij 14.1+ requires some configuration to ensure these are setup properly.
1. Click Preferences, Plugins. *Ensure Lombok is installed*
2. Click New, Project from Existing Sources, choose your spring-cloud-sleuth directory
2. Click New, Project from Existing Sources, choose your spring-cloud project directory
3. Choose Maven, and select Environment Settings. *Ensure you are using Maven 3.3.3*
4. In the next screen, *Select the profile `spring`* click Next until Finish.
5. Click Preferences, "Build, Execution, Deployment", Compiler, Annotation Processors. *Click Enable Annotation Processing*
......
......@@ -1483,6 +1483,36 @@ info:
url: https://github.com/spring-cloud-samples
----
[[netflix-rxjava-springmvc]]
== RxJava with Spring MVC
Spring Cloud Netflix includes the https://github.com/ReactiveX/RxJava[RxJava].
> RxJava is a Java VM implementation of http://reactivex.io/[Reactive Extensions]: a library for composing asynchronous and event-based programs by using observable sequences.
Spring Cloud Netflix provides support for returning `rx.Single` objects from Spring MVC Controllers. It also supports using `rx.Observable` objects for https://en.wikipedia.org/wiki/Server-sent_events[Server-sent events (SSE)]. This can be very convenient if your internal APIs are already built using RxJava (see <<spring-cloud-feign-hystrix>> for examples).
Here are some examples of using `rx.Single`:
[source,java]
----
include::../../../../spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/rx/SingleReturnValueHandlerTest.java[tags=rx_single,indent=0]
----
If you have an `Observable`, rather than a single, you can use `.toSingle()` or `.toList().toSingle()`. Here are some examples:
[source,java]
----
include::../../../../spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/rx/ObservableReturnValueHandlerTest.java[tags=rx_observable,indent=0]
----
If you have a streaming endpoint and client, SSE could be an option. To convert `rx.Observable` to a Spring `SseEmitter` use `RxResponse.sse()`. Here are some examples:
[source,java]
----
include::../../../../spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/rx/ObservableSseEmitterTest.java[tags=rx_observable_sse,indent=0]
----
[[netflix-metrics]]
== Metrics: Spectator, Servo, and Atlas
When used together, Spectator/Servo and Atlas provide a near real-time operational insight platform.
......@@ -1545,6 +1575,7 @@ If Spring AOP is enabled and `org.aspectj:aspectjweaver` is present on your runt
3. URI, sanitized for Atlas
4. Client name
[[netflix-metrics-spectator]]
=== Metrics Collection: Spectator
To enable Spectator metrics, include a dependency on `spring-boot-starter-spectator`:
......@@ -1626,6 +1657,7 @@ DistributionSummary ds = registry.distributionSummary("dsName", "tagKey1", "tagV
ds.record(request.sizeInBytes());
----
[[netflix-metrics-servo]]
=== Metrics Collection: Servo
WARNING: If your code is compiled on Java 8, please use Spectator instead of Servo as Spectator is destined to replace Servo entirely in the long term.
......@@ -1649,6 +1681,7 @@ Timer timer = new BasicTimer(config);
monitorRegistry.register(timer);
----
[[netflix-metrics-atlas]]
=== Metrics Backend: Atlas
Atlas was developed by Netflix to manage dimensional time series data for near real-time operational insight. Atlas features in-memory data storage, allowing it to gather and report very large numbers of metrics, very quickly.
......
/*
* Copyright 2013-2015 the original author or authors.
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,60 +16,53 @@
package org.springframework.cloud.netflix.rx;
import org.springframework.core.MethodParameter;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import rx.Observable;
import rx.functions.Action1;
import rx.Subscriber;
import rx.Subscription;
/**
* MVC handler for return values of type {@link rx.Observable}.
* A subscriber that sets the single value produced by the {@link Observable} on the {@link DeferredResult}.
*
* @author Spencer Gibb
* @author Jakub Narloch
* @see DeferredResult
*/
public class ObservableReturnValueHandler
implements AsyncHandlerMethodReturnValueHandler {
class DeferredResultSubscriber<T> extends Subscriber<T> implements Runnable {
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
return returnValue != null && returnValue instanceof Observable;
}
private final DeferredResult<T> deferredResult;
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Observable.class.isAssignableFrom(returnType.getParameterType());
}
private final Subscription subscription;
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest)
throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
private boolean completed;
Observable<?> observable = Observable.class.cast(returnValue);
public DeferredResultSubscriber(Observable<T> observable, DeferredResult<T> deferredResult) {
final DeferredResult<Object> deferredResult = new DeferredResult<>();
this.deferredResult = deferredResult;
this.deferredResult.onTimeout(this);
this.deferredResult.onCompletion(this);
this.subscription = observable.subscribe(this);
}
observable.subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
deferredResult.setResult(o);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
deferredResult.setErrorResult(throwable);
}
});
@Override
public void onNext(T value) {
if (!completed) {
deferredResult.setResult(value);
}
}
WebAsyncUtils.getAsyncManager(webRequest)
.startDeferredResultProcessing(deferredResult, mavContainer);
}
@Override
public void onError(Throwable e) {
deferredResult.setErrorResult(e);
}
@Override
public void onCompleted() {
completed = true;
}
@Override
public void run() {
this.subscription.unsubscribe();
}
}
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.rx;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import rx.Observable;
/**
* A specialized {@link SseEmitter} that handles {@link Observable} return types.
*
* @author Jakub Narloch
* @see SseEmitter
*/
class ObservableSseEmitter<T> extends SseEmitter {
private final ResponseBodyEmitterSubscriber<T> subscriber;
public ObservableSseEmitter(Observable<T> observable) {
this(null, observable);
}
public ObservableSseEmitter(MediaType mediaType, Observable<T> observable) {
this(null, mediaType, observable);
}
public ObservableSseEmitter(Long timeout, MediaType mediaType, Observable<T> observable) {
super(timeout);
this.subscriber = new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
}
}
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.rx;
import java.io.IOException;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
/**
* Subscriber that emits any value produced by the {@link Observable} into the delegated {@link ResponseBodyEmitter}.
*
* @author Jakub Narloch
*/
class ResponseBodyEmitterSubscriber<T> extends Subscriber<T> implements Runnable {
private final MediaType mediaType;
private final Subscription subscription;
private final ResponseBodyEmitter responseBodyEmitter;
private boolean completed;
/**
* Creates new instance of {@link ResponseBodyEmitterSubscriber} with response media type, observable and response
* emitter.
*
* @param mediaType the marshaled object media type
* @param observable the observable
* @param responseBodyEmitter the response emitter
*/
public ResponseBodyEmitterSubscriber(MediaType mediaType, Observable<T> observable, ResponseBodyEmitter responseBodyEmitter) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
this.subscription = observable.subscribe(this);
}
@Override
public void onNext(T value) {
try {
if(!completed) {
responseBodyEmitter.send(value, mediaType);
}
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
public void onError(Throwable e) {
responseBodyEmitter.completeWithError(e);
}
@Override
public void onCompleted() {
if(!completed) {
completed = true;
responseBodyEmitter.complete();
}
}
@Override
public void run() {
subscription.unsubscribe();
}
}
/*
* Copyright 2013-2015 the original author or authors.
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -28,6 +28,7 @@ import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
import rx.Observable;
import rx.Single;
/**
* @author Spencer Gibb
......@@ -39,25 +40,25 @@ public class RxJavaAutoConfiguration {
@Configuration
@ConditionalOnClass(AsyncHandlerMethodReturnValueHandler.class)
protected static class ObservableReturnValueHandlerConfig {
protected static class RxJavaReturnValueHandlerConfig {
@Bean
public ObservableReturnValueHandler observableReturnValueHandler() {
return new ObservableReturnValueHandler();
public SingleReturnValueHandler singleReturnValueHandler() {
return new SingleReturnValueHandler();
}
@Bean
public WebMvcConfigurerAdapter observableMVCConfiguration() {
public WebMvcConfigurerAdapter observableMVCConfiguration(final SingleReturnValueHandler singleReturnValueHandler) {
return new WebMvcConfigurerAdapter() {
@Override
public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
returnValueHandlers.add(observableReturnValueHandler());
returnValueHandlers.add(singleReturnValueHandler);
}
};
}
@Bean
public HasFeatures rxFeature() {
return HasFeatures.namedFeature("MVC Observable", Observable.class);
return HasFeatures.namedFeatures("MVC Observable", Observable.class, "MVC Single", Single.class);
}
}
}
package org.springframework.cloud.netflix.rx;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import rx.Observable;
/**
* A convenient class allowing to wrap either the {@link Observable} into a response supported by the
* Spring MVC.
*
* @author Jakub Narloch
*/
public final class RxResponse {
private RxResponse() {
}
/**
* Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted
* as server side event.
*
* @param observable the observable instance
* @param <T> the result type
* @return the sse emitter
*/
public static <T> SseEmitter sse(Observable<T> observable) {
return new ObservableSseEmitter<>(observable);
}
/**
* Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted
* as server side event.
*
* @param mediaType the media type of produced entry
* @param observable the observable instance
* @param <T> the result type
* @return the sse emitter
*/
public static <T> SseEmitter sse(MediaType mediaType, Observable<T> observable) {
return new ObservableSseEmitter<>(mediaType, observable);
}
/**
* Wraps the {@link Observable} into a {@link SseEmitter}. Every value produced by the observable will be emitted
* as server side event.
*
* @param timeout the response timeout
* @param mediaType the media type of produced entry
* @param observable the observable instance
* @param <T> the result type
* @return the sse emitter
*/
public static <T> SseEmitter sse(long timeout, MediaType mediaType, Observable<T> observable) {
return new ObservableSseEmitter<>(timeout, mediaType, observable);
}
}
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.rx;
import org.springframework.util.Assert;
import org.springframework.web.context.request.async.DeferredResult;
import rx.Single;
/**
* A specialized {@link DeferredResult} that handles {@link Single} return type.
*
* @author Jakub Narloch
* @see DeferredResult
*/
class SingleDeferredResult<T> extends DeferredResult<T> {
private static final Object EMPTY_RESULT = new Object();
private final DeferredResultSubscriber<T> subscriber;
public SingleDeferredResult(Single<T> single) {
this(null, EMPTY_RESULT, single);
}
public SingleDeferredResult(long timeout, Single<T> single) {
this(timeout, EMPTY_RESULT, single);
}
public SingleDeferredResult(Long timeout, Object timeoutResult, Single<T> single) {
super(timeout, timeoutResult);
Assert.notNull(single, "single can not be null");
subscriber = new DeferredResultSubscriber<>(single.toObservable(), this);
}
}
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.rx;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
import rx.Single;
import rx.functions.Func1;
/**
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Single} return types.
*
* @author Spencer Gibb
* @author Jakub Narloch
*/
public class SingleReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
return returnValue != null && supportsReturnType(returnType);
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Single.class.isAssignableFrom(returnType.getParameterType()) || isResponseEntity(returnType);
}
private boolean isResponseEntity(MethodParameter returnType) {
if(ResponseEntity.class.isAssignableFrom(returnType.getParameterType())) {
Class<?> bodyType = ResolvableType.forMethodParameter(returnType).getGeneric(0).resolve();
return bodyType != null && Single.class.isAssignableFrom(bodyType);
}
return false;
}
@SuppressWarnings("unchecked")
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
ResponseEntity<Single<?>> responseEntity = getResponseEntity(returnValue);
if(responseEntity != null) {
returnValue = responseEntity.getBody();
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
}
final Single<?> single = Single.class.cast(returnValue);
WebAsyncUtils.getAsyncManager(webRequest)
.startDeferredResultProcessing(convertToDeferredResult(responseEntity, single), mavContainer);
}
@SuppressWarnings("unchecked")
private ResponseEntity<Single<?>> getResponseEntity(Object returnValue) {
if (ResponseEntity.class.isAssignableFrom(returnValue.getClass())) {
return (ResponseEntity<Single<?>>) returnValue;
}
return null;
}
protected DeferredResult<?> convertToDeferredResult(final ResponseEntity<Single<?>> responseEntity, Single<?> single) {
//TODO: fix when java8 :-)
Single<ResponseEntity> singleResponse = single.map(new Func1<Object, ResponseEntity>() {
@Override
public ResponseEntity call(Object object) {
return new ResponseEntity<>(object, getHttpHeaders(responseEntity), getHttpStatus(responseEntity));
}
});
return new SingleDeferredResult<>(singleResponse);
}
private HttpStatus getHttpStatus(ResponseEntity<?> responseEntity) {
if(responseEntity == null) {
return HttpStatus.OK;
}
return responseEntity.getStatusCode();
}
private HttpHeaders getHttpHeaders(ResponseEntity<?> responseEntity) {
if(responseEntity == null) {
return new HttpHeaders();
}
return responseEntity.getHeaders();
}
}
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.rx;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.IntegrationTest;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.TestRestTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
/**
* Tests the demonstrate using {@link Observable} with {@link SingleReturnValueHandler} class.
*
* @author Spencer Gibb
* @author Jakub Narloch
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = ObservableReturnValueHandlerTest.Application.class)
@WebAppConfiguration
@IntegrationTest({"server.port=0"})
@DirtiesContext
public class ObservableReturnValueHandlerTest {
@Value("${local.server.port}")
private int port = 0;
private TestRestTemplate restTemplate = new TestRestTemplate();
@Configuration
@EnableAutoConfiguration
@RestController
protected static class Application {
// tag::rx_observable[]
@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
return Observable.just("single value").toSingle();
}
@RequestMapping(method = RequestMethod.GET, value = "/multiple")
public Single<List<String>> multiple() {
return Observable.just("multiple", "values").toList().toSingle();
}
@RequestMapping(method = RequestMethod.GET, value = "/responseWithObservable")
public ResponseEntity<Single<String>> responseWithObservable() {
Observable<String> observable = Observable.just("single value");
HttpHeaders headers = new HttpHeaders();
headers.setContentType(APPLICATION_JSON_UTF8);
return new ResponseEntity<>(observable.toSingle(), headers, HttpStatus.CREATED);
}
@RequestMapping(method = RequestMethod.GET, value = "/timeout")
public Observable<String> timeout() {
return Observable.timer(1, TimeUnit.MINUTES).map(new Func1<Long, String>() {
@Override
public String call(Long aLong) {
return "single value";
}
});
}
// end::rx_observable[]
@RequestMapping(method = RequestMethod.GET, value = "/throw")
public Single<Object> error() {
return Observable.error(new RuntimeException("Unexpected")).toSingle();
}
}
@Test
public void shouldRetrieveSingleValue() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/single"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals("single value", response.getBody());
}
@Test
public void shouldRetrieveMultipleValues() {
// when
ResponseEntity<List> response = restTemplate.getForEntity(path("/multiple"), List.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals(Arrays.asList("multiple", "values"), response.getBody());
}
@Test
public void shouldRetrieveSingleValueWithStatusCodeAndCustomHeader() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/responseWithObservable"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.CREATED, response.getStatusCode());
assertEquals(MediaType.APPLICATION_JSON_UTF8, response.getHeaders().getContentType());
assertEquals("single value", response.getBody());
}
@Test
public void shouldRetrieveErrorResponse() {
// when
ResponseEntity<Object> response = restTemplate.getForEntity(path("/throw"), Object.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
}
@Test
@Ignore("adds 30s to build")
public void shouldTimeoutOnConnection() {
// when
ResponseEntity<Object> response = restTemplate.getForEntity(path("/timeout"), Object.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
}
private String path(String context) {
return String.format("http://localhost:%d%s", port, context);
}
}
\ No newline at end of file
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.rx;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.TimeZone;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.IntegrationTest;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.TestRestTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.web.WebAppConfiguration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import rx.Observable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.springframework.http.MediaType.APPLICATION_JSON_UTF8;
/**
* Tests the {@link ObservableSseEmitter} class.
*
* @author Spencer Gibb
* @author Jakub Narloch
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = ObservableSseEmitterTest.Application.class)
@WebAppConfiguration
@IntegrationTest({"server.port=0"})
@DirtiesContext
public class ObservableSseEmitterTest {
@Value("${local.server.port}")
private int port = 0;
private TestRestTemplate restTemplate = new TestRestTemplate();
@Configuration
@EnableAutoConfiguration
@RestController
protected static class Application {
// tag::rx_observable_sse[]
@RequestMapping(method = RequestMethod.GET, value = "/sse")
public SseEmitter single() {
return RxResponse.sse(Observable.just("single value"));
}
@RequestMapping(method = RequestMethod.GET, value = "/messages")
public SseEmitter messages() {
return RxResponse.sse(Observable.just("message 1", "message 2", "message 3"));
}
@RequestMapping(method = RequestMethod.GET, value = "/events")
public SseEmitter event() {
return RxResponse.sse(APPLICATION_JSON_UTF8, Observable.just(
new EventDto("Spring io", getDate(2016, 5, 19)),
new EventDto("SpringOnePlatform", getDate(2016, 8, 1))
));
}
// end::rx_observable_sse[]
}
@Test
public void shouldRetrieveSse() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/sse"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals("data:single value\n\n", response.getBody());
}
@Test
public void shouldRetrieveSseWithMultipleMessages() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/messages"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals("data:message 1\n\ndata:message 2\n\ndata:message 3\n\n", response.getBody());
}
@Test
public void shouldRetrieveJsonOverSseWithMultipleMessages() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/events"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals("data:{\"name\":\"Spring io\",\"date\":1466337600000}\n\ndata:{\"name\":\"SpringOnePlatform\",\"date\":1472731200000}\n\n", response.getBody());
}
private String path(String context) {
return String.format("http://localhost:%d%s", port, context);
}
private static Date getDate(int year, int month, int day) {
GregorianCalendar calendar = new GregorianCalendar(year, month, day, 12, 0, 0);
calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
return calendar.getTime();
}
/**
* A simple DTO used for testing purpose.
*
* @author Jakub Narloch
*/
static class EventDto {
private final String name;
private final Date date;
@JsonCreator
public EventDto(@JsonProperty("name") String name, @JsonProperty("date") Date date) {
this.name = name;
this.date = date;
}
public String getName() {
return name;
}
public Date getDate() {
return date;
}
}
}
\ No newline at end of file
/*
* Copyright 2013-2015 the original author or authors.
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,9 +16,6 @@
package org.springframework.cloud.netflix.rx;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Value;
......@@ -36,36 +33,88 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import rx.Observable;
import rx.Single;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
* Tests the {@link SingleReturnValueHandler} class.
*
* @author Spencer Gibb
* @author Jakub Narloch
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = ObservableReturnValueHandlerTests.Application.class)
@SpringApplicationConfiguration(classes = SingleReturnValueHandlerTest.Application.class)
@WebAppConfiguration
@IntegrationTest({ "server.port=0" })
@IntegrationTest({"server.port=0"})
@DirtiesContext
public class ObservableReturnValueHandlerTests {
@Value("${local.server.port}")
private int port = 0;
@Configuration
@EnableAutoConfiguration
@RestController
protected static class Application {
@RequestMapping(method = RequestMethod.GET, value = "/")
public Observable<String> hi() {
return Observable.just("hello world");
}
}
public class SingleReturnValueHandlerTest {
@Value("${local.server.port}")
private int port = 0;
private TestRestTemplate restTemplate = new TestRestTemplate();
@Configuration
@EnableAutoConfiguration
@RestController
protected static class Application {
// tag::rx_single[]
@RequestMapping(method = RequestMethod.GET, value = "/single")
public Single<String> single() {
return Single.just("single value");
}
@Test
public void observableReturns() {
ResponseEntity<String> response = new TestRestTemplate().getForEntity("http://localhost:" + port, String.class);
assertNotNull("response was null", response);
assertEquals("response code was wrong", HttpStatus.OK, response.getStatusCode());
assertEquals("response was wrong", "hello world", response.getBody());
@RequestMapping(method = RequestMethod.GET, value = "/singleWithResponse")
public ResponseEntity<Single<String>> singleWithResponse() {
return new ResponseEntity<>(Single.just("single value"), HttpStatus.NOT_FOUND);
}
@RequestMapping(method = RequestMethod.GET, value = "/throw")
public Single<Object> error() {
return Single.error(new RuntimeException("Unexpected"));
}
// end::rx_single[]
}
}
@Test
public void shouldRetrieveSingleValue() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/single"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals("single value", response.getBody());
}
@Test
public void shouldRetrieveSingleValueWithStatusCode() {
// when
ResponseEntity<String> response = restTemplate.getForEntity(path("/singleWithResponse"), String.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.NOT_FOUND, response.getStatusCode());
assertEquals("single value", response.getBody());
}
@Test
public void shouldRetrieveErrorResponse() {
// when
ResponseEntity<Object> response = restTemplate.getForEntity(path("/throw"), Object.class);
// then
assertNotNull(response);
assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.getStatusCode());
}
private String path(String context) {
return String.format("http://localhost:%d%s", port, context);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment