Commit 62b6d6de by Dave Syer

Fix hystrix tests broken by 2.0 migration

parent 8d886eed
...@@ -46,8 +46,8 @@ import static org.assertj.core.api.Assertions.assertThat; ...@@ -46,8 +46,8 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Spencer Gibb * @author Spencer Gibb
*/ */
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = HystrixStreamTests.Application.class, webEnvironment = WebEnvironment.RANDOM_PORT, @SpringBootTest(classes = HystrixStreamTests.Application.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
properties = { "spring.jmx.enabled=true", "spring.application.name=mytestapp" }) "spring.jmx.enabled=true", "spring.application.name=mytestapp" })
@DirtiesContext @DirtiesContext
public class HystrixStreamTests { public class HystrixStreamTests {
...@@ -86,15 +86,16 @@ public class HystrixStreamTests { ...@@ -86,15 +86,16 @@ public class HystrixStreamTests {
@Test @Test
public void contextLoads() throws Exception { public void contextLoads() throws Exception {
this.application.hello(); this.application.hello();
//It is important that local service instance resolves for metrics // It is important that local service instance resolves for metrics
//origin details to be populated // origin details to be populated
ServiceInstance localServiceInstance = discoveryClient.getLocalServiceInstance(); ServiceInstance localServiceInstance = discoveryClient.getLocalServiceInstance();
assertThat(localServiceInstance).isNotNull(); assertThat(localServiceInstance).isNotNull();
assertThat(localServiceInstance.getServiceId()).isEqualTo("mytestapp"); assertThat(localServiceInstance.getServiceId()).isEqualTo("mytestapp");
this.task.gatherMetrics(); this.task.gatherMetrics();
Message<?> message = this.collector.forChannel(output).take(); Message<?> message = this.collector.forChannel(output).take();
assertThat(message.getPayload()).isInstanceOf(String.class); // TODO: possible regression with Edgware?
JsonNode tree = mapper.readTree((String) message.getPayload()); assertThat(message.getPayload()).isInstanceOf(byte[].class);
JsonNode tree = mapper.readTree(new String((byte[]) message.getPayload()));
assertThat(tree.hasNonNull("origin")); assertThat(tree.hasNonNull("origin"));
assertThat(tree.hasNonNull("data")); assertThat(tree.hasNonNull("data"));
} }
......
...@@ -17,20 +17,39 @@ ...@@ -17,20 +17,39 @@
package org.springframework.cloud.netflix.hystrix.stream; package org.springframework.cloud.netflix.hystrix.stream;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand; import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker; import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.contract.verifier.messaging.MessageVerifier;
import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier; import org.springframework.cloud.contract.verifier.messaging.boot.AutoConfigureMessageVerifier;
import org.springframework.cloud.contract.verifier.messaging.stream.StreamStubMessages;
import org.springframework.cloud.netflix.hystrix.contract.HystrixContractUtils; import org.springframework.cloud.netflix.hystrix.contract.HystrixContractUtils;
import org.springframework.cloud.netflix.hystrix.stream.StreamSourceTestBase.TestApplication; import org.springframework.cloud.netflix.hystrix.stream.StreamSourceTestBase.TestApplication;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
...@@ -83,5 +102,133 @@ public abstract class StreamSourceTestBase { ...@@ -83,5 +102,133 @@ public abstract class StreamSourceTestBase {
SpringApplication.run(TestApplication.class, args); SpringApplication.run(TestApplication.class, args);
} }
// TODO: remove this as soon as contract 2.0.0 is available
@Bean
MessageVerifier<Message<?>> contractVerifierMessageExchange(
ApplicationContext applicationContext) {
return new PatchedStubMessages(applicationContext);
}
}
static class PatchedStubMessages implements MessageVerifier<Message<?>> {
private static final Logger log = LoggerFactory
.getLogger(StreamStubMessages.class);
private final ApplicationContext context;
private final MessageCollector messageCollector;
private final ContractVerifierStreamMessageBuilder builder = new ContractVerifierStreamMessageBuilder();
public PatchedStubMessages(ApplicationContext context) {
this.context = context;
this.messageCollector = context.getBean(MessageCollector.class);
}
@Override
public <T> void send(T payload, Map<String, Object> headers, String destination) {
send(this.builder.create(payload, headers), destination);
}
@Override
public void send(Message<?> message, String destination) {
try {
MessageChannel messageChannel = this.context
.getBean(resolvedDestination(destination), MessageChannel.class);
messageChannel.send(message);
}
catch (Exception e) {
log.error(
"Exception occurred while trying to send a message [" + message
+ "] " + "to a channel with name [" + destination + "]",
e);
throw e;
}
}
@Override
public Message<?> receive(String destination, long timeout, TimeUnit timeUnit) {
try {
MessageChannel messageChannel = this.context
.getBean(resolvedDestination(destination), MessageChannel.class);
Message<?> message = this.messageCollector.forChannel(messageChannel)
.poll(timeout, timeUnit);
if (message == null) {
return message;
}
Object fromMessage = converter().fromMessage(message, String.class);
return MessageBuilder.createMessage(fromMessage, message.getHeaders());
}
catch (Exception e) {
log.error("Exception occurred while trying to read a message from "
+ " a channel with name [" + destination + "]", e);
throw new IllegalStateException(e);
}
}
private String resolvedDestination(String destination) {
try {
BindingServiceProperties channelBindingServiceProperties = this.context
.getBean(BindingServiceProperties.class);
for (Map.Entry<String, BindingProperties> entry : channelBindingServiceProperties
.getBindings().entrySet()) {
if (destination.equals(entry.getValue().getDestination())) {
if (log.isDebugEnabled()) {
log.debug("Found a channel named [{}] with destination [{}]",
entry.getKey(), destination);
}
return entry.getKey();
}
}
}
catch (Exception e) {
log.error(
"Exception took place while trying to resolve the destination. Will assume the name ["
+ destination + "]",
e);
}
if (log.isDebugEnabled()) {
log.debug("No destination named [" + destination
+ "] was found. Assuming that the destination equals the channel name",
destination);
}
return destination;
}
@Override
public Message<?> receive(String destination) {
return receive(destination, 5, TimeUnit.SECONDS);
}
private MappingJackson2MessageConverter converter() {
ObjectMapper mapper = null;
try {
mapper = this.context.getBean(ObjectMapper.class);
}
catch (NoSuchBeanDefinitionException e) {
}
MappingJackson2MessageConverter converter = createJacksonConverter();
if (mapper != null) {
converter.setObjectMapper(mapper);
}
return converter;
}
protected MappingJackson2MessageConverter createJacksonConverter() {
DefaultContentTypeResolver resolver = new DefaultContentTypeResolver();
resolver.setDefaultMimeType(MimeTypeUtils.APPLICATION_JSON);
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setContentTypeResolver(resolver);
return converter;
}
}
static class ContractVerifierStreamMessageBuilder {
public <T> Message<?> create(T payload, Map<String, Object> headers) {
return MessageBuilder.createMessage(payload, new MessageHeaders(headers));
}
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment