Commit 191bad05 by Dave Syer

Ensure Eureka server gets iniialized with non-zero count

The problem is pretty deep here so I hope I did something sane. Basically if PeerAwareInstanceRegistry.openForTraffic(int) ever gets called with a 0 argument it resets the threshold for numberOfRenewsPerMinThreshold to 0 from which it only ever recovers if the same method is called again with a non-zero argument. For a standalone server this means that it will never automatically expire any leases (only when it chats to its neighbours and finds out they have registered services will it ever switch the threshold back on). Another problem (unsolved) is that the reset mechanism will kick back in occasionally (resetting to 1 with this change, but that's probably also a bad thing). And when there *are* actually peers to talk to unless they are in AWS they will never communicate correctly about the actual number of local services (the criteria for that are deeply connected with AWS and EIP). Fixes gh-6
parent 91c9ab1d
...@@ -21,6 +21,10 @@ import java.lang.reflect.Modifier; ...@@ -21,6 +21,10 @@ import java.lang.reflect.Modifier;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextEvent;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
...@@ -31,10 +35,11 @@ import org.springframework.cloud.netflix.eureka.event.EurekaRegistryAvailableEve ...@@ -31,10 +35,11 @@ import org.springframework.cloud.netflix.eureka.event.EurekaRegistryAvailableEve
import org.springframework.cloud.netflix.eureka.event.EurekaServerStartedEvent; import org.springframework.cloud.netflix.eureka.event.EurekaServerStartedEvent;
import org.springframework.cloud.netflix.eureka.event.LeaseManagerMessageBroker; import org.springframework.cloud.netflix.eureka.event.LeaseManagerMessageBroker;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationEvent;
import org.springframework.context.SmartLifecycle; import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.Ordered; import org.springframework.core.Ordered;
import org.springframework.util.ReflectionUtils; import org.springframework.util.ReflectionUtils;
import org.springframework.web.context.ServletContextAware; import org.springframework.web.context.ServletContextAware;
...@@ -55,6 +60,9 @@ import com.netflix.eureka.lease.LeaseManager; ...@@ -55,6 +60,9 @@ import com.netflix.eureka.lease.LeaseManager;
public class EurekaServerInitializerConfiguration implements ServletContextAware, public class EurekaServerInitializerConfiguration implements ServletContextAware,
SmartLifecycle, Ordered { SmartLifecycle, Ordered {
private static Log logger = LogFactory
.getLog(EurekaServerInitializerConfiguration.class);
@Autowired @Autowired
private EurekaServerConfig eurekaServerConfig; private EurekaServerConfig eurekaServerConfig;
...@@ -77,18 +85,27 @@ public class EurekaServerInitializerConfiguration implements ServletContextAware ...@@ -77,18 +85,27 @@ public class EurekaServerInitializerConfiguration implements ServletContextAware
new Thread(new Runnable() { new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
try {
new EurekaBootStrap() { new EurekaBootStrap() {
@Override @Override
protected void initEurekaEnvironment() { protected void initEurekaEnvironment() {
LoggingConfiguration.getInstance().configure(); LoggingConfiguration.getInstance().configure();
EurekaServerConfigurationManager.getInstance() EurekaServerConfigurationManager.getInstance()
.setConfiguration(eurekaServerConfig); .setConfiguration(eurekaServerConfig);
//PeerAwareInstanceRegistry.getInstance(); // PeerAwareInstanceRegistry.getInstance();
applicationContext.publishEvent(new EurekaRegistryAvailableEvent(eurekaServerConfig)); applicationContext
.publishEvent(new EurekaRegistryAvailableEvent(
eurekaServerConfig));
} }
}.contextInitialized(new ServletContextEvent(servletContext)); }.contextInitialized(new ServletContextEvent(servletContext));
running = true; running = true;
applicationContext.publishEvent(new EurekaServerStartedEvent(eurekaServerConfig)); applicationContext.publishEvent(new EurekaServerStartedEvent(
eurekaServerConfig));
}
catch (Exception e) {
// Help!
logger.error("Could not initialize Eureka servlet context", e);
}
} }
}).start(); }).start();
} }
...@@ -125,39 +142,128 @@ public class EurekaServerInitializerConfiguration implements ServletContextAware ...@@ -125,39 +142,128 @@ public class EurekaServerInitializerConfiguration implements ServletContextAware
@Configuration @Configuration
@ConditionalOnClass(PeerAwareInstanceRegistry.class) @ConditionalOnClass(PeerAwareInstanceRegistry.class)
protected static class Initializer implements protected static class RegistryInstanceProxyInitializer implements
ApplicationListener<EurekaRegistryAvailableEvent> { SmartApplicationListener {
@Autowired @Autowired
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
private PeerAwareInstanceRegistry instance;
@Bean @Bean
public LeaseManagerMessageBroker leaseManagerMessageBroker() { public LeaseManagerMessageBroker leaseManagerMessageBroker() {
return new LeaseManagerMessageBroker(); return new LeaseManagerMessageBroker();
} }
@Override @Override
public void onApplicationEvent(EurekaRegistryAvailableEvent event) { public void onApplicationEvent(ApplicationEvent event) {
//wrap the instance registry... if (instance == null) {
ProxyFactory factory = new ProxyFactory(PeerAwareInstanceRegistry.getInstance()); instance = PeerAwareInstanceRegistry.getInstance();
//...with the LeaseManagerMessageBroker // Our instance is the un-proxied version so we can hack it
factory.addAdvice(new PiggybackMethodInterceptor(leaseManagerMessageBroker(), LeaseManager.class)); expectRegistrations(1);
factory.setProxyTargetClass(true); }
if (event instanceof EurekaServerStartedEvent) {
// Do it again in case this message came in late
expectRegistrations(1);
}
else {
replaceInstance(getProxyForInstance());
}
}
//Now replace the PeerAwareInstanceRegistry with our wrapped version private void replaceInstance(Object proxy) {
Field field = ReflectionUtils.findField(PeerAwareInstanceRegistry.class, "instance"); Field field = ReflectionUtils.findField(PeerAwareInstanceRegistry.class,
"instance");
try { try {
// Awful ugly hack to work around lack of DI in eureka // Awful ugly hack to work around lack of DI in eureka
field.setAccessible(true); field.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers"); Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true); modifiersField.setAccessible(true);
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
ReflectionUtils.setField(field, null, factory.getProxy()); ReflectionUtils.setField(field, null, proxy);
} }
catch (Exception e) { catch (Exception e) {
throw new IllegalStateException("Cannot modify instance registry", e); throw new IllegalStateException("Cannot modify instance registry", e);
} }
} }
private Object getProxyForInstance() {
// Wrap the instance registry...
ProxyFactory factory = new ProxyFactory(instance);
// ...with the LeaseManagerMessageBroker
factory.addAdvice(new PiggybackMethodInterceptor(leaseManagerMessageBroker(),
LeaseManager.class));
factory.addAdvice(new TrafficOpener());
factory.setProxyTargetClass(true);
return factory.getProxy();
}
private void expectRegistrations(int count) {
/*
* Setting expectedNumberOfRenewsPerMin to non-zero to ensure that even an
* isolated server can adjust its eviction policy to the number of
* registrations (when it's zero, even a successful registration won't reset
* the rate threshold in InstanceRegistry.register()).
*/
Field field = ReflectionUtils.findField(PeerAwareInstanceRegistry.class,
"expectedNumberOfRenewsPerMin");
try {
// Awful ugly hack to work around lack of DI in eureka
field.setAccessible(true);
int value = (int) ReflectionUtils.getField(field, instance);
if (value == 0 && count > 0) {
ReflectionUtils.setField(field, instance, count);
}
}
catch (Exception e) {
throw new IllegalStateException(
"Cannot modify instance registry expected renews", e);
}
}
@Override
public int getOrder() {
return 0;
}
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType.isAssignableFrom(EurekaServerStartedEvent.class)
|| eventType.isAssignableFrom(EurekaRegistryAvailableEvent.class);
}
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return true;
}
}
/**
* Additional aspect for intercepting method invocations on PeerAwareInstanceRegistry.
* If {@link PeerAwareInstanceRegistry#openForTraffic(int)} is called with a zero
* argument, it means that leases are not automatically cancelled if the instance
* hasn't sent any renewals recently. This happens for a standalone server. It seems
* like a bad default, so we set it to the smallest non-zero value we can, so that any
* instances that subsequently register can bump up the threshold.
*
* @author Dave Syer
*
*/
protected static class TrafficOpener implements MethodInterceptor {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
if ("openForTraffic".equals(invocation.getMethod().getName())) {
int count = (int) invocation.getArguments()[0];
if (count == 0) {
ReflectionUtils.invokeMethod(invocation.getMethod(),
invocation.getThis(), 1);
return null;
}
}
return invocation.proceed();
}
} }
} }
...@@ -22,7 +22,7 @@ import org.springframework.test.context.web.WebAppConfiguration; ...@@ -22,7 +22,7 @@ import org.springframework.test.context.web.WebAppConfiguration;
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = Application.class) @SpringApplicationConfiguration(classes = Application.class)
@WebAppConfiguration @WebAppConfiguration
@IntegrationTest({ "server.port=0", "spring.application.name=eureka", "management.contextPath=/admin" }) @IntegrationTest({ "server.port=0", "spring.application.name=eureka", "management.contextPath=/admin", "spring.jmx.enabled=true" })
public class ApplicationTests { public class ApplicationTests {
@Value("${local.server.port}") @Value("${local.server.port}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment