Commit b03b2f56 by Dave Syer

Add ping stream to turbine AMQP

This change adds more data to the stream with {"type":"Ping"} so (I believe) the existing dashboard client will discard it, but stay alive. Fixes gh-264
parent 72a182c9
......@@ -21,9 +21,12 @@ import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.SmartLifecycle;
......@@ -71,6 +74,11 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> {
return Collections.singletonMap("type", (Object) "Ping");
}).publish().refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.turbine.getPort();
......@@ -83,7 +91,7 @@ public class TurbineAmqpConfiguration implements SmartLifecycle {
(request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return publishedStreams.doOnUnsubscribe(
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(
data -> response.writeAndFlush(new ServerSentEvent(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment