Commit ebdcca4f by Daniel Lavoie Committed by Ryan Baxter

Hystrix Dashboard Cannot display anything for Turbine Stream (#2484)

Closes #2475
parent 2287e0ca
......@@ -15,6 +15,8 @@
*/
package org.springframework.cloud.netflix.hystrix.contract;
import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
......@@ -22,10 +24,9 @@ import java.util.Map;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.util.StreamUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
* @author Daniel Lavoie
*
*/
public class HystrixContractUtils {
......@@ -41,6 +42,11 @@ public class HystrixContractUtils {
}
}
public static void checkEvent(String event) {
assertThat(event).isNotNull();
assertThat(event).isEqualTo("message");
}
public static void checkOrigin(Map<String, Object> origin) {
assertThat(origin.get("host")).isNotNull();
assertThat(origin.get("port")).isNotNull();
......
......@@ -5,6 +5,7 @@
"serviceId":"application",
"id":"application:0"
},
"event" : "message",
"data":{
"type":"HystrixCommand",
"name":"application.hello",
......
......@@ -138,6 +138,7 @@ public class HystrixStreamTask implements ApplicationContextAware {
json.writeStartObject();
addServiceData(json, registration);
json.writeStringField("event", "message");
json.writeObjectFieldStart("data");
json.writeStringField("type", "HystrixCommand");
String name = key.name();
......
......@@ -27,7 +27,6 @@ import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.serviceregistry.Registration;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
......@@ -43,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Spencer Gibb
* @author Daniel Lavoie
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
......@@ -95,6 +95,8 @@ public class HystrixStreamTests {
JsonNode tree = mapper.readTree((String) message.getPayload());
assertThat(tree.hasNonNull("origin"));
assertThat(tree.hasNonNull("data"));
assertThat(tree.hasNonNull("event"));
assertThat(tree.findValue("event").asText().equals("message"));
}
}
......@@ -68,6 +68,10 @@ public abstract class StreamSourceTestBase {
"application.hello");
}
public void assertEvent(Object input) {
HystrixContractUtils.checkEvent((String) input);
}
@EnableAutoConfiguration
@EnableCircuitBreaker
@RestController
......
......@@ -22,6 +22,7 @@ org.springframework.cloud.contract.spec.Contract.make {
body(HystrixContractUtils.simpleBody())
testMatchers {
jsonPath('$.origin', byCommand('assertOrigin($it)'))
jsonPath('$.event', byCommand('assertEvent($it)'))
jsonPath('$.data', byCommand('assertData($it)'))
}
}
......
......@@ -50,6 +50,7 @@ import rx.subjects.PublishSubject;
/**
* @author Spencer Gibb
* @author Daniel Lavoie
*/
@Configuration
@EnableConfigurationProperties(TurbineStreamProperties.class)
......@@ -103,6 +104,9 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null,
Unpooled.copiedBuffer("message",
StandardCharsets.UTF_8),
Unpooled.copiedBuffer(JsonUtility.mapToJson(data),
StandardCharsets.UTF_8))));
}, serveSseConfigurator());
......
......@@ -16,16 +16,15 @@
package org.springframework.cloud.netflix.turbine.stream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -49,13 +48,13 @@ import org.springframework.http.client.ClientHttpResponse;
import org.springframework.integration.support.management.MessageChannelMetrics;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Spencer Gibb
* @author Daniel Lavoie
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TurbineStreamTests.Application.class, webEnvironment = WebEnvironment.RANDOM_PORT, value = {
......@@ -67,9 +66,6 @@ import static org.assertj.core.api.Assertions.assertThat;
"stubrunner.ids=org.springframework.cloud:spring-cloud-netflix-hystrix-stream:${projectVersion:1.4.0.BUILD-SNAPSHOT}:stubs" })
@AutoConfigureStubRunner
public class TurbineStreamTests {
private static Log log = LogFactory.getLog(TurbineStreamTests.class);
@Autowired
StubTrigger stubTrigger;
......@@ -85,8 +81,6 @@ public class TurbineStreamTests {
@Autowired
TurbineStreamConfiguration turbine;
private CountDownLatch latch = new CountDownLatch(1);
@EnableAutoConfiguration
@EnableTurbineStream
public static class Application {
......@@ -109,16 +103,16 @@ public class TurbineStreamTests {
assertThat(metrics).containsEntry("type", "HystrixCommand");
assertThat(((MessageChannelMetrics) input).getSendCount()).isEqualTo(count + 1);
}
private boolean containsMetrics(String line) {
return line.startsWith("data:") && !line.contains("Ping");
}
@SuppressWarnings("unchecked")
private Map<String, Object> extractMetrics(String body) throws Exception {
String[] split = body.split("data:");
for (String value : split) {
if (value.contains("Ping") || value.length() == 0) {
continue;
}
else {
return mapper.readValue(value, Map.class);
for (String value : body.split("\n")) {
if (containsMetrics(value)) {
return mapper.readValue(value.split("data:")[1], Map.class);
}
}
return null;
......@@ -129,21 +123,23 @@ public class TurbineStreamTests {
// The message has to be sent after the endpoint is activated, so this is a
// convenient place to put it
stubTrigger.trigger("metrics");
byte[] bytes = new byte[1024];
StringBuilder builder = new StringBuilder();
int read = 0;
while (read >= 0
&& StringUtils.countOccurrencesOf(builder.toString(), "\n") < 2) {
read = response.getBody().read(bytes, 0, bytes.length);
if (read > 0) {
latch.countDown();
builder.append(new String(bytes, 0, read));
String responseBody = "";
boolean metricFound = false;
try (BufferedReader buffer = new BufferedReader(
new InputStreamReader(response.getBody()))) {
do {
String line = buffer.readLine();
responseBody += line + "\n";
if (containsMetrics(line)) {
metricFound = true;
}
}
log.debug("Building: " + builder);
while (!metricFound);
}
log.debug("Done: " + builder);
return ResponseEntity.status(response.getStatusCode())
.headers(response.getHeaders()).body(builder.toString());
.headers(response.getHeaders()).body(responseBody);
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment