Commit 802482ff by Spencer Gibb

Allow CommonsInstanceDiscovery to resolve multiple services on a single host.

fixes gh-897
parent 371edbf8
/* /*
* Copyright 2013-2015 the original author or authors. * Copyright 2013-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -19,6 +19,7 @@ package org.springframework.cloud.netflix.turbine; ...@@ -19,6 +19,7 @@ package org.springframework.cloud.netflix.turbine;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient; import org.springframework.cloud.client.discovery.DiscoveryClient;
...@@ -33,10 +34,10 @@ import lombok.extern.apachecommons.CommonsLog; ...@@ -33,10 +34,10 @@ import lombok.extern.apachecommons.CommonsLog;
/** /**
* Class that encapsulates an {@link InstanceDiscovery} * Class that encapsulates an {@link InstanceDiscovery}
* implementation that uses Eureka (see https://github.com/Netflix/eureka) The plugin * implementation that uses Spring Cloud Commons (see https://github.com/spring-cloud/spring-cloud-commons)
* requires a list of applications configured. It then queries the set of instances for * The plugin requires a list of applications configured. It then queries the set of
* each application. Instance information retrieved from Eureka must be translated to * instances for * each application. Instance information retrieved from the {@link DiscoveryClient}
* something that Turbine can understand i.e the * must be translated to * something that Turbine can understand i.e the
* {@link Instance} class. * {@link Instance} class.
* <p> * <p>
* All the logic to perform this translation can be overriden here, so that you can * All the logic to perform this translation can be overriden here, so that you can
...@@ -48,10 +49,14 @@ import lombok.extern.apachecommons.CommonsLog; ...@@ -48,10 +49,14 @@ import lombok.extern.apachecommons.CommonsLog;
public class CommonsInstanceDiscovery implements InstanceDiscovery { public class CommonsInstanceDiscovery implements InstanceDiscovery {
private static final String DEFAULT_CLUSTER_NAME_EXPRESSION = "serviceId"; private static final String DEFAULT_CLUSTER_NAME_EXPRESSION = "serviceId";
protected static final String PORT_KEY = "port";
protected static final String SECURE_PORT_KEY = "securePort";
protected static final String FUSED_HOST_PORT_KEY = "fusedHostPort";
private final Expression clusterNameExpression; private final Expression clusterNameExpression;
private DiscoveryClient discoveryClient; private DiscoveryClient discoveryClient;
private TurbineProperties turbineProperties; private TurbineProperties turbineProperties;
private final boolean combineHostPort;
public CommonsInstanceDiscovery(TurbineProperties turbineProperties, DiscoveryClient discoveryClient) { public CommonsInstanceDiscovery(TurbineProperties turbineProperties, DiscoveryClient discoveryClient) {
this(turbineProperties, DEFAULT_CLUSTER_NAME_EXPRESSION); this(turbineProperties, DEFAULT_CLUSTER_NAME_EXPRESSION);
...@@ -67,6 +72,7 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery { ...@@ -67,6 +72,7 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery {
clusterNameExpression = defaultExpression; clusterNameExpression = defaultExpression;
} }
this.clusterNameExpression = parser.parseExpression(clusterNameExpression); this.clusterNameExpression = parser.parseExpression(clusterNameExpression);
this.combineHostPort = turbineProperties.isCombineHostPort();
} }
protected Expression getClusterNameExpression() { protected Expression getClusterNameExpression() {
...@@ -77,8 +83,12 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery { ...@@ -77,8 +83,12 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery {
return turbineProperties; return turbineProperties;
} }
protected boolean isCombineHostPort() {
return combineHostPort;
}
/** /**
* Method that queries Eureka service for a list of configured application names * Method that queries DiscoveryClient for a list of configured application names
* @return Collection<Instance> * @return Collection<Instance>
*/ */
@Override @Override
...@@ -141,31 +151,23 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery { ...@@ -141,31 +151,23 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery {
/** /**
* Private helper that marshals the information from each instance into something that * Private helper that marshals the information from each instance into something that
* Turbine can understand. Override this method for your own implementation for * Turbine can understand. Override this method for your own implementation.
* parsing Eureka info.
* @param serviceInstance * @param serviceInstance
* @return Instance * @return Instance
*/ */
private Instance marshall(ServiceInstance serviceInstance) { Instance marshall(ServiceInstance serviceInstance) {
String hostname = serviceInstance.getHost(); String hostname = serviceInstance.getHost();
String port = String.valueOf(serviceInstance.getPort());
String cluster = getClusterName(serviceInstance); String cluster = getClusterName(serviceInstance);
Boolean status = Boolean.TRUE; //TODO: where to get? Boolean status = Boolean.TRUE; //TODO: where to get?
if (hostname != null && cluster != null && status != null) { if (hostname != null && cluster != null && status != null) {
Instance instance = new Instance(hostname, cluster, status); Instance instance = getInstance(hostname, port, cluster, status);
// TODO: reimplement when metadata is in commons Map<String, String> metadata = serviceInstance.getMetadata();
// add metadata
/*Map<String, String> metadata = instanceInfo.getMetadata();
if (metadata != null) {
instance.getAttributes().putAll(metadata);
}*/
// add ports
instance.getAttributes().put("port", String.valueOf(serviceInstance.getPort()));
boolean securePortEnabled = serviceInstance.isSecure(); boolean securePortEnabled = serviceInstance.isSecure();
if (securePortEnabled) {
instance.getAttributes().put("securePort", String.valueOf(serviceInstance.getPort())); addMetadata(instance, hostname, port, securePortEnabled, port, metadata);
}
return instance; return instance;
} }
else { else {
...@@ -173,9 +175,31 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery { ...@@ -173,9 +175,31 @@ public class CommonsInstanceDiscovery implements InstanceDiscovery {
} }
} }
protected void addMetadata(Instance instance, String hostname, String port, boolean securePortEnabled, String securePort, Map<String, String> metadata) {
// add metadata
if (metadata != null) {
instance.getAttributes().putAll(metadata);
}
// add ports
instance.getAttributes().put(PORT_KEY, port);
if (securePortEnabled) {
instance.getAttributes().put(SECURE_PORT_KEY, securePort);
}
if (this.isCombineHostPort()) {
String fusedHostPort = securePortEnabled ? hostname+":"+securePort : instance.getHostname() ;
instance.getAttributes().put(FUSED_HOST_PORT_KEY, fusedHostPort);
}
}
protected Instance getInstance(String hostname, String port, String cluster, Boolean status) {
String hostPart = this.isCombineHostPort() ? hostname+":"+port : hostname;
return new Instance(hostPart, cluster, status);
}
/** /**
* Helper that fetches the cluster name. Cluster is a Turbine concept and not a Eureka * Helper that fetches the cluster name. Cluster is a Turbine concept and not a commons
* concept. By default we choose the amazon asg name as the cluster. A custom * concept. By default we choose the amazon serviceId as the cluster. A custom
* implementation can be plugged in by overriding this method. * implementation can be plugged in by overriding this method.
*/ */
protected String getClusterName(Object object) { protected String getClusterName(Object object) {
......
/* /*
* Copyright 2013-2015 the original author or authors. * Copyright 2013-2016 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -47,14 +47,14 @@ import lombok.extern.apachecommons.CommonsLog; ...@@ -47,14 +47,14 @@ import lombok.extern.apachecommons.CommonsLog;
public class EurekaInstanceDiscovery extends CommonsInstanceDiscovery { public class EurekaInstanceDiscovery extends CommonsInstanceDiscovery {
private static final String EUREKA_DEFAULT_CLUSTER_NAME_EXPRESSION = "appName"; private static final String EUREKA_DEFAULT_CLUSTER_NAME_EXPRESSION = "appName";
private static final String ASG_KEY = "asg";
private final EurekaClient eurekaClient; private final EurekaClient eurekaClient;
private final boolean combineHostPort;
public EurekaInstanceDiscovery(TurbineProperties turbineProperties, EurekaClient eurekaClient) { public EurekaInstanceDiscovery(TurbineProperties turbineProperties, EurekaClient eurekaClient) {
super(turbineProperties, EUREKA_DEFAULT_CLUSTER_NAME_EXPRESSION); super(turbineProperties, EUREKA_DEFAULT_CLUSTER_NAME_EXPRESSION);
this.eurekaClient = eurekaClient; this.eurekaClient = eurekaClient;
this.combineHostPort = turbineProperties.isCombineHostPort();
} }
/** /**
...@@ -104,37 +104,26 @@ public class EurekaInstanceDiscovery extends CommonsInstanceDiscovery { ...@@ -104,37 +104,26 @@ public class EurekaInstanceDiscovery extends CommonsInstanceDiscovery {
String cluster = getClusterName(instanceInfo); String cluster = getClusterName(instanceInfo);
Boolean status = parseInstanceStatus(instanceInfo.getStatus()); Boolean status = parseInstanceStatus(instanceInfo.getStatus());
if (hostname != null && cluster != null && status != null) { if (hostname != null && cluster != null && status != null) {
String hostPart = combineHostPort ? hostname+":"+port : hostname; Instance instance = getInstance(hostname, port, cluster, status);
Instance instance = new Instance(hostPart, cluster, status);
// add metadata
Map<String, String> metadata = instanceInfo.getMetadata(); Map<String, String> metadata = instanceInfo.getMetadata();
if (metadata != null) { boolean securePortEnabled = instanceInfo.isPortEnabled(InstanceInfo.PortType.SECURE);
instance.getAttributes().putAll(metadata); String securePort = String.valueOf(instanceInfo.getSecurePort());
}
addMetadata(instance, hostname, port, securePortEnabled, securePort, metadata);
// add amazon metadata // add amazon metadata
String asgName = instanceInfo.getASGName(); String asgName = instanceInfo.getASGName();
if (asgName != null) { if (asgName != null) {
instance.getAttributes().put("asg", asgName); instance.getAttributes().put(ASG_KEY, asgName);
} }
DataCenterInfo dcInfo = instanceInfo.getDataCenterInfo(); DataCenterInfo dcInfo = instanceInfo.getDataCenterInfo();
if (dcInfo != null && dcInfo.getName().equals(DataCenterInfo.Name.Amazon)) { if (dcInfo != null && dcInfo.getName().equals(DataCenterInfo.Name.Amazon)) {
AmazonInfo amznInfo = (AmazonInfo) dcInfo; AmazonInfo amznInfo = (AmazonInfo) dcInfo;
instance.getAttributes().putAll(amznInfo.getMetadata()); instance.getAttributes().putAll(amznInfo.getMetadata());
} }
// add ports
instance.getAttributes().put("port", String.valueOf(instanceInfo.getPort()));
boolean securePortEnabled = instanceInfo.isPortEnabled(InstanceInfo.PortType.SECURE);
if (securePortEnabled) {
instance.getAttributes().put("securePort", String.valueOf(instanceInfo.getSecurePort()));
}
if (combineHostPort) {
String fusedHostPort = securePortEnabled ? hostname+":"+String.valueOf(instanceInfo.getSecurePort()) : hostPart ;
instance.getAttributes().put("fusedHostPort", fusedHostPort);
}
return instance; return instance;
} }
else { else {
......
/*
* Copyright 2013-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import com.netflix.turbine.discovery.Instance;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
/**
* @author Spencer Gibb
*/
public class CommonsInstanceDiscoveryTest {
private DiscoveryClient discoveryClient;
private TurbineProperties turbineProperties;
@Before
public void setUp() throws Exception {
this.discoveryClient = mock(DiscoveryClient.class);
this.turbineProperties = new TurbineProperties();
}
@Test
public void testSecureCombineHostPort() {
turbineProperties.setCombineHostPort(true);
CommonsInstanceDiscovery discovery = createDiscovery();
String appName = "testAppName";
int port = 8443;
String hostName = "myhost";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, hostName, port, true);
Instance instance = discovery.marshall(serviceInstance);
assertEquals("port is wrong", String.valueOf(port), instance.getAttributes().get("port"));
assertEquals("securePort is wrong", String.valueOf(port), instance.getAttributes().get("securePort"));
String urlPath = SpringClusterMonitor.ClusterConfigBasedUrlClosure.getUrlPath(instance);
assertEquals("url is wrong", "https://"+hostName+":"+port+"/hystrix.stream", urlPath);
}
@Test
public void testCombineHostPort() {
turbineProperties.setCombineHostPort(true);
CommonsInstanceDiscovery discovery = createDiscovery();
String appName = "testAppName";
int port = 8080;
String hostName = "myhost";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, hostName, port, false);
Instance instance = discovery.marshall(serviceInstance);
assertEquals("hostname is wrong", hostName+":"+port, instance.getHostname());
assertEquals("port is wrong", String.valueOf(port), instance.getAttributes().get("port"));
String urlPath = SpringClusterMonitor.ClusterConfigBasedUrlClosure.getUrlPath(instance);
assertEquals("url is wrong", "http://"+hostName+":"+port+"/hystrix.stream", urlPath);
String clusterName = discovery.getClusterName(serviceInstance);
assertEquals("clusterName is wrong", appName, clusterName);
}
@Test
public void testGetClusterName() {
CommonsInstanceDiscovery discovery = createDiscovery();
String appName = "testAppName";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, "myhost", 8080, false);
String clusterName = discovery.getClusterName(serviceInstance);
assertEquals("clusterName is wrong", appName, clusterName);
}
@Test
public void testGetPort() {
CommonsInstanceDiscovery discovery = createDiscovery();
String appName = "testAppName";
int port = 8080;
String hostName = "myhost";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, hostName, port, false);
Instance instance = discovery.marshall(serviceInstance);
assertEquals("port is wrong", String.valueOf(port), instance.getAttributes().get("port"));
String urlPath = SpringClusterMonitor.ClusterConfigBasedUrlClosure.getUrlPath(instance);
assertEquals("url is wrong", "http://"+hostName+":"+port+"/hystrix.stream", urlPath);
}
@Test
public void testGetSecurePort() {
CommonsInstanceDiscovery discovery = createDiscovery();
String appName = "testAppName";
//int port = 8080;
int port = 8443;
String hostName = "myhost";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, hostName, port, true);
Instance instance = discovery.marshall(serviceInstance);
assertEquals("port is wrong", String.valueOf(port), instance.getAttributes().get("port"));
assertEquals("securePort is wrong", String.valueOf(port), instance.getAttributes().get("securePort"));
String urlPath = SpringClusterMonitor.ClusterConfigBasedUrlClosure.getUrlPath(instance);
assertEquals("url is wrong", "https://"+hostName+":"+port+"/hystrix.stream", urlPath);
}
@Test
public void testGetClusterNameCustomExpression() {
turbineProperties.setClusterNameExpression("host");
CommonsInstanceDiscovery discovery = createDiscovery();
String appName = "testAppName";
String hostName = "myhost";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, hostName, 8080, true);
String clusterName = discovery.getClusterName(serviceInstance);
assertEquals("clusterName is wrong", hostName, clusterName);
}
@Test
public void testGetClusterNameInstanceMetadataMapExpression() {
turbineProperties.setClusterNameExpression("metadata['cluster']");
CommonsInstanceDiscovery discovery = createDiscovery();
String metadataProperty = "myCluster";
String appName = "testAppName";
String hostName = "myhost";
DefaultServiceInstance serviceInstance = new DefaultServiceInstance(appName, hostName, 8080, true, Collections.singletonMap("cluster", metadataProperty));
String clusterName = discovery.getClusterName(serviceInstance);
assertEquals("clusterName is wrong", metadataProperty, clusterName);
}
private CommonsInstanceDiscovery createDiscovery() {
return new CommonsInstanceDiscovery(turbineProperties, discoveryClient);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment