Commit de40f824 by nobodyiam

async refreshing multiple meta server addresses and do some refactoring

parent 83b04df5
......@@ -36,6 +36,11 @@
<!-- end of log -->
<!-- test -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
......
package com.ctrip.framework.apollo.core;
import com.ctrip.framework.apollo.core.enums.Env;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.core.utils.NetUtil;
import com.ctrip.framework.apollo.core.utils.ResourceUtils;
import com.ctrip.framework.apollo.tracer.Tracer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The meta domain will load the meta server from System environment first, if not exist, will load
* from apollo-env.properties. If neither exists, will load the default meta url.
* The meta domain will load the meta server from System environment first, if not exist, will load from
* apollo-env.properties. If neither exists, will load the default meta url.
*
* Currently, apollo supports local/dev/fat/uat/lpt/pro environments.
*/
public class MetaDomainConsts {
private static Map<Env, Object> domains = new HashMap<>();
public static final String DEFAULT_META_URL = "http://config.local";
private static final long REFRESH_INTERVAL_IN_SECOND = 60;//1 min
private static final Logger logger = LoggerFactory.getLogger(MetaDomainConsts.class);
private static Map<Env, String> domains = new HashMap<>();
private static Map<String, String> metaServerAddressCache = Maps.newConcurrentMap();
private static AtomicBoolean periodicRefreshStarted = new AtomicBoolean(false);
static {
initialize();
}
static void initialize() {
Properties prop = new Properties();
prop = ResourceUtils.readConfigFile("apollo-env.properties", prop);
Properties env = System.getProperties();
......@@ -39,8 +60,97 @@ public class MetaDomainConsts {
}
public static String getDomain(Env env) {
String metaAddress = String.valueOf(domains.get(env));
String validAddress = NetUtil.getValidAddress( metaAddress );
return validAddress;
String metaAddress = domains.get(env);
//if there is more than one address, need to select one
if (metaAddress != null && metaAddress.contains(",")) {
return selectMetaServerAddress(metaAddress);
}
return metaAddress;
}
/**
* Select one available meta server from the comma separated meta server addresses, e.g.
* http://1.2.3.4:8080,http://2.3.4.5:8080
*
* <br />
*
* In production environment, we still suggest using one single domain
* like http://config.xxx.com(backed by software load balancers like nginx) instead of multiple ip addresses
*/
private static String selectMetaServerAddress(String metaServerAddresses) {
String metaAddressSelected = metaServerAddressCache.get(metaServerAddresses);
if (metaAddressSelected == null) {
//initialize
if (periodicRefreshStarted.compareAndSet(false, true)) {
schedulePeriodicRefresh();
}
updateMetaServerAddresses(metaServerAddresses);
metaAddressSelected = metaServerAddressCache.get(metaServerAddresses);
}
return metaAddressSelected;
}
private static void updateMetaServerAddresses(String metaServerAddresses) {
logger.debug("Selecting meta server address for: {}", metaServerAddresses);
Transaction transaction = Tracer.newTransaction("Apollo.MetaService", "refreshMetaServerAddress");
transaction.addData("Url", metaServerAddresses);
try {
List<String> metaServers = Lists.newArrayList(metaServerAddresses.split(","));
//random load balancing
Collections.shuffle(metaServers);
boolean serverAvailable = false;
for (String address : metaServers) {
if (NetUtil.pingUrl(address)) {
//select the first available meta server
metaServerAddressCache.put(metaServerAddresses, address);
serverAvailable = true;
logger.debug("Selected meta server address {} for {}", address, metaServerAddresses);
break;
}
}
//we need to make sure the map is not empty, e.g. the first update might be failed
if (!metaServerAddressCache.containsKey(metaServerAddresses)) {
metaServerAddressCache.put(metaServerAddresses, metaServers.get(0));
}
if (!serverAvailable) {
logger.warn("Could not find available meta server for configured meta server addresses: {}, fallback to: {}", metaServerAddresses,
metaServerAddressCache.get(metaServerAddresses));
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
private static void schedulePeriodicRefresh() {
ScheduledExecutorService scheduledExecutorService = Executors
.newScheduledThreadPool(1, ApolloThreadFactory.create("MetaServiceLocator", true));
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
for (String metaServerAddresses : metaServerAddressCache.keySet()) {
updateMetaServerAddresses(metaServerAddresses);
}
} catch (Throwable ex) {
logger.warn(
String.format("Refreshing meta server address failed, will retry in %d seconds",
REFRESH_INTERVAL_IN_SECOND), ex
);
}
}
}, REFRESH_INTERVAL_IN_SECOND, REFRESH_INTERVAL_IN_SECOND, TimeUnit.SECONDS);
}
}
package com.ctrip.framework.apollo.core.utils;
import com.google.common.io.CharStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
/**
* Created by gl49 on 2018/6/8.
*/
public class NetUtil {
public static int getUrlStatus(String address){
int statusCode = 0;
private static final int DEFAULT_TIMEOUT_IN_SECONDS = 5000;
/**
* ping the url, return true if ping ok, false otherwise
*/
public static boolean pingUrl(String address) {
try {
URL urlObj = new URL(address);
HttpURLConnection oc = (HttpURLConnection) urlObj.openConnection();
oc.setUseCaches(false);
oc.setConnectTimeout(5000);
statusCode = oc.getResponseCode();
if (200 == statusCode) {
return statusCode;
}
} catch (Exception ignore) {
HttpURLConnection connection = (HttpURLConnection) urlObj.openConnection();
connection.setRequestMethod("GET");
connection.setUseCaches(false);
connection.setConnectTimeout(DEFAULT_TIMEOUT_IN_SECONDS);
connection.setReadTimeout(DEFAULT_TIMEOUT_IN_SECONDS);
int statusCode = connection.getResponseCode();
cleanUpConnection(connection);
return (200 <= statusCode && statusCode <= 399);
} catch (Throwable ignore) {
}
return statusCode;
return false;
}
public static boolean checkUrl(String address){
int status = getUrlStatus(address);
if( 0 == status ){ //异常状态重试一次
status = getUrlStatus(address);
/**
* according to https://docs.oracle.com/javase/7/docs/technotes/guides/net/http-keepalive.html, we should clean up the
* connection by reading the response body so that the connection could be reused.
*/
private static void cleanUpConnection(HttpURLConnection conn) {
InputStreamReader isr = null;
InputStreamReader esr = null;
try {
isr = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
CharStreams.toString(isr);
} catch (IOException e) {
InputStream errorStream = conn.getErrorStream();
if (errorStream != null) {
esr = new InputStreamReader(errorStream, StandardCharsets.UTF_8);
try {
CharStreams.toString(esr);
} catch (IOException ioe) {
//ignore
}
if( 200 == status ){
return true;
}
return false;
} finally {
if (isr != null) {
try {
isr.close();
} catch (IOException ex) {
// ignore
}
public static String getValidAddress(String metaAddress) {
String validAddress = null;
String[] addressArr = changeAddressArr(metaAddress);
for(String address : addressArr){
if(NetUtil.checkUrl(address)){
validAddress = address;
break;
}
if (esr != null) {
try {
esr.close();
} catch (IOException ex) {
// ignore
}
if(null == validAddress){
throw new RuntimeException("invalid meta address, please check $env_meta config!");
}
return validAddress;
}
private static String[] changeAddressArr(String address) {
String[] addressArr = address.split(",");
return addressArr;
}
}
package com.ctrip.framework.apollo;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.ctrip.framework.apollo.tracer.internals.MockMessageProducerManager;
import com.ctrip.framework.apollo.tracer.spi.MessageProducer;
import com.ctrip.framework.apollo.tracer.spi.Transaction;
import java.io.IOException;
import java.net.ServerSocket;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.junit.After;
import org.junit.Before;
public abstract class BaseIntegrationTest {
protected static final int PORT = findFreePort();
private Server server;
/**
* init and start a jetty server, remember to call server.stop when the task is finished
*/
protected Server startServerWithHandlers(ContextHandler... handlers) throws Exception {
server = new Server(PORT);
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers);
server.setHandler(contexts);
server.start();
return server;
}
@Before
public void setUp() throws Exception {
MessageProducer someProducer = mock(MessageProducer.class);
MockMessageProducerManager.setProducer(someProducer);
Transaction someTransaction = mock(Transaction.class);
when(someProducer.newTransaction(anyString(), anyString())).thenReturn(someTransaction);
}
@After
public void tearDown() throws Exception {
if (server != null && server.isStarted()) {
server.stop();
}
}
protected ContextHandler mockServerHandler(final int statusCode, final String response) {
ContextHandler context = new ContextHandler("/");
context.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request,
HttpServletResponse response) throws IOException, ServletException {
response.setContentType("text/plain;charset=UTF-8");
response.setStatus(statusCode);
response.getWriter().println(response);
baseRequest.setHandled(true);
}
});
return context;
}
/**
* Returns a free port number on localhost.
*
* Heavily inspired from org.eclipse.jdt.launching.SocketUtil (to avoid a dependency to JDT just because of this).
* Slightly improved with close() missing in JDT. And throws exception instead of returning -1.
*
* @return a free port number on localhost
* @throws IllegalStateException if unable to find a free port
*/
protected static int findFreePort() {
ServerSocket socket = null;
try {
socket = new ServerSocket(0);
socket.setReuseAddress(true);
int port = socket.getLocalPort();
try {
socket.close();
} catch (IOException e) {
// Ignore IOException on close()
}
return port;
} catch (IOException e) {
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
}
}
}
throw new IllegalStateException("Could not find a free TCP/IP port to start embedded Jetty HTTP Server on");
}
}
package com.ctrip.framework.apollo.core;
import com.ctrip.framework.apollo.core.enums.Env;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Assert;
import com.ctrip.framework.apollo.BaseIntegrationTest;
import com.ctrip.framework.apollo.core.enums.Env;
import javax.servlet.http.HttpServletResponse;
import org.junit.After;
import org.junit.Test;
public class MetaDomainTest {
public class MetaDomainTest extends BaseIntegrationTest {
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
System.clearProperty("fat_meta");
System.clearProperty("uat_meta");
System.clearProperty("lpt_meta");
}
@Test
public void testGetMetaDomain() {
Assert.assertEquals("http://localhost:8080", MetaDomainConsts.getDomain(Env.LOCAL));
Assert.assertEquals("http://dev:8080", MetaDomainConsts.getDomain(Env.DEV));
Assert.assertEquals(MetaDomainConsts.DEFAULT_META_URL, MetaDomainConsts.getDomain(Env.PRO));
assertEquals("http://localhost:8080", MetaDomainConsts.getDomain(Env.LOCAL));
assertEquals("http://dev:8080", MetaDomainConsts.getDomain(Env.DEV));
assertEquals(MetaDomainConsts.DEFAULT_META_URL, MetaDomainConsts.getDomain(Env.PRO));
}
@Test
public void testGetValidAddress() throws Exception {
String someResponse = "some response";
startServerWithHandlers(mockServerHandler(HttpServletResponse.SC_OK, someResponse));
String validServer = "http://localhost:" + PORT;
String invalidServer = "http://localhost:" + findFreePort();
System.setProperty("fat_meta", validServer + "," + invalidServer);
System.setProperty("uat_meta", invalidServer + "," + validServer);
MetaDomainConsts.initialize();
assertEquals(validServer, MetaDomainConsts.getDomain(Env.FAT));
assertEquals(validServer, MetaDomainConsts.getDomain(Env.UAT));
}
@Test
public void testInvalidAddress() throws Exception {
String invalidServer = "http://localhost:" + findFreePort();
String anotherInvalidServer = "http://localhost:" + findFreePort();
System.setProperty("lpt_meta", invalidServer + "," + anotherInvalidServer);
MetaDomainConsts.initialize();
String metaServer = MetaDomainConsts.getDomain(Env.LPT);
assertTrue(metaServer.equals(invalidServer) || metaServer.equals(anotherInvalidServer));
}
}
package com.ctrip.framework.apollo.core.utils;
import org.junit.Assert;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.ctrip.framework.apollo.BaseIntegrationTest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.junit.Test;
/**
* Created by gl49 on 2018/6/8.
*/
public class NetUtilTest {
public class NetUtilTest extends BaseIntegrationTest {
@Test
public void testCheckUrl(){
String apolloUrl = "http://www.ctrip.com";
Assert.assertEquals( NetUtil.checkUrl(apolloUrl), true);
public void testPingUrlWithStatusCode200() throws Exception {
String someResponse = "some response";
ContextHandler handler = mockServerHandler(HttpServletResponse.SC_OK, someResponse);
startServerWithHandlers(handler);
assertTrue(NetUtil.pingUrl("http://localhost:" + PORT));
}
@Test
public void testPingUrlWithStatusCode404() throws Exception {
String someResponse = "some response";
startServerWithHandlers(mockServerHandler(HttpServletResponse.SC_NOT_FOUND, someResponse));
assertFalse(NetUtil.pingUrl("http://localhost:" + PORT));
}
@Test
public void testGetValidAddress(){
String addressStr = "http://www.ctrip.com,http://www.ctrip.com2";
String address = NetUtil.getValidAddress(addressStr);
Assert.assertEquals( address, "http://www.ctrip.com");
public void testPingUrlWithServerNotStarted() throws Exception {
assertFalse(NetUtil.pingUrl("http://localhost:" + PORT));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment