Commit 0b638884 by Dave Syer

Fix merge in Turbine Stream tests

parent b704dc50
......@@ -55,7 +55,6 @@ import rx.subjects.PublishSubject;
@EnableConfigurationProperties(TurbineStreamProperties.class)
public class TurbineStreamConfiguration implements SmartLifecycle {
<<<<<<< HEAD
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
private AtomicBoolean running = new AtomicBoolean(false);
......@@ -77,7 +76,6 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
}
@Bean
//TODO: migrate to WebFlux?
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
......@@ -102,12 +100,12 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output
.doOnUnsubscribe(() -> log
.info("Unsubscribing RxNetty server connection"))
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null, null, JsonUtility.mapToJson(data))));
}, sseServerConfigurator());
Unpooled.copiedBuffer(JsonUtility.mapToJson(data),
StandardCharsets.UTF_8))));
}, serveSseConfigurator());
return httpServer;
}
......@@ -154,104 +152,5 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
public int getTurbinePort() {
return this.turbinePort;
}
=======
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
private AtomicBoolean running = new AtomicBoolean(false);
@Autowired
private TurbineStreamProperties properties;
private int turbinePort;
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (Stream)",
TurbineStreamProperties.class);
}
@Bean
public PublishSubject<Map<String, Object>> hystrixSubject() {
return PublishSubject.create();
}
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
.aggregateGroupedStreams(hystrixSubject().groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
Unpooled.copiedBuffer(JsonUtility.mapToJson(data),
StandardCharsets.UTF_8))));
}, serveSseConfigurator());
return httpServer;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
aggregatorServer().start();
}
}
@Override
public void stop() {
if (this.running.compareAndSet(true, false)) {
try {
aggregatorServer().shutdown();
}
catch (InterruptedException ex) {
log.error("Error shutting down", ex);
}
}
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
public int getPhase() {
return 0;
}
public int getTurbinePort() {
return this.turbinePort;
}
>>>>>>> feature/eureka-stubs
}
......@@ -16,7 +16,6 @@
package org.springframework.cloud.netflix.turbine.stream;
import org.junit.Ignore;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
......@@ -27,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -59,7 +59,8 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TurbineStreamTests.Application.class, webEnvironment = WebEnvironment.NONE, value = {
"turbine.stream.port=0", "spring.jmx.enabled=true", "spring.main.web-application-type=servlet"
"turbine.stream.port=0", "spring.jmx.enabled=true",
"spring.main.web-application-type=servlet",
// TODO: we don't need this if we harmonize the turbine and hystrix destinations
// https://github.com/spring-cloud/spring-cloud-netflix/issues/1948
"spring.cloud.stream.bindings.turbineStreamInput.destination=hystrixStreamOutput",
......@@ -89,10 +90,11 @@ public class TurbineStreamTests {
@EnableAutoConfiguration
@EnableTurbineStream
public static class Application { }
public static class Application {
}
@Test
@Ignore //FIXME 2.0.0 Elmurst stream missing class @Controller?
@Ignore // FIXME 2.0.0 Elmurst stream missing class @Controller?
public void contextLoads() throws Exception {
rest.getInterceptors().add(new NonClosingInterceptor());
int count = ((MessageChannelMetrics) input).getSendCount();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment