Updates turbine-stream to use webflux

Rather than custom netty server. Harmonizes spring cloud contract versions to 1.2.4.RELEASE fixes gh-2319
parent 2e12d9b4
......@@ -27,7 +27,7 @@
<spring-cloud-config.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-config.version>
<spring-cloud-stream.version>Elmhurst.BUILD-SNAPSHOT</spring-cloud-stream.version>
<!-- Has to be a stable version (not one that depends on this version of netflix): -->
<donotreplacespring-cloud-contract.version>1.2.0.RELEASE</donotreplacespring-cloud-contract.version>
<donotreplacespring-cloud-contract.version>1.2.4.RELEASE</donotreplacespring-cloud-contract.version>
<!-- Plugin versions -->
<maven-compiler-plugin.version>3.6.1</maven-compiler-plugin.version>
......
......@@ -17,6 +17,8 @@ package org.springframework.cloud.netflix.eureka.server.doc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
......@@ -324,4 +326,18 @@ class WireMockRestAssuredRequestAdapter implements Request {
return false;
}
@Override
public boolean isMultipart() {
return false;
}
@Override
public Collection<Part> getParts() {
return Collections.emptyList();
}
@Override
public Part getPart(String s) {
return null;
}
}
......@@ -15,7 +15,7 @@
<description>Spring Cloud Netflix Hystrix Contract</description>
<properties>
<main.basedir>${basedir}/..</main.basedir>
<donotreplacespring-cloud-contract.version>1.1.2.RELEASE</donotreplacespring-cloud-contract.version>
<donotreplacespring-cloud-contract.version>1.2.4.RELEASE</donotreplacespring-cloud-contract.version>
</properties>
<dependencies>
<dependency>
......
......@@ -79,6 +79,12 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.netflix.turbine</groupId>
......@@ -89,8 +95,13 @@
<artifactId>rxjava</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<artifactId>spring-boot-starter-webflux</artifactId>
<optional>true</optional>
</dependency>
<dependency>
......@@ -104,6 +115,11 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-contract-stub-runner</artifactId>
<scope>test</scope>
......
......@@ -54,6 +54,9 @@ public class HystrixStreamAggregator {
@ServiceActivator(inputChannel = TurbineStreamClient.INPUT)
public void sendToSubject(@Payload String payload) {
if (log.isTraceEnabled()) {
log.trace("Received hystrix stream payload string: " + payload);
}
if (payload.startsWith("\"")) {
// Legacy payload from an Angel client
payload = payload.substring(1, payload.length() - 1);
......
/*
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.stream;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.subjects.PublishSubject;
@RestController
public class TurbineController {
private static final Log log = LogFactory.getLog(TurbineController.class);
private final Flux<Map<String, Object>> flux;
public TurbineController(PublishSubject<Map<String, Object>> hystrixSubject) {
Observable<Map<String, Object>> stream = StreamAggregator.aggregateGroupedStreams(hystrixSubject.groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o);
Flux<Map<String, Object>> ping = Flux.interval(Duration.ofSeconds(5), Duration.ofSeconds(10))
.map(l -> Collections.singletonMap("type", (Object) "ping"))
.share();
flux = Flux.merge(RxReactiveStreams.toPublisher(stream), ping)
.share();
}
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Map<String, Object>> stream() {
return this.flux;
}
}
......@@ -22,29 +22,25 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import static io.reactivex.netty.pipeline.PipelineConfigurators.serveSseConfigurator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import rx.Observable;
import rx.subjects.PublishSubject;
......@@ -54,7 +50,7 @@ import rx.subjects.PublishSubject;
*/
@Configuration
@EnableConfigurationProperties(TurbineStreamProperties.class)
public class TurbineStreamConfiguration implements SmartLifecycle {
public class TurbineStreamConfiguration /*implements SmartLifecycle*/ {
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
......@@ -77,6 +73,11 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
}
@Bean
public TurbineController turbineController(PublishSubject<Map<String, Object>> hystrixSubject) {
return new TurbineController(hystrixSubject);
}
// @Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
......@@ -91,11 +92,11 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
// this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
// if (this.turbinePort <= 0) {
// this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
// }
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
......@@ -112,6 +113,7 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
}, serveSseConfigurator());
return httpServer;
}
/*
@Override
public boolean isAutoStartup() {
......@@ -152,6 +154,7 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
public int getPhase() {
return 0;
}
*/
public int getTurbinePort() {
return this.turbinePort;
......
......@@ -30,21 +30,10 @@ import org.springframework.http.MediaType;
@ConfigurationProperties("turbine.stream")
public class TurbineStreamProperties {
@Value("${server.port:8989}")
private int port = 8989;
private String destination = HystrixConstants.HYSTRIX_STREAM_DESTINATION;
private String contentType = MediaType.APPLICATION_JSON_VALUE;
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getDestination() {
return destination;
}
......@@ -68,18 +57,18 @@ public class TurbineStreamProperties {
if (o == null || getClass() != o.getClass())
return false;
TurbineStreamProperties that = (TurbineStreamProperties) o;
return port == that.port && Objects.equals(destination, that.destination)
return Objects.equals(destination, that.destination)
&& Objects.equals(contentType, that.contentType);
}
@Override
public int hashCode() {
return Objects.hash(port, destination, contentType);
return Objects.hash(destination, contentType);
}
@Override
public String toString() {
return new StringBuilder("TurbineStreamProperties{").append("port=").append(port)
return new StringBuilder("TurbineStreamProperties{")
.append(", ").append("destination='").append(destination).append("', ")
.append("contentType='").append(contentType).append("'}").toString();
}
......
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.turbine.stream.TurbineStreamAutoConfiguration
org.springframework.context.ApplicationListener=\
org.springframework.cloud.netflix.turbine.stream.TurbinePortApplicationListener
\ No newline at end of file
#org.springframework.context.ApplicationListener=\
#org.springframework.cloud.netflix.turbine.stream.TurbinePortApplicationListener
\ No newline at end of file
......@@ -7,7 +7,4 @@ spring:
default_domain: cloud.turbine.stream
server:
port: 8990
turbine:
stream:
port: 8989
......@@ -24,8 +24,6 @@ import java.net.URI;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -33,7 +31,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.cloud.contract.stubrunner.StubTrigger;
import org.springframework.cloud.contract.stubrunner.spring.AutoConfigureStubRunner;
import org.springframework.http.HttpHeaders;
......@@ -51,15 +49,14 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT;
/**
* @author Spencer Gibb
* @author Daniel Lavoie
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = TurbineStreamTests.Application.class, webEnvironment = WebEnvironment.NONE, value = {
"turbine.stream.port=0", "spring.jmx.enabled=true",
"spring.main.web-application-type=servlet",
@SpringBootTest(classes = TurbineStreamTests.Application.class, webEnvironment = RANDOM_PORT, properties = {
// TODO: we don't need this if we harmonize the turbine and hystrix destinations
// https://github.com/spring-cloud/spring-cloud-netflix/issues/1948
"spring.cloud.stream.bindings.turbineStreamInput.destination=hystrixStreamOutput",
......@@ -82,21 +79,23 @@ public class TurbineStreamTests {
@Autowired
TurbineStreamConfiguration turbine;
@LocalServerPort
int port;
@EnableAutoConfiguration
@EnableTurbineStream
public static class Application {
}
@Test
@Ignore // FIXME 2.0.0 Elmurst stream missing class @Controller?
public void contextLoads() throws Exception {
rest.getInterceptors().add(new NonClosingInterceptor());
int count = ((MessageChannelMetrics) input).getSendCount();
ResponseEntity<String> response = rest.execute(
new URI("http://localhost:" + turbine.getTurbinePort() + "/"),
new URI("http://localhost:" + port + "/"),
HttpMethod.GET, null, this::extract);
assertThat(response.getHeaders().getContentType())
.isEqualTo(MediaType.TEXT_EVENT_STREAM);
assertThat(response.getHeaders().getContentType().isCompatibleWith(MediaType.TEXT_EVENT_STREAM))
.isTrue();
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
Map<String, Object> metrics = extractMetrics(response.getBody());
assertThat(metrics).containsEntry("type", "HystrixCommand");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment