Commit 47d1940b by Jason Song

multiple namespaces reuse the same long poll connection

parent 90d635ec
......@@ -5,14 +5,19 @@ import org.springframework.data.repository.PagingAndSortingRepository;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import java.util.List;
import java.util.Set;
public interface AppNamespaceRepository extends PagingAndSortingRepository<AppNamespace, Long>{
AppNamespace findByAppIdAndName(String appId, String namespaceName);
List<AppNamespace> findByAppIdAndNameIn(String appId, Set<String> namespaceNames);
AppNamespace findByNameAndIsPublicTrue(String namespaceName);
List<AppNamespace> findByNameInAndIsPublicTrue(Set<String> namespaceNames);
List<AppNamespace> findByAppIdAndIsPublic(String appId, boolean isPublic);
}
......@@ -2,7 +2,9 @@ package com.ctrip.framework.apollo.biz.repository;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.query.Param;
import java.util.Collection;
import java.util.List;
......@@ -16,4 +18,7 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
ReleaseMessage findTopByOrderByIdDesc();
ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);
@Query("select message, max(id) as id from ReleaseMessage where message in :messages group by message")
List<Object[]> findLatestReleaseMessagesGroupByMessages(@Param("messages") Collection<String> messages);
}
package com.ctrip.framework.apollo.biz.service;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.List;
import java.util.Objects;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.ctrip.framework.apollo.biz.entity.Audit;
import com.ctrip.framework.apollo.biz.entity.Cluster;
import com.ctrip.framework.apollo.biz.entity.Namespace;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.biz.entity.Audit;
import com.ctrip.framework.apollo.biz.repository.AppNamespaceRepository;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.common.utils.BeanUtils;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.core.exception.ServiceException;
import com.ctrip.framework.apollo.core.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@Service
public class AppNamespaceService {
......@@ -45,6 +48,14 @@ public class AppNamespaceService {
return appNamespaceRepository.findByNameAndIsPublicTrue(namespaceName);
}
public List<AppNamespace> findPublicNamespacesByNames(Set<String> namespaceNames) {
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.EMPTY_LIST;
}
return appNamespaceRepository.findByNameInAndIsPublicTrue(namespaceNames);
}
public List<AppNamespace> findPrivateAppNamespace(String appId){
return appNamespaceRepository.findByAppIdAndIsPublic(appId, false);
}
......@@ -54,6 +65,14 @@ public class AppNamespaceService {
return appNamespaceRepository.findByAppIdAndName(appId, namespaceName);
}
public List<AppNamespace> findByAppIdAndNamespaces(String appId, Set<String> namespaceNames) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(appId), "appId must not be null");
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.EMPTY_LIST;
}
return appNamespaceRepository.findByAppIdAndNameIn(appId, namespaceNames);
}
@Transactional
public void createDefaultAppNamespace(String appId, String createBy) {
if (!isAppNamespaceNameUnique(appId, ConfigConsts.NAMESPACE_APPLICATION)) {
......
package com.ctrip.framework.apollo.biz.service;
import com.google.common.collect.Lists;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.dianping.cat.Cat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* @author Jason Song(song_s@ctrip.com)
......@@ -19,4 +24,20 @@ public class ReleaseMessageService {
public ReleaseMessage findLatestReleaseMessageForMessages(Collection<String> messages) {
return releaseMessageRepository.findTopByMessageInOrderByIdDesc(messages);
}
public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Collection<String> messages) {
List<Object[]> result =
releaseMessageRepository.findLatestReleaseMessagesGroupByMessages(messages);
List<ReleaseMessage> releaseMessages = Lists.newArrayList();
for (Object[] o : result) {
try {
ReleaseMessage releaseMessage = new ReleaseMessage((String) o[0]);
releaseMessage.setId((Long) o[1]);
releaseMessages.add(releaseMessage);
} catch (Exception ex) {
Cat.logError("Parsing LatestReleaseMessagesGroupByMessages failed", ex);
}
}
return releaseMessages;
}
}
......@@ -64,6 +64,11 @@
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>test</scope>
</dependency>
<!-- end of test -->
<!-- dal-jdbc -->
<dependency>
......
......@@ -2,6 +2,7 @@ package com.ctrip.framework.apollo.build;
import com.ctrip.framework.apollo.internals.ConfigServiceLocator;
import com.ctrip.framework.apollo.internals.DefaultConfigManager;
import com.ctrip.framework.apollo.internals.RemoteConfigLongPollService;
import com.ctrip.framework.apollo.spi.DefaultConfigFactory;
import com.ctrip.framework.apollo.spi.DefaultConfigFactoryManager;
import com.ctrip.framework.apollo.spi.DefaultConfigRegistry;
......@@ -33,6 +34,7 @@ public class ComponentConfigurator extends AbstractResourceConfigurator {
all.add(A(ConfigUtil.class));
all.add(A(HttpUtil.class));
all.add(A(ConfigServiceLocator.class));
all.add(A(RemoteConfigLongPollService.class));
return all;
}
......
......@@ -11,14 +11,10 @@ import com.google.common.util.concurrent.RateLimiter;
import com.ctrip.framework.apollo.Apollo;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfig;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.dto.ServiceDTO;
import com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy;
import com.ctrip.framework.apollo.core.schedule.SchedulePolicy;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
import com.ctrip.framework.apollo.util.ConfigUtil;
import com.ctrip.framework.apollo.util.ExceptionUtil;
import com.ctrip.framework.apollo.util.http.HttpRequest;
import com.ctrip.framework.apollo.util.http.HttpResponse;
import com.ctrip.framework.apollo.util.http.HttpUtil;
......@@ -36,12 +32,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
......@@ -55,15 +48,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final ConfigServiceLocator m_serviceLocator;
private final HttpUtil m_httpUtil;
private final ConfigUtil m_configUtil;
private final RemoteConfigLongPollService remoteConfigLongPollService;
private volatile AtomicReference<ApolloConfig> m_configCache;
private final String m_namespace;
private final static ScheduledExecutorService m_executorService;
private final ExecutorService m_longPollingService;
private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult;
private RateLimiter m_longPollRateLimiter;
private RateLimiter m_loadConfigRateLimiter;
private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
......@@ -86,17 +75,12 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
m_configUtil = m_container.lookup(ConfigUtil.class);
m_httpUtil = m_container.lookup(HttpUtil.class);
m_serviceLocator = m_container.lookup(ConfigServiceLocator.class);
remoteConfigLongPollService = m_container.lookup(RemoteConfigLongPollService.class);
} catch (ComponentLookupException ex) {
Cat.logError(ex);
throw new ApolloConfigException("Unable to load component!", ex);
}
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>();
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
this.trySync();
this.schedulePeriodicRefresh();
......@@ -119,7 +103,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
this.m_executorService.scheduleAtFixedRate(
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
......@@ -271,63 +255,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
}
private void scheduleLongPollingRefresh() {
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
doLongPollingRefresh(appId, cluster, dataCenter);
}
});
} catch (Throwable ex) {
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Cat.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
remoteConfigLongPollService.submit(m_namespace, this);
}
String url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster,
m_namespace, dataCenter, m_longPollResult.get());
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//longer timeout for read - 1 minute
request.setReadTimeout(60000);
transaction.addData("Url", url);
HttpResponse<ApolloConfigNotification> response =
m_httpUtil.doGet(request, ApolloConfigNotification.class);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200) {
m_longPollServiceDto.set(lastServiceDto);
if (response.getBody() != null) {
m_longPollResult.set(response.getBody());
transaction.addData("Result", response.getBody().toString());
}
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_executorService.submit(new Runnable() {
@Override
public void run() {
......@@ -336,63 +268,6 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
});
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Cat.logError(ex);
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}",
sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
String assembleLongPollRefreshUrl(String uri, String appId, String cluster,
String namespace, String dataCenter,
ApolloConfigNotification previousResult) {
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", queryParamEscaper.escape(appId));
queryParams.put("cluster", queryParamEscaper.escape(cluster));
if (!Strings.isNullOrEmpty(namespace)) {
queryParams.put("namespace", queryParamEscaper.escape(namespace));
}
if (!Strings.isNullOrEmpty(dataCenter)) {
queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
}
String localIp = m_configUtil.getLocalIp();
if (!Strings.isNullOrEmpty(localIp)) {
queryParams.put("ip", queryParamEscaper.escape(localIp));
}
if (previousResult != null) {
//number doesn't need encode
queryParams.put("notificationId", String.valueOf(previousResult.getNotificationId()));
}
String params = MAP_JOINER.join(queryParams);
if (!uri.endsWith("/")) {
uri += "/";
}
return uri + "notifications?" + params;
}
void stopLongPollingRefresh() {
this.m_longPollingStopped.compareAndSet(false, true);
}
private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) {
......
......@@ -96,7 +96,7 @@ public class ConfigUtil {
}
public String getLocalIp() {
return Networks.forIp().getLocalHostAddress();
return Foundation.net().getHostAddress();
}
public String getMetaServerDomainName() {
......
......@@ -31,6 +31,15 @@
<implementation>com.ctrip.framework.apollo.util.ConfigUtil</implementation>
</component>
<component>
<role>com.ctrip.framework.apollo.util.http.HttpUtil</role>
<implementation>com.ctrip.framework.apollo.util.http.HttpUtil</implementation>
<requirements>
<requirement>
<role>com.ctrip.framework.apollo.util.ConfigUtil</role>
</requirement>
</requirements>
</component>
<component>
<role>com.ctrip.framework.apollo.internals.ConfigServiceLocator</role>
<implementation>com.ctrip.framework.apollo.internals.ConfigServiceLocator</implementation>
<requirements>
......@@ -43,12 +52,18 @@
</requirements>
</component>
<component>
<role>com.ctrip.framework.apollo.util.http.HttpUtil</role>
<implementation>com.ctrip.framework.apollo.util.http.HttpUtil</implementation>
<role>com.ctrip.framework.apollo.internals.RemoteConfigLongPollService</role>
<implementation>com.ctrip.framework.apollo.internals.RemoteConfigLongPollService</implementation>
<requirements>
<requirement>
<role>com.ctrip.framework.apollo.util.ConfigUtil</role>
</requirement>
<requirement>
<role>com.ctrip.framework.apollo.util.http.HttpUtil</role>
</requirement>
<requirement>
<role>com.ctrip.framework.apollo.internals.ConfigServiceLocator</role>
</requirement>
</requirements>
</component>
</components>
......
......@@ -6,6 +6,7 @@ import com.ctrip.framework.apollo.internals.DefaultConfigManagerTest;
import com.ctrip.framework.apollo.internals.DefaultConfigTest;
import com.ctrip.framework.apollo.internals.LocalFileConfigRepositoryTest;
import com.ctrip.framework.apollo.internals.PropertiesConfigFileTest;
import com.ctrip.framework.apollo.internals.RemoteConfigLongPollServiceTest;
import com.ctrip.framework.apollo.internals.RemoteConfigRepositoryTest;
import com.ctrip.framework.apollo.internals.SimpleConfigTest;
import com.ctrip.framework.apollo.internals.XmlConfigFileTest;
......@@ -23,7 +24,8 @@ import org.junit.runners.Suite.SuiteClasses;
ConfigServiceTest.class, DefaultConfigRegistryTest.class, DefaultConfigFactoryManagerTest.class,
DefaultConfigManagerTest.class, DefaultConfigTest.class, LocalFileConfigRepositoryTest.class,
RemoteConfigRepositoryTest.class, SimpleConfigTest.class, DefaultConfigFactoryTest.class,
ConfigIntegrationTest.class, ExceptionUtilTest.class, XmlConfigFileTest.class, PropertiesConfigFileTest.class
ConfigIntegrationTest.class, ExceptionUtilTest.class, XmlConfigFileTest.class,
PropertiesConfigFileTest.class, RemoteConfigLongPollServiceTest.class
})
public class AllTests {
......
......@@ -49,12 +49,14 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
private String someReleaseKey;
private File configDir;
private String defaultNamespace;
private String someOtherNamespace;
@Before
public void setUp() throws Exception {
super.setUp();
defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION;
someOtherNamespace = "someOtherNamespace";
someReleaseKey = "1";
configDir = new File(ClassLoaderUtil.getClassPath() + "config-cache");
if (configDir.exists()) {
......@@ -246,7 +248,9 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId), false);
Lists.newArrayList(
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId)),
false);
startServerWithHandlers(configHandler, pollHandler);
......@@ -267,14 +271,109 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
longPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null));
}
@Test
public void testLongPollRefreshWithMultipleNamespacesAndOnlyOneNamespaceNotified() throws Exception {
final String someKey = "someKey";
final String someValue = "someValue";
final String anotherValue = "anotherValue";
long someNotificationId = 1;
long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap();
configurations.put(someKey, someValue);
ApolloConfig apolloConfig = assembleApolloConfig(configurations);
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
Lists.newArrayList(
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId)),
false);
startServerWithHandlers(configHandler, pollHandler);
Config someOtherConfig = ConfigService.getConfig(someOtherNamespace);
Config config = ConfigService.getAppConfig();
assertEquals(someValue, config.getProperty(someKey, null));
assertEquals(someValue, someOtherConfig.getProperty(someKey, null));
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
longPollFinished.set(true);
}
});
apolloConfig.getConfigurations().put(someKey, anotherValue);
longPollFinished.get(pollTimeoutInMS * 50, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null));
TimeUnit.MILLISECONDS.sleep(pollTimeoutInMS * 10);
assertEquals(someValue, someOtherConfig.getProperty(someKey, null));
}
@Test
public void testLongPollRefreshWithMultipleNamespacesAndMultipleNamespaceNotified() throws Exception {
final String someKey = "someKey";
final String someValue = "someValue";
final String anotherValue = "anotherValue";
long someNotificationId = 1;
long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap();
configurations.put(someKey, someValue);
ApolloConfig apolloConfig = assembleApolloConfig(configurations);
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
Lists.newArrayList(
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId),
new ApolloConfigNotification(someOtherNamespace, someNotificationId)),
false);
startServerWithHandlers(configHandler, pollHandler);
Config config = ConfigService.getAppConfig();
Config someOtherConfig = ConfigService.getConfig(someOtherNamespace);
assertEquals(someValue, config.getProperty(someKey, null));
assertEquals(someValue, someOtherConfig.getProperty(someKey, null));
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
final SettableFuture<Boolean> someOtherNamespacelongPollFinished = SettableFuture.create();
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
longPollFinished.set(true);
}
});
someOtherConfig.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
someOtherNamespacelongPollFinished.set(true);
}
});
apolloConfig.getConfigurations().put(someKey, anotherValue);
longPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS);
someOtherNamespacelongPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null));
assertEquals(anotherValue, someOtherConfig.getProperty(someKey, null));
}
private ContextHandler mockPollNotificationHandler(final long pollResultTimeOutInMS,
final int statusCode,
final ApolloConfigNotification result,
final List<ApolloConfigNotification> result,
final boolean failedAtFirstTime) {
ContextHandler context = new ContextHandler("/notifications");
ContextHandler context = new ContextHandler("/notifications/v2");
context.setHandler(new AbstractHandler() {
AtomicInteger counter = new AtomicInteger(0);
......
......@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture;
import com.ctrip.framework.apollo.core.dto.ApolloConfig;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.dto.ServiceDTO;
......@@ -24,6 +25,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.unidal.lookup.ComponentTestCase;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Properties;
......@@ -34,12 +36,12 @@ import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
/**
* Created by Jason on 4/9/16.
......@@ -52,9 +54,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
@Mock
private static HttpResponse<ApolloConfig> someResponse;
@Mock
private static HttpResponse<ApolloConfigNotification> pollResponse;
@Mock
private ConfigUtil someConfigUtil;
private static HttpResponse<List<ApolloConfigNotification>> pollResponse;
private RemoteConfigLongPollService remoteConfigLongPollService;
@Before
public void setUp() throws Exception {
......@@ -66,6 +67,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
defineComponent(ConfigUtil.class, MockConfigUtil.class);
defineComponent(ConfigServiceLocator.class, MockConfigServiceLocator.class);
defineComponent(HttpUtil.class, MockHttpUtil.class);
remoteConfigLongPollService = lookup(RemoteConfigLongPollService.class);
}
@Test
......@@ -84,7 +87,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
Properties config = remoteConfigRepository.getConfig();
assertEquals(configurations, config);
remoteConfigRepository.stopLongPollingRefresh();
remoteConfigLongPollService.stopLongPollingRefresh();
}
@Test(expected = ApolloConfigException.class)
......@@ -95,7 +98,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
//must stop the long polling before exception occurred
remoteConfigRepository.stopLongPollingRefresh();
remoteConfigLongPollService.stopLongPollingRefresh();
remoteConfigRepository.getConfig();
}
......@@ -124,7 +127,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
assertEquals(newConfigurations, captor.getValue());
remoteConfigRepository.stopLongPollingRefresh();
remoteConfigLongPollService.stopLongPollingRefresh();
}
@Test
......@@ -137,7 +140,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
RepositoryChangeListener someListener = mock(RepositoryChangeListener.class);
doAnswer(new Answer<Void>(){
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
......@@ -153,38 +156,22 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
Map<String, String> newConfigurations = ImmutableMap.of("someKey", "anotherValue");
ApolloConfig newApolloConfig = assembleApolloConfig(newConfigurations);
ApolloConfigNotification someNotification = mock(ApolloConfigNotification.class);
when(someNotification.getNamespaceName()).thenReturn(someNamespace);
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(pollResponse.getBody()).thenReturn(Lists.newArrayList(someNotification));
when(someResponse.getBody()).thenReturn(newApolloConfig);
longPollFinished.get(500, TimeUnit.MILLISECONDS);
remoteConfigRepository.stopLongPollingRefresh();
remoteConfigLongPollService.stopLongPollingRefresh();
verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture());
assertEquals(newConfigurations, captor.getValue());
}
@Test
public void testAssembleLongPollRefreshUrl() throws Exception {
String someUri = "http://someServer";
String someAppId = "someAppId";
String someCluster = "someCluster+ &.-_someSign";
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
String longPollRefreshUrl =
remoteConfigRepository
.assembleLongPollRefreshUrl(someUri, someAppId, someCluster, someNamespace, null, null);
assertTrue(longPollRefreshUrl.contains("http://someServer/notifications?"));
assertTrue(longPollRefreshUrl.contains("appId=someAppId"));
assertTrue(longPollRefreshUrl.contains("cluster=someCluster%2B+%26.-_someSign"));
assertTrue(longPollRefreshUrl.contains("namespace=" + someNamespace));
}
@Test
public void testAssembleQueryConfigUrl() throws Exception {
String someUri = "http://someServer";
String someAppId = "someAppId";
......@@ -200,7 +187,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
someApolloConfig);
assertTrue(queryConfigUrl
.contains("http://someServer/configs/someAppId/someCluster+%20&.-_someSign/" + someNamespace));
.contains(
"http://someServer/configs/someAppId/someCluster+%20&.-_someSign/" + someNamespace));
assertTrue(queryConfigUrl
.contains("releaseKey=20160705193346-583078ef5716c055%2B20160705193308-31c471ddf9087c3f"));
......@@ -265,15 +253,17 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
public static class MockHttpUtil extends HttpUtil {
@Override
public <T> HttpResponse<T> doGet(HttpRequest httpRequest, Class<T> responseType) {
if (httpRequest.getUrl().contains("notifications?")) {
return (HttpResponse<T>) someResponse;
}
@Override
public <T> HttpResponse<T> doGet(HttpRequest httpRequest, Type responseType) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
return (HttpResponse<T>) pollResponse;
}
return (HttpResponse<T>) someResponse;
}
}
}
......@@ -3,6 +3,7 @@ package com.ctrip.framework.apollo.configservice;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageScanner;
import com.ctrip.framework.apollo.configservice.controller.ConfigFileController;
import com.ctrip.framework.apollo.configservice.controller.NotificationController;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -17,12 +18,15 @@ public class ConfigServiceAutoConfiguration {
private NotificationController notificationController;
@Autowired
private ConfigFileController configFileController;
@Autowired
private NotificationControllerV2 notificationControllerV2;
@Bean
public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
//handle server cache first
releaseMessageScanner.addMessageListener(configFileController);
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
}
......
......@@ -59,6 +59,17 @@ public class NotificationController implements ReleaseMessageListener {
@Autowired
private NamespaceUtil namespaceUtil;
/**
* For single namespace notification, reserved for older version of apollo clients
*
* @param appId the appId
* @param cluster the cluster
* @param namespace the namespace name
* @param dataCenter the datacenter
* @param notificationId the notification id for the namespace
* @param clientIp the client side ip
* @return a deferred result
*/
@RequestMapping(method = RequestMethod.GET)
public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification(
@RequestParam(value = "appId") String appId,
......
......@@ -2,6 +2,9 @@ package com.ctrip.framework.apollo.configservice.util;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
......@@ -11,6 +14,8 @@ import com.ctrip.framework.apollo.core.ConfigConsts;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
......@@ -23,32 +28,61 @@ public class WatchKeysUtil {
@Autowired
private AppNamespaceService appNamespaceService;
/**
* Assemble watch keys for the given appId, cluster, namespace, dataCenter combination
*/
public Set<String> assembleAllWatchKeys(String appId, String clusterName, String namespace,
String dataCenter) {
Set<String> watchedKeys = assembleWatchKeys(appId, clusterName, namespace, dataCenter);
Multimap<String, String> watchedKeysMap =
assembleAllWatchKeys(appId, clusterName, Sets.newHashSet(namespace), dataCenter);
return Sets.newHashSet(watchedKeysMap.get(namespace));
}
/**
* Assemble watch keys for the given appId, cluster, namespaces, dataCenter combination
* @return a multimap with namespace as the key and watch keys as the value
*/
public Multimap<String, String> assembleAllWatchKeys(String appId, String clusterName,
Set<String> namespaces,
String dataCenter) {
Multimap<String, String> watchedKeysMap =
assembleWatchKeys(appId, clusterName, namespaces, dataCenter);
//Every app has an 'application' namespace
if (!(namespaces.size() == 1 && namespaces.contains(ConfigConsts.NAMESPACE_APPLICATION))) {
Set<String> namespacesBelongToAppId = namespacesBelongToAppId(appId, namespaces);
Set<String> publicNamespaces = Sets.difference(namespaces, namespacesBelongToAppId);
//Listen on more namespaces if it's a public namespace
if (!namespaceBelongsToAppId(appId, namespace)) {
watchedKeys.addAll(this.findPublicConfigWatchKey(appId, clusterName, namespace, dataCenter));
if (!publicNamespaces.isEmpty()) {
watchedKeysMap
.putAll(findPublicConfigWatchKeys(appId, clusterName, publicNamespaces, dataCenter));
}
}
return watchedKeys;
return watchedKeysMap;
}
private Set<String> findPublicConfigWatchKey(String applicationId, String clusterName,
String namespace,
private Multimap<String, String> findPublicConfigWatchKeys(String applicationId,
String clusterName,
Set<String> namespaces,
String dataCenter) {
AppNamespace appNamespace = appNamespaceService.findPublicNamespaceByName(namespace);
Multimap<String, String> watchedKeysMap = HashMultimap.create();
List<AppNamespace> appNamespaces = appNamespaceService.findPublicNamespacesByNames(namespaces);
for (AppNamespace appNamespace : appNamespaces) {
//check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) {
return Sets.newHashSet();
if (Objects.equals(applicationId, appNamespace.getAppId())) {
continue;
}
String publicConfigAppId = appNamespace.getAppId();
return assembleWatchKeys(publicConfigAppId, clusterName, namespace, dataCenter);
watchedKeysMap.putAll(appNamespace.getName(),
assembleWatchKeys(publicConfigAppId, clusterName, appNamespace.getName(), dataCenter));
}
return watchedKeysMap;
}
private String assembleKey(String appId, String cluster, String namespace) {
......@@ -57,6 +91,7 @@ public class WatchKeysUtil {
private Set<String> assembleWatchKeys(String appId, String clusterName, String namespace,
String dataCenter) {
Set<String> watchedKeys = Sets.newHashSet();
//watch specified cluster config change
......@@ -75,14 +110,27 @@ public class WatchKeysUtil {
return watchedKeys;
}
private boolean namespaceBelongsToAppId(String appId, String namespaceName) {
//Every app has an 'application' namespace
if (Objects.equals(ConfigConsts.NAMESPACE_APPLICATION, namespaceName)) {
return true;
private Multimap<String, String> assembleWatchKeys(String appId, String clusterName,
Set<String> namespaces,
String dataCenter) {
Multimap<String, String> watchedKeysMap = HashMultimap.create();
for (String namespace : namespaces) {
watchedKeysMap
.putAll(namespace, assembleWatchKeys(appId, clusterName, namespace, dataCenter));
}
return watchedKeysMap;
}
AppNamespace appNamespace = appNamespaceService.findOne(appId, namespaceName);
private Set<String> namespacesBelongToAppId(String appId, Set<String> namespaces) {
List<AppNamespace> appNamespaces =
appNamespaceService.findByAppIdAndNamespaces(appId, namespaces);
if (appNamespaces == null || appNamespaces.isEmpty()) {
return Collections.emptySet();
}
return appNamespace != null;
return FluentIterable.from(appNamespaces).transform(AppNamespace::getName).toSet();
}
}
......@@ -3,10 +3,13 @@ package com.ctrip.framework.apollo.configservice;
import com.ctrip.framework.apollo.configservice.controller.ConfigControllerTest;
import com.ctrip.framework.apollo.configservice.controller.ConfigFileControllerTest;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerTest;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2Test;
import com.ctrip.framework.apollo.configservice.integration.ConfigControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.ConfigFileControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.NotificationControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.NotificationControllerV2IntegrationTest;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtilTest;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtilTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
......@@ -16,7 +19,9 @@ import org.junit.runners.Suite.SuiteClasses;
@SuiteClasses({ConfigControllerTest.class, NotificationControllerTest.class,
ConfigControllerIntegrationTest.class, NotificationControllerIntegrationTest.class,
NamespaceUtilTest.class, ConfigFileControllerTest.class,
ConfigFileControllerIntegrationTest.class})
ConfigFileControllerIntegrationTest.class, WatchKeysUtilTest.class,
NotificationControllerV2Test.class, NotificationControllerV2IntegrationTest.class
})
public class AllTests {
}
......@@ -24,6 +24,9 @@ import org.springframework.web.client.RestTemplate;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
......@@ -83,4 +86,23 @@ public abstract class AbstractBaseIntegrationTest {
return release;
}
protected void periodicSendMessage(ExecutorService executorService, String message, AtomicBoolean stop) {
executorService.submit((Runnable) () -> {
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
//double check
if (stop.get()) {
break;
}
sendReleaseMessage(message);
}
});
}
}
......@@ -2,22 +2,17 @@ package com.ctrip.framework.apollo.configservice.integration;
import com.google.common.base.Joiner;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.configservice.controller.NotificationController;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.jdbc.Sql;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
......@@ -27,9 +22,6 @@ import static org.junit.Assert.assertNotEquals;
* @author Jason Song(song_s@ctrip.com)
*/
public class NotificationControllerIntegrationTest extends AbstractBaseIntegrationTest {
@Autowired
private NotificationController notificationController;
private String someAppId;
private String someCluster;
private String defaultNamespace;
......@@ -49,7 +41,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespace() throws Exception {
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop);
periodicSendMessage(executorService, assembleKey(someAppId, someCluster, defaultNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
......@@ -68,7 +60,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespaceAsFile() throws Exception {
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop);
periodicSendMessage(executorService, assembleKey(someAppId, someCluster, defaultNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
......@@ -89,7 +81,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
public void testPollNotificationWithPrivateNamespaceAsFile() throws Exception {
String namespace = "someNamespace.xml";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(someAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace), stop);
periodicSendMessage(executorService, assembleKey(someAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity(
......@@ -144,7 +136,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String publicAppId = "somePublicAppId";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace), stop);
periodicSendMessage(executorService, assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity(
......@@ -168,7 +160,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(publicAppId, someDC, somePublicNamespace), stop);
periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity(
......@@ -192,7 +184,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(publicAppId, someDC, somePublicNamespace), stop);
periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity(
......@@ -228,23 +220,4 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
private String assembleKey(String appId, String cluster, String namespace) {
return Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join(appId, cluster, namespace);
}
private void periodicSendMessage(String message, AtomicBoolean stop) {
executorService.submit((Runnable) () -> {
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
//double check
if (stop.get()) {
break;
}
sendReleaseMessage(message);
}
});
}
}
package com.ctrip.framework.apollo.configservice.util;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.core.ConfigConsts;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collection;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class WatchKeysUtilTest {
@Mock
private AppNamespaceService appNamespaceService;
@Mock
private AppNamespace someAppNamespace;
@Mock
private AppNamespace anotherAppNamespace;
@Mock
private AppNamespace somePublicAppNamespace;
private WatchKeysUtil watchKeysUtil;
private String someAppId;
private String someCluster;
private String someNamespace;
private String anotherNamespace;
private String somePublicNamespace;
private String defaultCluster;
private String someDC;
private String somePublicAppId;
@Before
public void setUp() throws Exception {
watchKeysUtil = new WatchKeysUtil();
someAppId = "someId";
someCluster = "someCluster";
someNamespace = "someName";
anotherNamespace = "anotherName";
somePublicNamespace = "somePublicName";
defaultCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
someDC = "someDC";
somePublicAppId = "somePublicId";
when(someAppNamespace.getName()).thenReturn(someNamespace);
when(anotherAppNamespace.getName()).thenReturn(anotherNamespace);
when(appNamespaceService.findByAppIdAndNamespaces(someAppId, Sets.newHashSet(someNamespace)))
.thenReturn(Lists.newArrayList(someAppNamespace));
when(appNamespaceService
.findByAppIdAndNamespaces(someAppId, Sets.newHashSet(someNamespace, anotherNamespace)))
.thenReturn(Lists.newArrayList(someAppNamespace, anotherAppNamespace));
when(appNamespaceService
.findByAppIdAndNamespaces(someAppId,
Sets.newHashSet(someNamespace, anotherNamespace, somePublicNamespace)))
.thenReturn(Lists.newArrayList(someAppNamespace, anotherAppNamespace));
when(somePublicAppNamespace.getAppId()).thenReturn(somePublicAppId);
when(somePublicAppNamespace.getName()).thenReturn(somePublicNamespace);
when(appNamespaceService.findPublicNamespacesByNames(Sets.newHashSet(somePublicNamespace)))
.thenReturn(Lists.newArrayList(somePublicAppNamespace));
ReflectionTestUtils.setField(watchKeysUtil, "appNamespaceService", appNamespaceService);
}
@Test
public void testAssembleAllWatchKeysWithOneNamespaceAndDefaultCluster() throws Exception {
Set<String> watchKeys =
watchKeysUtil.assembleAllWatchKeys(someAppId, defaultCluster, someNamespace, null);
Set<String> clusters = Sets.newHashSet(defaultCluster);
assertEquals(clusters.size(), watchKeys.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeys);
}
@Test
public void testAssembleAllWatchKeysWithOneNamespaceAndSomeDC() throws Exception {
Set<String> watchKeys =
watchKeysUtil.assembleAllWatchKeys(someAppId, someDC, someNamespace, someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someDC);
assertEquals(clusters.size(), watchKeys.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeys);
}
@Test
public void testAssembleAllWatchKeysWithOneNamespaceAndSomeDCAndSomeCluster() throws Exception {
Set<String> watchKeys =
watchKeysUtil.assembleAllWatchKeys(someAppId, someCluster, someNamespace, someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someCluster, someDC);
assertEquals(clusters.size(), watchKeys.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeys);
}
@Test
public void testAssembleAllWatchKeysWithMultipleNamespaces() throws Exception {
Multimap<String, String> watchKeysMap =
watchKeysUtil.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(someNamespace, anotherNamespace), someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someCluster, someDC);
assertEquals(clusters.size() * 2, watchKeysMap.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeysMap.get(someNamespace));
assertWatchKeys(someAppId, clusters, anotherNamespace, watchKeysMap.get(anotherNamespace));
}
@Test
public void testAssembleAllWatchKeysWithPrivateAndPublicNamespaces() throws Exception {
Multimap<String, String> watchKeysMap =
watchKeysUtil.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(someNamespace, anotherNamespace, somePublicNamespace), someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someCluster, someDC);
assertEquals(clusters.size() * 4, watchKeysMap.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeysMap.get(someNamespace));
assertWatchKeys(someAppId, clusters, anotherNamespace, watchKeysMap.get(anotherNamespace));
assertWatchKeys(someAppId, clusters, somePublicNamespace, watchKeysMap.get(somePublicNamespace));
assertWatchKeys(somePublicAppId, clusters, somePublicNamespace, watchKeysMap.get(somePublicNamespace));
}
private void assertWatchKeys(String appId, Set<String> clusters, String namespaceName,
Collection<String> watchedKeys) {
for (String cluster : clusters) {
String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(appId, cluster, namespaceName);
assertTrue(watchedKeys.contains(key));
}
}
}
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange;
import com.ctrip.framework.apollo.model.ConfigChangeEvent;
......@@ -18,11 +16,12 @@ import java.io.InputStreamReader;
*/
public class ApolloConfigDemo {
private static final Logger logger = LoggerFactory.getLogger(ApolloConfigDemo.class);
private String DEFAULT_VALUE = "undefined";
private Config config;
private Config publicConfig;
public ApolloConfigDemo() {
config = ConfigService.getAppConfig();
config.addChangeListener(new ConfigChangeListener() {
ConfigChangeListener changeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
logger.info("Changes for namespace {}", changeEvent.getNamespace());
......@@ -33,12 +32,19 @@ public class ApolloConfigDemo {
change.getChangeType());
}
}
});
};
config = ConfigService.getAppConfig();
config.addChangeListener(changeListener);
publicConfig = ConfigService.getConfig("FX.apollo");
publicConfig.addChangeListener(changeListener);
}
private String getConfig(String key) {
String result = config.getProperty(key, "undefined");
logger.info(String.format("Loading key: %s with value: %s", key, result));
String result = config.getProperty(key, DEFAULT_VALUE);
if (DEFAULT_VALUE.equals(result)) {
result = publicConfig.getProperty(key, DEFAULT_VALUE);
}
logger.info(String.format("Loading key : %s with value: %s", key, result));
return result;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment