Commit f966b452 by Spencer Gibb

use eureka to lookup hystrix streams in turbine

parent 80688beb
......@@ -58,4 +58,5 @@ The default application name, virtual host and non-secure port are taken from th
- [ ] Better observable example
- [ ] Distributed refresh environment via platform bus
- [x] Metrics aggregation (turbine)
- [ ] Use Eureka for instance discovery rather than static list see https://github.com/Netflix/Turbine/blob/master/turbine-contrib/src/main/java/com/netflix/turbine/discovery/EurekaInstanceDiscovery.java
- [x] Use Eureka for instance discovery rather than static list see https://github.com/Netflix/Turbine/blob/master/turbine-contrib/src/main/java/com/netflix/turbine/discovery/EurekaInstanceDiscovery.java
- [ ] Configure InstanceDiscovery.impl using auto config/config props
......@@ -61,7 +61,7 @@
## Netflix Turbine
`spring-platform-netflix-turbine$ java -jar target/spring-platform-netflix-turbine-1.0.0.BUILD-SNAPSHOT.war --turbine.aggregator.clusterConfig=sampleApps --turbine.ConfigPropertyBasedDiscovery.sampleApps.instances=localhost:9080 --turbine.instanceUrlSuffix=/hystrix.stream`
`spring-platform-netflix-turbine$ java -jar target/spring-platform-netflix-turbine-1.0.0.BUILD-SNAPSHOT.war --InstanceDiscovery.impl=io.spring.platform.netflix.turbine.EurekaInstanceDiscovery --turbine.appConfig=samplefrontendservice --turbine.aggregator.clusterConfig=sampleApps --turbine.instanceUrlSuffix=/hystrix.stream --turbine.instanceInsertPort=true`
## Sandbox Sample Backend
......
......@@ -47,6 +47,12 @@
<groupId>com.netflix.eureka</groupId>
<artifactId>eureka-client</artifactId>
<version>${eureka.version}</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.netflix.eureka</groupId>
......
......@@ -54,6 +54,10 @@
<artifactId>spring-platform-config-client</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.eureka</groupId>
<artifactId>eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.netflix.turbine</groupId>
<artifactId>turbine-core</artifactId>
</dependency>
......
package io.spring.platform.netflix.turbine;
import com.netflix.turbine.init.TurbineInit;
import com.netflix.turbine.plugins.PluginsFactory;
import com.netflix.turbine.streaming.servlet.TurbineStreamServlet;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
......@@ -47,6 +48,7 @@ public class Application extends SpringBootServletInitializer implements SmartLi
@Override
public void start() {
PluginsFactory.setClusterMonitorFactory(new SpringAggregatorFactory());
TurbineInit.init();
}
......
package io.spring.platform.netflix.turbine;
import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.discovery.DiscoveryManager;
import com.netflix.discovery.shared.Application;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.discovery.InstanceDiscovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Class that encapsulates an {@link InstanceDiscovery} implementation that uses Eureka (see https://github.com/Netflix/eureka)
* The plugin requires a list of applications configured. It then queries the set of instances for each application.
* Instance information retrieved from Eureka must be translated to something that Turbine can understand i.e the {@link Instance} class.
*
* All the logic to perform this translation can be overriden here, so that you can provide your own implementation if needed.
*/
public class EurekaInstanceDiscovery implements InstanceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(EurekaInstanceDiscovery.class);
// Property the controls the list of applications that are enabled in Eureka
private static final DynamicStringProperty ApplicationList = DynamicPropertyFactory.getInstance().getStringProperty("turbine.appConfig", "");
public EurekaInstanceDiscovery() {
// Eureka client should already be configured by spring-platform-netflix-core
// initialize eureka client. make sure eureka properties are properly configured in config.properties
//DiscoveryManager.getInstance().initComponent(new MyDataCenterInstanceConfig(), new DefaultEurekaClientConfig());
}
/**
* Method that queries Eureka service for a list of configured application names
* @return Collection<Instance>
*/
@Override
public Collection<Instance> getInstanceList() throws Exception {
List<Instance> instances = new ArrayList<>();
List<String> appNames = parseApps();
if (appNames == null || appNames.size() == 0) {
logger.info("No apps configured, returning an empty instance list");
return instances;
}
logger.info("Fetching instance list for apps: " + appNames);
for (String appName : appNames) {
try {
instances.addAll(getInstancesForApp(appName));
} catch (Exception e) {
logger.error("Failed to fetch instances for app: " + appName + ", retrying once more", e);
try {
instances.addAll(getInstancesForApp(appName));
} catch (Exception e1) {
logger.error("Failed again to fetch instances for app: " + appName + ", giving up", e);
}
}
}
return instances;
}
/**
* Private helper that fetches the Instances for each application.
* @param appName
* @return List<Instance>
* @throws Exception
*/
private List<Instance> getInstancesForApp(String appName) throws Exception {
List<Instance> instances = new ArrayList<>();
logger.info("Fetching instances for app: {}", appName);
Application app = DiscoveryManager.getInstance().getDiscoveryClient().getApplication(appName);
if (app == null) {
logger.warn("Eureka returned null for app: {}", appName);
}
List<InstanceInfo> instancesForApp = app.getInstances();
if (instancesForApp != null) {
logger.info("Received instance list for app: {} = {}", appName, instancesForApp.size());
for (InstanceInfo iInfo : instancesForApp) {
Instance instance = marshallInstanceInfo(iInfo);
if (instance != null) {
instances.add(instance);
}
}
}
return instances;
}
/**
* Private helper that marshals the information from each instance into something that Turbine can understand.
* Override this method for your own implementation for parsing Eureka info.
*
* @param iInfo
* @return Instance
*/
protected Instance marshallInstanceInfo(InstanceInfo iInfo) {
String hostname = iInfo.getHostName();
String cluster = getClusterName(iInfo);
Boolean status = parseInstanceStatus(iInfo.getStatus());
if (hostname != null && cluster != null && status != null) {
Instance instance = new Instance(hostname, cluster, status);
Map<String, String> metadata = iInfo.getMetadata();
if (metadata != null) {
instance.getAttributes().putAll(metadata);
}
String asgName = iInfo.getASGName();
if (asgName != null) {
instance.getAttributes().put("asg", asgName);
}
instance.getAttributes().put("port", String.valueOf(iInfo.getPort()));
DataCenterInfo dcInfo = iInfo.getDataCenterInfo();
if (dcInfo != null && dcInfo.getName().equals(DataCenterInfo.Name.Amazon)) {
AmazonInfo amznInfo = (AmazonInfo) dcInfo;
instance.getAttributes().putAll(amznInfo.getMetadata());
}
return instance;
} else {
return null;
}
}
/**
* Helper that returns whether the instance is Up of Down
* @param status
* @return
*/
protected Boolean parseInstanceStatus(InstanceStatus status) {
if (status != null) {
if (status == InstanceStatus.UP) {
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} else {
return null;
}
}
/**
* Helper that fetches the cluster name. Cluster is a Turbine concept and not a Eureka concept.
* By default we choose the amazon asg name as the cluster. A custom implementation can be plugged in by overriding this method.
*
* @param iInfo
* @return
*/
protected String getClusterName(InstanceInfo iInfo) {
return iInfo.getASGName();
}
/**
* TODO: move to ConfigurationProperties
* Private helper that parses the list of application names.
*
* @return List<String>
*/
private List<String> parseApps() {
String appList = ApplicationList.get();
if (appList == null) {
return null;
}
appList = appList.trim();
if (appList.length() == 0) {
return null;
}
String[] parts = appList.split(",");
if (parts != null && parts.length > 0) {
return Arrays.asList(parts);
}
return null;
}
}
\ No newline at end of file
package io.spring.platform.netflix.turbine;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.turbine.data.AggDataFromCluster;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.handler.TurbineDataHandler;
import com.netflix.turbine.monitor.TurbineDataMonitor;
import com.netflix.turbine.monitor.cluster.AggregateClusterMonitor;
import com.netflix.turbine.monitor.cluster.ClusterMonitor;
import com.netflix.turbine.monitor.cluster.ClusterMonitorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static com.netflix.turbine.monitor.cluster.AggregateClusterMonitor.AggregatorClusterMonitorConsole;
/**
* Created by sgibb on 7/14/14.
*/
public class SpringAggregatorFactory implements ClusterMonitorFactory<AggDataFromCluster> {
private static final Logger logger = LoggerFactory.getLogger(SpringAggregatorFactory.class);
private static final DynamicStringProperty aggClusters = DynamicPropertyFactory.getInstance().getStringProperty("turbine.aggregator.clusterConfig", null);
/**
* @return {@link ClusterMonitor}<{@link AggDataFromCluster}>
*/
@Override
public ClusterMonitor<AggDataFromCluster> getClusterMonitor(String name) {
TurbineDataMonitor<AggDataFromCluster> clusterMonitor = AggregateClusterMonitor.AggregatorClusterMonitorConsole.findMonitor(name + "_agg");
return (ClusterMonitor<AggDataFromCluster>) clusterMonitor;
}
public static TurbineDataMonitor<AggDataFromCluster> findOrRegisterAggregateMonitor(String clusterName) {
TurbineDataMonitor<AggDataFromCluster> clusterMonitor = AggregatorClusterMonitorConsole.findMonitor(clusterName + "_agg");
if (clusterMonitor == null) {
logger.info("Could not find monitors: " + AggregatorClusterMonitorConsole.toString());
clusterMonitor = new SpringClusterMonitor(clusterName + "_agg", clusterName);
clusterMonitor = AggregatorClusterMonitorConsole.findOrRegisterMonitor(clusterMonitor);
}
return clusterMonitor;
}
@Override
public void initClusterMonitors() {
for(String clusterName : getClusterNames()) {
ClusterMonitor<AggDataFromCluster> clusterMonitor = (ClusterMonitor<AggDataFromCluster>) findOrRegisterAggregateMonitor(clusterName);
clusterMonitor.registerListenertoClusterMonitor(StaticListener);
try {
clusterMonitor.startMonitor();
} catch (Exception e) {
logger.warn("Could not init cluster monitor for: " + clusterName);
clusterMonitor.stopMonitor();
clusterMonitor.getDispatcher().stopDispatcher();
}
}
}
private List<String> getClusterNames() {
List<String> clusters = new ArrayList<String>();
String clusterNames = aggClusters.get();
if (clusterNames == null || clusterNames.trim().length() == 0) {
clusters.add("default");
} else {
String[] parts = aggClusters.get().split(",");
for (String s : parts) {
clusters.add(s);
}
}
return clusters;
}
private TurbineDataHandler<AggDataFromCluster> StaticListener = new TurbineDataHandler<AggDataFromCluster>() {
@Override
public String getName() {
return "StaticListener_For_Aggregator";
}
@Override
public void handleData(Collection<AggDataFromCluster> stats) {
}
@Override
public void handleHostLost(Instance host) {
}
@Override
public PerformanceCriteria getCriteria() {
return NonCriticalCriteria;
}
};
private PerformanceCriteria NonCriticalCriteria = new PerformanceCriteria() {
@Override
public boolean isCritical() {
return false;
}
@Override
public int getMaxQueueSize() {
return 0;
}
@Override
public int numThreads() {
return 0;
}
};
}
package io.spring.platform.netflix.turbine;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.turbine.data.DataFromSingleInstance;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.monitor.MonitorConsole;
import com.netflix.turbine.monitor.cluster.AggregateClusterMonitor;
import com.netflix.turbine.monitor.cluster.ObservationCriteria;
import com.netflix.turbine.monitor.instance.InstanceUrlClosure;
/**
* Created by sgibb on 7/14/14.
*/
public class SpringClusterMonitor extends AggregateClusterMonitor {
public SpringClusterMonitor(String name, String clusterName) {
super(name,
new ObservationCriteria.ClusterBasedObservationCriteria(clusterName),
new PerformanceCriteria.AggClusterPerformanceCriteria(clusterName),
new MonitorConsole<DataFromSingleInstance>(),
InstanceMonitorDispatcher,
SpringClusterMonitor.ClusterConfigBasedUrlClosure);
}
/**
* TODO: make this a template of some kind (secure, management port, etc...)
* Helper class that decides how to connect to a server based on injected config.
* Note that the cluster name must be provided here since one can have different configs for different clusters
*/
public static InstanceUrlClosure ClusterConfigBasedUrlClosure = new InstanceUrlClosure() {
private final DynamicStringProperty defaultUrlClosureConfig = DynamicPropertyFactory.getInstance().getStringProperty("turbine.instanceUrlSuffix", null);
private final DynamicBooleanProperty instanceInsertPort = DynamicPropertyFactory.getInstance().getBooleanProperty("turbine.instanceInsertPort", false);
@Override
public String getUrlPath(Instance host) {
if (host.getCluster() == null) {
throw new RuntimeException("Host must have cluster name in order to use ClusterConfigBasedUrlClosure");
}
String key = "turbine.instanceUrlSuffix." + host.getCluster();
DynamicStringProperty urlClosureConfig = DynamicPropertyFactory.getInstance().getStringProperty(key, null);
String url = urlClosureConfig.get();
if (url == null) {
url = defaultUrlClosureConfig.get();
}
if (url == null) {
throw new RuntimeException("Config property: " + urlClosureConfig.getName() + " or " +
defaultUrlClosureConfig.getName() + " must be set");
}
String insertPortKey = "turbine.instanceInsertPort." + host.getCluster();
DynamicStringProperty insertPortProp = DynamicPropertyFactory.getInstance().getStringProperty(insertPortKey, null);
boolean insertPort;
if (insertPortProp.get() == null) {
insertPort = instanceInsertPort.get();
} else {
insertPort = Boolean.parseBoolean(insertPortProp.get());
}
if (insertPort) {
if (url.startsWith("/")) {
url = url.substring(1);
}
if (!host.getAttributes().containsKey("port")) {
throw new RuntimeException("Configured to use port, but port is not in host attributes");
}
return String.format("http://%s:%s/%s", host.getHostname(), host.getAttributes().get("port"), url);
}
return "http://" + host.getHostname() + url;
}
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment