Commit a5acb67f by Yiming Liu

Merge pull request #117 from nobodyiam/config-client-update-push

Add client side long polling support and server side mock impl
parents 93d1ce67 2781e18c
......@@ -30,7 +30,7 @@ public class ConfigServiceLocator {
private Type m_responseType;
/**
* Create a config service locator
* Create a config service locator.
*/
public ConfigServiceLocator() {
List<ServiceDTO> initial = Lists.newArrayList();
......@@ -88,8 +88,8 @@ public class ConfigServiceLocator {
throw new RuntimeException("Get config services failed", exception);
}
private void logConfigServicesToCat(List<ServiceDTO> serviceDTOs) {
for (ServiceDTO serviceDTO : serviceDTOs) {
private void logConfigServicesToCat(List<ServiceDTO> serviceDtos) {
for (ServiceDTO serviceDTO : serviceDtos) {
Cat.logEvent("Apollo.Config.Services", serviceDTO.getHomepageUrl());
}
}
......
......@@ -95,7 +95,14 @@ public class DefaultConfig extends AbstractConfig implements RepositoryChangeLis
Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties);
//check double checked result
if (actualChanges.isEmpty()) {
return;
}
this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));
Cat.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}
private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties) {
......
......@@ -120,7 +120,9 @@ public class LocalFileConfigRepository extends AbstractConfigRepository
updateFileProperties(properties);
} catch (Throwable ex) {
Cat.logError(ex);
logger.warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(), ex);
logger
.warn("Sync config from fallback repository {} failed, reason: {}", m_fallback.getClass(),
ex);
}
}
......
package com.ctrip.apollo.internals;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers;
import com.ctrip.apollo.core.dto.ApolloConfig;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.dto.ServiceDTO;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.apollo.util.ConfigUtil;
......@@ -22,10 +27,14 @@ import org.unidal.lookup.ContainerLoader;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
......@@ -40,6 +49,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private volatile AtomicReference<ApolloConfig> m_configCache;
private final String m_namespace;
private final ScheduledExecutorService m_executorService;
private final AtomicBoolean m_longPollingStopped;
/**
* Constructor.
......@@ -58,10 +68,12 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
Cat.logError(ex);
throw new IllegalStateException("Unable to load component!", ex);
}
this.m_longPollingStopped = new AtomicBoolean(false);
this.m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("RemoteConfigRepository", true));
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}
@Override
......@@ -84,7 +96,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
new Runnable() {
@Override
public void run() {
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "periodicRefresh");
trySync();
transaction.setStatus(Message.SUCCESS);
transaction.complete();
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
......@@ -113,11 +128,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
return result;
}
private ApolloConfig loadApolloConfig() {
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
Cat.logEvent("Apollo.Client.Config", String.format("%s-%s-%s", appId, cluster, m_namespace));
Cat.logEvent("Apollo.Client.ConfigInfo",
String.format("%s-%s-%s", appId, cluster, m_namespace));
int maxRetries = 2;
Throwable exception = null;
......@@ -128,7 +143,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
for (ServiceDTO configService : randomConfigServices) {
String url =
assembleUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
m_configCache.get());
logger.debug("Loading config from {}", url);
......@@ -172,18 +187,19 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
throw new RuntimeException(message, exception);
}
private String assembleUrl(String uri, String appId, String cluster, String namespace,
ApolloConfig previousConfig) {
private String assembleQueryConfigUrl(String uri, String appId, String cluster, String namespace,
ApolloConfig previousConfig) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
String path = "configs/%s/%s";
List<String> params = Lists.newArrayList(appId, cluster);
List<String> params = Lists.newArrayList(escaper.escape(appId), escaper.escape(cluster));
if (!Strings.isNullOrEmpty(namespace)) {
path = path + "/%s";
params.add(namespace);
params.add(escaper.escape(namespace));
}
if (previousConfig != null) {
path = path + "?releaseId=%s";
params.add(String.valueOf(previousConfig.getReleaseId()));
params.add(escaper.escape(String.valueOf(previousConfig.getReleaseId())));
}
String pathExpanded = String.format(path, params.toArray());
......@@ -193,6 +209,106 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
return uri + pathExpanded;
}
private void scheduleLongPollingRefresh() {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final ExecutorService longPollingService =
Executors.newFixedThreadPool(2,
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
longPollingService.submit(new Runnable() {
@Override
public void run() {
doLongPollingRefresh(appId, cluster, longPollingService);
}
});
}
private void doLongPollingRefresh(String appId, String cluster,
ExecutorService longPollingService) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
Transaction transaction = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
String url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster,
m_namespace, m_configCache.get());
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//no timeout for read
request.setReadTimeout(0);
transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
transaction.addData("Url", url);
HttpResponse<ApolloConfigNotification> response =
m_httpUtil.doGet(request, ApolloConfigNotification.class);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200) {
longPollingService.submit(new Runnable() {
@Override
public void run() {
trySync();
}
});
}
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}",
appId, cluster, m_namespace, ex);
lastServiceDto = null;
Cat.logError(ex);
if (transaction != null) {
transaction.setStatus(ex);
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ie) {
//ignore
}
} finally {
if (transaction != null) {
transaction.complete();
}
}
}
}
private String assembleLongPollRefreshUrl(String uri, String appId, String cluster,
String namespace,
ApolloConfig previousConfig) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", escaper.escape(appId));
queryParams.put("cluster", escaper.escape(cluster));
if (!Strings.isNullOrEmpty(namespace)) {
queryParams.put("namespace", escaper.escape(namespace));
}
if (previousConfig != null) {
queryParams.put("releaseId", escaper.escape(previousConfig.getReleaseId()));
}
String params = Joiner.on("&").withKeyValueSeparator("=").join(queryParams);
if (!uri.endsWith("/")) {
uri += "/";
}
return uri + "notifications?" + params;
}
void stopLongPollingRefresh() {
this.m_longPollingStopped.compareAndSet(false, true);
}
private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) {
......
......@@ -74,5 +74,7 @@ public class SimpleConfig extends AbstractConfig implements RepositoryChangeList
m_configProperties = newConfigProperties;
this.fireConfigChange(new ConfigChangeEvent(m_namespace, changeMap));
Cat.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}
}
......@@ -40,7 +40,8 @@ public class ConfigChangeEvent {
}
/**
* Get the changes. Please note that the returned Map is immutable.
* Get the changes as <Key, Change> map.
* Please note that the returned Map is immutable.
* @return changes
*/
public Map<String, ConfigChange> getChanges() {
......
......@@ -27,20 +27,23 @@ public class HttpUtil {
private ConfigUtil m_configUtil;
private Gson gson;
private String basicAuth;
/**
* Constructor.
*/
public HttpUtil() {
gson = new Gson();
try {
basicAuth = "Basic " + BaseEncoding.base64().encode("user:".getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
basicAuth = "Basic " + BaseEncoding.base64().encode("user:".getBytes("UTF-8"));
} catch (UnsupportedEncodingException ex) {
ex.printStackTrace();
}
}
/**
* Do get operation for the http request.
*
* @param httpRequest the request
* @param httpRequest the request
* @param responseType the response type
* @return the response
* @throws RuntimeException if any error happened or response code is neither 200 nor 304
......@@ -59,7 +62,7 @@ public class HttpUtil {
/**
* Do get operation for the http request.
*
* @param httpRequest the request
* @param httpRequest the request
* @param responseType the response type
* @return the response
* @throws RuntimeException if any error happened or response code is neither 200 nor 304
......@@ -76,14 +79,14 @@ public class HttpUtil {
}
private <T> HttpResponse<T> doGetWithSerializeFunction(HttpRequest httpRequest,
Function<String, T> serializeFunction) {
Function<String, T> serializeFunction) {
InputStream is = null;
try {
HttpURLConnection conn = (HttpURLConnection) new URL(httpRequest.getUrl()).openConnection();
conn.setRequestMethod("GET");
conn.setRequestProperty ("Authorization", basicAuth);
conn.setRequestProperty("Authorization", basicAuth);
int connectTimeout = httpRequest.getConnectTimeout();
if (connectTimeout < 0) {
connectTimeout = m_configUtil.getConnectTimeout();
......
......@@ -9,6 +9,7 @@ import com.ctrip.apollo.ConfigChangeListener;
import com.ctrip.apollo.ConfigService;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ApolloConfig;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.utils.ClassLoaderUtil;
import com.ctrip.apollo.model.ConfigChangeEvent;
......@@ -196,6 +197,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
config.addChangeListener(new ConfigChangeListener() {
AtomicInteger counter = new AtomicInteger(0);
@Override
public void onChange(ConfigChangeEvent changeEvent) {
//only need to assert once
......@@ -220,6 +222,66 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
assertEquals(anotherValue, config.getProperty(someKey, null));
}
@Test
public void testLongPollRefresh() throws Exception {
final String someKey = "someKey";
final String someValue = "someValue";
final String anotherValue = "anotherValue";
Map<String, String> configurations = Maps.newHashMap();
configurations.put(someKey, someValue);
ApolloConfig apolloConfig = assembleApolloConfig(configurations);
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(50, HttpServletResponse.SC_OK,
new ApolloConfigNotification(apolloConfig.getAppId(), apolloConfig.getCluster(),
apolloConfig.getNamespace()), false);
startServerWithHandlers(configHandler, pollHandler);
Config config = ConfigService.getConfig();
assertEquals(someValue, config.getProperty(someKey, null));
apolloConfig.getConfigurations().put(someKey, anotherValue);
TimeUnit.MILLISECONDS.sleep(60);
assertEquals(anotherValue, config.getProperty(someKey, null));
}
private ContextHandler mockPollNotificationHandler(final long pollResultTimeOutInMS,
final int statusCode,
final ApolloConfigNotification result,
final boolean failedAtFirstTime) {
ContextHandler context = new ContextHandler("/notifications");
context.setHandler(new AbstractHandler() {
AtomicInteger counter = new AtomicInteger(0);
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
if (failedAtFirstTime && counter.incrementAndGet() == 1) {
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
baseRequest.setHandled(true);
return;
}
try {
TimeUnit.MILLISECONDS.sleep(pollResultTimeOutInMS);
} catch (InterruptedException e) {
}
response.setContentType("application/json;charset=UTF-8");
response.setStatus(statusCode);
response.getWriter().println(gson.toJson(result));
baseRequest.setHandled(true);
}
});
return context;
}
private ContextHandler mockConfigServerHandler(final int statusCode, final ApolloConfig result,
final boolean failedAtFirstTime) {
ContextHandler context = new ContextHandler("/configs/*");
......@@ -237,15 +299,11 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
response.setContentType("application/json;charset=UTF-8");
response.setStatus(statusCode);
response.getWriter().println(gson.toJson(result));
baseRequest.setHandled(true);
}
});
return context;
}
......
......@@ -5,6 +5,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.ctrip.apollo.core.dto.ApolloConfig;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.dto.ServiceDTO;
import com.ctrip.apollo.util.ConfigUtil;
import com.ctrip.apollo.util.http.HttpRequest;
......@@ -22,6 +23,9 @@ import org.unidal.lookup.ComponentTestCase;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.eq;
......@@ -41,6 +45,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
@Mock
private static HttpResponse<ApolloConfig> someResponse;
@Mock
private static HttpResponse<ApolloConfigNotification> pollResponse;
@Mock
private ConfigUtil someConfigUtil;
@Before
......@@ -48,6 +54,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
super.setUp();
someNamespace = "someName";
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_NOT_MODIFIED);
defineComponent(ConfigUtil.class, MockConfigUtil.class);
defineComponent(ConfigServiceLocator.class, MockConfigServiceLocator.class);
defineComponent(HttpUtil.class, MockHttpUtil.class);
......@@ -65,9 +73,11 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
when(someResponse.getBody()).thenReturn(someApolloConfig);
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
Properties config = remoteConfigRepository.getConfig();
assertEquals(configurations, config);
remoteConfigRepository.stopLongPollingRefresh();
}
@Test(expected = RuntimeException.class)
......@@ -76,6 +86,10 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
when(someResponse.getStatusCode()).thenReturn(500);
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
//must stop the long polling before exception occurred
remoteConfigRepository.stopLongPollingRefresh();
remoteConfigRepository.getConfig();
}
......@@ -102,6 +116,35 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture());
assertEquals(newConfigurations, captor.getValue());
remoteConfigRepository.stopLongPollingRefresh();
}
@Test
public void testLongPollingRefresh() throws Exception {
Map<String, String> configurations = ImmutableMap.of("someKey", "someValue");
ApolloConfig someApolloConfig = assembleApolloConfig(configurations);
when(someResponse.getStatusCode()).thenReturn(200);
when(someResponse.getBody()).thenReturn(someApolloConfig);
RepositoryChangeListener someListener = mock(RepositoryChangeListener.class);
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
remoteConfigRepository.addChangeListener(someListener);
final ArgumentCaptor<Properties> captor = ArgumentCaptor.forClass(Properties.class);
Map<String, String> newConfigurations = ImmutableMap.of("someKey", "anotherValue");
ApolloConfig newApolloConfig = assembleApolloConfig(newConfigurations);
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(someResponse.getBody()).thenReturn(newApolloConfig);
TimeUnit.MILLISECONDS.sleep(60);
remoteConfigRepository.stopLongPollingRefresh();
verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture());
assertEquals(newConfigurations, captor.getValue());
}
private ApolloConfig assembleApolloConfig(Map<String, String> configurations) {
......@@ -109,7 +152,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
String someClusterName = "cluster";
String someReleaseId = "1";
ApolloConfig apolloConfig =
new ApolloConfig(someAppId, someClusterName, someNamespace, someReleaseId);
new ApolloConfig(someAppId, someClusterName, someNamespace, someReleaseId);
apolloConfig.setConfigurations(configurations);
......@@ -143,6 +186,13 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
public static class MockHttpUtil extends HttpUtil {
@Override
public <T> HttpResponse<T> doGet(HttpRequest httpRequest, Class<T> responseType) {
if (httpRequest.getUrl().contains("notifications?")) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
return (HttpResponse<T>) pollResponse;
}
return (HttpResponse<T>) someResponse;
}
}
......
......@@ -40,7 +40,7 @@ public class ConfigController {
@RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId,
HttpServletResponse response) throws IOException {
Release release = configService.findRelease(appId, clusterName, namespace);
//TODO if namespace != application, should also query config by namespace and DC?
//TODO if namespace != application, should also query config by namespace and DC(default if DC not found)?
//And if found, should merge config, as well as releaseId -> make releaseId a string?
if (release == null) {
response.sendError(HttpServletResponse.SC_NOT_FOUND,
......
package com.ctrip.apollo.configservice.controller;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletResponse;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RestController
@RequestMapping("/notifications")
public class NotificationController {
private static final Logger logger = LoggerFactory.getLogger(NotificationController.class);
private final static long TIMEOUT = 60 * 60 * 1000;//60 MINUTES
private final Multimap<String, DeferredResult<ApolloConfigNotification>> deferredResults =
Multimaps.synchronizedSetMultimap(HashMultimap.create());
{
startRandomChange();
}
@RequestMapping(method = RequestMethod.GET)
public DeferredResult<ApolloConfigNotification> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_APPLICATION) String namespace,
@RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId,
HttpServletResponse response) {
DeferredResult<ApolloConfigNotification> deferredResult =
new DeferredResult<>(TIMEOUT);
String key = assembleKey(appId, cluster, namespace);
this.deferredResults.put(key, deferredResult);
deferredResult.onCompletion(() -> {
logger.info("deferred result for {} {} {} completed", appId, cluster, namespace);
deferredResults.remove(key, deferredResult);
});
deferredResult.onTimeout(() -> {
logger.info("deferred result for {} {} {} timeout", appId, cluster, namespace);
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
});
logger.info("deferred result for {} {} {} returned", appId, cluster, namespace);
return deferredResult;
}
private void startRandomChange() {
Random random = new Random();
ScheduledExecutorService testService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("NotificationController", true));
testService.scheduleAtFixedRate((Runnable) () -> deferredResults
.entries().stream().filter(entry -> random.nextBoolean()).forEach(entry -> {
String[] keys = entry.getKey().split("-");
entry.getValue().setResult(new ApolloConfigNotification(keys[0], keys[1], keys[2]));
}), 30, 30, TimeUnit.SECONDS);
}
private String assembleKey(String appId, String cluster, String namespace) {
return String.format("%s-%s-%s", appId, cluster, namespace);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment