Commit 4d9281c2 by Spencer Gibb

reset zuul routes when eureka sends catalog delta.

fixes gh-76
parent 7423decc
...@@ -16,20 +16,24 @@ ...@@ -16,20 +16,24 @@
package org.springframework.cloud.netflix.eureka; package org.springframework.cloud.netflix.eureka;
import com.netflix.appinfo.DataCenterInfo; import com.netflix.appinfo.DataCenterInfo;
import com.netflix.appinfo.UniqueIdentifier;
import com.netflix.appinfo.DataCenterInfo.Name; import com.netflix.appinfo.DataCenterInfo.Name;
import com.netflix.appinfo.InstanceInfo; import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.UniqueIdentifier;
import com.netflix.discovery.converters.Converters.ApplicationsConverter;
import com.netflix.discovery.converters.Converters.InstanceInfoConverter; import com.netflix.discovery.converters.Converters.InstanceInfoConverter;
import com.netflix.discovery.shared.Applications;
import com.thoughtworks.xstream.MarshallingStrategy; import com.thoughtworks.xstream.MarshallingStrategy;
import com.thoughtworks.xstream.converters.Converter; import com.thoughtworks.xstream.converters.*;
import com.thoughtworks.xstream.converters.ConverterLookup;
import com.thoughtworks.xstream.converters.DataHolder;
import com.thoughtworks.xstream.converters.MarshallingContext;
import com.thoughtworks.xstream.converters.UnmarshallingContext;
import com.thoughtworks.xstream.core.TreeMarshallingStrategy; import com.thoughtworks.xstream.core.TreeMarshallingStrategy;
import com.thoughtworks.xstream.io.HierarchicalStreamReader; import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter; import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.mapper.Mapper; import com.thoughtworks.xstream.mapper.Mapper;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.cloud.client.discovery.DiscoveryHeartbeatEvent;
import org.springframework.context.ApplicationContext;
/** /**
* A special purpose wrapper for an XStream TreeMarshallingStrategy that is aware of the * A special purpose wrapper for an XStream TreeMarshallingStrategy that is aware of the
...@@ -47,18 +51,23 @@ import com.thoughtworks.xstream.mapper.Mapper; ...@@ -47,18 +51,23 @@ import com.thoughtworks.xstream.mapper.Mapper;
public class DataCenterAwareMarshallingStrategy implements MarshallingStrategy { public class DataCenterAwareMarshallingStrategy implements MarshallingStrategy {
private TreeMarshallingStrategy delegate = new TreeMarshallingStrategy(); private TreeMarshallingStrategy delegate = new TreeMarshallingStrategy();
private ApplicationContext context;
public DataCenterAwareMarshallingStrategy(ApplicationContext context) {
this.context = context;
}
@Override @Override
public Object unmarshal(Object root, HierarchicalStreamReader reader, public Object unmarshal(Object root, HierarchicalStreamReader reader,
DataHolder dataHolder, ConverterLookup converterLookup, Mapper mapper) { DataHolder dataHolder, ConverterLookup converterLookup, Mapper mapper) {
ConverterLookup wrapped = new DataCenterAwareConverterLookup(converterLookup); ConverterLookup wrapped = new DataCenterAwareConverterLookup(converterLookup, context);
return delegate.unmarshal(root, reader, dataHolder, wrapped, mapper); return delegate.unmarshal(root, reader, dataHolder, wrapped, mapper);
} }
@Override @Override
public void marshal(HierarchicalStreamWriter writer, Object obj, public void marshal(HierarchicalStreamWriter writer, Object obj,
ConverterLookup converterLookup, Mapper mapper, DataHolder dataHolder) { ConverterLookup converterLookup, Mapper mapper, DataHolder dataHolder) {
ConverterLookup wrapped = new DataCenterAwareConverterLookup(converterLookup); ConverterLookup wrapped = new DataCenterAwareConverterLookup(converterLookup, context);
delegate.marshal(writer, obj, wrapped, mapper, dataHolder); delegate.marshal(writer, obj, wrapped, mapper, dataHolder);
} }
...@@ -86,9 +95,11 @@ public class DataCenterAwareMarshallingStrategy implements MarshallingStrategy { ...@@ -86,9 +95,11 @@ public class DataCenterAwareMarshallingStrategy implements MarshallingStrategy {
private static class DataCenterAwareConverterLookup implements ConverterLookup { private static class DataCenterAwareConverterLookup implements ConverterLookup {
private ConverterLookup delegate; private ConverterLookup delegate;
private ApplicationContext context;
public DataCenterAwareConverterLookup(ConverterLookup delegate) { public DataCenterAwareConverterLookup(ConverterLookup delegate, ApplicationContext context) {
this.delegate = delegate; this.delegate = delegate;
this.context = context;
} }
@Override @Override
...@@ -96,12 +107,52 @@ public class DataCenterAwareMarshallingStrategy implements MarshallingStrategy { ...@@ -96,12 +107,52 @@ public class DataCenterAwareMarshallingStrategy implements MarshallingStrategy {
Converter converter = delegate.lookupConverterForType(type); Converter converter = delegate.lookupConverterForType(type);
if (InstanceInfo.class == type) { if (InstanceInfo.class == type) {
return new DataCenterAwareConverter(); return new DataCenterAwareConverter();
} else if (Applications.class == type) {
return new PublishingApplicationsConverter(context);
} }
return converter; return converter;
} }
} }
private static class PublishingApplicationsConverter extends ApplicationsConverter {
private ApplicationContext context;
public PublishingApplicationsConverter(ApplicationContext context) {
this.context = context;
}
@Override
public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext unmarshallingContext) {
Object obj = super.unmarshal(reader, unmarshallingContext);
ProxyFactory factory = new ProxyFactory(obj);
factory.addAdvice(new SetVersionInterceptor(this.context));
return factory.getProxy();
}
}
@Slf4j
private static class SetVersionInterceptor implements MethodInterceptor {
private ApplicationContext context;
public SetVersionInterceptor(ApplicationContext context) {
this.context = context;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Object ret = invocation.proceed();
if ("setVersion".equals(invocation.getMethod().getName())) {
Long version = Long.class.cast(invocation.getArguments()[0]);
log.debug("Applications.setVersion() called with version: " + version);
context.publishEvent(new DiscoveryHeartbeatEvent(invocation.getThis(), version));
}
return ret;
}
}
private static class DataCenterAwareConverter extends InstanceInfoConverter { private static class DataCenterAwareConverter extends InstanceInfoConverter {
@Override @Override
......
...@@ -17,10 +17,12 @@ package org.springframework.cloud.netflix.eureka; ...@@ -17,10 +17,12 @@ package org.springframework.cloud.netflix.eureka;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -39,12 +41,15 @@ import com.netflix.discovery.converters.XmlXStream; ...@@ -39,12 +41,15 @@ import com.netflix.discovery.converters.XmlXStream;
@ConditionalOnExpression("${eureka.client.enabled:true}") @ConditionalOnExpression("${eureka.client.enabled:true}")
public class EurekaClientAutoConfiguration { public class EurekaClientAutoConfiguration {
@Autowired
ApplicationContext context;
@PostConstruct @PostConstruct
public void init() { public void init() {
XmlXStream.getInstance().setMarshallingStrategy( XmlXStream.getInstance().setMarshallingStrategy(
new DataCenterAwareMarshallingStrategy()); new DataCenterAwareMarshallingStrategy(context));
JsonXStream.getInstance().setMarshallingStrategy( JsonXStream.getInstance().setMarshallingStrategy(
new DataCenterAwareMarshallingStrategy()); new DataCenterAwareMarshallingStrategy(context));
} }
@Bean @Bean
......
package org.springframework.cloud.netflix.eureka; package org.springframework.cloud.netflix.eureka;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.netflix.appinfo.InstanceInfo; import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.shared.Application; import com.netflix.discovery.shared.Application;
...@@ -95,13 +96,16 @@ public class EurekaDiscoveryClient implements DiscoveryClient { ...@@ -95,13 +96,16 @@ public class EurekaDiscoveryClient implements DiscoveryClient {
if (applications == null) { if (applications == null) {
return Collections.emptyList(); return Collections.emptyList();
} }
return Lists.newArrayList(transform(applications.getRegisteredApplications(), new Function<Application, String>() { return Lists.newArrayList(filter(transform(applications.getRegisteredApplications(), new Function<Application, String>() {
@Nullable @Nullable
@Override @Override
public String apply(@Nullable Application app) { public String apply(@Nullable Application app) {
if (app.getInstances().isEmpty()) {
return null;
}
return app.getName().toLowerCase(); return app.getName().toLowerCase();
} }
})); }), Predicates.notNull()));
} }
@Override @Override
......
package org.springframework.cloud.netflix.zuul; package org.springframework.cloud.netflix.zuul;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.discovery.DiscoveryHeartbeatEvent;
import org.springframework.cloud.client.discovery.InstanceRegisteredEvent; import org.springframework.cloud.client.discovery.InstanceRegisteredEvent;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent; import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
...@@ -12,6 +13,7 @@ import org.springframework.web.servlet.handler.AbstractUrlHandlerMapping; ...@@ -12,6 +13,7 @@ import org.springframework.web.servlet.handler.AbstractUrlHandlerMapping;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/** /**
* MVC HandlerMapping that maps incoming request paths to remote services. * MVC HandlerMapping that maps incoming request paths to remote services.
...@@ -27,6 +29,8 @@ public class ZuulHandlerMapping extends AbstractUrlHandlerMapping implements ...@@ -27,6 +29,8 @@ public class ZuulHandlerMapping extends AbstractUrlHandlerMapping implements
private ZuulController zuul; private ZuulController zuul;
private AtomicReference<Object> latestHeartbeat = new AtomicReference<>();
@Autowired @Autowired
public ZuulHandlerMapping(ProxyRouteLocator routeLocator, ZuulController zuul) { public ZuulHandlerMapping(ProxyRouteLocator routeLocator, ZuulController zuul) {
this.routeLocator = routeLocator; this.routeLocator = routeLocator;
...@@ -39,6 +43,12 @@ public class ZuulHandlerMapping extends AbstractUrlHandlerMapping implements ...@@ -39,6 +43,12 @@ public class ZuulHandlerMapping extends AbstractUrlHandlerMapping implements
if (event instanceof InstanceRegisteredEvent if (event instanceof InstanceRegisteredEvent
|| event instanceof RefreshScopeRefreshedEvent) { || event instanceof RefreshScopeRefreshedEvent) {
reset(); reset();
} else if (event instanceof DiscoveryHeartbeatEvent) {
DiscoveryHeartbeatEvent e = (DiscoveryHeartbeatEvent) event;
if (latestHeartbeat.get() == null || !latestHeartbeat.get().equals(e.getValue())) {
latestHeartbeat.set(e.getValue());
reset();
}
} }
} }
......
...@@ -120,9 +120,9 @@ public class EurekaServerInitializerConfiguration implements ServletContextAware ...@@ -120,9 +120,9 @@ public class EurekaServerInitializerConfiguration implements ServletContextAware
EurekaServerConfigurationManager.getInstance() EurekaServerConfigurationManager.getInstance()
.setConfiguration(eurekaServerConfig); .setConfiguration(eurekaServerConfig);
XmlXStream.getInstance().setMarshallingStrategy( XmlXStream.getInstance().setMarshallingStrategy(
new DataCenterAwareMarshallingStrategy()); new DataCenterAwareMarshallingStrategy(applicationContext));
JsonXStream.getInstance().setMarshallingStrategy( JsonXStream.getInstance().setMarshallingStrategy(
new DataCenterAwareMarshallingStrategy()); new DataCenterAwareMarshallingStrategy(applicationContext));
// PeerAwareInstanceRegistry.getInstance(); // PeerAwareInstanceRegistry.getInstance();
applicationContext applicationContext
.publishEvent(new EurekaRegistryAvailableEvent( .publishEvent(new EurekaRegistryAvailableEvent(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment