Commit c626bcca by Dave Syer

Add rest template to extract SSE events and assert them

The message has to be sent after the endpoint starts up, which only happens after the first request is accepted, so we put that logic in the response extractor.
parent e2951880
......@@ -15,6 +15,7 @@
<properties>
<main.basedir>${basedir}/..</main.basedir>
<turbine.version>2.0.0-DP.2</turbine.version>
<spring-cloud-contract.version>1.0.5.RELEASE</spring-cloud-contract.version>
</properties>
<build>
<plugins>
......@@ -104,5 +105,11 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
<version>${spring-cloud-contract.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -20,16 +20,17 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.subjects.PublishSubject;
/**
......@@ -38,70 +39,70 @@ import rx.subjects.PublishSubject;
@Component // needed for ServiceActivator to be picked up
public class HystrixStreamAggregator {
private static final Log log = LogFactory.getLog(HystrixStreamAggregator.class);
private static final Log log = LogFactory.getLog(HystrixStreamAggregator.class);
private ObjectMapper objectMapper;
private ObjectMapper objectMapper;
private PublishSubject<Map<String, Object>> subject;
private PublishSubject<Map<String, Object>> subject;
@Autowired
public HystrixStreamAggregator(ObjectMapper objectMapper,
PublishSubject<Map<String, Object>> subject) {
this.objectMapper = objectMapper;
this.subject = subject;
}
@Autowired
public HystrixStreamAggregator(ObjectMapper objectMapper,
PublishSubject<Map<String, Object>> subject) {
this.objectMapper = objectMapper;
this.subject = subject;
}
@ServiceActivator(inputChannel = TurbineStreamClient.INPUT)
public void sendToSubject(@Payload String payload) {
if (payload.startsWith("\"")) {
// Legacy payload from an Angel client
payload = payload.substring(1, payload.length() - 1);
payload = payload.replace("\\\"", "\"");
}
try {
if (payload.startsWith("[")) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> list = this.objectMapper.readValue(payload,
List.class);
for (Map<String, Object> map : list) {
sendMap(map);
}
}
else {
@SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
sendMap(map);
}
}
catch (IOException ex) {
log.error("Error receiving hystrix stream payload: " + payload, ex);
}
}
@ServiceActivator(inputChannel = TurbineStreamClient.INPUT)
public void sendToSubject(@Payload String payload) {
if (payload.startsWith("\"")) {
// Legacy payload from an Angel client
payload = payload.substring(1, payload.length() - 1);
payload = payload.replace("\\\"", "\"");
}
try {
if (payload.startsWith("[")) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> list = this.objectMapper.readValue(payload,
List.class);
for (Map<String, Object> map : list) {
sendMap(map);
}
}
else {
@SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
sendMap(map);
}
}
catch (IOException ex) {
log.error("Error receiving hystrix stream payload: " + payload, ex);
}
}
private void sendMap(Map<String, Object> map) {
Map<String, Object> data = getPayloadData(map);
if (log.isDebugEnabled()) {
log.debug("Received hystrix stream payload: " + data);
}
this.subject.onNext(data);
}
private void sendMap(Map<String, Object> map) {
Map<String, Object> data = getPayloadData(map);
if (log.isDebugEnabled()) {
log.debug("Received hystrix stream payload: " + data);
}
this.subject.onNext(data);
}
public static Map<String, Object> getPayloadData(Map<String, Object> jsonMap) {
@SuppressWarnings("unchecked")
Map<String, Object> origin = (Map<String, Object>) jsonMap.get("origin");
String instanceId = null;
if (origin.containsKey("id")) {
instanceId = origin.get("id").toString();
}
if (!StringUtils.hasText(instanceId)) {
// TODO: instanceid template
instanceId = origin.get("serviceId") + ":" + origin.get("host") + ":"
+ origin.get("port");
}
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) jsonMap.get("data");
data.put("instanceId", instanceId);
return data;
}
public static Map<String, Object> getPayloadData(Map<String, Object> jsonMap) {
@SuppressWarnings("unchecked")
Map<String, Object> origin = (Map<String, Object>) jsonMap.get("origin");
String instanceId = null;
if (origin.containsKey("id")) {
instanceId = origin.get("id").toString();
}
if (!StringUtils.hasText(instanceId)) {
// TODO: instanceid template
instanceId = origin.get("serviceId") + ":" + origin.get("host") + ":"
+ origin.get("port");
}
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) jsonMap.get("data");
data.put("instanceId", instanceId);
return data;
}
}
......@@ -20,17 +20,17 @@ import java.util.Map;
import javax.annotation.PostConstruct;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.ChannelBindingServiceProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import rx.subjects.PublishSubject;
/**
......@@ -48,7 +48,7 @@ import rx.subjects.PublishSubject;
public class TurbineStreamAutoConfiguration {
@Autowired
private ChannelBindingServiceProperties bindings;
private BindingServiceProperties bindings;
@Autowired
private TurbineStreamProperties properties;
......
......@@ -16,13 +16,25 @@
package org.springframework.cloud.netflix.turbine.stream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
......@@ -31,16 +43,8 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigurator;
import static io.reactivex.netty.pipeline.PipelineConfigurators.serveSseConfigurator;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import rx.Observable;
import rx.subjects.PublishSubject;
......@@ -51,102 +55,102 @@ import rx.subjects.PublishSubject;
@EnableConfigurationProperties(TurbineStreamProperties.class)
public class TurbineStreamConfiguration implements SmartLifecycle {
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
private AtomicBoolean running = new AtomicBoolean(false);
@Autowired
private TurbineStreamProperties properties;
private int turbinePort;
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (Stream)",
TurbineStreamProperties.class);
}
@Bean
public PublishSubject<Map<String, Object>> hystrixSubject() {
return PublishSubject.create();
}
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
.aggregateGroupedStreams(hystrixSubject().groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output
.doOnUnsubscribe(() -> log
.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null, null, JsonUtility.mapToJson(data))));
}, sseServerConfigurator());
return httpServer;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
aggregatorServer().start();
}
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
try {
aggregatorServer().shutdown();
}
catch (InterruptedException ex) {
log.error("Error shutting down", ex);
}
}
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public int getPhase() {
return 0;
}
public int getTurbinePort() {
return this.turbinePort;
}
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
private AtomicBoolean running = new AtomicBoolean(false);
@Autowired
private TurbineStreamProperties properties;
private int turbinePort;
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (Stream)",
TurbineStreamProperties.class);
}
@Bean
public PublishSubject<Map<String, Object>> hystrixSubject() {
return PublishSubject.create();
}
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
.aggregateGroupedStreams(hystrixSubject().groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
Unpooled.copiedBuffer(JsonUtility.mapToJson(data),
StandardCharsets.UTF_8))));
}, serveSseConfigurator());
return httpServer;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
aggregatorServer().start();
}
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
try {
aggregatorServer().shutdown();
}
catch (InterruptedException ex) {
log.error("Error shutting down", ex);
}
}
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public int getPhase() {
return 0;
}
public int getTurbinePort() {
return this.turbinePort;
}
}
......@@ -16,13 +16,13 @@
package org.springframework.cloud.netflix.turbine.stream;
import java.util.Objects;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import org.springframework.http.MediaType;
import java.util.Objects;
/**
* @author Dave Syer
* @author Gregor Zurowski
......@@ -30,59 +30,58 @@ import java.util.Objects;
@ConfigurationProperties("turbine.stream")
public class TurbineStreamProperties {
@Value("${server.port:8989}")
private int port = 8989;
private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION;
private String contentType = MediaType.APPLICATION_JSON_VALUE;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getContentType() {
return contentType;
}
public void setContentType(String contentType) {
this.contentType = contentType;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TurbineStreamProperties that = (TurbineStreamProperties) o;
return port == that.port &&
Objects.equals(destination, that.destination) &&
Objects.equals(contentType, that.contentType);
}
@Override
public int hashCode() {
return Objects.hash(port, destination, contentType);
}
@Override
public String toString() {
return new StringBuilder("TurbineStreamProperties{")
.append("port=").append(port).append(", ")
.append("destination='").append(destination).append("', ")
.append("contentType='").append(contentType).append("'}")
.toString();
}
@Value("${server.port:8989}")
private int port = 8989;
private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION;
private String contentType = MediaType.APPLICATION_JSON_VALUE;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public String getContentType() {
return contentType;
}
public void setContentType(String contentType) {
this.contentType = contentType;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TurbineStreamProperties that = (TurbineStreamProperties) o;
return port == that.port && Objects.equals(destination, that.destination)
&& Objects.equals(contentType, that.contentType);
}
@Override
public int hashCode() {
return Objects.hash(port, destination, contentType);
}
@Override
public String toString() {
return new StringBuilder("TurbineStreamProperties{").append("port=").append(port)
.append(", ").append("destination='").append(destination).append("', ")
.append("contentType='").append(contentType).append("'}").toString();
}
}
......@@ -16,22 +16,78 @@
package org.springframework.cloud.netflix.turbine.stream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpRequest;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.integration.support.management.MessageChannelMetrics;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Spencer Gibb
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TurbineStreamTests.Application.class, webEnvironment = WebEnvironment.RANDOM_PORT, value = {
"turbine.stream.port=0", "spring.jmx.enabled=true" })
"turbine.stream.port=0",
// TODO: we don't need this if we harmonize the turbine and hystrix destinations
// https://github.com/spring-cloud/spring-cloud-netflix/issues/1948
"spring.cloud.stream.bindings.turbineStreamInput.destination=hystrixStreamOutput",
"logging.level.org.springframework.cloud.netflix.turbine=DEBUG",
"spring.jmx.enabled=true", "stubrunner.workOffline=true",
"stubrunner.ids=org.springframework.cloud:spring-cloud-netflix-hystrix-stream" })
@AutoConfigureStubRunner
public class TurbineStreamTests {
private static Log log = LogFactory.getLog(TurbineStreamTests.class);
@Autowired
StubTrigger stubTrigger;
@Autowired
ObjectMapper mapper;
@Autowired
@Qualifier(TurbineStreamClient.INPUT)
SubscribableChannel input;
RestTemplate rest = new RestTemplate();
@Autowired
TurbineStreamConfiguration turbine;
private CountDownLatch latch = new CountDownLatch(1);
@EnableAutoConfiguration
@EnableTurbineStream
public static class Application {
......@@ -41,7 +97,102 @@ public class TurbineStreamTests {
}
@Test
public void contextLoads() {
public void contextLoads() throws Exception {
rest.getInterceptors().add(new NonClosingInterceptor());
int count = ((MessageChannelMetrics) input).getSendCount();
ResponseEntity<String> response = rest.execute(
new URI("http://localhost:" + turbine.getTurbinePort() + "/"),
HttpMethod.GET, null, this::extract);
assertThat(response.getHeaders().getContentType())
.isEqualTo(MediaType.TEXT_EVENT_STREAM);
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
Map<String, Object> metrics = extractMetrics(response.getBody());
assertThat(metrics).containsEntry("type", "HystrixCommand");
assertThat(((MessageChannelMetrics) input).getSendCount()).isEqualTo(count + 1);
}
@SuppressWarnings("unchecked")
private Map<String, Object> extractMetrics(String body) throws Exception {
String[] split = body.split("data:");
for (String value : split) {
if (value.contains("Ping") || value.length() == 0) {
continue;
}
else {
return mapper.readValue(value, Map.class);
}
}
return null;
}
private ResponseEntity<String> extract(ClientHttpResponse response)
throws IOException {
// The message has to be sent after the endpoint is activated, so this is a
// convenient place to put it
stubTrigger.trigger("metrics");
byte[] bytes = new byte[1024];
StringBuilder builder = new StringBuilder();
int read = 0;
while (read >= 0
&& StringUtils.countOccurrencesOf(builder.toString(), "\n") < 2) {
read = response.getBody().read(bytes, 0, bytes.length);
if (read > 0) {
latch.countDown();
builder.append(new String(bytes, 0, read));
}
log.info("Building: " + builder);
}
log.info("Done: " + builder);
return ResponseEntity.status(response.getStatusCode())
.headers(response.getHeaders()).body(builder.toString());
}
private class NonClosingInterceptor implements ClientHttpRequestInterceptor {
private class NonClosingResponse implements ClientHttpResponse {
private ClientHttpResponse delegate;
public NonClosingResponse(ClientHttpResponse delegate) {
this.delegate = delegate;
}
@Override
public InputStream getBody() throws IOException {
return delegate.getBody();
}
@Override
public HttpHeaders getHeaders() {
return delegate.getHeaders();
}
@Override
public HttpStatus getStatusCode() throws IOException {
return delegate.getStatusCode();
}
@Override
public int getRawStatusCode() throws IOException {
return delegate.getRawStatusCode();
}
@Override
public String getStatusText() throws IOException {
return delegate.getStatusText();
}
@Override
public void close() {
}
}
@Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body,
ClientHttpRequestExecution execution) throws IOException {
return new NonClosingResponse(execution.execute(request, body));
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment