Commit 85a3e82d by Spencer Gibb

remove references to bus

parent 4dc7972f
......@@ -26,7 +26,7 @@ public class HystrixStreamAutoConfiguration {
@ConditionalOnExpression("${hystrix.stream.amqp.enabled:true}")
@IntegrationComponentScan(basePackageClasses = HystrixStreamChannel.class)
@EnableScheduling
protected static class HystrixStreamBusAutoConfiguration {
protected static class HystrixStreamAmqpAutoConfiguration {
@Autowired
private AmqpTemplate amqpTemplate;
......
......@@ -5,7 +5,7 @@ import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
* Run the RxNetty based Spring Cloud Bus Turbine server.
* Run the RxNetty based Spring Cloud Turbine AMQP server.
* Based on Netflix Turbine 2
*
* @author Spencer Gibb
......
......@@ -27,7 +27,7 @@ import org.springframework.integration.dsl.amqp.Amqp;
public class TurbineAmqpAutoConfiguration {
@Configuration
@ConditionalOnExpression("${hystrix.stream.bus.turbine.enabled:true}")
@ConditionalOnExpression("${turbine.amqp.enabled:true}")
protected static class HystrixStreamAggregatorAutoConfiguration {
@Autowired
......@@ -41,7 +41,7 @@ public class TurbineAmqpAutoConfiguration {
}
@Bean
protected Binding localCloudBusQueueBinding() {
protected Binding localTurbineAmqpQueueBinding() {
return BindingBuilder.bind(hystrixStreamQueue()).to(hystrixStreamExchange()).with("");
}
......
......@@ -24,7 +24,7 @@ import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigu
*/
@Configuration
@Slf4j
@ConfigurationProperties("bus.turbine")
@ConfigurationProperties("turbine.amqp")
public class TurbineAmqpConfiguration implements SmartLifecycle {
private boolean running = false;
......@@ -41,15 +41,15 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator.aggregateGroupedStreams(hystrixSubject()
.groupBy(data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("BusTurbine => Starting aggregation"))
.doOnUnsubscribe(() -> log.info("AmqpTurbine => Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("AmqpTurbine => Starting aggregation"))
.flatMap(o -> o).publish().refCount();
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty.createHttpServer(port, (request, response) -> {
log.info("BusTurbine => SSE Request Received");
log.info("AmqpTurbine => SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return publishedStreams
.doOnUnsubscribe(() -> log.info("BusTurbine => Unsubscribing RxNetty server connection"))
.doOnUnsubscribe(() -> log.info("AmqpTurbine => Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data))));
}, sseServerConfigurator());
return httpServer;
......
......@@ -17,7 +17,7 @@ import static org.springframework.netflix.turbine.amqp.Aggregator.getPayloadData
public class AggregatorTest {
public static final String STREAM_ALL = "hystrixbus";
public static final String STREAM_ALL = "hystrixamqp";
public static void main(String[] args) {
getHystrixStreamFromFile(STREAM_ALL, 1).flatMap(commandGroup -> commandGroup.take(50)).take(50).toBlocking().forEach(s -> System.out.println("s: " + s));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment