Commit bda11adf by Dave Syer

Ensure content type is application/json

For compatiblity with Angel streams we only need to set the content type explicitly. Fixes gh-873
parent 000fee96
...@@ -33,17 +33,12 @@ import org.springframework.scheduling.annotation.EnableScheduling; ...@@ -33,17 +33,12 @@ import org.springframework.scheduling.annotation.EnableScheduling;
import com.netflix.hystrix.HystrixCircuitBreaker; import com.netflix.hystrix.HystrixCircuitBreaker;
/** /**
* Autoconfiguration for a Spring Cloud Hystrix on AMQP. Enabled by default if * Autoconfiguration for a Spring Cloud Hystrix on Spring Cloud Stream. Enabled by default
* spring-rabbit is on the classpath, and can be switched off with * if spring-cloud-stream is on the classpath, and can be switched off with
* <code>spring.cloud.bus.amqp.enabled</code>. If there is a single * <code>hystrix.stream.queue.enabled</code>. There are some high level configuration
* {@link ConnectionFactory} in the context it will be used, or if there is a one * options in {@link HystrixStreamProperties}. The binding name for Spring Cloud Stream is
* qualified as <code>@HystrixConnectionFactory</code> it will be preferred over others, * {@link HystrixStreamClient#OUTPUT} so you can configure stream other properties through
* otherwise the <code>@Primary</code> one will be used. If there are multiple unqualified * that.
* connection factories there will be an autowiring error. Note that Spring Boot (as of
* 1.2.2) creates a ConnectionFactory that is <i>not</i> <code>@Primary</code>, so if you
* want to use one connection factory for the bus and another for business messages, you
* need to create both, and annotate them <code>@HystrixConnectionFactory</code> and
* <code>@Primary</code> respectively.
* *
* @author Spencer Gibb * @author Spencer Gibb
* @author Dave Syer * @author Dave Syer
...@@ -64,20 +59,26 @@ public class HystrixStreamAutoConfiguration { ...@@ -64,20 +59,26 @@ public class HystrixStreamAutoConfiguration {
@Bean @Bean
public HasFeatures hystrixStreamQueueFeature() { public HasFeatures hystrixStreamQueueFeature() {
return HasFeatures.namedFeature("Hystrix Stream (Queue)", HystrixStreamAutoConfiguration.class); return HasFeatures.namedFeature("Hystrix Stream (Queue)",
HystrixStreamAutoConfiguration.class);
} }
@PostConstruct @PostConstruct
public void init() { public void init() {
BindingProperties outputBinding = this.bindings.getBindings().get(HystrixStreamClient.OUTPUT); BindingProperties outputBinding = this.bindings.getBindings()
.get(HystrixStreamClient.OUTPUT);
if (outputBinding == null) { if (outputBinding == null) {
this.bindings.getBindings().put(HystrixStreamClient.OUTPUT, this.bindings.getBindings().put(HystrixStreamClient.OUTPUT,
new BindingProperties()); new BindingProperties());
} }
BindingProperties output = this.bindings.getBindings().get(HystrixStreamClient.OUTPUT); BindingProperties output = this.bindings.getBindings()
.get(HystrixStreamClient.OUTPUT);
if (output.getDestination() == null) { if (output.getDestination() == null) {
output.setDestination(this.properties.getDestination()); output.setDestination(this.properties.getDestination());
} }
if (output.getContentType() == null) {
output.setContentType(this.properties.getContentType());
}
} }
@Bean @Bean
......
...@@ -16,11 +16,11 @@ ...@@ -16,11 +16,11 @@
package org.springframework.cloud.netflix.hystrix.stream; package org.springframework.cloud.netflix.hystrix.stream;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.netflix.hystrix.HystrixConstants; import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import lombok.Data;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
*/ */
...@@ -36,4 +36,6 @@ public class HystrixStreamProperties { ...@@ -36,4 +36,6 @@ public class HystrixStreamProperties {
private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION; private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION;
private String contentType = "application/json";
} }
...@@ -22,8 +22,6 @@ import java.util.ArrayList; ...@@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.BeansException; import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
...@@ -32,6 +30,7 @@ import org.springframework.cloud.stream.annotation.Output; ...@@ -32,6 +30,7 @@ import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
...@@ -45,6 +44,8 @@ import com.netflix.hystrix.HystrixThreadPoolKey; ...@@ -45,6 +44,8 @@ import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolMetrics; import com.netflix.hystrix.HystrixThreadPoolMetrics;
import com.netflix.hystrix.util.HystrixRollingNumberEvent; import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import lombok.extern.apachecommons.CommonsLog;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
* *
...@@ -90,7 +91,11 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -90,7 +91,11 @@ public class HystrixStreamTask implements ApplicationContextAware {
for (String json : metrics) { for (String json : metrics) {
// TODO: batch all metrics to one message // TODO: batch all metrics to one message
try { try {
this.outboundChannel.send(MessageBuilder.withPayload(json).build()); // TODO: remove the explicit content type when s-c-stream can handle that for us
this.outboundChannel.send(MessageBuilder.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE,
"application/json")
.build());
} }
catch (Exception ex) { catch (Exception ex) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
...@@ -157,8 +162,8 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -157,8 +162,8 @@ public class HystrixStreamTask implements ApplicationContextAware {
.getRollingCount(HystrixRollingNumberEvent.COLLAPSED)); .getRollingCount(HystrixRollingNumberEvent.COLLAPSED));
json.writeNumberField("rollingCountExceptionsThrown", commandMetrics json.writeNumberField("rollingCountExceptionsThrown", commandMetrics
.getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN)); .getRollingCount(HystrixRollingNumberEvent.EXCEPTION_THROWN));
json.writeNumberField("rollingCountFailure", json.writeNumberField("rollingCountFailure", commandMetrics
commandMetrics.getRollingCount(HystrixRollingNumberEvent.FAILURE)); .getRollingCount(HystrixRollingNumberEvent.FAILURE));
json.writeNumberField("rollingCountFallbackFailure", commandMetrics json.writeNumberField("rollingCountFallbackFailure", commandMetrics
.getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE)); .getRollingCount(HystrixRollingNumberEvent.FALLBACK_FAILURE));
json.writeNumberField("rollingCountFallbackRejection", commandMetrics json.writeNumberField("rollingCountFallbackRejection", commandMetrics
...@@ -171,12 +176,12 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -171,12 +176,12 @@ public class HystrixStreamTask implements ApplicationContextAware {
.getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED)); .getRollingCount(HystrixRollingNumberEvent.SEMAPHORE_REJECTED));
json.writeNumberField("rollingCountShortCircuited", commandMetrics json.writeNumberField("rollingCountShortCircuited", commandMetrics
.getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED)); .getRollingCount(HystrixRollingNumberEvent.SHORT_CIRCUITED));
json.writeNumberField("rollingCountSuccess", json.writeNumberField("rollingCountSuccess", commandMetrics
commandMetrics.getRollingCount(HystrixRollingNumberEvent.SUCCESS)); .getRollingCount(HystrixRollingNumberEvent.SUCCESS));
json.writeNumberField("rollingCountThreadPoolRejected", commandMetrics json.writeNumberField("rollingCountThreadPoolRejected", commandMetrics
.getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED)); .getRollingCount(HystrixRollingNumberEvent.THREAD_POOL_REJECTED));
json.writeNumberField("rollingCountTimeout", json.writeNumberField("rollingCountTimeout", commandMetrics
commandMetrics.getRollingCount(HystrixRollingNumberEvent.TIMEOUT)); .getRollingCount(HystrixRollingNumberEvent.TIMEOUT));
json.writeNumberField("currentConcurrentExecutionCount", json.writeNumberField("currentConcurrentExecutionCount",
commandMetrics.getCurrentConcurrentExecutionCount()); commandMetrics.getCurrentConcurrentExecutionCount());
...@@ -186,12 +191,18 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -186,12 +191,18 @@ public class HystrixStreamTask implements ApplicationContextAware {
commandMetrics.getExecutionTimeMean()); commandMetrics.getExecutionTimeMean());
json.writeObjectFieldStart("latencyExecute"); json.writeObjectFieldStart("latencyExecute");
json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0)); json.writeNumberField("0", commandMetrics.getExecutionTimePercentile(0));
json.writeNumberField("25", commandMetrics.getExecutionTimePercentile(25)); json.writeNumberField("25",
json.writeNumberField("50", commandMetrics.getExecutionTimePercentile(50)); commandMetrics.getExecutionTimePercentile(25));
json.writeNumberField("75", commandMetrics.getExecutionTimePercentile(75)); json.writeNumberField("50",
json.writeNumberField("90", commandMetrics.getExecutionTimePercentile(90)); commandMetrics.getExecutionTimePercentile(50));
json.writeNumberField("95", commandMetrics.getExecutionTimePercentile(95)); json.writeNumberField("75",
json.writeNumberField("99", commandMetrics.getExecutionTimePercentile(99)); commandMetrics.getExecutionTimePercentile(75));
json.writeNumberField("90",
commandMetrics.getExecutionTimePercentile(90));
json.writeNumberField("95",
commandMetrics.getExecutionTimePercentile(95));
json.writeNumberField("99",
commandMetrics.getExecutionTimePercentile(99));
json.writeNumberField("99.5", json.writeNumberField("99.5",
commandMetrics.getExecutionTimePercentile(99.5)); commandMetrics.getExecutionTimePercentile(99.5));
json.writeNumberField("100", json.writeNumberField("100",
...@@ -208,7 +219,8 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -208,7 +219,8 @@ public class HystrixStreamTask implements ApplicationContextAware {
json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90)); json.writeNumberField("90", commandMetrics.getTotalTimePercentile(90));
json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95)); json.writeNumberField("95", commandMetrics.getTotalTimePercentile(95));
json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99)); json.writeNumberField("99", commandMetrics.getTotalTimePercentile(99));
json.writeNumberField("99.5", commandMetrics.getTotalTimePercentile(99.5)); json.writeNumberField("99.5",
commandMetrics.getTotalTimePercentile(99.5));
json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100)); json.writeNumberField("100", commandMetrics.getTotalTimePercentile(100));
json.writeEndObject(); json.writeEndObject();
...@@ -222,7 +234,8 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -222,7 +234,8 @@ public class HystrixStreamTask implements ApplicationContextAware {
commandProperties.circuitBreakerRequestVolumeThreshold().get()); commandProperties.circuitBreakerRequestVolumeThreshold().get());
json.writeNumberField( json.writeNumberField(
"propertyValue_circuitBreakerSleepWindowInMilliseconds", "propertyValue_circuitBreakerSleepWindowInMilliseconds",
commandProperties.circuitBreakerSleepWindowInMilliseconds().get()); commandProperties.circuitBreakerSleepWindowInMilliseconds()
.get());
json.writeNumberField( json.writeNumberField(
"propertyValue_circuitBreakerErrorThresholdPercentage", "propertyValue_circuitBreakerErrorThresholdPercentage",
commandProperties.circuitBreakerErrorThresholdPercentage().get()); commandProperties.circuitBreakerErrorThresholdPercentage().get());
...@@ -245,11 +258,13 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -245,11 +258,13 @@ public class HystrixStreamTask implements ApplicationContextAware {
.get()); .get());
json.writeStringField( json.writeStringField(
"propertyValue_executionIsolationThreadPoolKeyOverride", "propertyValue_executionIsolationThreadPoolKeyOverride",
commandProperties.executionIsolationThreadPoolKeyOverride().get()); commandProperties.executionIsolationThreadPoolKeyOverride()
.get());
json.writeNumberField( json.writeNumberField(
"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests", "propertyValue_executionIsolationSemaphoreMaxConcurrentRequests",
commandProperties commandProperties
.executionIsolationSemaphoreMaxConcurrentRequests().get()); .executionIsolationSemaphoreMaxConcurrentRequests()
.get());
json.writeNumberField( json.writeNumberField(
"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests", "propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests",
commandProperties commandProperties
...@@ -309,22 +324,22 @@ public class HystrixStreamTask implements ApplicationContextAware { ...@@ -309,22 +324,22 @@ public class HystrixStreamTask implements ApplicationContextAware {
json.writeStringField("name", key.name()); json.writeStringField("name", key.name());
json.writeNumberField("currentTime", System.currentTimeMillis()); json.writeNumberField("currentTime", System.currentTimeMillis());
json.writeNumberField("currentActiveCount", threadPoolMetrics json.writeNumberField("currentActiveCount",
.getCurrentActiveCount().intValue()); threadPoolMetrics.getCurrentActiveCount().intValue());
json.writeNumberField("currentCompletedTaskCount", threadPoolMetrics json.writeNumberField("currentCompletedTaskCount",
.getCurrentCompletedTaskCount().longValue()); threadPoolMetrics.getCurrentCompletedTaskCount().longValue());
json.writeNumberField("currentCorePoolSize", threadPoolMetrics json.writeNumberField("currentCorePoolSize",
.getCurrentCorePoolSize().intValue()); threadPoolMetrics.getCurrentCorePoolSize().intValue());
json.writeNumberField("currentLargestPoolSize", threadPoolMetrics json.writeNumberField("currentLargestPoolSize",
.getCurrentLargestPoolSize().intValue()); threadPoolMetrics.getCurrentLargestPoolSize().intValue());
json.writeNumberField("currentMaximumPoolSize", threadPoolMetrics json.writeNumberField("currentMaximumPoolSize",
.getCurrentMaximumPoolSize().intValue()); threadPoolMetrics.getCurrentMaximumPoolSize().intValue());
json.writeNumberField("currentPoolSize", threadPoolMetrics json.writeNumberField("currentPoolSize",
.getCurrentPoolSize().intValue()); threadPoolMetrics.getCurrentPoolSize().intValue());
json.writeNumberField("currentQueueSize", threadPoolMetrics json.writeNumberField("currentQueueSize",
.getCurrentQueueSize().intValue()); threadPoolMetrics.getCurrentQueueSize().intValue());
json.writeNumberField("currentTaskCount", threadPoolMetrics json.writeNumberField("currentTaskCount",
.getCurrentTaskCount().longValue()); threadPoolMetrics.getCurrentTaskCount().longValue());
json.writeNumberField("rollingCountThreadsExecuted", json.writeNumberField("rollingCountThreadsExecuted",
threadPoolMetrics.getRollingCountThreadsExecuted()); threadPoolMetrics.getRollingCountThreadsExecuted());
json.writeNumberField("rollingMaxActiveThreads", json.writeNumberField("rollingMaxActiveThreads",
......
...@@ -28,19 +28,18 @@ import org.springframework.context.annotation.Bean; ...@@ -28,19 +28,18 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
/** /**
* Autoconfiguration for a Spring Cloud Turbine using Spring Cloud Stream. * Autoconfiguration for a Spring Cloud Turbine using Spring Cloud Stream. Enabled by
* Enabled by default if spring-cloud-stream is on the classpath, and can be * default if spring-cloud-stream is on the classpath, and can be switched off with
* switched off with <code>turbine.stream.enabled</code>. * <code>turbine.stream.enabled</code>.
* *
* If there is a single * If there is a single {@link ConnectionFactory} in the context it will be used, or if
* {@link ConnectionFactory} in the context it will be used, or if there is a one * there is a one qualified as <code>@TurbineConnectionFactory</code> it will be preferred
* qualified as <code>@TurbineConnectionFactory</code> it will be preferred over others, * over others, otherwise the <code>@Primary</code> one will be used. If there are
* otherwise the <code>@Primary</code> one will be used. If there are multiple unqualified * multiple unqualified connection factories there will be an autowiring error. Note that
* connection factories there will be an autowiring error. Note that Spring Boot (as of * Spring Boot (as of 1.2.2) creates a ConnectionFactory that is <i>not</i>
* 1.2.2) creates a ConnectionFactory that is <i>not</i> <code>@Primary</code>, so if you * <code>@Primary</code>, so if you want to use one connection factory for turbine and
* want to use one connection factory for turbine and another for business messages, you * another for business messages, you need to create both, and annotate them
* need to create both, and annotate them <code>@TurbineConnectionFactory</code> and * <code>@TurbineConnectionFactory</code> and <code>@Primary</code> respectively.
* <code>@Primary</code> respectively.
* *
* @author Spencer Gibb * @author Spencer Gibb
* @author Dave Syer * @author Dave Syer
...@@ -59,14 +58,19 @@ public class TurbineStreamAutoConfiguration { ...@@ -59,14 +58,19 @@ public class TurbineStreamAutoConfiguration {
@PostConstruct @PostConstruct
public void init() { public void init() {
BindingProperties inputBinding = this.bindings.getBindings().get(TurbineStreamClient.INPUT); BindingProperties inputBinding = this.bindings.getBindings()
.get(TurbineStreamClient.INPUT);
if (inputBinding == null) { if (inputBinding == null) {
this.bindings.getBindings().put(TurbineStreamClient.INPUT, this.bindings.getBindings().put(TurbineStreamClient.INPUT,
new BindingProperties()); new BindingProperties());
} }
BindingProperties input = this.bindings.getBindings().get(TurbineStreamClient.INPUT); BindingProperties input = this.bindings.getBindings()
.get(TurbineStreamClient.INPUT);
if (input.getDestination() == null) { if (input.getDestination() == null) {
input.setDestination(properties.getDestination()); input.setDestination(this.properties.getDestination());
}
if (input.getContentType() == null) {
input.setContentType(this.properties.getContentType());
} }
} }
......
...@@ -16,11 +16,12 @@ ...@@ -16,11 +16,12 @@
package org.springframework.cloud.netflix.turbine.stream; package org.springframework.cloud.netflix.turbine.stream;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.netflix.hystrix.HystrixConstants; import org.springframework.cloud.netflix.hystrix.HystrixConstants;
import org.springframework.http.MediaType;
import lombok.Data;
/** /**
* @author Dave Syer * @author Dave Syer
...@@ -33,4 +34,6 @@ public class TurbineStreamProperties { ...@@ -33,4 +34,6 @@ public class TurbineStreamProperties {
private int port = 8989; private int port = 8989;
private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION; private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION;
private String contentType = MediaType.APPLICATION_JSON_VALUE;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment