Add hystrix.stream.queue.size to customize queue size.

Also added descriptions to properties. fixes gh-1131
parent 6877f089
......@@ -23,11 +23,14 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.ChannelBindingServiceProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.MessageChannel;
import org.springframework.scheduling.annotation.EnableScheduling;
import com.netflix.hystrix.HystrixCircuitBreaker;
......@@ -57,6 +60,10 @@ public class HystrixStreamAutoConfiguration {
@Autowired
private HystrixStreamProperties properties;
@Autowired
@Output(HystrixStreamClient.OUTPUT)
private MessageChannel outboundChannel;
@Bean
public HasFeatures hystrixStreamQueueFeature() {
return HasFeatures.namedFeature("Hystrix Stream (Queue)",
......@@ -87,8 +94,8 @@ public class HystrixStreamAutoConfiguration {
}
@Bean
public HystrixStreamTask hystrixStreamTask() {
return new HystrixStreamTask();
public HystrixStreamTask hystrixStreamTask(DiscoveryClient discoveryClient) {
return new HystrixStreamTask(this.outboundChannel, discoveryClient, this.properties);
}
}
......@@ -28,14 +28,28 @@ import lombok.Data;
@Data
public class HystrixStreamProperties {
/** Flag to indicate that Hystrix Stream is enabled. Default is true. */
private boolean enabled = true;
/** Flag to indicate to prefix metric names with serviceId. Default is true. */
private boolean prefixMetricName = true;
/** Flag to indicate to send the id field in the metrics. Default is true */
private boolean sendId = true;
/** The destination of the stream. Destination as defined by Spring Cloud Stream. Defaults to springCloudHystrixStream */
private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION;
/** The content type of the messages. Defaults to application/json */
private String contentType = "application/json";
/** How often (in ms) to send messages to the stream. Defaults to 500. */
private long sendRate = 500;
/** How often to put messages in the queue. This queue drains to the stream. Defaults to 500. */
private long gatherRate = 500;
/** The size of the metrics queue. This queue drains to the stream. Defaults to 1000. */
private int size = 1000;
}
......@@ -23,10 +23,8 @@ import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.messaging.MessageChannel;
......@@ -55,25 +53,26 @@ import lombok.extern.apachecommons.CommonsLog;
@CommonsLog
public class HystrixStreamTask implements ApplicationContextAware {
@Autowired
@Output(HystrixStreamClient.OUTPUT)
private MessageChannel outboundChannel;
@Autowired
private DiscoveryClient discoveryClient;
private ApplicationContext context;
@Autowired
private HystrixStreamProperties properties;
private ApplicationContext context;
// Visible for testing
// TODO: configurable size.
final LinkedBlockingQueue<String> jsonMetrics = new LinkedBlockingQueue<>(
1000);
final LinkedBlockingQueue<String> jsonMetrics;
private final JsonFactory jsonFactory = new JsonFactory();
public HystrixStreamTask(MessageChannel outboundChannel, DiscoveryClient discoveryClient, HystrixStreamProperties properties) {
this.outboundChannel = outboundChannel;
this.discoveryClient = discoveryClient;
this.properties = properties;
this.jsonMetrics = new LinkedBlockingQueue<>(properties.getSize());
}
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
......@@ -96,7 +95,7 @@ public class HystrixStreamTask implements ApplicationContextAware {
// TODO: remove the explicit content type when s-c-stream can handle that for us
this.outboundChannel.send(MessageBuilder.withPayload(json)
.setHeader(MessageHeaders.CONTENT_TYPE,
"application/json")
this.properties.getContentType())
.build());
}
catch (Exception ex) {
......
......@@ -61,6 +61,7 @@ public class HystrixStreamTaskTests {
new HystrixPropertiesCommandDefault(hystrixCommandKey, HystrixCommandProperties.defaultSetter()));
given(this.discoveryClient.getLocalServiceInstance()).willReturn(this.serviceInstance);
this.hystrixStreamTask.setApplicationContext(this.context);
this.hystrixStreamTask.gatherMetrics();
assertThat(this.hystrixStreamTask.jsonMetrics.isEmpty(), is(false));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment