Commit 34d511fb by 张乐 Committed by GitHub

Merge pull request #529 from nobodyiam/clean-release-message

Clean release message on demand and do async notification if there are too many clients to notify
parents cac111a4 ad94cc77
......@@ -105,4 +105,11 @@ public class BizConfig extends RefreshableConfig {
return TimeUnit.SECONDS;
}
public int releaseMessageNotificationBatch() {
return getIntProperty("apollo.release-message.notification.batch", 100);
}
public int releaseMessageNotificationBatchIntervalInMilli() {
return getIntProperty("apollo.release-message.notification.batch.interval", 100);
}
}
package com.ctrip.framework.apollo.biz.message;
import com.google.common.collect.Queues;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
......@@ -9,8 +12,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
/**
* @author Jason Song(song_s@ctrip.com)
......@@ -18,11 +30,21 @@ import java.util.Objects;
@Component
public class DatabaseMessageSender implements MessageSender {
private static final Logger logger = LoggerFactory.getLogger(DatabaseMessageSender.class);
private static final int CLEAN_QUEUE_MAX_SIZE = 100;
private BlockingQueue<Long> toClean = Queues.newLinkedBlockingQueue(CLEAN_QUEUE_MAX_SIZE);
private final ExecutorService cleanExecutorService;
private final AtomicBoolean cleanStopped;
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
public DatabaseMessageSender() {
cleanExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create("DatabaseMessageSender", true));
cleanStopped = new AtomicBoolean(false);
}
@Override
@Transactional
public void sendMessage(String message, String channel) {
logger.info("Sending message {} to channel {}", message, channel);
if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
......@@ -33,7 +55,8 @@ public class DatabaseMessageSender implements MessageSender {
Tracer.logEvent("Apollo.AdminService.ReleaseMessage", message);
Transaction transaction = Tracer.newTransaction("Apollo.AdminService", "sendMessage");
try {
releaseMessageRepository.save(new ReleaseMessage(message));
ReleaseMessage newMessage = releaseMessageRepository.save(new ReleaseMessage(message));
toClean.offer(newMessage.getId());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
logger.error("Sending message to database failed", ex);
......@@ -42,4 +65,45 @@ public class DatabaseMessageSender implements MessageSender {
transaction.complete();
}
}
@PostConstruct
private void initialize() {
cleanExecutorService.submit(() -> {
while (!cleanStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Long rm = toClean.poll(1, TimeUnit.SECONDS);
if (rm != null) {
cleanMessage(rm);
} else {
TimeUnit.SECONDS.sleep(5);
}
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
});
}
private void cleanMessage(Long id) {
boolean hasMore = true;
//double check in case the release message is rolled back
ReleaseMessage releaseMessage = releaseMessageRepository.findOne(id);
if (releaseMessage == null) {
return;
}
while (hasMore && !Thread.currentThread().isInterrupted()) {
List<ReleaseMessage> messages = releaseMessageRepository.findFirst100ByMessageAndIdLessThanOrderByIdAsc(
releaseMessage.getMessage(), releaseMessage.getId());
releaseMessageRepository.delete(messages);
hasMore = messages.size() == 100;
messages.forEach(toRemove -> Tracer.logEvent(
String.format("ReleaseMessage.Clean.%s", toRemove.getMessage()), String.valueOf(toRemove.getId())));
}
}
void stopClean() {
cleanStopped.set(true);
}
}
......@@ -19,6 +19,8 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);
List<ReleaseMessage> findFirst100ByMessageAndIdLessThanOrderByIdAsc(String message, Long id);
@Query("select message, max(id) as id from ReleaseMessage where message in :messages group by message")
List<Object[]> findLatestReleaseMessagesGroupByMessages(@Param("messages") Collection<String> messages);
}
......@@ -12,6 +12,7 @@ import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
......@@ -22,6 +23,7 @@ import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;
import org.slf4j.Logger;
......@@ -41,6 +43,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author Jason Song(song_s@ctrip.com)
......@@ -61,6 +66,8 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
new TypeToken<List<ApolloConfigNotification>>() {
}.getType();
private final ExecutorService largeNotificationBatchExecutorService;
@Autowired
private WatchKeysUtil watchKeysUtil;
......@@ -76,6 +83,14 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
@Autowired
private Gson gson;
@Autowired
private BizConfig bizConfig;
public NotificationControllerV2() {
largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create
("NotificationControllerV2", true));
}
@RequestMapping(method = RequestMethod.GET)
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
......@@ -220,6 +235,27 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
//create a new list to avoid ConcurrentModificationException
List<DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>> results =
Lists.newArrayList(deferredResults.get(content));
//do async notification if too many clients
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
results.get(i).setResult(notification);
}
});
return;
}
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result : results) {
......
......@@ -7,6 +7,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
......@@ -27,6 +28,7 @@ import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
......@@ -57,6 +59,9 @@ public class NotificationControllerV2Test {
private NamespaceUtil namespaceUtil;
@Mock
private WatchKeysUtil watchKeysUtil;
@Mock
private BizConfig bizConfig;
private Gson gson;
private Multimap<String, DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>>
......@@ -66,11 +71,16 @@ public class NotificationControllerV2Test {
public void setUp() throws Exception {
controller = new NotificationControllerV2();
gson = new Gson();
when(bizConfig.releaseMessageNotificationBatch()).thenReturn(100);
when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(5);
ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService);
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil);
ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil);
ReflectionTestUtils.setField(controller, "gson", gson);
ReflectionTestUtils.setField(controller, "bizConfig", bizConfig);
someAppId = "someAppId";
someCluster = "someCluster";
......@@ -283,6 +293,48 @@ public class NotificationControllerV2Test {
assertEquals(someId, notification.getNotificationId());
}
@Test
public void testPollNotificationWithHandleMessageInBatch() throws Exception {
String someWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace);
int someBatch = 1;
int someBatchInterval = 10;
Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey));
String notificationAsString =
transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, Sets.newHashSet(defaultNamespace),
someDataCenter)).thenReturn(watchKeysMap);
when(bizConfig.releaseMessageNotificationBatch()).thenReturn(someBatch);
when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(someBatchInterval);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
anotherDeferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(someWatchKey);
someReleaseMessage.setId(someId);
controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);
assertTrue(!anotherDeferredResult.hasResult());
TimeUnit.MILLISECONDS.sleep(someBatchInterval * 3);
assertTrue(anotherDeferredResult.hasResult());
}
private String transformApolloConfigNotificationsToString(
String namespace, long notificationId) {
List<ApolloConfigNotification> notifications =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment