Commit 44e28a57 by Jason Song

add rate limiter to protect server

parent a80d1174
......@@ -97,7 +97,7 @@ public class ConfigServiceLocator implements Initializable {
tryUpdateConfigServices();
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
m_configUtil.getRefreshIntervalTimeUnit());
}
private synchronized void updateConfigServices() {
......
......@@ -6,6 +6,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.RateLimiter;
import com.ctrip.framework.apollo.Apollo;
import com.ctrip.framework.apollo.core.ConfigConsts;
......@@ -60,9 +61,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final ExecutorService m_longPollingService;
private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
private SchedulePolicy m_longPollSuccessSchedulePolicyInMS;
private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult;
private RateLimiter m_longPollRateLimiter;
private RateLimiter m_loadConfigRateLimiter;
static {
m_executorService = Executors.newScheduledThreadPool(1,
......@@ -87,12 +89,13 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
throw new ApolloConfigException("Unable to load component!", ex);
}
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollSuccessSchedulePolicyInMS = new ExponentialSchedulePolicy(100, 1000); //in millisecond
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>();
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
......@@ -113,7 +116,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshTimeUnit());
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
this.m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
......@@ -124,7 +127,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
Cat.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit());
m_configUtil.getRefreshIntervalTimeUnit());
}
@Override
......@@ -158,6 +161,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
}
private ApolloConfig loadApolloConfig() {
m_loadConfigRateLimiter.acquire();
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
......@@ -281,8 +285,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
m_longPollRateLimiter.acquire();
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
long sleepTime = 50; //default 50 ms
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
......@@ -316,12 +320,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
trySync();
}
});
m_longPollSuccessSchedulePolicyInMS.success();
}
if (response.getStatusCode() == 304) {
sleepTime = m_longPollSuccessSchedulePolicyInMS.fail();
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
......@@ -333,14 +333,13 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}",
sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
sleepTime = sleepTimeInSecond * 1000;
} finally {
transaction.complete();
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
......
......@@ -27,12 +27,15 @@ public class ConfigUtil {
private int connectTimeout = 5000; //5 seconds
private int readTimeout = 10000; //10 seconds
private String cluster;
private int loadConfigQPS = 2; //2 times per second
private int longPollQPS = 2; //2 times per second
public ConfigUtil() {
initRefreshInterval();
initConnectTimeout();
initReadTimeout();
initCluster();
initQPS();
}
/**
......@@ -145,7 +148,35 @@ public class ConfigUtil {
return refreshInterval;
}
public TimeUnit getRefreshTimeUnit() {
public TimeUnit getRefreshIntervalTimeUnit() {
return refreshIntervalTimeUnit;
}
private void initQPS() {
String customizedLoadConfigQPS = System.getProperty("apollo.loadConfigQPS");
if (!Strings.isNullOrEmpty(customizedLoadConfigQPS)) {
try {
loadConfigQPS = Integer.parseInt(customizedLoadConfigQPS);
} catch (Throwable ex) {
logger.error("Config for apollo.loadConfigQPS is invalid: {}", customizedLoadConfigQPS);
}
}
String customizedLongPollQPS = System.getProperty("apollo.longPollQPS");
if (!Strings.isNullOrEmpty(customizedLongPollQPS)) {
try {
longPollQPS = Integer.parseInt(customizedLongPollQPS);
} catch (Throwable ex) {
logger.error("Config for apollo.longPollQPS is invalid: {}", customizedLongPollQPS);
}
}
}
public int getLoadConfigQPS() {
return loadConfigQPS;
}
public int getLongPollQPS() {
return longPollQPS;
}
}
......@@ -153,7 +153,7 @@ public abstract class BaseIntegrationTest extends ComponentTestCase {
}
@Override
public TimeUnit getRefreshTimeUnit() {
public TimeUnit getRefreshIntervalTimeUnit() {
return refreshTimeUnit;
}
......@@ -166,6 +166,16 @@ public abstract class BaseIntegrationTest extends ComponentTestCase {
public String getDataCenter() {
return someDataCenter;
}
@Override
public int getLoadConfigQPS() {
return 200;
}
@Override
public int getLongPollQPS() {
return 200;
}
}
/**
......
......@@ -192,6 +192,16 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
public String getDataCenter() {
return null;
}
@Override
public int getLoadConfigQPS() {
return 200;
}
@Override
public int getLongPollQPS() {
return 200;
}
}
public static class MockConfigServiceLocator extends ConfigServiceLocator {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment