Commit 1eaf3791 by Jason Song

Use cache for notification controller

parent efcfe940
......@@ -16,6 +16,7 @@ import java.lang.reflect.Type;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
public class BizConfig extends RefreshableConfig {
......@@ -80,5 +81,28 @@ public class BizConfig extends RefreshableConfig {
return getValue("clogging.server.port");
}
public int appNamespaceCacheScanInterval() {
return getIntProperty("apollo.app-namespace-cache-scan.interval", 1);
}
public TimeUnit appNamespaceCacheScanIntervalTimeUnit() {
return TimeUnit.SECONDS;
}
public int appNamespaceCacheRebuildInterval() {
return getIntProperty("apollo.app-namespace-cache-rebuild.interval", 60);
}
public TimeUnit appNamespaceCacheRebuildIntervalTimeUnit() {
return TimeUnit.SECONDS;
}
public int releaseMessageCacheScanInterval() {
return getIntProperty("apollo.release-message-cache-scan.interval", 1);
}
public TimeUnit releaseMessageCacheScanIntervalTimeUnit() {
return TimeUnit.SECONDS;
}
}
......@@ -30,7 +30,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
......@@ -250,12 +249,7 @@ public class GrayReleaseRulesHolder implements ReleaseMessageListener, Initializ
}
private void populateDataBaseInterval() {
try {
databaseScanInterval = bizConfig.grayReleaseRuleScanInterval();
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Load apollo gray release rule scan interval from server config failed", ex);
}
}
private int getDatabaseScanIntervalSecond() {
......
......@@ -20,4 +20,6 @@ public interface AppNamespaceRepository extends PagingAndSortingRepository<AppNa
List<AppNamespace> findByAppIdAndIsPublic(String appId, boolean isPublic);
List<AppNamespace> findFirst500ByIdGreaterThanOrderByIdAsc(long id);
}
......@@ -75,7 +75,7 @@ public abstract class RefreshableConfig {
try {
String value = getValue(key);
return value == null ? defaultValue : Integer.parseInt(value);
} catch (Exception e) {
} catch (Throwable e) {
Tracer.logError("Get int property failed.", e);
return defaultValue;
}
......@@ -85,7 +85,7 @@ public abstract class RefreshableConfig {
try {
String value = getValue(key);
return value == null ? defaultValue : "true".equals(value);
} catch (Exception e) {
} catch (Throwable e) {
Tracer.logError("Get boolean property failed.", e);
return defaultValue;
}
......@@ -95,7 +95,7 @@ public abstract class RefreshableConfig {
try {
String value = getValue(key);
return Strings.isNullOrEmpty(value) ? defaultValue : value.split(LIST_SEPARATOR);
} catch (Exception e) {
} catch (Throwable e) {
Tracer.logError("Get array property failed.", e);
return defaultValue;
}
......@@ -104,7 +104,7 @@ public abstract class RefreshableConfig {
public String getValue(String key, String defaultValue) {
try {
return environment.getProperty(key, defaultValue);
} catch (Exception e) {
} catch (Throwable e) {
Tracer.logError("Get value failed.", e);
return defaultValue;
}
......
......@@ -5,6 +5,7 @@ import com.ctrip.framework.apollo.biz.message.ReleaseMessageScanner;
import com.ctrip.framework.apollo.configservice.controller.ConfigFileController;
import com.ctrip.framework.apollo.configservice.controller.NotificationController;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
......@@ -30,10 +31,14 @@ public class ConfigServiceAutoConfiguration {
private NotificationControllerV2 notificationControllerV2;
@Autowired
private GrayReleaseRulesHolder grayReleaseRulesHolder;
@Autowired
private ReleaseMessageServiceWithCache releaseMessageServiceWithCache;
@Bean
public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
//0. handle release message cache
releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
//1. handle gray release rule
releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
//2. handle server cache
......
......@@ -10,8 +10,8 @@ import com.google.common.collect.Multimaps;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -51,7 +51,7 @@ public class NotificationController implements ReleaseMessageListener {
private WatchKeysUtil watchKeysUtil;
@Autowired
private ReleaseMessageService releaseMessageService;
private ReleaseMessageServiceWithCache releaseMessageService;
@Autowired
private EntityManagerUtil entityManagerUtil;
......
......@@ -15,9 +15,9 @@ import com.google.gson.reflect.TypeToken;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.common.exception.BadRequestException;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -65,7 +65,7 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
private WatchKeysUtil watchKeysUtil;
@Autowired
private ReleaseMessageService releaseMessageService;
private ReleaseMessageServiceWithCache releaseMessageService;
@Autowired
private EntityManagerUtil entityManagerUtil;
......
package com.ctrip.framework.apollo.configservice.service;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.repository.AppNamespaceRepository;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public class AppNamespaceServiceWithCache implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(AppNamespaceServiceWithCache.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.skipNulls();
@Autowired
private AppNamespaceRepository appNamespaceRepository;
@Autowired
private BizConfig bizConfig;
private int scanInterval;
private TimeUnit scanIntervalTimeUnit;
private int rebuildInterval;
private TimeUnit rebuildIntervalTimeUnit;
private ScheduledExecutorService scheduledExecutorService;
private long maxIdScanned;
//store namespaceName -> AppNamespace
private Map<String, AppNamespace> publicAppNamespaceCache;
//store appId+namespaceName -> AppNamespace
private Map<String, AppNamespace> appNamespaceCache;
//store id -> AppNamespace
private Map<Long, AppNamespace> appNamespaceIdCache;
public AppNamespaceServiceWithCache() {
maxIdScanned = 0;
publicAppNamespaceCache = Maps.newConcurrentMap();
appNamespaceCache = Maps.newConcurrentMap();
appNamespaceIdCache = Maps.newConcurrentMap();
scheduledExecutorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
.create("AppNamespaceServiceWithCache", true));
}
public List<AppNamespace> findByAppIdAndNamespaces(String appId, Set<String> namespaceNames) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(appId), "appId must not be null");
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.emptyList();
}
// return appNamespaceRepository.findByAppIdAndNameIn(appId, namespaceNames);
List<AppNamespace> result = Lists.newArrayList();
for (String namespaceName : namespaceNames) {
AppNamespace appNamespace = appNamespaceCache.get(STRING_JOINER.join(appId, namespaceName));
if (appNamespace != null) {
result.add(appNamespace);
}
}
return result;
}
public List<AppNamespace> findPublicNamespacesByNames(Set<String> namespaceNames) {
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.emptyList();
}
// return appNamespaceRepository.findByNameInAndIsPublicTrue(namespaceNames);
List<AppNamespace> result = Lists.newArrayList();
for (String namespaceName : namespaceNames) {
AppNamespace appNamespace = publicAppNamespaceCache.get(namespaceName);
if (appNamespace != null) {
result.add(appNamespace);
}
}
return result;
}
@Override
public void afterPropertiesSet() throws Exception {
populateDataBaseInterval();
scanNewAppNamespaces(); //block the startup process until load finished
scheduledExecutorService.scheduleAtFixedRate(() -> {
Transaction transaction = Tracer.newTransaction("Apollo.AppNamespaceServiceWithCache",
"rebuildCache");
try {
this.updateAndDeleteCache();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Rebuild cache failed", ex);
} finally {
transaction.complete();
}
}, rebuildInterval, rebuildInterval, rebuildIntervalTimeUnit);
scheduledExecutorService.scheduleWithFixedDelay(this::scanNewAppNamespaces, scanInterval,
scanInterval, scanIntervalTimeUnit);
}
private void scanNewAppNamespaces() {
Transaction transaction = Tracer.newTransaction("Apollo.AppNamespaceServiceWithCache",
"scanNewAppNamespaces");
try {
this.loadNewAppNamespaces();
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Load new app namespaces failed", ex);
} finally {
transaction.complete();
}
}
//for those new app namespaces
private void loadNewAppNamespaces() {
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
//current batch is 500
List<AppNamespace> appNamespaces = appNamespaceRepository
.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(appNamespaces)) {
break;
}
mergeAppNamespaces(appNamespaces);
int scanned = appNamespaces.size();
maxIdScanned = appNamespaces.get(scanned - 1).getId();
hasMore = scanned == 500;
logger.info("Loaded {} new app namespaces with startId {}", scanned, maxIdScanned);
}
}
private void mergeAppNamespaces(List<AppNamespace> appNamespaces) {
for (AppNamespace appNamespace : appNamespaces) {
appNamespaceCache.put(assembleAppNamespaceKey(appNamespace), appNamespace);
appNamespaceIdCache.put(appNamespace.getId(), appNamespace);
if (appNamespace.isPublic()) {
publicAppNamespaceCache.put(appNamespace.getName(), appNamespace);
}
}
}
//for those updated or deleted app namespaces
private void updateAndDeleteCache() {
List<Long> ids = Lists.newArrayList(appNamespaceIdCache.keySet());
if (CollectionUtils.isEmpty(ids)) {
return;
}
List<List<Long>> partitionIds = Lists.partition(ids, 500);
for (List<Long> toRebuild : partitionIds) {
Iterable<AppNamespace> appNamespaces = appNamespaceRepository.findAll(toRebuild);
if (appNamespaces == null) {
continue;
}
//handle updated
Set<Long> foundIds = handleUpdatedAppNamespaces(appNamespaces);
//handle deleted
handleDeletedAppNamespaces(Sets.difference(Sets.newHashSet(toRebuild), foundIds));
}
}
//for those updated app namespaces
private Set<Long> handleUpdatedAppNamespaces(Iterable<AppNamespace> appNamespaces) {
Set<Long> foundIds = Sets.newHashSet();
for (AppNamespace appNamespace : appNamespaces) {
foundIds.add(appNamespace.getId());
AppNamespace thatInCache = appNamespaceIdCache.get(appNamespace.getId());
if (thatInCache != null && appNamespace.getDataChangeLastModifiedTime().after(thatInCache
.getDataChangeLastModifiedTime())) {
appNamespaceIdCache.put(appNamespace.getId(), appNamespace);
String oldKey = assembleAppNamespaceKey(thatInCache);
String newKey = assembleAppNamespaceKey(appNamespace);
appNamespaceCache.put(newKey, appNamespace);
//in case appId or namespaceName changes
if (!newKey.equals(oldKey)) {
appNamespaceCache.remove(oldKey);
}
if (appNamespace.isPublic()) {
publicAppNamespaceCache.put(appNamespace.getName(), appNamespace);
//in case namespaceName changes
if (!appNamespace.getName().equals(thatInCache.getName()) && thatInCache.isPublic()) {
publicAppNamespaceCache.remove(thatInCache.getName());
}
} else if (thatInCache.isPublic()) {
//just in case isPublic changes
publicAppNamespaceCache.remove(thatInCache.getName());
}
logger.info("Found AppNamespace changes, old: {}, new: {}", thatInCache, appNamespace);
}
}
return foundIds;
}
//for those deleted app namespaces
private void handleDeletedAppNamespaces(Set<Long> deletedIds) {
if (CollectionUtils.isEmpty(deletedIds)) {
return;
}
for (Long deletedId : deletedIds) {
AppNamespace deleted = appNamespaceIdCache.remove(deletedId);
if (deleted == null) {
continue;
}
appNamespaceCache.remove(assembleAppNamespaceKey(deleted));
if (deleted.isPublic()) {
publicAppNamespaceCache.remove(deleted.getName());
}
logger.info("Found AppNamespace deleted, {}", deleted);
}
}
private String assembleAppNamespaceKey(AppNamespace appNamespace) {
return STRING_JOINER.join(appNamespace.getAppId(), appNamespace.getName());
}
private void populateDataBaseInterval() {
scanInterval = bizConfig.appNamespaceCacheScanInterval();
scanIntervalTimeUnit = bizConfig.appNamespaceCacheScanIntervalTimeUnit();
rebuildInterval = bizConfig.appNamespaceCacheRebuildInterval();
rebuildIntervalTimeUnit = bizConfig.appNamespaceCacheRebuildIntervalTimeUnit();
}
}
package com.ctrip.framework.apollo.configservice.service;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public class ReleaseMessageServiceWithCache implements ReleaseMessageListener, InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageServiceWithCache
.class);
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
@Autowired
private BizConfig bizConfig;
private int scanInterval;
private TimeUnit scanIntervalTimeUnit;
private volatile long maxIdScanned;
private ConcurrentMap<String, ReleaseMessage> releaseMessageCache;
private AtomicBoolean doScan;
private ExecutorService executorService;
public ReleaseMessageServiceWithCache() {
initialize();
}
private void initialize() {
releaseMessageCache = Maps.newConcurrentMap();
doScan = new AtomicBoolean(true);
executorService = Executors.newSingleThreadExecutor(ApolloThreadFactory
.create("ReleaseMessageServiceWithCache", true));
}
public ReleaseMessage findLatestReleaseMessageForMessages(Set<String> messages) {
if (CollectionUtils.isEmpty(messages)) {
return null;
}
long maxReleaseMessageId = 0;
ReleaseMessage result = null;
for (String message : messages) {
ReleaseMessage releaseMessage = releaseMessageCache.get(message);
if (releaseMessage != null && releaseMessage.getId() > maxReleaseMessageId) {
maxReleaseMessageId = releaseMessage.getId();
result = releaseMessage;
}
}
return result;
}
public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Set<String> messages) {
if (CollectionUtils.isEmpty(messages)) {
return Collections.emptyList();
}
List<ReleaseMessage> releaseMessages = Lists.newArrayList();
for (String message : messages) {
ReleaseMessage releaseMessage = releaseMessageCache.get(message);
if (releaseMessage != null) {
releaseMessages.add(releaseMessage);
}
}
return releaseMessages;
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
//Could stop once the ReleaseMessageScanner starts to work
doScan.set(false);
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
Tracer.logEvent("Apollo.ReleaseMessageService.UpdateCache", String.valueOf(message.getId()));
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
long gap = message.getId() - maxIdScanned;
if (gap == 1) {
mergeReleaseMessage(message);
} else if (gap > 1) {
//gap found!
loadReleaseMessages(maxIdScanned);
}
}
@Override
public void afterPropertiesSet() throws Exception {
populateDataBaseInterval();
//block the startup process until load finished
//this should happen before ReleaseMessageScanner due to autowire
loadReleaseMessages(0);
executorService.submit(() -> {
while (doScan.get() && !Thread.currentThread().isInterrupted()) {
Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageServiceWithCache",
"scanNewReleaseMessages");
try {
loadReleaseMessages(maxIdScanned);
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan new release messages failed", ex);
} finally {
transaction.complete();
}
try {
scanIntervalTimeUnit.sleep(scanInterval);
} catch (InterruptedException e) {
//ignore
}
}
});
}
private synchronized void mergeReleaseMessage(ReleaseMessage releaseMessage) {
ReleaseMessage old = releaseMessageCache.get(releaseMessage.getMessage());
if (old == null || releaseMessage.getId() > old.getId()) {
releaseMessageCache.put(releaseMessage.getMessage(), releaseMessage);
maxIdScanned = releaseMessage.getId();
}
}
private void loadReleaseMessages(long startId) {
boolean hasMore = true;
while (hasMore && !Thread.currentThread().isInterrupted()) {
//current batch is 500
List<ReleaseMessage> releaseMessages = releaseMessageRepository
.findFirst500ByIdGreaterThanOrderByIdAsc(startId);
if (CollectionUtils.isEmpty(releaseMessages)) {
break;
}
releaseMessages.forEach(this::mergeReleaseMessage);
int scanned = releaseMessages.size();
startId = releaseMessages.get(scanned - 1).getId();
hasMore = scanned == 500;
logger.info("Loaded {} release messages with startId {}", scanned, startId);
}
}
private void populateDataBaseInterval() {
scanInterval = bizConfig.releaseMessageCacheScanInterval();
scanIntervalTimeUnit = bizConfig.releaseMessageCacheScanIntervalTimeUnit();
}
//only for test use
private void reset() throws Exception {
executorService.shutdownNow();
initialize();
afterPropertiesSet();
}
}
......@@ -7,7 +7,7 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.configservice.service.AppNamespaceServiceWithCache;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -26,7 +26,7 @@ import java.util.Set;
public class WatchKeysUtil {
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
@Autowired
private AppNamespaceService appNamespaceService;
private AppNamespaceServiceWithCache appNamespaceService;
/**
* Assemble watch keys for the given appId, cluster, namespace, dataCenter combination
......
......@@ -8,6 +8,8 @@ import com.ctrip.framework.apollo.configservice.integration.ConfigControllerInte
import com.ctrip.framework.apollo.configservice.integration.ConfigFileControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.NotificationControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.NotificationControllerV2IntegrationTest;
import com.ctrip.framework.apollo.configservice.service.AppNamespaceServiceWithCacheTest;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCacheTest;
import com.ctrip.framework.apollo.configservice.util.InstanceConfigAuditUtilTest;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtilTest;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtilTest;
......@@ -22,7 +24,8 @@ import org.junit.runners.Suite.SuiteClasses;
NamespaceUtilTest.class, ConfigFileControllerTest.class,
ConfigFileControllerIntegrationTest.class, WatchKeysUtilTest.class,
NotificationControllerV2Test.class, NotificationControllerV2IntegrationTest.class,
InstanceConfigAuditUtilTest.class
InstanceConfigAuditUtilTest.class, AppNamespaceServiceWithCacheTest.class,
ReleaseMessageServiceWithCacheTest.class
})
public class AllTests {
......
......@@ -6,8 +6,8 @@ import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -45,7 +45,7 @@ public class NotificationControllerTest {
private long someNotificationId;
private String someClientIp;
@Mock
private ReleaseMessageService releaseMessageService;
private ReleaseMessageServiceWithCache releaseMessageService;
@Mock
private EntityManagerUtil entityManagerUtil;
@Mock
......
......@@ -9,8 +9,8 @@ import com.google.gson.Gson;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -50,7 +50,7 @@ public class NotificationControllerV2Test {
private long someNotificationId;
private String someClientIp;
@Mock
private ReleaseMessageService releaseMessageService;
private ReleaseMessageServiceWithCache releaseMessageService;
@Mock
private EntityManagerUtil entityManagerUtil;
@Mock
......
......@@ -2,14 +2,19 @@ package com.ctrip.framework.apollo.configservice.integration;
import com.google.common.base.Joiner;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.jdbc.Sql;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -28,8 +33,12 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
private String somePublicNamespace;
private ExecutorService executorService;
@Autowired
private ReleaseMessageServiceWithCache releaseMessageServiceWithCache;
@Before
public void setUp() throws Exception {
ReflectionTestUtils.invokeMethod(releaseMessageServiceWithCache, "reset");
someAppId = "someAppId";
someCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION;
......
......@@ -5,9 +5,11 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.ctrip.framework.apollo.configservice.service.ReleaseMessageServiceWithCache;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -16,6 +18,7 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.jdbc.Sql;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.List;
import java.util.Set;
......@@ -33,6 +36,9 @@ public class NotificationControllerV2IntegrationTest extends AbstractBaseIntegra
@Autowired
private Gson gson;
@Autowired
private ReleaseMessageServiceWithCache releaseMessageServiceWithCache;
private String someAppId;
private String someCluster;
private String defaultNamespace;
......@@ -42,6 +48,7 @@ public class NotificationControllerV2IntegrationTest extends AbstractBaseIntegra
@Before
public void setUp() throws Exception {
ReflectionTestUtils.invokeMethod(releaseMessageServiceWithCache, "reset");
someAppId = "someAppId";
someCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION;
......
package com.ctrip.framework.apollo.configservice.service;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class ReleaseMessageServiceWithCacheTest {
private ReleaseMessageServiceWithCache releaseMessageServiceWithCache;
@Mock
private ReleaseMessageRepository releaseMessageRepository;
@Mock
private BizConfig bizConfig;
private int scanInterval;
private TimeUnit scanIntervalTimeUnit;
@Before
public void setUp() throws Exception {
releaseMessageServiceWithCache = new ReleaseMessageServiceWithCache();
ReflectionTestUtils.setField(releaseMessageServiceWithCache, "releaseMessageRepository",
releaseMessageRepository);
ReflectionTestUtils.setField(releaseMessageServiceWithCache, "bizConfig", bizConfig);
scanInterval = 10;
scanIntervalTimeUnit = TimeUnit.MILLISECONDS;
when(bizConfig.releaseMessageCacheScanInterval()).thenReturn(scanInterval);
when(bizConfig.releaseMessageCacheScanIntervalTimeUnit()).thenReturn(scanIntervalTimeUnit);
}
@Test
public void testWhenNoReleaseMessages() throws Exception {
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn
(Collections.emptyList());
releaseMessageServiceWithCache.afterPropertiesSet();
String someMessage = "someMessage";
String anotherMessage = "anotherMessage";
Set<String> messages = Sets.newHashSet(someMessage, anotherMessage);
assertNull(releaseMessageServiceWithCache.findLatestReleaseMessageForMessages(messages));
assertTrue(releaseMessageServiceWithCache.findLatestReleaseMessagesGroupByMessages(messages)
.isEmpty());
}
@Test
public void testWhenHasReleaseMsgAndHasRepeatMsg() throws Exception {
String someMsgContent = "msg1";
ReleaseMessage someMsg = assembleReleaseMsg(1, someMsgContent);
String anotherMsgContent = "msg2";
ReleaseMessage anotherMsg = assembleReleaseMsg(2, anotherMsgContent);
ReleaseMessage anotherRepeatMsg = assembleReleaseMsg(3, anotherMsgContent);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L))
.thenReturn(Arrays.asList(someMsg, anotherMsg, anotherRepeatMsg));
releaseMessageServiceWithCache.afterPropertiesSet();
verify(bizConfig).releaseMessageCacheScanInterval();
ReleaseMessage latestReleaseMsg =
releaseMessageServiceWithCache
.findLatestReleaseMessageForMessages(Sets.newHashSet(someMsgContent, anotherMsgContent));
assertNotNull(latestReleaseMsg);
assertEquals(3, latestReleaseMsg.getId());
assertEquals(anotherMsgContent, latestReleaseMsg.getMessage());
List<ReleaseMessage> latestReleaseMsgGroupByMsgContent =
releaseMessageServiceWithCache
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(someMsgContent, anotherMsgContent));
assertEquals(2, latestReleaseMsgGroupByMsgContent.size());
assertEquals(1, latestReleaseMsgGroupByMsgContent.get(1).getId());
assertEquals(someMsgContent, latestReleaseMsgGroupByMsgContent.get(1).getMessage());
assertEquals(3, latestReleaseMsgGroupByMsgContent.get(0).getId());
assertEquals(anotherMsgContent, latestReleaseMsgGroupByMsgContent.get(0).getMessage());
}
@Test
public void testWhenReleaseMsgSizeBiggerThan500() throws Exception {
String someMsgContent = "msg1";
List<ReleaseMessage> firstBatchReleaseMsg = new ArrayList<>(500);
for (int i = 0; i < 500; i++) {
firstBatchReleaseMsg.add(assembleReleaseMsg(i + 1, someMsgContent));
}
String antherMsgContent = "msg2";
ReleaseMessage antherMsg = assembleReleaseMsg(501, antherMsgContent);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L))
.thenReturn(firstBatchReleaseMsg);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(500L))
.thenReturn(Collections.singletonList(antherMsg));
releaseMessageServiceWithCache.afterPropertiesSet();
verify(releaseMessageRepository, times(1)).findFirst500ByIdGreaterThanOrderByIdAsc(500L);
ReleaseMessage latestReleaseMsg =
releaseMessageServiceWithCache
.findLatestReleaseMessageForMessages(Sets.newHashSet(someMsgContent, antherMsgContent));
assertNotNull(latestReleaseMsg);
assertEquals(501, latestReleaseMsg.getId());
assertEquals(antherMsgContent, latestReleaseMsg.getMessage());
List<ReleaseMessage> latestReleaseMsgGroupByMsgContent =
releaseMessageServiceWithCache
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(someMsgContent, antherMsgContent));
assertEquals(2, latestReleaseMsgGroupByMsgContent.size());
assertEquals(500, latestReleaseMsgGroupByMsgContent.get(1).getId());
assertEquals(501, latestReleaseMsgGroupByMsgContent.get(0).getId());
}
@Test
public void testNewReleaseMessagesBeforeHandleMessage() throws Exception {
String someMessageContent = "someMessage";
long someMessageId = 1;
ReleaseMessage someMessage = assembleReleaseMsg(someMessageId, someMessageContent);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(Lists.newArrayList
(someMessage));
releaseMessageServiceWithCache.afterPropertiesSet();
ReleaseMessage latestReleaseMsg =
releaseMessageServiceWithCache
.findLatestReleaseMessageForMessages(Sets.newHashSet(someMessageContent));
List<ReleaseMessage> latestReleaseMsgGroupByMsgContent =
releaseMessageServiceWithCache
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(someMessageContent));
assertEquals(someMessageId, latestReleaseMsg.getId());
assertEquals(someMessageContent, latestReleaseMsg.getMessage());
assertEquals(latestReleaseMsg, latestReleaseMsgGroupByMsgContent.get(0));
long newMessageId = 2;
ReleaseMessage newMessage = assembleReleaseMsg(newMessageId, someMessageContent);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someMessageId)).thenReturn(Lists
.newArrayList(newMessage));
scanIntervalTimeUnit.sleep(scanInterval * 3);
ReleaseMessage newLatestReleaseMsg =
releaseMessageServiceWithCache
.findLatestReleaseMessageForMessages(Sets.newHashSet(someMessageContent));
List<ReleaseMessage> newLatestReleaseMsgGroupByMsgContent =
releaseMessageServiceWithCache
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(someMessageContent));
assertEquals(newMessageId, newLatestReleaseMsg.getId());
assertEquals(someMessageContent, newLatestReleaseMsg.getMessage());
assertEquals(newLatestReleaseMsg, newLatestReleaseMsgGroupByMsgContent.get(0));
}
@Test
public void testNewReleasesWithHandleMessage() throws Exception {
String someMessageContent = "someMessage";
long someMessageId = 1;
ReleaseMessage someMessage = assembleReleaseMsg(someMessageId, someMessageContent);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(Lists.newArrayList
(someMessage));
releaseMessageServiceWithCache.afterPropertiesSet();
ReleaseMessage latestReleaseMsg =
releaseMessageServiceWithCache
.findLatestReleaseMessageForMessages(Sets.newHashSet(someMessageContent));
List<ReleaseMessage> latestReleaseMsgGroupByMsgContent =
releaseMessageServiceWithCache
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(someMessageContent));
assertEquals(someMessageId, latestReleaseMsg.getId());
assertEquals(someMessageContent, latestReleaseMsg.getMessage());
assertEquals(latestReleaseMsg, latestReleaseMsgGroupByMsgContent.get(0));
long newMessageId = 2;
ReleaseMessage newMessage = assembleReleaseMsg(newMessageId, someMessageContent);
releaseMessageServiceWithCache.handleMessage(newMessage, Topics.APOLLO_RELEASE_TOPIC);
ReleaseMessage newLatestReleaseMsg =
releaseMessageServiceWithCache
.findLatestReleaseMessageForMessages(Sets.newHashSet(someMessageContent));
List<ReleaseMessage> newLatestReleaseMsgGroupByMsgContent =
releaseMessageServiceWithCache
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(someMessageContent));
assertEquals(newMessageId, newLatestReleaseMsg.getId());
assertEquals(someMessageContent, newLatestReleaseMsg.getMessage());
assertEquals(newLatestReleaseMsg, newLatestReleaseMsgGroupByMsgContent.get(0));
}
private ReleaseMessage assembleReleaseMsg(long id, String msgContent) {
ReleaseMessage msg = new ReleaseMessage(msgContent);
msg.setId(id);
return msg;
}
}
......@@ -5,7 +5,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.configservice.service.AppNamespaceServiceWithCache;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -29,7 +29,7 @@ import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class WatchKeysUtilTest {
@Mock
private AppNamespaceService appNamespaceService;
private AppNamespaceServiceWithCache appNamespaceService;
@Mock
private AppNamespace someAppNamespace;
@Mock
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment