Commit 463dacc9 by Jason Song Committed by GitHub

Merge pull request #265 from nobodyiam/client-rate-limiter

add rate limiter to protect server
parents a80d1174 44e28a57
...@@ -97,7 +97,7 @@ public class ConfigServiceLocator implements Initializable { ...@@ -97,7 +97,7 @@ public class ConfigServiceLocator implements Initializable {
tryUpdateConfigServices(); tryUpdateConfigServices();
} }
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit()); m_configUtil.getRefreshIntervalTimeUnit());
} }
private synchronized void updateConfigServices() { private synchronized void updateConfigServices() {
......
...@@ -6,6 +6,7 @@ import com.google.common.collect.Lists; ...@@ -6,6 +6,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.escape.Escaper; import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers; import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.RateLimiter;
import com.ctrip.framework.apollo.Apollo; import com.ctrip.framework.apollo.Apollo;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
...@@ -60,9 +61,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -60,9 +61,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final ExecutorService m_longPollingService; private final ExecutorService m_longPollingService;
private final AtomicBoolean m_longPollingStopped; private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollFailSchedulePolicyInSecond; private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
private SchedulePolicy m_longPollSuccessSchedulePolicyInMS;
private AtomicReference<ServiceDTO> m_longPollServiceDto; private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult; private AtomicReference<ApolloConfigNotification> m_longPollResult;
private RateLimiter m_longPollRateLimiter;
private RateLimiter m_loadConfigRateLimiter;
static { static {
m_executorService = Executors.newScheduledThreadPool(1, m_executorService = Executors.newScheduledThreadPool(1,
...@@ -87,12 +89,13 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -87,12 +89,13 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
throw new ApolloConfigException("Unable to load component!", ex); throw new ApolloConfigException("Unable to load component!", ex);
} }
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollSuccessSchedulePolicyInMS = new ExponentialSchedulePolicy(100, 1000); //in millisecond
m_longPollingStopped = new AtomicBoolean(false); m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor( m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true)); ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>(); m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>(); m_longPollResult = new AtomicReference<>();
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
this.trySync(); this.trySync();
this.schedulePeriodicRefresh(); this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh(); this.scheduleLongPollingRefresh();
...@@ -113,7 +116,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -113,7 +116,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private void schedulePeriodicRefresh() { private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}", logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshTimeUnit()); m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
this.m_executorService.scheduleAtFixedRate( this.m_executorService.scheduleAtFixedRate(
new Runnable() { new Runnable() {
@Override @Override
...@@ -124,7 +127,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -124,7 +127,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
Cat.logEvent("Apollo.Client.Version", Apollo.VERSION); Cat.logEvent("Apollo.Client.Version", Apollo.VERSION);
} }
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(), }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshTimeUnit()); m_configUtil.getRefreshIntervalTimeUnit());
} }
@Override @Override
...@@ -158,6 +161,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -158,6 +161,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
} }
private ApolloConfig loadApolloConfig() { private ApolloConfig loadApolloConfig() {
m_loadConfigRateLimiter.acquire();
String appId = m_configUtil.getAppId(); String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster(); String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter(); String dataCenter = m_configUtil.getDataCenter();
...@@ -281,8 +285,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -281,8 +285,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
final Random random = new Random(); final Random random = new Random();
ServiceDTO lastServiceDto = null; ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) { while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
m_longPollRateLimiter.acquire();
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification"); Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
long sleepTime = 50; //default 50 ms
try { try {
if (lastServiceDto == null) { if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices(); List<ServiceDTO> configServices = getConfigServices();
...@@ -316,12 +320,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -316,12 +320,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
trySync(); trySync();
} }
}); });
m_longPollSuccessSchedulePolicyInMS.success();
} }
if (response.getStatusCode() == 304) {
sleepTime = m_longPollSuccessSchedulePolicyInMS.fail();
}
m_longPollFailSchedulePolicyInSecond.success(); m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode()); transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS); transaction.setStatus(Message.SUCCESS);
...@@ -333,14 +333,13 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -333,14 +333,13 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
logger.warn( logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}", "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}",
sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex)); sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
sleepTime = sleepTimeInSecond * 1000;
} finally {
transaction.complete();
try { try {
TimeUnit.MILLISECONDS.sleep(sleepTime); TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
//ignore //ignore
} }
} finally {
transaction.complete();
} }
} }
} }
......
...@@ -27,12 +27,15 @@ public class ConfigUtil { ...@@ -27,12 +27,15 @@ public class ConfigUtil {
private int connectTimeout = 5000; //5 seconds private int connectTimeout = 5000; //5 seconds
private int readTimeout = 10000; //10 seconds private int readTimeout = 10000; //10 seconds
private String cluster; private String cluster;
private int loadConfigQPS = 2; //2 times per second
private int longPollQPS = 2; //2 times per second
public ConfigUtil() { public ConfigUtil() {
initRefreshInterval(); initRefreshInterval();
initConnectTimeout(); initConnectTimeout();
initReadTimeout(); initReadTimeout();
initCluster(); initCluster();
initQPS();
} }
/** /**
...@@ -145,7 +148,35 @@ public class ConfigUtil { ...@@ -145,7 +148,35 @@ public class ConfigUtil {
return refreshInterval; return refreshInterval;
} }
public TimeUnit getRefreshTimeUnit() { public TimeUnit getRefreshIntervalTimeUnit() {
return refreshIntervalTimeUnit; return refreshIntervalTimeUnit;
} }
private void initQPS() {
String customizedLoadConfigQPS = System.getProperty("apollo.loadConfigQPS");
if (!Strings.isNullOrEmpty(customizedLoadConfigQPS)) {
try {
loadConfigQPS = Integer.parseInt(customizedLoadConfigQPS);
} catch (Throwable ex) {
logger.error("Config for apollo.loadConfigQPS is invalid: {}", customizedLoadConfigQPS);
}
}
String customizedLongPollQPS = System.getProperty("apollo.longPollQPS");
if (!Strings.isNullOrEmpty(customizedLongPollQPS)) {
try {
longPollQPS = Integer.parseInt(customizedLongPollQPS);
} catch (Throwable ex) {
logger.error("Config for apollo.longPollQPS is invalid: {}", customizedLongPollQPS);
}
}
}
public int getLoadConfigQPS() {
return loadConfigQPS;
}
public int getLongPollQPS() {
return longPollQPS;
}
} }
...@@ -153,7 +153,7 @@ public abstract class BaseIntegrationTest extends ComponentTestCase { ...@@ -153,7 +153,7 @@ public abstract class BaseIntegrationTest extends ComponentTestCase {
} }
@Override @Override
public TimeUnit getRefreshTimeUnit() { public TimeUnit getRefreshIntervalTimeUnit() {
return refreshTimeUnit; return refreshTimeUnit;
} }
...@@ -166,6 +166,16 @@ public abstract class BaseIntegrationTest extends ComponentTestCase { ...@@ -166,6 +166,16 @@ public abstract class BaseIntegrationTest extends ComponentTestCase {
public String getDataCenter() { public String getDataCenter() {
return someDataCenter; return someDataCenter;
} }
@Override
public int getLoadConfigQPS() {
return 200;
}
@Override
public int getLongPollQPS() {
return 200;
}
} }
/** /**
......
...@@ -192,6 +192,16 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -192,6 +192,16 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
public String getDataCenter() { public String getDataCenter() {
return null; return null;
} }
@Override
public int getLoadConfigQPS() {
return 200;
}
@Override
public int getLongPollQPS() {
return 200;
}
} }
public static class MockConfigServiceLocator extends ConfigServiceLocator { public static class MockConfigServiceLocator extends ConfigServiceLocator {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment