Commit 38055b58 by Spencer Gibb

Allow flexibility with Turbine http.

Allows non-eureka based InstanceDiscovery. Allows custom InstanceDiscovery. fixes gh-566
parent 1e716ff2
......@@ -64,10 +64,12 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.netflix.eureka</groupId>
<artifactId>eureka-client</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
......@@ -85,8 +87,8 @@
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
......
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.discovery.InstanceDiscovery;
import lombok.extern.apachecommons.CommonsLog;
/**
* Class that encapsulates an {@link InstanceDiscovery}
* implementation that uses Eureka (see https://github.com/Netflix/eureka) The plugin
* requires a list of applications configured. It then queries the set of instances for
* each application. Instance information retrieved from Eureka must be translated to
* something that Turbine can understand i.e the
* {@link Instance} class.
* <p>
* All the logic to perform this translation can be overriden here, so that you can
* provide your own implementation if needed.
*
* @author Spencer Gibb
*/
@CommonsLog
public class CommonsInstanceDiscovery implements InstanceDiscovery {
private static final String DEFAULT_CLUSTER_NAME_EXPRESSION = "serviceId";
private final Expression clusterNameExpression;
private DiscoveryClient discoveryClient;
private TurbineProperties turbineProperties;
public CommonsInstanceDiscovery(TurbineProperties turbineProperties, DiscoveryClient discoveryClient) {
this(turbineProperties, DEFAULT_CLUSTER_NAME_EXPRESSION);
this.discoveryClient = discoveryClient;
}
protected CommonsInstanceDiscovery(TurbineProperties turbineProperties, String defaultExpression) {
this.turbineProperties = turbineProperties;
SpelExpressionParser parser = new SpelExpressionParser();
String clusterNameExpression = turbineProperties
.getClusterNameExpression();
if (clusterNameExpression == null) {
clusterNameExpression = defaultExpression;
}
this.clusterNameExpression = parser.parseExpression(clusterNameExpression);
}
protected Expression getClusterNameExpression() {
return clusterNameExpression;
}
public TurbineProperties getTurbineProperties() {
return turbineProperties;
}
/**
* Method that queries Eureka service for a list of configured application names
* @return Collection<Instance>
*/
@Override
public Collection<Instance> getInstanceList() throws Exception {
List<Instance> instances = new ArrayList<>();
List<String> appNames = getTurbineProperties().getAppConfigList();
if (appNames == null || appNames.size() == 0) {
log.info("No apps configured, returning an empty instance list");
return instances;
}
log.info("Fetching instance list for apps: " + appNames);
for (String appName : appNames) {
try {
instances.addAll(getInstancesForApp(appName));
}
catch (Exception ex) {
log.error("Failed to fetch instances for app: " + appName
+ ", retrying once more", ex);
try {
instances.addAll(getInstancesForApp(appName));
}
catch (Exception retryException) {
log.error("Failed again to fetch instances for app: " + appName
+ ", giving up", ex);
}
}
}
return instances;
}
/**
* helper that fetches the Instances for each application from DiscoveryClient.
* @param serviceId
* @return List<Instance>
* @throws Exception
*/
protected List<Instance> getInstancesForApp(String serviceId) throws Exception {
List<Instance> instances = new ArrayList<>();
log.info("Fetching instances for app: " + serviceId);
List<ServiceInstance> serviceInstances = discoveryClient.getInstances(serviceId);
if (serviceInstances == null || serviceInstances.isEmpty()) {
log.warn("DiscoveryClient returned null or empty for service: " + serviceId);
return instances;
}
try {
log.info("Received instance list for service: " + serviceId + ", size="
+ serviceInstances.size());
for (ServiceInstance serviceInstance : serviceInstances) {
Instance instance = marshall(serviceInstance);
if (instance != null) {
instances.add(instance);
}
}
}
catch (Exception e) {
log.warn("Failed to retrieve instances from DiscoveryClient", e);
}
return instances;
}
/**
* Private helper that marshals the information from each instance into something that
* Turbine can understand. Override this method for your own implementation for
* parsing Eureka info.
* @param serviceInstance
* @return Instance
*/
private Instance marshall(ServiceInstance serviceInstance) {
String hostname = serviceInstance.getHost();
String cluster = getClusterName(serviceInstance);
Boolean status = Boolean.TRUE; //TODO: where to get?
if (hostname != null && cluster != null && status != null) {
Instance instance = new Instance(hostname, cluster, status);
// TODO: reimplement when metadata is in commons
// add metadata
/*Map<String, String> metadata = instanceInfo.getMetadata();
if (metadata != null) {
instance.getAttributes().putAll(metadata);
}*/
// add ports
instance.getAttributes().put("port", String.valueOf(serviceInstance.getPort()));
boolean securePortEnabled = serviceInstance.isSecure();
if (securePortEnabled) {
instance.getAttributes().put("securePort", String.valueOf(serviceInstance.getPort()));
}
return instance;
}
else {
return null;
}
}
/**
* Helper that fetches the cluster name. Cluster is a Turbine concept and not a Eureka
* concept. By default we choose the amazon asg name as the cluster. A custom
* implementation can be plugged in by overriding this method.
*/
protected String getClusterName(Object object) {
StandardEvaluationContext context = new StandardEvaluationContext(object);
Object value = this.clusterNameExpression.getValue(context);
if (value != null) {
return value.toString();
}
return null;
}
}
......@@ -30,7 +30,7 @@ import org.springframework.context.annotation.Import;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TurbineConfiguration.class)
@Import(TurbineHttpConfiguration.class)
public @interface EnableTurbine {
}
......@@ -17,27 +17,18 @@
package org.springframework.cloud.netflix.turbine;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import com.netflix.discovery.EurekaClient;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import com.netflix.appinfo.AmazonInfo;
import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.InstanceInfo.InstanceStatus;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.discovery.InstanceDiscovery;
import lombok.extern.apachecommons.CommonsLog;
/**
* Class that encapsulates an {@link com.netflix.turbine.discovery.InstanceDiscovery}
......@@ -53,74 +44,39 @@ import com.netflix.turbine.discovery.InstanceDiscovery;
* @author Spencer Gibb
*/
@CommonsLog
public class EurekaInstanceDiscovery implements InstanceDiscovery {
public class EurekaInstanceDiscovery extends CommonsInstanceDiscovery {
// Property the controls the list of applications that are enabled in Eureka
private static final DynamicStringProperty ApplicationList = DynamicPropertyFactory
.getInstance().getStringProperty("turbine.appConfig", "");
private static final String EUREKA_DEFAULT_CLUSTER_NAME_EXPRESSION = "appName";
private final EurekaClient eurekaClient;
private final Expression clusterNameExpression;
public EurekaInstanceDiscovery(TurbineProperties turbineProperties, EurekaClient eurekaClient) {
super(turbineProperties, EUREKA_DEFAULT_CLUSTER_NAME_EXPRESSION);
this.eurekaClient = eurekaClient;
SpelExpressionParser parser = new SpelExpressionParser();
this.clusterNameExpression = parser.parseExpression(turbineProperties
.getClusterNameExpression());
}
/**
* Method that queries Eureka service for a list of configured application names
* @return Collection<Instance>
*/
@Override
public Collection<Instance> getInstanceList() throws Exception {
List<Instance> instances = new ArrayList<>();
List<String> appNames = parseApps();
if (appNames == null || appNames.size() == 0) {
log.info("No apps configured, returning an empty instance list");
return instances;
}
log.info("Fetching instance list for apps: " + appNames);
for (String appName : appNames) {
try {
instances.addAll(getInstancesForApp(appName));
}
catch (Exception ex) {
log.error("Failed to fetch instances for app: " + appName
+ ", retrying once more", ex);
try {
instances.addAll(getInstancesForApp(appName));
}
catch (Exception retryException) {
log.error("Failed again to fetch instances for app: " + appName
+ ", giving up", ex);
}
}
}
return instances;
}
/**
* Private helper that fetches the Instances for each application.
* @param appName
* @param serviceId
* @return List<Instance>
* @throws Exception
*/
private List<Instance> getInstancesForApp(String appName) throws Exception {
@Override
protected List<Instance> getInstancesForApp(String serviceId) throws Exception {
List<Instance> instances = new ArrayList<>();
log.info("Fetching instances for app: " + appName);
Application app = eurekaClient.getApplication(appName);
log.info("Fetching instances for app: " + serviceId);
Application app = eurekaClient.getApplication(serviceId);
if (app == null) {
log.warn("Eureka returned null for app: " + appName);
log.warn("Eureka returned null for app: " + serviceId);
return instances;
}
try {
List<InstanceInfo> instancesForApp = app.getInstances();
if (instancesForApp != null) {
log.info("Received instance list for app: " + appName + ", size="
log.info("Received instance list for app: " + serviceId + ", size="
+ instancesForApp.size());
for (InstanceInfo iInfo : instancesForApp) {
Instance instance = marshallInstanceInfo(iInfo);
Instance instance = marshall(iInfo);
if (instance != null) {
instances.add(instance);
}
......@@ -140,7 +96,7 @@ public class EurekaInstanceDiscovery implements InstanceDiscovery {
* @param instanceInfo
* @return Instance
*/
protected Instance marshallInstanceInfo(InstanceInfo instanceInfo) {
Instance marshall(InstanceInfo instanceInfo) {
String hostname = instanceInfo.getHostName();
String cluster = getClusterName(instanceInfo);
Boolean status = parseInstanceStatus(instanceInfo.getStatus());
......@@ -187,36 +143,5 @@ public class EurekaInstanceDiscovery implements InstanceDiscovery {
return status == InstanceStatus.UP;
}
/**
* Helper that fetches the cluster name. Cluster is a Turbine concept and not a Eureka
* concept. By default we choose the amazon asg name as the cluster. A custom
* implementation can be plugged in by overriding this method.
*/
protected String getClusterName(InstanceInfo iInfo) {
StandardEvaluationContext context = new StandardEvaluationContext(iInfo);
Object value = this.clusterNameExpression.getValue(context);
if (value != null) {
return value.toString();
}
return null;
}
private List<String> parseApps() {
// TODO: move to ConfigurationProperties Private helper that parses the list of
// application names.
String appList = ApplicationList.get();
if (appList == null) {
return null;
}
appList = appList.trim();
if (appList.length() == 0) {
return null;
}
String[] parts = appList.split(",");
if (parts != null && parts.length > 0) {
return Arrays.asList(parts);
}
return null;
}
}
......@@ -20,8 +20,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import lombok.extern.apachecommons.CommonsLog;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.turbine.data.AggDataFromCluster;
......@@ -33,6 +31,8 @@ import com.netflix.turbine.monitor.cluster.AggregateClusterMonitor;
import com.netflix.turbine.monitor.cluster.ClusterMonitor;
import com.netflix.turbine.monitor.cluster.ClusterMonitorFactory;
import lombok.extern.apachecommons.CommonsLog;
import static com.netflix.turbine.monitor.cluster.AggregateClusterMonitor.AggregatorClusterMonitorConsole;
/**
......
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.netflix.discovery.EurekaClient;
import com.netflix.turbine.discovery.InstanceDiscovery;
import com.netflix.turbine.streaming.servlet.TurbineStreamServlet;
/**
* @author Spencer Gibb
*/
@Configuration
@EnableConfigurationProperties
@EnableDiscoveryClient
public class TurbineHttpConfiguration {
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (HTTP)", TurbineHttpConfiguration.class);
}
@Bean
public ServletRegistrationBean turbineStreamServlet() {
return new ServletRegistrationBean(new TurbineStreamServlet(), "/turbine.stream");
}
@Bean
public TurbineProperties turbineProperties() {
return new TurbineProperties();
}
@Bean
public TurbineLifecycle turbineLifecycle(InstanceDiscovery instanceDiscovery) {
return new TurbineLifecycle(instanceDiscovery);
}
@Configuration
@ConditionalOnClass(EurekaClient.class)
protected static class EurekaTurbineConfiguration {
@Bean
@ConditionalOnMissingBean
public InstanceDiscovery instanceDiscovery(TurbineProperties turbineProperties, EurekaClient eurekaClient) {
return new EurekaInstanceDiscovery(turbineProperties, eurekaClient);
}
}
@Configuration
@ConditionalOnMissingClass("com.netflix.discovery.EurekaClient")
protected static class DiscoveryClientTurbineConfiguration {
@Bean
@ConditionalOnMissingBean
public InstanceDiscovery instanceDiscovery(TurbineProperties turbineProperties, DiscoveryClient discoveryClient) {
return new CommonsInstanceDiscovery(turbineProperties, discoveryClient);
}
}
}
......@@ -16,55 +16,26 @@
package org.springframework.cloud.netflix.turbine;
import com.netflix.discovery.EurekaClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.embedded.ServletRegistrationBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import com.netflix.turbine.discovery.InstanceDiscovery;
import com.netflix.turbine.init.TurbineInit;
import com.netflix.turbine.plugins.PluginsFactory;
import com.netflix.turbine.streaming.servlet.TurbineStreamServlet;
/**
* @author Spencer Gibb
*/
@Configuration
@EnableConfigurationProperties
@EnableDiscoveryClient
public class TurbineConfiguration implements SmartLifecycle, Ordered {
public class TurbineLifecycle implements SmartLifecycle, Ordered {
@Autowired
private EurekaClient eurekaClient;
private final InstanceDiscovery instanceDiscovery;
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (HTTP)", TurbineConfiguration.class);
}
@Bean
public ServletRegistrationBean turbineStreamServlet() {
return new ServletRegistrationBean(new TurbineStreamServlet(), "/turbine.stream");
}
@Bean
public TurbineProperties turbineProperties() {
return new TurbineProperties();
}
private boolean running;
@Bean
public InstanceDiscovery instanceDiscovery() {
return new EurekaInstanceDiscovery(turbineProperties(), eurekaClient);
public TurbineLifecycle(InstanceDiscovery instanceDiscovery) {
this.instanceDiscovery = instanceDiscovery;
}
private boolean running;
@Override
public boolean isAutoStartup() {
return true;
......@@ -78,7 +49,7 @@ public class TurbineConfiguration implements SmartLifecycle, Ordered {
@Override
public void start() {
PluginsFactory.setClusterMonitorFactory(new SpringAggregatorFactory());
PluginsFactory.setInstanceDiscovery(instanceDiscovery());
PluginsFactory.setInstanceDiscovery(instanceDiscovery);
TurbineInit.init();
}
......
......@@ -16,9 +16,13 @@
package org.springframework.cloud.netflix.turbine;
import java.util.Arrays;
import java.util.List;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.util.StringUtils;
/**
* @author Spencer Gibb
......@@ -27,6 +31,18 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties("turbine")
public class TurbineProperties {
private String clusterNameExpression = "appName";
private String clusterNameExpression;
private String appConfig;
public List<String> getAppConfigList() {
if (!StringUtils.hasText(appConfig)) {
return null;
}
String[] parts = appConfig.trim().split(",");
if (parts != null && parts.length > 0) {
return Arrays.asList(parts);
}
return null;
}
}
......@@ -16,16 +16,15 @@
package org.springframework.cloud.netflix.turbine;
import static org.mockito.Mockito.*;
import com.netflix.discovery.EurekaClient;
import com.netflix.turbine.discovery.Instance;
import org.junit.Before;
import org.junit.Test;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.EurekaClient;
import com.netflix.turbine.discovery.Instance;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
/**
* @author Spencer Gibb
......@@ -65,7 +64,7 @@ public class EurekaInstanceDiscoveryTest {
.setHostName(hostName)
.setPort(port)
.build();
Instance instance = discovery.marshallInstanceInfo(instanceInfo);
Instance instance = discovery.marshall(instanceInfo);
assertEquals("port is wrong", String.valueOf(port), instance.getAttributes().get("port"));
String urlPath = SpringClusterMonitor.ClusterConfigBasedUrlClosure.getUrlPath(instance);
......@@ -86,7 +85,7 @@ public class EurekaInstanceDiscoveryTest {
.setSecurePort(securePort)
.enablePort(InstanceInfo.PortType.SECURE, true)
.build();
Instance instance = discovery.marshallInstanceInfo(instanceInfo);
Instance instance = discovery.marshall(instanceInfo);
assertEquals("port is wrong", String.valueOf(port), instance.getAttributes().get("port"));
assertEquals("securePort is wrong", String.valueOf(securePort), instance.getAttributes().get("securePort"));
......
/*
* Copyright 2013-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.SpringApplicationConfiguration;
import org.springframework.boot.test.WebIntegrationTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Spencer Gibb
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = TurbineHttpTests.TurbineHttpSampleApplication.class)
@WebIntegrationTest(randomPort = true)
public class TurbineHttpTests {
@EnableAutoConfiguration
@EnableTurbine
public static class TurbineHttpSampleApplication {
public static void main(String[] args) {
new SpringApplicationBuilder().sources(TurbineHttpSampleApplication.class).run(args);
}
}
@Test
public void contextLoads() {
}
}
......@@ -27,6 +27,10 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-turbine</artifactId>
</dependency>
<dependency>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment