Commit 639a87bf by Spencer Gibb

Port *-amqp to spring-cloud-stream.

Move turbine/hystrix amqp specific implementations to spring-cloud-stream based implementations and let the user decide which broker to use. fixes gh-545
parent 9e817a7d
......@@ -23,6 +23,7 @@
<bintray.package>netflix</bintray.package>
<spring-cloud-commons.version>1.1.0.BUILD-SNAPSHOT</spring-cloud-commons.version>
<spring-cloud-config.version>1.1.0.BUILD-SNAPSHOT</spring-cloud-config.version>
<spring-cloud-stream.version>1.0.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
<main.basedir>${basedir}</main.basedir>
<archaius.version>0.6.5</archaius.version>
<eureka.version>1.2.0</eureka.version>
......@@ -83,87 +84,97 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix-dashboard</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-turbine</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-turbine-amqp</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<artifactId>spring-cloud-starter-turbine-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zuul</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-core</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-eureka-server</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-hystrix-dashboard</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-hystrix-amqp</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<artifactId>spring-cloud-netflix-hystrix-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-sidecar</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-turbine</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-turbine-amqp</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<artifactId>spring-cloud-netflix-turbine-stream</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-zuul-server</artifactId>
<version>1.1.0.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
......@@ -379,10 +390,10 @@
<modules>
<module>spring-cloud-netflix-core</module>
<module>spring-cloud-netflix-hystrix-dashboard</module>
<module>spring-cloud-netflix-hystrix-amqp</module>
<module>spring-cloud-netflix-hystrix-stream</module>
<module>spring-cloud-netflix-eureka-server</module>
<module>spring-cloud-netflix-turbine</module>
<module>spring-cloud-netflix-turbine-amqp</module>
<module>spring-cloud-netflix-turbine-stream</module>
<module>spring-cloud-netflix-sidecar</module>
<module>spring-cloud-starter-eureka</module>
<module>spring-cloud-starter-eureka-server</module>
......@@ -391,7 +402,7 @@
<module>spring-cloud-starter-hystrix-dashboard</module>
<module>spring-cloud-starter-ribbon</module>
<module>spring-cloud-starter-turbine</module>
<module>spring-cloud-starter-turbine-amqp</module>
<module>spring-cloud-starter-turbine-stream</module>
<module>spring-cloud-starter-zuul</module>
<module>docs</module>
</modules>
......
......@@ -21,6 +21,6 @@ package org.springframework.cloud.netflix.hystrix;
*/
public interface HystrixConstants {
String HYSTRIX_STREAM_NAME = "spring.cloud.hystrix.stream";
String HYSTRIX_STREAM_DESTINATION = "springCloudHystrixStream";
}
......@@ -8,10 +8,10 @@
<version>1.1.0.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-cloud-netflix-hystrix-amqp</artifactId>
<artifactId>spring-cloud-netflix-hystrix-stream</artifactId>
<packaging>jar</packaging>
<name>Spring Cloud Netflix Hystrix AMQP</name>
<description>Spring Cloud Netflix Hystrix AMQP</description>
<name>Spring Cloud Netflix Hystrix Stream</name>
<description>Spring Cloud Netflix Hystrix Stream</description>
<properties>
<main.basedir>${basedir}/..</main.basedir>
</properties>
......@@ -22,24 +22,20 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-core</artifactId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
......@@ -85,5 +81,10 @@
<artifactId>spring-boot-starter-actuator</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix.amqp;
package org.springframework.cloud.netflix.hystrix.stream;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
......
......@@ -14,31 +14,24 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix.amqp;
package org.springframework.cloud.netflix.hystrix.stream;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import org.springframework.context.ApplicationContext;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.config.ChannelBindingProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.hystrix.HystrixCircuitBreaker;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashMap;
/**
* Autoconfiguration for a Spring Cloud Hystrix on AMQP. Enabled by default if
* spring-rabbit is on the classpath, and can be switched off with
......@@ -56,41 +49,26 @@ import com.netflix.hystrix.HystrixCircuitBreaker;
* @author Dave Syer
*/
@Configuration
@ConditionalOnClass({ HystrixCircuitBreaker.class, RabbitTemplate.class })
@ConditionalOnProperty(value = "hystrix.stream.amqp.enabled", matchIfMissing = true)
@IntegrationComponentScan(basePackageClasses = HystrixStreamChannel.class)
@ConditionalOnClass({ HystrixCircuitBreaker.class, EnableBinding.class })
@ConditionalOnProperty(value = "hystrix.stream.queue.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@EnableScheduling
@EnableBinding(HystrixStreamClient.class)
public class HystrixStreamAutoConfiguration {
@Autowired(required = false)
@HystrixConnectionFactory
private ConnectionFactory hystrixConnectionFactory;
@Autowired(required = false)
private ConnectionFactory primaryConnectionFactory;
@Autowired
private ApplicationContext context;
private ChannelBindingProperties bindings;
@Autowired(required = false)
private ObjectMapper objectMapper;
private RabbitTemplate amqpTemplate;
public RabbitTemplate amqpTemplate() {
if (this.amqpTemplate == null) {
RabbitTemplate amqpTemplate = new RabbitTemplate(connectionFactory());
Jackson2JsonMessageConverter converter = messageConverter();
amqpTemplate.setMessageConverter(converter);
this.amqpTemplate = amqpTemplate;
}
return this.amqpTemplate;
@PostConstruct
public void init() {
this.bindings.getBindings().put(HystrixStreamClient.OUTPUT,
new HashMap<>(Collections.singletonMap("destination",
HystrixStreamClient.HYSTRIX_STREAM_DESTINATION)));
}
@Bean
public HystrixStreamAmqpProperties hystrixStreamAmqpProperties() {
return new HystrixStreamAmqpProperties();
public HystrixStreamProperties hystrixStreamProperties() {
return new HystrixStreamProperties();
}
@Bean
......@@ -98,49 +76,4 @@ public class HystrixStreamAutoConfiguration {
return new HystrixStreamTask();
}
@Bean
public DirectChannel hystrixStream() {
return new DirectChannel();
}
@Bean
public DirectExchange hystrixStreamExchange() {
DirectExchange exchange = new DirectExchange(HystrixConstants.HYSTRIX_STREAM_NAME);
return exchange;
}
@Bean
public IntegrationFlow hystrixStreamOutboundFlow() {
return IntegrationFlows
.from("hystrixStream")
// TODO: set content type
/*
* .enrichHeaders(new ComponentConfigurer<HeaderEnricherSpec>() {
*
* @Override public void configure(HeaderEnricherSpec spec) {
* spec.header("content-type", "application/json", true); } })
*/
.handle(Amqp.outboundAdapter(amqpTemplate()).exchangeName(
HystrixConstants.HYSTRIX_STREAM_NAME)).get();
}
private ConnectionFactory connectionFactory() {
if (this.hystrixConnectionFactory != null) {
RabbitAdmin amqpAdmin = new RabbitAdmin(this.hystrixConnectionFactory);
hystrixStreamExchange().setAdminsThatShouldDeclare(amqpAdmin);
amqpAdmin.setApplicationContext(this.context);
amqpAdmin.afterPropertiesSet();
return this.hystrixConnectionFactory;
}
return this.primaryConnectionFactory;
}
private Jackson2JsonMessageConverter messageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
if (this.objectMapper != null) {
converter.setJsonObjectMapper(this.objectMapper);
}
return converter;
}
}
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix.stream;
import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
/**
* @author Dave Syer
*
*/
public interface HystrixStreamClient extends HystrixConstants {
String OUTPUT = "hystrixStreamOutput";
@Output(OUTPUT)
MessageChannel hystrixStreamOutput();
}
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix.amqp;
package org.springframework.cloud.netflix.hystrix.stream;
import lombok.Data;
......@@ -23,9 +23,9 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Spencer Gibb
*/
@ConfigurationProperties("hystrix.stream.amqp")
@ConfigurationProperties("hystrix.stream.queue")
@Data
public class HystrixStreamAmqpProperties {
public class HystrixStreamProperties {
private boolean enabled = true;
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix.amqp;
package org.springframework.cloud.netflix.hystrix.stream;
import java.io.IOException;
import java.io.StringWriter;
......@@ -28,8 +28,11 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled;
import com.fasterxml.jackson.core.JsonFactory;
......@@ -52,7 +55,8 @@ import com.netflix.hystrix.util.HystrixRollingNumberEvent;
public class HystrixStreamTask implements ApplicationContextAware {
@Autowired
private HystrixStreamChannel channel;
@Output(HystrixStreamClient.OUTPUT)
private MessageChannel outboundChannel;
@Autowired
private DiscoveryClient discoveryClient;
......@@ -60,7 +64,7 @@ public class HystrixStreamTask implements ApplicationContextAware {
private ApplicationContext context;
@Autowired
private HystrixStreamAmqpProperties properties;
private HystrixStreamProperties properties;
private final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<>(
1000);
......@@ -74,30 +78,30 @@ public class HystrixStreamTask implements ApplicationContextAware {
}
// TODO: use integration to split this up?
@Scheduled(fixedRateString = "${hystrix.stream.amqp.sendRate:500}")
@Scheduled(fixedRateString = "${hystrix.stream.queue.sendRate:500}")
public void sendMetrics() {
ArrayList<String> metrics = new ArrayList<>();
this.jsonMetrics.drainTo(metrics);
if (!metrics.isEmpty()) {
if (log.isTraceEnabled()) {
log.trace("sending amqp metrics size: " + metrics.size());
log.trace("sending stream metrics size: " + metrics.size());
}
for (String json : metrics) {
// TODO: batch all metrics to one message
try {
this.channel.send(json);
this.outboundChannel.send(MessageBuilder.withPayload(json).build());
}
catch (Exception ex) {
if (log.isTraceEnabled()) {
log.trace("failed sending amqp metrics: " + ex.getMessage());
log.trace("failed sending stream metrics: " + ex.getMessage());
}
}
}
}
}
@Scheduled(fixedRateString = "${hystrix.stream.amqp.gatherRate:500}")
@Scheduled(fixedRateString = "${hystrix.stream.queue.gatherRate:500}")
public void gatherMetrics() {
try {
// command metrics
......
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.amqp.HystrixStreamAutoConfiguration
org.springframework.cloud.netflix.hystrix.stream.HystrixStreamAutoConfiguration
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.netflix.hystrix.amqp;
package org.springframework.cloud.netflix.hystrix.stream;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -36,11 +36,11 @@ import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
* @author Spencer Gibb
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = HystrixAmqpTests.Application.class)
@SpringApplicationConfiguration(classes = HystrixStreamTests.Application.class)
@WebAppConfiguration
@IntegrationTest({ "server.port=0", "spring.jmx.enabled=true" })
@DirtiesContext
public class HystrixAmqpTests {
public class HystrixStreamTests {
@EnableAutoConfiguration
@EnableDiscoveryClient
......
......@@ -3,4 +3,4 @@ server:
logging:
level:
org.springframework.netflix.hystrix.amqp: TRACE
\ No newline at end of file
org.springframework.netflix.hystrix.stream: TRACE
\ No newline at end of file
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Autoconfiguration for a Spring Cloud Turbine on AMQP. Enabled by default if
* spring-rabbit is on the classpath, and can be switched off with
* <code>spring.cloud.bus.amqp.enabled</code>. If there is a single
* {@link ConnectionFactory} in the context it will be used, or if there is a one
* qualified as <code>@TurbineConnectionFactory</code> it will be preferred over others,
* otherwise the <code>@Primary</code> one will be used. If there are multiple unqualified
* connection factories there will be an autowiring error. Note that Spring Boot (as of
* 1.2.2) creates a ConnectionFactory that is <i>not</i> <code>@Primary</code>, so if you
* want to use one connection factory for the bus and another for business messages, you
* need to create both, and annotate them <code>@TurbineConnectionFactory</code> and
* <code>@Primary</code> respectively.
*
* @author Spencer Gibb
* @author Dave Syer
*/
@Configuration
@ConditionalOnClass(AmqpTemplate.class)
@ConditionalOnProperty(value = "turbine.amqp.enabled", matchIfMissing = true)
public class TurbineAmqpAutoConfiguration {
@Autowired
private ApplicationContext context;
@Autowired(required = false)
@TurbineConnectionFactory
private ConnectionFactory turbineConnectionFactory;
@Autowired(required = false)
private ConnectionFactory primaryConnectionFactory;
@Autowired(required = false)
private ObjectMapper objectMapper;
@Bean
public DirectExchange hystrixStreamExchange() {
DirectExchange exchange = new DirectExchange(HystrixConstants.HYSTRIX_STREAM_NAME);
return exchange;
}
@Bean
protected Binding localTurbineAmqpQueueBinding() {
return BindingBuilder.bind(hystrixStreamQueue()).to(hystrixStreamExchange())
.with("");
}
@Bean
public Queue hystrixStreamQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // TODO: configure TTL
Queue queue = new Queue(HystrixConstants.HYSTRIX_STREAM_NAME, false, false,
false, args);
return queue;
}
@Bean
public IntegrationFlow hystrixStreamAggregatorInboundFlow() {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory(), hystrixStreamQueue())
.messageConverter(messageConverter()))
.channel("hystrixStreamAggregator").get();
}
@Bean
public Aggregator hystrixStreamAggregator() {
return new Aggregator();
}
private ConnectionFactory connectionFactory() {
if (this.turbineConnectionFactory != null) {
RabbitAdmin amqpAdmin = new RabbitAdmin(this.turbineConnectionFactory);
hystrixStreamExchange().setAdminsThatShouldDeclare(amqpAdmin);
localTurbineAmqpQueueBinding().setAdminsThatShouldDeclare(amqpAdmin);
hystrixStreamQueue().setAdminsThatShouldDeclare(amqpAdmin);
amqpAdmin.setApplicationContext(this.context);
amqpAdmin.afterPropertiesSet();
return this.turbineConnectionFactory;
}
return this.primaryConnectionFactory;
}
private Jackson2JsonMessageConverter messageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
if (this.objectMapper != null) {
converter.setJsonObjectMapper(this.objectMapper);
}
return converter;
}
}
......@@ -8,10 +8,10 @@
<version>1.1.0.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-cloud-netflix-turbine-amqp</artifactId>
<artifactId>spring-cloud-netflix-turbine-stream</artifactId>
<packaging>jar</packaging>
<name>Spring Cloud Netflix Turbine AMQP</name>
<description>Spring Cloud Netflix Turbine AMQP</description>
<name>Spring Cloud Netflix Turbine Stream</name>
<description>Spring Cloud Netflix Turbine Stream</description>
<properties>
<main.basedir>${basedir}/..</main.basedir>
<turbine.version>2.0.0-DP.2</turbine.version>
......@@ -54,24 +54,16 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.turbine</groupId>
......@@ -105,6 +97,11 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
......@@ -25,14 +25,15 @@ import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* Run the RxNetty based Spring Cloud Turbine AMQP server. Based on Netflix Turbine 2
* Run the RxNetty based Spring Cloud Turbine Stream server.
* Based on Netflix Turbine 2 and Spring Cloud Stream
*
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TurbineAmqpConfiguration.class)
public @interface EnableTurbineAmqp {
@Import(TurbineStreamConfiguration.class)
public @interface EnableTurbineStream {
}
......@@ -14,15 +14,16 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import java.io.IOException;
import java.util.Map;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import rx.subjects.PublishSubject;
......@@ -32,9 +33,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
/**
* @author Spencer Gibb
*/
@MessageEndpoint
@CommonsLog
public class Aggregator {
@Component //needed for ServiceActivator to be picked up
public class HystrixStreamAggregator {
@Autowired
private ObjectMapper objectMapper;
......@@ -42,8 +43,8 @@ public class Aggregator {
@Autowired
private PublishSubject<Map<String, Object>> subject;
@ServiceActivator(inputChannel = "hystrixStreamAggregator")
public void handle(String payload) {
@ServiceActivator(inputChannel = TurbineStreamClient.INPUT)
public void sendToSubject(String payload) {
try {
@SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
......
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
......@@ -6,7 +6,7 @@ import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
@EnableTurbineAmqp
@EnableTurbineStream
public class TurbineApplication {
public static void main(String[] args) {
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
......
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import java.util.HashMap;
import java.util.Map;
......@@ -16,7 +16,7 @@ public class TurbinePortApplicationListener implements
Integer.class);
Integer managementPort = event.getEnvironment().getProperty("management.port",
Integer.class);
Integer turbinePort = event.getEnvironment().getProperty("turbine.amqp.port",
Integer turbinePort = event.getEnvironment().getProperty("turbine.stream.port",
Integer.class);
if (serverPort == null && managementPort == null) {
return;
......@@ -29,7 +29,7 @@ public class TurbinePortApplicationListener implements
ports.put("server.port", -1);
if (serverPort != null) {
// Turbine port defaults to server port value supplied by user
ports.put("turbine.amqp.port", serverPort);
ports.put("turbine.stream.port", serverPort);
}
}
else if (managementPort != null && managementPort != -1 && serverPort == null) {
......
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.stream;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.config.ChannelBindingProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.Collections;
import java.util.HashMap;
/**
* Autoconfiguration for a Spring Cloud Turbine using Spring Cloud Stream.
* Enabled by default if spring-cloud-stream is on the classpath, and can be
* switched off with <code>turbine.stream.enabled</code>.
*
* If there is a single
* {@link ConnectionFactory} in the context it will be used, or if there is a one
* qualified as <code>@TurbineConnectionFactory</code> it will be preferred over others,
* otherwise the <code>@Primary</code> one will be used. If there are multiple unqualified
* connection factories there will be an autowiring error. Note that Spring Boot (as of
* 1.2.2) creates a ConnectionFactory that is <i>not</i> <code>@Primary</code>, so if you
* want to use one connection factory for turbine and another for business messages, you
* need to create both, and annotate them <code>@TurbineConnectionFactory</code> and
* <code>@Primary</code> respectively.
*
* @author Spencer Gibb
* @author Dave Syer
*/
@Configuration
@ConditionalOnClass(EnableBinding.class)
@ConditionalOnProperty(value = "turbine.stream.enabled", matchIfMissing = true)
@EnableBinding(TurbineStreamClient.class)
public class TurbineStreamAutoConfiguration {
@Autowired
private ChannelBindingProperties bindings;
@PostConstruct
public void init() {
this.bindings.getBindings().put(TurbineStreamClient.INPUT,
new HashMap<>(Collections.singletonMap("destination",
TurbineStreamClient.HYSTRIX_STREAM_DESTINATION)));
}
@Bean
public HystrixStreamAggregator hystrixStreamAggregator() {
return new HystrixStreamAggregator();
}
}
......@@ -14,18 +14,20 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.hystrix.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
/**
* @author Spencer Gibb
* @author Dave Syer
*
*/
@MessagingGateway
public interface HystrixStreamChannel {
public interface TurbineStreamClient extends HystrixConstants {
@Gateway(requestChannel = "hystrixStream")
public void send(String s);
String INPUT = "turbineStreamInput";
@Input(INPUT)
SubscribableChannel turbineStreamInput();
}
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
......@@ -24,6 +24,7 @@ import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.apachecommons.CommonsLog;
......@@ -48,13 +49,13 @@ import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigu
*/
@Configuration
@CommonsLog
@EnableConfigurationProperties(TurbineAmqpProperties.class)
public class TurbineAmqpConfiguration implements SmartLifecycle {
@EnableConfigurationProperties(TurbineStreamProperties.class)
public class TurbineStreamConfiguration implements SmartLifecycle {
private boolean running = false;
private AtomicBoolean running = new AtomicBoolean(false);
@Autowired
private TurbineAmqpProperties turbine;
private TurbineStreamProperties properties;
private int turbinePort;
......@@ -64,6 +65,7 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
}
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
......@@ -75,12 +77,11 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> {
return Collections.singletonMap("type", (Object) "Ping");
}).publish().refCount();
.map(count -> Collections.singletonMap("type", (Object) "Ping"))
.publish().refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.turbine.getPort();
this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
......@@ -113,23 +114,25 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
aggregatorServer().start();
}
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
try {
aggregatorServer().shutdown();
}
catch (InterruptedException ex) {
} catch (InterruptedException ex) {
log.error("Error shutting down", ex);
}
this.running = false;
}
}
@Override
public boolean isRunning() {
return this.running;
return this.running.get();
}
@Override
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import lombok.Data;
......@@ -24,9 +24,9 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @author Dave Syer
*/
@ConfigurationProperties("turbine.amqp")
@ConfigurationProperties("turbine.stream")
@Data
public class TurbineAmqpProperties {
public class TurbineStreamProperties {
@Value("${server.port:8989}")
private int port = 8989;
......
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.turbine.amqp.TurbineAmqpAutoConfiguration
org.springframework.cloud.netflix.turbine.stream.TurbineStreamAutoConfiguration
org.springframework.context.ApplicationListener=\
org.springframework.cloud.netflix.turbine.amqp.TurbinePortApplicationListener
\ No newline at end of file
org.springframework.cloud.netflix.turbine.stream.TurbinePortApplicationListener
\ No newline at end of file
info:
component: Turbine AMQP
component: Turbine Stream
spring:
application:
name: turbine
jmx:
default_domain: cloud.turbine.amqp
default_domain: cloud.turbine.stream
server:
port: 8990
turbine:
amqp:
stream:
port: 8989
\ No newline at end of file
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import java.io.BufferedReader;
import java.io.InputStream;
......@@ -31,11 +31,11 @@ import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.aggregator.TypeAndNameKey;
import com.netflix.turbine.internal.JsonUtility;
import static org.springframework.cloud.netflix.turbine.amqp.Aggregator.getPayloadData;
import static org.springframework.cloud.netflix.turbine.stream.HystrixStreamAggregator.getPayloadData;
public class AggregatorTest {
public class HystrixStreamAggregatorTest {
public static final String STREAM_ALL = "hystrixamqp";
public static final String STREAM_ALL = "hystrixtest";
public static void main(String[] args) {
getHystrixStreamFromFile(STREAM_ALL, 1)
......@@ -49,9 +49,9 @@ public class AggregatorTest {
Observable<Map<String, Object>> objectObservable = Observable.create(sub -> {
try {
while (!sub.isUnsubscribed()) {
String packagePath = AggregatorTest.class.getPackage().getName()
String packagePath = HystrixStreamAggregatorTest.class.getPackage().getName()
.replace('.', '/');
InputStream file = AggregatorTest.class.getResourceAsStream("/"
InputStream file = HystrixStreamAggregatorTest.class.getResourceAsStream("/"
+ packagePath + "/" + stream + ".stream");
BufferedReader in = new BufferedReader(new InputStreamReader(file));
String line = null;
......
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import static org.junit.Assert.*;
......@@ -27,31 +27,31 @@ public class TurbinePortApplicationListenerTests {
EnvironmentTestUtils.addEnvironment(environment, "server.port=9999");
listener.onApplicationEvent(event);
assertEquals("-1", environment.resolvePlaceholders("${server.port}"));
assertEquals("9999", environment.resolvePlaceholders("${turbine.amqp.port}"));
assertEquals("9999", environment.resolvePlaceholders("${turbine.stream.port}"));
}
@Test
public void turbinePortOnly() {
EnvironmentTestUtils.addEnvironment(environment, "turbine.amqp.port=9999");
EnvironmentTestUtils.addEnvironment(environment, "turbine.stream.port=9999");
listener.onApplicationEvent(event);
assertEquals("9999", environment.resolvePlaceholders("${turbine.amqp.port}"));
assertEquals("9999", environment.resolvePlaceholders("${turbine.stream.port}"));
assertEquals("0", environment.resolvePlaceholders("${server.port:0}"));
}
@Test
public void turbineAndManagementPorts() {
EnvironmentTestUtils.addEnvironment(environment, "turbine.amqp.port=9999", "management.port=9000");
EnvironmentTestUtils.addEnvironment(environment, "turbine.stream.port=9999", "management.port=9000");
listener.onApplicationEvent(event);
assertEquals("9999", environment.resolvePlaceholders("${turbine.amqp.port}"));
assertEquals("9999", environment.resolvePlaceholders("${turbine.stream.port}"));
assertEquals("9000", environment.resolvePlaceholders("${server.port:0}"));
assertEquals("9000", environment.resolvePlaceholders("${management.port:0}"));
}
@Test
public void turbineAndServerPorts() {
EnvironmentTestUtils.addEnvironment(environment, "turbine.amqp.port=9999", "server.port=9000");
EnvironmentTestUtils.addEnvironment(environment, "turbine.stream.port=9999", "server.port=9000");
listener.onApplicationEvent(event);
assertEquals("9999", environment.resolvePlaceholders("${turbine.amqp.port}"));
assertEquals("9999", environment.resolvePlaceholders("${turbine.stream.port}"));
assertEquals("9000", environment.resolvePlaceholders("${server.port:0}"));
assertEquals("0", environment.resolvePlaceholders("${management.port:0}"));
}
......
......@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.amqp;
package org.springframework.cloud.netflix.turbine.stream;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -29,13 +29,13 @@ import org.springframework.test.context.web.WebAppConfiguration;
* @author Spencer Gibb
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = TurbineAmqpTests.Application.class)
@SpringApplicationConfiguration(classes = TurbineStreamTests.Application.class)
@WebAppConfiguration
@IntegrationTest({ "server.port=0", "turbine.amqp.port=0", "spring.jmx.enabled=true" })
public class TurbineAmqpTests {
@IntegrationTest({ "server.port=0", "turbine.stream.port=0", "spring.jmx.enabled=true" })
public class TurbineStreamTests {
@EnableAutoConfiguration
@EnableTurbineAmqp
@EnableTurbineStream
public static class Application {
public static void main(String[] args) {
new SpringApplicationBuilder().sources(Application.class).run(args);
......
......@@ -8,9 +8,9 @@
<version>1.1.0.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath> <!-- lookup parent from repository -->
</parent>
<artifactId>spring-cloud-starter-turbine-amqp</artifactId>
<name>spring-cloud-starter-turbine-amqp</name>
<description>Spring Cloud Starter Turbine AMQP</description>
<artifactId>spring-cloud-starter-turbine-stream</artifactId>
<name>spring-cloud-starter-turbine-stream</name>
<description>Spring Cloud Starter Turbine Stream</description>
<url>http://projects.spring.io/spring-cloud</url>
<organization>
<name>Pivotal Software, Inc.</name>
......@@ -37,27 +37,11 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-turbine-amqp</artifactId>
<artifactId>spring-cloud-netflix-turbine-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-event</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment