Commit 8f8e738f by Dave Syer

Use JSON for AMQP Hystrix data

Fixes gh-126
parent a9213f24
...@@ -48,6 +48,10 @@ ...@@ -48,6 +48,10 @@
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId> <groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId> <artifactId>hystrix-core</artifactId>
</dependency> </dependency>
......
package org.springframework.netflix.hystrix.amqp; package org.springframework.netflix.hystrix.amqp;
import com.netflix.hystrix.HystrixCircuitBreaker; import javax.annotation.PostConstruct;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
...@@ -16,50 +18,68 @@ import org.springframework.integration.dsl.IntegrationFlows; ...@@ -16,50 +18,68 @@ import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp; import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.hystrix.HystrixCircuitBreaker;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
*/ */
@Configuration @Configuration
@ConditionalOnClass({HystrixCircuitBreaker.class, AmqpTemplate.class}) @ConditionalOnClass({ HystrixCircuitBreaker.class, RabbitTemplate.class })
@ConditionalOnExpression("${hystrix.stream.amqp.enabled:true}")
@IntegrationComponentScan(basePackageClasses = HystrixStreamChannel.class)
@EnableScheduling
public class HystrixStreamAutoConfiguration { public class HystrixStreamAutoConfiguration {
@Configuration
@ConditionalOnExpression("${hystrix.stream.amqp.enabled:true}")
@IntegrationComponentScan(basePackageClasses = HystrixStreamChannel.class)
@EnableScheduling
protected static class HystrixStreamAmqpAutoConfiguration {
@Autowired @Autowired
private AmqpTemplate amqpTemplate; private RabbitTemplate amqpTemplate;
@Autowired(required = false)
private ObjectMapper objectMapper;
@PostConstruct
public void init() {
Jackson2JsonMessageConverter converter = messageConverter();
amqpTemplate.setMessageConverter(converter);
}
@Bean
public HystrixStreamTask hystrixStreamTask() {
return new HystrixStreamTask();
}
@Bean
public DirectChannel hystrixStream() {
return new DirectChannel();
}
@Bean @Bean
public HystrixStreamTask hystrixStreamTask() { public DirectExchange hystrixStreamExchange() {
return new HystrixStreamTask(); DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME);
} return exchange;
}
@Bean @Bean
public DirectChannel hystrixStream() { public IntegrationFlow hystrixStreamOutboundFlow() {
return new DirectChannel(); return IntegrationFlows
} .from("hystrixStream")
// TODO: set content type
@Bean /*
public DirectExchange hystrixStreamExchange() { * .enrichHeaders(new ComponentConfigurer<HeaderEnricherSpec>() {
DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME); *
return exchange; * @Override public void configure(HeaderEnricherSpec spec) {
} * spec.header("content-type", "application/json", true); } })
*/
.handle(Amqp.outboundAdapter(this.amqpTemplate).exchangeName(
Constants.HYSTRIX_STREAM_NAME)).get();
}
@Bean private Jackson2JsonMessageConverter messageConverter() {
public IntegrationFlow hystrixStreamOutboundFlow() { Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return IntegrationFlows.from("hystrixStream") if (objectMapper != null) {
//TODO: set content type converter.setJsonObjectMapper(objectMapper);
/*.enrichHeaders(new ComponentConfigurer<HeaderEnricherSpec>() { }
@Override return converter;
public void configure(HeaderEnricherSpec spec) { }
spec.header("content-type", "application/json", true);
}
})*/
.handle(Amqp.outboundAdapter(this.amqpTemplate).exchangeName(Constants.HYSTRIX_STREAM_NAME))
.get();
}
}
} }
package org.springframework.netflix.turbine.amqp; package org.springframework.cloud.netflix.turbine.amqp;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
......
package org.springframework.netflix.turbine.amqp; package org.springframework.cloud.netflix.turbine.amqp;
import org.springframework.context.annotation.Import; import org.springframework.context.annotation.Import;
......
package org.springframework.netflix.turbine.amqp; package org.springframework.cloud.netflix.turbine.amqp;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import javax.annotation.PostConstruct;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
...@@ -19,52 +23,70 @@ import org.springframework.integration.dsl.IntegrationFlow; ...@@ -19,52 +23,70 @@ import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp; import org.springframework.integration.dsl.amqp.Amqp;
import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
*/ */
@Configuration @Configuration
@ConditionalOnClass(AmqpTemplate.class) @ConditionalOnClass(AmqpTemplate.class)
@ConditionalOnExpression("${turbine.amqp.enabled:true}")
public class TurbineAmqpAutoConfiguration { public class TurbineAmqpAutoConfiguration {
@Configuration @Autowired
@ConditionalOnExpression("${turbine.amqp.enabled:true}") private ConnectionFactory connectionFactory;
protected static class HystrixStreamAggregatorAutoConfiguration {
@Autowired
@Autowired private RabbitTemplate amqpTemplate;
private ConnectionFactory connectionFactory;
@Autowired(required = false)
//TODO: how to fail gracefully if no rabbit? private ObjectMapper objectMapper;
@Bean
public DirectExchange hystrixStreamExchange() { @PostConstruct
DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME); public void init() {
return exchange; Jackson2JsonMessageConverter converter = messageConverter();
} amqpTemplate.setMessageConverter(converter);
}
@Bean
protected Binding localTurbineAmqpQueueBinding() { @Bean
return BindingBuilder.bind(hystrixStreamQueue()).to(hystrixStreamExchange()).with(""); public DirectExchange hystrixStreamExchange() {
} DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME);
return exchange;
@Bean }
public Queue hystrixStreamQueue() {
Map<String, Object> args = new HashMap<>(); @Bean
args.put("x-message-ttl", 60000); //TODO: configure TTL protected Binding localTurbineAmqpQueueBinding() {
Queue queue = new Queue(Constants.HYSTRIX_STREAM_NAME, false, false, false, args); return BindingBuilder.bind(hystrixStreamQueue()).to(hystrixStreamExchange())
return queue; .with("");
} }
@Bean @Bean
public IntegrationFlow hystrixStreamAggregatorInboundFlow() { public Queue hystrixStreamQueue() {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, hystrixStreamQueue())) Map<String, Object> args = new HashMap<>();
.channel("hystrixStreamAggregator") args.put("x-message-ttl", 60000); // TODO: configure TTL
.get(); Queue queue = new Queue(Constants.HYSTRIX_STREAM_NAME, false, false, false, args);
} return queue;
}
@Bean
public Aggregator hystrixStreamAggregator() { @Bean
return new Aggregator(); public IntegrationFlow hystrixStreamAggregatorInboundFlow() {
} return IntegrationFlows
} .from(Amqp.inboundAdapter(connectionFactory, hystrixStreamQueue())
.messageConverter(messageConverter()))
.channel("hystrixStreamAggregator").get();
}
@Bean
public Aggregator hystrixStreamAggregator() {
return new Aggregator();
}
private Jackson2JsonMessageConverter messageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
if (objectMapper != null) {
converter.setJsonObjectMapper(objectMapper);
}
return converter;
}
} }
package org.springframework.netflix.turbine.amqp; package org.springframework.cloud.netflix.turbine.amqp;
import com.netflix.turbine.aggregator.InstanceKey; import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigurator;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty; import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer; import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent; import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Map;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import rx.Observable; import rx.Observable;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
import java.util.Map; import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigurator; import com.netflix.turbine.internal.JsonUtility;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
*/ */
@Configuration @Configuration
@Slf4j @Slf4j
@ConfigurationProperties("turbine.amqp") @EnableConfigurationProperties(TurbineAmqpProperties.class)
public class TurbineAmqpConfiguration implements SmartLifecycle { public class TurbineAmqpConfiguration implements SmartLifecycle {
private boolean running = false; private boolean running = false;
private int port = 8989; @Autowired
private TurbineAmqpProperties turbine;
@Bean @Bean
public PublishSubject<Map<String, Object>> hystrixSubject() { public PublishSubject<Map<String, Object>> hystrixSubject() {
...@@ -39,19 +44,26 @@ public class TurbineAmqpConfiguration implements SmartLifecycle { ...@@ -39,19 +44,26 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
@Bean @Bean
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() { public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream // multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator.aggregateGroupedStreams(hystrixSubject() Observable<Map<String, Object>> publishedStreams = StreamAggregator
.groupBy(data -> InstanceKey.create((String) data.get("instanceId")))) .aggregateGroupedStreams(
.doOnUnsubscribe(() -> log.info("AmqpTurbine => Unsubscribing aggregation.")) hystrixSubject().groupBy(
.doOnSubscribe(() -> log.info("AmqpTurbine => Starting aggregation")) data -> InstanceKey.create((String) data
.flatMap(o -> o).publish().refCount(); .get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(port, (request, response) -> { .doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
log.info("AmqpTurbine => SSE Request Received"); .publish().refCount();
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return publishedStreams HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(
.doOnUnsubscribe(() -> log.info("AmqpTurbine => Unsubscribing RxNetty server connection")) turbine.getPort(),
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data)))); (request, response) -> {
}, sseServerConfigurator()); log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return publishedStreams.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(
data -> response.writeAndFlush(new ServerSentEvent(
null, null, JsonUtility.mapToJson(data))));
}, sseServerConfigurator());
return httpServer; return httpServer;
} }
...@@ -75,7 +87,8 @@ public class TurbineAmqpConfiguration implements SmartLifecycle { ...@@ -75,7 +87,8 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
public void stop() { public void stop() {
try { try {
aggregatorServer().shutdown(); aggregatorServer().shutdown();
} catch (InterruptedException e) { }
catch (InterruptedException e) {
log.error("Error shutting down", e); log.error("Error shutting down", e);
} }
running = false; running = false;
......
package org.springframework.cloud.netflix.turbine.amqp;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Dave Syer
*/
@ConfigurationProperties("turbine.amqp")
@Data
public class TurbineAmqpProperties {
private int port = 8989;
}
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.netflix.turbine.amqp.TurbineAmqpAutoConfiguration org.springframework.cloud.netflix.turbine.amqp.TurbineAmqpAutoConfiguration
...@@ -12,8 +12,7 @@ import com.netflix.turbine.internal.JsonUtility; ...@@ -12,8 +12,7 @@ import com.netflix.turbine.internal.JsonUtility;
import rx.Observable; import rx.Observable;
import rx.observables.GroupedObservable; import rx.observables.GroupedObservable;
import static org.springframework.cloud.netflix.turbine.amqp.Aggregator.getPayloadData;
import static org.springframework.netflix.turbine.amqp.Aggregator.getPayloadData;
public class AggregatorTest { public class AggregatorTest {
......
...@@ -2,6 +2,7 @@ package org.springframework.netflix.turbine.amqp; ...@@ -2,6 +2,7 @@ package org.springframework.netflix.turbine.amqp;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.turbine.amqp.EnableTurbineAmqp;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment