Commit 8c7116b2 by Yongsung Yoon Committed by Ryan Baxter

Add support cluster query param in TurbineStream (#3001)

parent 2df81038
......@@ -726,6 +726,24 @@ You can then point the Hystrix Dashboard to the Turbine Stream Server instead of
Spring Cloud provides a `spring-cloud-starter-netflix-turbine-stream` that has all the dependencies you need to get a Turbine Stream server running - just add the Stream binder of your choice, e.g. `spring-cloud-starter-stream-rabbit`. You need Java 8 to run the app because it is Netty-based.
Turbine Stream server also supports the `cluster` parameter.
Unlike Turbine server, Turbine Stream uses eureka serviceIds as cluster names and these are not configurable.
If Turbine Stream server is running on port 8989 on `my.turbine.server` and you have two eureka serviceIds `customers` and `products` in your environment, the following URLs will be available on your Turbine Stream server. `default` and empty cluster name will provide all metrics that Turbine Stream server receives.
----
http://my.turbine.sever:8989/turbine.stream?cluster=customers
http://my.turbine.sever:8989/turbine.stream?cluster=products
http://my.turbine.sever:8989/turbine.stream?cluster=default
http://my.turbine.sever:8989/turbine.stream
----
So, you can use eureka serviceIds as cluster names for your Turbine dashboard (or any compatible dashboard).
You dont need to configure any properties like `turbine.appConfig`, `turbine.clusterNameExpression` and `turbine.aggregator.clusterConfig` for your Turbine Stream server.
NOTE: Turbine Stream server gathers all metrics from the configured input channel with Spring Cloud Stream. It means that it doesnt gather Hystrix metrics actively from each instance. It just can provide metrics that were already gathered into the input channel by each instance.
[[spring-cloud-ribbon]]
== Client Side Load Balancer: Ribbon
......
......@@ -18,6 +18,7 @@ package org.springframework.cloud.netflix.turbine.stream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -46,6 +47,7 @@ import org.springframework.util.SocketUtils;
import static io.reactivex.netty.pipeline.PipelineConfigurators.serveSseConfigurator;
import rx.Observable;
import rx.functions.Func1;
import rx.subjects.PublishSubject;
/**
......@@ -58,6 +60,10 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
private static final String CLUSTER_PARAM = "cluster";
private static final String DEFAULT_CLUSTER = "default";
private static final String INSTANCE_ID_KEY = "instanceId";
private AtomicBoolean running = new AtomicBoolean(false);
@Autowired
......@@ -103,6 +109,7 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.filter(createClusterPredicate(request.getQueryParameters()))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null,
Unpooled.copiedBuffer("message",
......@@ -113,6 +120,23 @@ public class TurbineStreamConfiguration implements SmartLifecycle {
return httpServer;
}
Func1<Map<String, Object>, Boolean> createClusterPredicate(Map<String, List<String>> queryParameters) {
List<String> clusterNames = queryParameters.get(CLUSTER_PARAM);
if ((clusterNames == null) || (clusterNames.isEmpty()) || (clusterNames.contains(DEFAULT_CLUSTER))) {
return (data) -> true; // always true
}
return (data) -> {
String instanceId = (String) data.get(INSTANCE_ID_KEY);
if (instanceId == null) {
return true; // ping or unknown metric data. should be passed
}
return clusterNames.stream()
.anyMatch(clusterName -> instanceId.toLowerCase().startsWith(clusterName.toLowerCase() + ":"));
};
}
@Override
public boolean isAutoStartup() {
return true;
......
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine.stream;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import static org.junit.Assert.assertEquals;
/**
* @author Yongsung Yoon
*/
public class TurbineStreamConfigurationTest {
TurbineStreamConfiguration turbineStreamConfiguration;
List<Map<String, Object>> testMetricList;
@Before
public void setUp() {
turbineStreamConfiguration = new TurbineStreamConfiguration();
testMetricList = createBasicTestMetricList();
}
private List<Map<String,Object>> createBasicTestMetricList() {
List<Map<String, Object>> testDataList = new ArrayList<>();
HashMap<String, Object> map = new HashMap<>();
map.put("instanceId", "abc:127.0.0.1:8080");
map.put("type", "HystrixCommand");
testDataList.add(map);
map = new HashMap<>();
map.put("instanceId", "def:127.0.0.1:8080");
map.put("type", "HystrixCommand");
testDataList.add(map);
map = new HashMap<>();
map.put("instanceId", "xyz:127.0.0.1:8080");
map.put("type", "HystrixThreadPool");
testDataList.add(map);
map = new HashMap<>();
map.put("type", "ping");
testDataList.add(map);
map = new HashMap<>();
map.put("dummy", "data");
testDataList.add(map);
return testDataList;
}
@Test
public void shouldReturnAlwaysTruePredicateWithEmptyQueryParam() {
Func1<Map<String, Object>, Boolean> clusterPredicate = turbineStreamConfiguration.createClusterPredicate(Collections.emptyMap());
assertThatGivenPredicateReturnsTrueAsExpectedCount(5, clusterPredicate); // all
}
@Test
public void shouldReturnAlwaysTruePredicateIfQueryParamsContainDefault() {
Map<String, List<String>> queryMap = new HashMap<>();
queryMap.put("cluster", Arrays.asList("default", "garbage"));
Func1<Map<String, Object>, Boolean> clusterPredicate = turbineStreamConfiguration.createClusterPredicate(queryMap);
assertThatGivenPredicateReturnsTrueAsExpectedCount(5, clusterPredicate); // all
}
@Test
public void shouldReturnPredicateForGivenClusterName() {
Map<String, List<String>> queryMap = new HashMap<>();
queryMap.put("cluster", Arrays.asList("abc"));
Func1<Map<String, Object>, Boolean> clusterPredicate = turbineStreamConfiguration.createClusterPredicate(queryMap);
assertThatGivenPredicateReturnsTrueAsExpectedCount(3, clusterPredicate); // abc + ping + dummy
}
@Test
public void shouldReturnPredicateForGivenMultipleClusterNames() {
Map<String, List<String>> queryMap = new HashMap<>();
queryMap.put("cluster", Arrays.asList("abc", "xyz"));
Func1<Map<String, Object>, Boolean> clusterPredicate = turbineStreamConfiguration.createClusterPredicate(queryMap);
assertThatGivenPredicateReturnsTrueAsExpectedCount(4, clusterPredicate); // abc + xyz + ping + dummy
}
@Test
public void shouldReturnPredicateForUnknownClusterName() {
Map<String, List<String>> queryMap = new HashMap<>();
queryMap.put("cluster", Arrays.asList("ttt", "eee"));
Func1<Map<String, Object>, Boolean> clusterPredicate = turbineStreamConfiguration.createClusterPredicate(queryMap);
assertThatGivenPredicateReturnsTrueAsExpectedCount(2, clusterPredicate); // ping + dummy
}
void assertThatGivenPredicateReturnsTrueAsExpectedCount(int expectedCount, Func1<Map<String, Object>, Boolean> predicate) {
assertEquals(expectedCount, countEmittedMetricsWithPredicate(predicate));
}
int countEmittedMetricsWithPredicate(Func1<Map<String, Object>, Boolean> predicate) {
try {
return Observable.from(this.testMetricList)
.filter(predicate)
.count()
.toBlocking()
.single();
} catch (NoSuchElementException ex) {
return 0;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment