Commit ffc43175 by Nastya Smirnova Committed by Spencer Gibb

Create TurbineClustersProvider

Extract separate class TurbineClustersProvider to be able to provide custom solution for retrieving configured clusters (#2219)
parent 28de82e6
......@@ -688,6 +688,9 @@ turbine:
appConfig: customers
----
If you need to customize which cluster names should be used by Turbine (you don't want to store cluster names in
`turbine.aggregator.clusterConfig` configuration) provide a bean of type `TurbineClustersProvider`.
The `clusterName` can be customized by a SPEL expression in `turbine.clusterNameExpression` with root an instance of `InstanceInfo`. The default value is `appName`, which means that the Eureka serviceId ends up as the cluster key (i.e. the `InstanceInfo` for customers has an `appName` of "CUSTOMERS"). A different example would be `turbine.clusterNameExpression=aSGName`, which would get the cluster name from the AWS ASG name. Another example:
----
......
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.List;
/**
* Provides clusters names for Turbine based on configuration value.
*
* @author Anastasiia Smirnova
*/
public class ConfigurationBasedTurbineClustersProvider implements TurbineClustersProvider {
private static final Log log = LogFactory.getLog(ConfigurationBasedTurbineClustersProvider.class);
private final TurbineAggregatorProperties properties;
public ConfigurationBasedTurbineClustersProvider(TurbineAggregatorProperties turbineAggregatorProperties) {
this.properties = turbineAggregatorProperties;
}
@Override
public List<String> getClusterNames() {
List<String> clusterNames = properties.getClusterConfig();
log.trace("Using clusters names: " + clusterNames);
return clusterNames;
}
}
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
/**
* Provides clusters names for Turbine based on applications names registered in Eureka.
*
* @author Anastasiia Smirnova
*/
public class EurekaBasedTurbineClustersProvider implements TurbineClustersProvider {
private static final Log log = LogFactory.getLog(EurekaBasedTurbineClustersProvider.class);
private final EurekaClient eurekaClient;
public EurekaBasedTurbineClustersProvider(EurekaClient eurekaClient) {
this.eurekaClient = eurekaClient;
}
@Override
public List<String> getClusterNames() {
Applications applications = eurekaClient.getApplications();
List<Application> registeredApplications = applications.getRegisteredApplications();
List<String> appNames = new ArrayList<>(registeredApplications.size());
for (Application application : registeredApplications) {
appNames.add(application.getName());
}
log.trace("Using clusters names: " + appNames);
return appNames;
}
}
......@@ -16,12 +16,8 @@
package org.springframework.cloud.netflix.turbine;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.turbine.data.AggDataFromCluster;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.PerformanceCriteria;
......@@ -43,8 +39,11 @@ public class SpringAggregatorFactory implements ClusterMonitorFactory<AggDataFro
private static final Log log = LogFactory.getLog(SpringAggregatorFactory.class);
private static final DynamicStringProperty aggClusters = DynamicPropertyFactory
.getInstance().getStringProperty("turbine.aggregator.clusterConfig", null);
private final TurbineClustersProvider clustersProvider;
public SpringAggregatorFactory(TurbineClustersProvider clustersProvider) {
this.clustersProvider = clustersProvider;
}
/**
* @return {@link com.netflix.turbine.monitor.cluster.ClusterMonitor}<
......@@ -73,7 +72,7 @@ public class SpringAggregatorFactory implements ClusterMonitorFactory<AggDataFro
@Override
public void initClusterMonitors() {
for (String clusterName : getClusterNames()) {
for (String clusterName : clustersProvider.getClusterNames()) {
ClusterMonitor<AggDataFromCluster> clusterMonitor = (ClusterMonitor<AggDataFromCluster>) findOrRegisterAggregateMonitor(clusterName);
clusterMonitor.registerListenertoClusterMonitor(this.StaticListener);
try {
......@@ -87,27 +86,12 @@ public class SpringAggregatorFactory implements ClusterMonitorFactory<AggDataFro
}
}
private List<String> getClusterNames() {
List<String> clusters = new ArrayList<String>();
String clusterNames = aggClusters.get();
if (clusterNames == null || clusterNames.trim().length() == 0) {
clusters.add("default");
}
else {
String[] parts = aggClusters.get().split(",");
for (String s : parts) {
clusters.add(s);
}
}
return clusters;
}
/**
* shutdown all configured cluster monitors
*/
@Override
public void shutdownClusterMonitors() {
for (String clusterName : getClusterNames()) {
for (String clusterName : clustersProvider.getClusterNames()) {
ClusterMonitor<AggDataFromCluster> clusterMonitor = (ClusterMonitor<AggDataFromCluster>) AggregateClusterMonitor
.findOrRegisterAggregateMonitor(clusterName);
clusterMonitor.stopMonitor();
......
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import org.springframework.boot.context.properties.ConfigurationProperties;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* @author Anastasiia Smirnova
*/
@ConfigurationProperties("turbine.aggregator")
public class TurbineAggregatorProperties {
private static final String DEFAULT = "default";
/**
* The list of cluster names.
*/
private List<String> clusterConfig = Collections.singletonList(DEFAULT);
public List<String> getClusterConfig() {
return clusterConfig;
}
public void setClusterConfig(List<String> clusterConfig) {
this.clusterConfig = clusterConfig;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TurbineAggregatorProperties that = (TurbineAggregatorProperties) o;
return Objects.equals(clusterConfig, that.clusterConfig);
}
@Override
public int hashCode() {
return Objects.hash(clusterConfig);
}
@Override
public String toString() {
return "TurbineAggregatorProperties{" + "clusterConfig='" + clusterConfig + '\''
+ '}';
}
}
/*
* Copyright 2013-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.netflix.turbine;
import java.util.List;
/**
* Interface that gives possibility to customize which clusters names Turbine will use.
*
* @author Anastasiia Smirnova
*/
public interface TurbineClustersProvider {
List<String> getClusterNames();
}
......@@ -16,6 +16,7 @@
package org.springframework.cloud.netflix.turbine;
import com.netflix.turbine.monitor.cluster.ClusterMonitorFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
......@@ -43,18 +44,40 @@ public class TurbineHttpConfiguration {
}
@Bean
@ConditionalOnMissingBean(name = "turbineStreamServlet")
public ServletRegistrationBean turbineStreamServlet() {
return new ServletRegistrationBean(new TurbineStreamServlet(), "/turbine.stream");
}
@Bean
@ConditionalOnMissingBean
public TurbineProperties turbineProperties() {
return new TurbineProperties();
}
@Bean
public TurbineLifecycle turbineLifecycle(InstanceDiscovery instanceDiscovery) {
return new TurbineLifecycle(instanceDiscovery);
@ConditionalOnMissingBean
public TurbineAggregatorProperties turbineAggregatorProperties() {
return new TurbineAggregatorProperties();
}
@Bean
@ConditionalOnMissingBean
public TurbineLifecycle turbineLifecycle(InstanceDiscovery instanceDiscovery,
ClusterMonitorFactory<?> factory) {
return new TurbineLifecycle(instanceDiscovery, factory);
}
@Bean
@ConditionalOnMissingBean
public ClusterMonitorFactory clusterMonitorFactory(TurbineClustersProvider clustersProvider) {
return new SpringAggregatorFactory(clustersProvider);
}
@Bean
@ConditionalOnMissingBean
public TurbineClustersProvider clustersProvider(TurbineAggregatorProperties turbineAggregatorProperties) {
return new ConfigurationBasedTurbineClustersProvider(turbineAggregatorProperties);
}
@Configuration
......
......@@ -16,6 +16,7 @@
package org.springframework.cloud.netflix.turbine;
import com.netflix.turbine.monitor.cluster.ClusterMonitorFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.Ordered;
......@@ -29,11 +30,13 @@ import com.netflix.turbine.plugins.PluginsFactory;
public class TurbineLifecycle implements SmartLifecycle, Ordered {
private final InstanceDiscovery instanceDiscovery;
private final ClusterMonitorFactory<?> factory;
private boolean running;
private volatile boolean running;
public TurbineLifecycle(InstanceDiscovery instanceDiscovery) {
public TurbineLifecycle(InstanceDiscovery instanceDiscovery, ClusterMonitorFactory<?> factory) {
this.instanceDiscovery = instanceDiscovery;
this.factory = factory;
}
@Override
......@@ -48,7 +51,7 @@ public class TurbineLifecycle implements SmartLifecycle, Ordered {
@Override
public void start() {
PluginsFactory.setClusterMonitorFactory(new SpringAggregatorFactory());
PluginsFactory.setClusterMonitorFactory(factory);
PluginsFactory.setInstanceDiscovery(instanceDiscovery);
TurbineInit.init();
}
......
package org.springframework.cloud.netflix.turbine;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class ConfigurationBasedTurbineClustersProviderTest {
@Test
public void shouldReturnDefaultClusterIfConfigurationIsEmpty() throws Exception {
TurbineAggregatorProperties properties = new TurbineAggregatorProperties();
TurbineClustersProvider provider = new ConfigurationBasedTurbineClustersProvider(
properties);
List<String> clusterNames = provider.getClusterNames();
assertThat(clusterNames).containsOnly("default");
}
@Test
public void shouldReturnConfiguredClusters() throws Exception {
TurbineAggregatorProperties properties = new TurbineAggregatorProperties();
properties.setClusterConfig(Arrays.asList("cluster1", "cluster2", "cluster3"));
TurbineClustersProvider provider = new ConfigurationBasedTurbineClustersProvider(
properties);
List<String> clusterNames = provider.getClusterNames();
assertThat(clusterNames).containsOnly("cluster1", "cluster2", "cluster3");
}
}
\ No newline at end of file
package org.springframework.cloud.netflix.turbine;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import org.junit.Test;
import java.util.List;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class EurekaBasedTurbineClustersProviderTest {
EurekaClient eurekaClient = mock(EurekaClient.class);
TurbineClustersProvider provider = new EurekaBasedTurbineClustersProvider(eurekaClient);
@Test
public void shouldProvideAllClustersNames() throws Exception {
Applications applications = registeredApplications(asList(application("service1"),
application("service2"), application("service3")));
when(eurekaClient.getApplications()).thenReturn(applications);
List<String> clusterNames = provider.getClusterNames();
assertThat(clusterNames).containsOnly("service1", "service2", "service3");
}
private Applications registeredApplications(List<Application> registered) {
Applications applications = mock(Applications.class);
when(applications.getRegisteredApplications()).thenReturn(registered);
return applications;
}
private Application application(String name) {
Application application = mock(Application.class);
when(application.getName()).thenReturn(name);
return application;
}
}
\ No newline at end of file
package org.springframework.cloud.netflix.turbine;
import org.junit.After;
import org.junit.Test;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
public class TurbineAggregatorPropertiesTest {
private AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
@After
public void clear() {
if (this.context != null) {
this.context.close();
}
}
@Test
public void shouldHaveDefaultConfiguration() throws Exception {
setupContext();
TurbineAggregatorProperties actual = getProperties();
assertThat(actual.getClusterConfig()).containsOnly("default");
}
@Test
public void shouldLoadCustomProperties() {
addEnvironment(this.context,
"turbine.aggregator.clusterConfig=cluster1, cluster2, cluster3");
setupContext();
TurbineAggregatorProperties actual = getProperties();
assertThat(actual.getClusterConfig()).containsOnly("cluster1", "cluster2",
"cluster3");
}
private void setupContext() {
this.context.register(TestConfiguration.class);
this.context.refresh();
}
private TurbineAggregatorProperties getProperties() {
return this.context.getBean(TurbineAggregatorProperties.class);
}
@Configuration
@EnableConfigurationProperties(TurbineAggregatorProperties.class)
static class TestConfiguration {
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment