Commit eba0cc0b by Spencer Gibb

Use Ribbon LoadBalancerContext to record statistics of an execution.

Added LoadBalancerClient.reconstructURI to take an URI with the service as hostname and inject a real host and port. Added tests for RibbonInterceptor and RibbonLoadBalancerClient. fixes gh-37
parent 5cd88deb
...@@ -8,6 +8,5 @@ package org.springframework.cloud.client; ...@@ -8,6 +8,5 @@ package org.springframework.cloud.client;
public interface ServiceInstance { public interface ServiceInstance {
public String getServiceId(); public String getServiceId();
public String getHost(); public String getHost();
public String getIpAddress();
public int getPort(); public int getPort();
} }
...@@ -2,6 +2,8 @@ package org.springframework.cloud.client.loadbalancer; ...@@ -2,6 +2,8 @@ package org.springframework.cloud.client.loadbalancer;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import java.net.URI;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
*/ */
...@@ -19,5 +21,7 @@ public interface LoadBalancerClient { ...@@ -19,5 +21,7 @@ public interface LoadBalancerClient {
* @param request allows implementations to execute pre and post actions such as incrementing metrics * @param request allows implementations to execute pre and post actions such as incrementing metrics
* @return the result of the LoadBalancerRequest callback on the selected ServiceInstance * @return the result of the LoadBalancerRequest callback on the selected ServiceInstance
*/ */
public <T> T choose(String serviceId, LoadBalancerRequest<T> request); public <T> T execute(String serviceId, LoadBalancerRequest<T> request);
public URI reconstructURI(ServiceInstance instance, URI original);
} }
...@@ -6,5 +6,5 @@ import org.springframework.cloud.client.ServiceInstance; ...@@ -6,5 +6,5 @@ import org.springframework.cloud.client.ServiceInstance;
* @author Spencer Gibb * @author Spencer Gibb
*/ */
public interface LoadBalancerRequest<T> { public interface LoadBalancerRequest<T> {
public T apply(ServiceInstance instance); public T apply(ServiceInstance instance) throws Exception;
} }
...@@ -26,11 +26,6 @@ public class EurekaDiscoveryClient implements DiscoveryClient { ...@@ -26,11 +26,6 @@ public class EurekaDiscoveryClient implements DiscoveryClient {
} }
@Override @Override
public String getIpAddress() {
return config.getIpAddress();
}
@Override
public int getPort() { public int getPort() {
return config.getNonSecurePort(); return config.getNonSecurePort();
} }
......
...@@ -8,7 +8,6 @@ import org.springframework.http.client.ClientHttpRequestExecution; ...@@ -8,7 +8,6 @@ import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor; import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse; import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper; import org.springframework.http.client.support.HttpRequestWrapper;
import org.springframework.web.util.UriComponentsBuilder;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
...@@ -25,25 +24,21 @@ public class RibbonInterceptor implements ClientHttpRequestInterceptor { ...@@ -25,25 +24,21 @@ public class RibbonInterceptor implements ClientHttpRequestInterceptor {
} }
@Override @Override
public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException { public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
return loadBalancer.execute(serviceName, new LoadBalancerRequest<ClientHttpResponse>() {
@Override
public ClientHttpResponse apply(final ServiceInstance instance) throws Exception {
HttpRequestWrapper wrapper = new HttpRequestWrapper(request) { HttpRequestWrapper wrapper = new HttpRequestWrapper(request) {
@Override @Override
public URI getURI() { public URI getURI() {
final URI originalUri = super.getURI(); URI uri = loadBalancer.reconstructURI(instance, originalUri);
String serviceName = originalUri.getHost();
URI uri = loadBalancer.choose(serviceName, new LoadBalancerRequest<URI>() {
@Override
public URI apply(ServiceInstance instance) {
return UriComponentsBuilder.fromUri(originalUri)
.host(instance.getHost())
.port(instance.getPort())
.build()
.toUri();
}
});
return uri; return uri;
} }
}; };
return execution.execute(wrapper, body); return execution.execute(wrapper, body);
} }
});
}
} }
package org.springframework.cloud.netflix.ribbon; package org.springframework.cloud.netflix.ribbon;
import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Throwables;
import com.netflix.loadbalancer.*;
import com.netflix.servo.monitor.Stopwatch;
import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient; import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest; import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import com.netflix.loadbalancer.AbstractLoadBalancer;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
/** /**
* @author Spencer Gibb * @author Spencer Gibb
* @author Dave Syer * @author Dave Syer
...@@ -23,7 +23,8 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient { ...@@ -23,7 +23,8 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient {
private SpringClientFactory clientFactory; private SpringClientFactory clientFactory;
private Map<String, ILoadBalancer> balancers = new HashMap<String, ILoadBalancer>(); private Map<String, ILoadBalancer> balancers = new HashMap<>();
private Map<String, LoadBalancerContext> contexts = new HashMap<>();
public RibbonLoadBalancerClient(RibbonClientPreprocessor ribbonClientPreprocessor, SpringClientFactory clientFactory, List<BaseLoadBalancer> balancers) { public RibbonLoadBalancerClient(RibbonClientPreprocessor ribbonClientPreprocessor, SpringClientFactory clientFactory, List<BaseLoadBalancer> balancers) {
this.ribbonClientPreprocessor = ribbonClientPreprocessor; this.ribbonClientPreprocessor = ribbonClientPreprocessor;
...@@ -34,22 +35,54 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient { ...@@ -34,22 +35,54 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient {
} }
@Override @Override
public URI reconstructURI(ServiceInstance instance, URI original) {
String serviceId = instance.getServiceId();
LoadBalancerContext context = getOrCreateLoadBalancerContext(serviceId, getLoadBalancer(serviceId));
Server server = new Server(instance.getHost(), instance.getPort());
return context.reconstructURIWithServer(server, original);
}
@Override
public ServiceInstance choose(String serviceId) { public ServiceInstance choose(String serviceId) {
return new RibbonServer(serviceId, getServer(serviceId)); return new RibbonServer(serviceId, getServer(serviceId));
} }
@Override @Override
public <T> T choose(String serviceId, LoadBalancerRequest<T> request) { public <T> T execute(String serviceId, LoadBalancerRequest<T> request) {
ILoadBalancer loadBalancer = getLoadBalancer(serviceId); ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
LoadBalancerContext context = getOrCreateLoadBalancerContext(serviceId, loadBalancer);
Server server = getServer(serviceId, loadBalancer); Server server = getServer(serviceId, loadBalancer);
RibbonServer ribbonServer = new RibbonServer(serviceId, server); RibbonServer ribbonServer = new RibbonServer(serviceId, server);
ServerStats serverStats = context.getServerStats(server);
context.noteOpenConnection(serverStats);
Stopwatch tracer = context.getExecuteTracer().start();
try { try {
return request.apply(ribbonServer);
} finally { T returnVal = request.apply(ribbonServer);
if (loadBalancer instanceof AbstractLoadBalancer) { recordStats(context, tracer, serverStats, returnVal, null);
AbstractLoadBalancer.class.cast(loadBalancer).getLoadBalancerStats().incrementNumRequests(server); return returnVal;
} catch (Exception e) {
recordStats(context, tracer, serverStats, null, e);
Throwables.propagate(e);
} }
return null;
} }
private void recordStats(LoadBalancerContext context, Stopwatch tracer, ServerStats serverStats, Object entity, Throwable exception) {
tracer.stop();
long duration = tracer.getDuration(TimeUnit.MILLISECONDS);
context.noteRequestCompletion(serverStats, entity, exception, duration, null/*errorHandler*/);
}
protected LoadBalancerContext getOrCreateLoadBalancerContext(String serviceId, ILoadBalancer loadBalancer) {
LoadBalancerContext context = contexts.get(serviceId);
if (context == null) {
context = new LoadBalancerContext(loadBalancer);
contexts.put(serviceId, context);
}
return context;
} }
protected Server getServer(String serviceId) { protected Server getServer(String serviceId) {
...@@ -73,11 +106,11 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient { ...@@ -73,11 +106,11 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient {
return loadBalancer; return loadBalancer;
} }
private class RibbonServer implements ServiceInstance { protected static class RibbonServer implements ServiceInstance {
private String serviceId; protected String serviceId;
private Server server; protected Server server;
private RibbonServer(String serviceId, Server server) { protected RibbonServer(String serviceId, Server server) {
this.serviceId = serviceId; this.serviceId = serviceId;
this.server = server; this.server = server;
} }
...@@ -93,11 +126,6 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient { ...@@ -93,11 +126,6 @@ public class RibbonLoadBalancerClient implements LoadBalancerClient {
} }
@Override @Override
public String getIpAddress() {
return null; // TODO: ribbon doesn't supply ip
}
@Override
public int getPort() { public int getPort() {
return server.getPort(); return server.getPort();
} }
......
package org.springframework.cloud.netflix.ribbon;
import com.google.common.base.Throwables;
import com.netflix.loadbalancer.Server;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.RibbonServer;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;
import org.springframework.web.util.UriComponentsBuilder;
import java.net.URI;
import java.net.URL;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Spencer Gibb
*/
public class RibbonInterceptorTest {
@Mock
HttpRequest request;
@Mock
ClientHttpRequestExecution execution;
@Mock
ClientHttpResponse response;
@Before
public void init() {
MockitoAnnotations.initMocks(this);
}
@Test
public void testIntercept() throws Exception {
RibbonServer server = new RibbonServer("myservice", new Server("myhost", 8080));
RibbonInterceptor interceptor = new RibbonInterceptor(new MyClient(server));
when(request.getURI()).thenReturn(new URL("http://myservice").toURI());
when(execution.execute(isA(HttpRequest.class), isA(byte[].class))).thenReturn(response);
ArgumentCaptor<HttpRequestWrapper> argument = ArgumentCaptor.forClass(HttpRequestWrapper.class);
ClientHttpResponse response = interceptor.intercept(request, new byte[0], execution);
assertNotNull("response was null", response);
verify(execution).execute(argument.capture(), isA(byte[].class));
HttpRequestWrapper wrapper = argument.getValue();
assertEquals("wrong constructed uri", new URL("http://myhost:8080").toURI(), wrapper.getURI());
}
protected static class MyClient implements LoadBalancerClient {
ServiceInstance instance;
public MyClient(ServiceInstance instance) {
this.instance = instance;
}
@Override
public ServiceInstance choose(String serviceId) {
return instance;
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) {
try {
return request.apply(instance);
} catch (Exception e) {
Throwables.propagate(e);
}
return null;
}
@Override
public URI reconstructURI(ServiceInstance instance, URI original) {
return UriComponentsBuilder.fromUri(original)
.host(instance.getHost())
.port(instance.getPort())
.build()
.toUri();
}
}
}
package org.springframework.cloud.netflix.ribbon;
import com.netflix.loadbalancer.BaseLoadBalancer;
import com.netflix.loadbalancer.LoadBalancerStats;
import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerStats;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerRequest;
import org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.RibbonServer;
import java.net.URI;
import java.net.URL;
import java.util.Arrays;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Spencer Gibb
*/
public class RibbonLoadBalancerClientTest {
@Mock
RibbonClientPreprocessor preprocessor;
@Mock
SpringClientFactory clientFactory;
@Mock
BaseLoadBalancer loadBalancer;
@Mock
LoadBalancerStats loadBalancerStats;
@Mock
ServerStats serverStats;
@Before
public void init() {
MockitoAnnotations.initMocks(this);
}
@Test
public void reconstructURI() throws Exception {
RibbonServer server = getRibbonServer();
RibbonLoadBalancerClient client = getRibbonLoadBalancerClient(server);
ServiceInstance serviceInstance = client.choose(server.getServiceId());
URI uri = client.reconstructURI(serviceInstance, new URL("http://" + server.serviceId).toURI());
assertEquals(server.getHost(), uri.getHost());
assertEquals(server.getPort(), uri.getPort());
}
@Test
public void testChoose() {
RibbonServer server = getRibbonServer();
RibbonLoadBalancerClient client = getRibbonLoadBalancerClient(server);
ServiceInstance serviceInstance = client.choose(server.getServiceId());
assertServiceInstance(server, serviceInstance);
}
@Test
public void testExecute() {
final RibbonServer server = getRibbonServer();
RibbonLoadBalancerClient client = getRibbonLoadBalancerClient(server);
final String returnVal = "myval";
Object actualReturn = client.execute(server.getServiceId(), new LoadBalancerRequest<Object>() {
@Override
public Object apply(ServiceInstance instance) throws Exception {
assertServiceInstance(server, instance);
return returnVal;
}
});
verifyServerStats();
assertEquals("retVal was wrong", returnVal, actualReturn);
}
@Test
public void testExecuteException() {
final RibbonServer ribbonServer = getRibbonServer();
RibbonLoadBalancerClient client = getRibbonLoadBalancerClient(ribbonServer);
try {
client.execute(ribbonServer.getServiceId(), new LoadBalancerRequest<Object>() {
@Override
public Object apply(ServiceInstance instance) throws Exception {
assertServiceInstance(ribbonServer, instance);
throw new RuntimeException();
}
});
fail("Should have thrown exception");
} catch (Exception e) {
assertNotNull(e);
}
verifyServerStats();
}
protected RibbonServer getRibbonServer() {
return new RibbonServer("testService", new Server("myhost", 9080));
}
protected void verifyServerStats() {
verify(serverStats).incrementActiveRequestsCount();
verify(serverStats).decrementActiveRequestsCount();
verify(serverStats).incrementNumRequests();
verify(serverStats).noteResponseTime(anyDouble());
}
protected void assertServiceInstance(RibbonServer ribbonServer, ServiceInstance instance) {
assertNotNull("instance was null", instance);
assertEquals("serviceId was wrong", ribbonServer.getServiceId(), instance.getServiceId());
assertEquals("host was wrong", ribbonServer.getHost(), instance.getHost());
assertEquals("port was wrong", ribbonServer.getPort(), instance.getPort());
}
protected RibbonLoadBalancerClient getRibbonLoadBalancerClient(RibbonServer ribbonServer) {
when(loadBalancer.getName()).thenReturn(ribbonServer.getServiceId());
when(loadBalancer.chooseServer(anyString())).thenReturn(ribbonServer.server);
when(loadBalancer.getLoadBalancerStats()).thenReturn(loadBalancerStats);
when(loadBalancerStats.getSingleServerStat(ribbonServer.server)).thenReturn(serverStats);
return new RibbonLoadBalancerClient(preprocessor, clientFactory, Arrays.asList(loadBalancer));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment