Commit 562b1689 by Johannes Edmeier

REFA: Use the reactive WebClient in ApplicationOperations

parent c70f4bb7
......@@ -30,19 +30,18 @@ import de.codecentric.boot.admin.server.registry.store.SimpleApplicationStore;
import de.codecentric.boot.admin.server.web.client.ApplicationOperations;
import de.codecentric.boot.admin.server.web.client.BasicAuthHttpHeaderProvider;
import de.codecentric.boot.admin.server.web.client.HttpHeadersProvider;
import io.netty.channel.ChannelOption;
import org.reactivestreams.Publisher;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
@EnableConfigurationProperties(AdminServerProperties.class)
......@@ -74,18 +73,13 @@ public class AdminServerCoreConfiguration {
@Bean
@ConditionalOnMissingBean
public ApplicationOperations applicationOperations(RestTemplateBuilder restTemplBuilder,
HttpHeadersProvider headersProvider) {
RestTemplateBuilder builder = restTemplBuilder.messageConverters(new MappingJackson2HttpMessageConverter())
.errorHandler(new DefaultResponseErrorHandler() {
@Override
protected boolean hasError(HttpStatus statusCode) {
return false;
}
});
builder = builder.setConnectTimeout(adminServerProperties.getMonitor().getConnectTimeout())
.setReadTimeout(adminServerProperties.getMonitor().getReadTimeout());
return new ApplicationOperations(builder.build(), headersProvider);
public ApplicationOperations applicationOperations(HttpHeadersProvider headersProvider) {
WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(options -> {
options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
adminServerProperties.getMonitor().getConnectTimeout());
options.option(ChannelOption.SO_TIMEOUT, adminServerProperties.getMonitor().getReadTimeout());
})).build();
return new ApplicationOperations(webClient, headersProvider);
}
@Bean
......
......@@ -20,9 +20,11 @@ import de.codecentric.boot.admin.server.model.Application;
import de.codecentric.boot.admin.server.model.Info;
import de.codecentric.boot.admin.server.registry.store.ApplicationStore;
import de.codecentric.boot.admin.server.web.client.ApplicationOperations;
import reactor.core.publisher.Mono;
import java.io.Serializable;
import java.util.Map;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
......@@ -36,8 +38,7 @@ import org.springframework.http.ResponseEntity;
* @author Johannes Edmeier
*/
public class InfoUpdater implements ApplicationEventPublisherAware {
private static final Logger LOGGER = LoggerFactory.getLogger(InfoUpdater.class);
private static final Logger log = LoggerFactory.getLogger(InfoUpdater.class);
private final ApplicationStore store;
private final ApplicationOperations applicationOps;
private ApplicationEventPublisher publisher;
......@@ -52,35 +53,32 @@ public class InfoUpdater implements ApplicationEventPublisherAware {
return;
}
Info info = queryInfo(application);
if (!info.equals(application.getInfo())) {
queryInfo(application).filter(info -> !info.equals(application.getInfo())).doOnNext(info -> {
Application newState = Application.copyOf(application).info(info).build();
store.save(newState);
publisher.publishEvent(new ClientApplicationInfoChangedEvent(newState, info));
}
}).subscribe();
}
protected Info queryInfo(Application application) {
try {
ResponseEntity<Map<String, Serializable>> response = applicationOps.getInfo(application);
if (response.getStatusCode().is2xxSuccessful() && response.hasBody()) {
return convertInfo(response);
} else {
LOGGER.info("Couldn't retrieve info for {}: {} - {}", application, response.getStatusCode(),
response.getBody());
return Info.empty();
}
} catch (Exception ex) {
LOGGER.warn("Couldn't retrieve info for {}", application, ex);
return convertInfo(ex);
}
protected Mono<Info> queryInfo(Application application) {
return applicationOps.getInfo(application)
.log(log.getName(), Level.FINEST)
.map(response -> convertInfo(application, response))
.onErrorResume(ex -> Mono.just(convertInfo(application, ex)));
}
protected Info convertInfo(ResponseEntity<Map<String, Serializable>> response) {
return Info.from(response.getBody());
protected Info convertInfo(Application application, ResponseEntity<Map<String, Serializable>> response) {
if (response.getStatusCode().is2xxSuccessful() && response.hasBody()) {
return Info.from(response.getBody());
} else {
log.info("Couldn't retrieve info for {}: {} - {}", application, response.getStatusCode(),
response.getBody());
return Info.empty();
}
}
protected Info convertInfo(Exception ex) {
protected Info convertInfo(Application application, Throwable ex) {
log.warn("Couldn't retrieve info for {}", application, ex);
return Info.empty();
}
......
......@@ -21,10 +21,12 @@ import de.codecentric.boot.admin.server.model.ApplicationId;
import de.codecentric.boot.admin.server.model.StatusInfo;
import de.codecentric.boot.admin.server.registry.store.ApplicationStore;
import de.codecentric.boot.admin.server.web.client.ApplicationOperations;
import reactor.core.publisher.Mono;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
......@@ -38,7 +40,7 @@ import org.springframework.http.ResponseEntity;
* @author Johannes Edmeier
*/
public class StatusUpdater implements ApplicationEventPublisherAware {
private static final Logger LOGGER = LoggerFactory.getLogger(StatusUpdater.class);
private static final Logger log = LoggerFactory.getLogger(StatusUpdater.class);
private final ApplicationStore store;
private final ApplicationOperations applicationOps;
private ApplicationEventPublisher publisher;
......@@ -61,27 +63,25 @@ public class StatusUpdater implements ApplicationEventPublisherAware {
public void updateStatus(Application application) {
StatusInfo oldStatus = application.getStatusInfo();
StatusInfo newStatus = queryStatus(application);
if (!newStatus.getStatus().equals(oldStatus.getStatus())) {
Application newState = Application.copyOf(application).statusInfo(newStatus).build();
store.save(newState);
publisher.publishEvent(new ClientApplicationStatusChangedEvent(newState, oldStatus, newStatus));
}
queryStatus(application).filter(newStatus -> !newStatus.getStatus().equals(oldStatus.getStatus()))
.doOnNext(newStatus -> {
Application newState = Application.copyOf(application)
.statusInfo(newStatus)
.build();
store.save(newState);
publisher.publishEvent(
new ClientApplicationStatusChangedEvent(newState, oldStatus, newStatus));
})
.subscribe();
}
protected StatusInfo queryStatus(Application application) {
LOGGER.trace("Updating status for {}", application);
protected Mono<StatusInfo> queryStatus(Application application) {
log.trace("Updating status for {}", application);
lastQueried.put(application.getId(), System.currentTimeMillis());
try {
return convertStatusInfo(applicationOps.getHealth(application));
} catch (Exception ex) {
if ("OFFLINE".equals(application.getStatusInfo().getStatus())) {
LOGGER.debug("Couldn't retrieve status for {}", application, ex);
} else {
LOGGER.info("Couldn't retrieve status for {}", application, ex);
}
return convertStatusInfo(ex);
}
return applicationOps.getHealth(application)
.log(log.getName(), Level.FINEST)
.map(this::convertStatusInfo)
.onErrorResume(ex -> Mono.just(convertStatusInfo(application, ex)));
}
protected StatusInfo convertStatusInfo(ResponseEntity<Map<String, Serializable>> response) {
......@@ -100,7 +100,13 @@ public class StatusUpdater implements ApplicationEventPublisherAware {
return StatusInfo.ofDown(details);
}
protected StatusInfo convertStatusInfo(Exception ex) {
protected StatusInfo convertStatusInfo(Application application, Throwable ex) {
if ("OFFLINE".equals(application.getStatusInfo().getStatus())) {
log.debug("Couldn't retrieve status for {}", application, ex);
} else {
log.info("Couldn't retrieve status for {}", application, ex);
}
Map<String, Serializable> details = new HashMap<>();
details.put("message", ex.getMessage());
details.put("exception", ex.getClass().getName());
......
/*
* Copyright 2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -16,63 +16,55 @@
package de.codecentric.boot.admin.server.web.client;
import de.codecentric.boot.admin.server.model.Application;
import reactor.core.publisher.Mono;
import java.io.Serializable;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import static java.util.Collections.singletonList;
/**
* Handles all rest operations invoked on a registered application.
*
* @author Johannes Edmeier
*/
public class ApplicationOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationOperations.class);
private static final Logger log = LoggerFactory.getLogger(ApplicationOperations.class);
@SuppressWarnings("unchecked")
private static final Class<Map<String, Serializable>> RESPONSE_TYPE_MAP = (Class<Map<String, Serializable>>) (Class<?>) Map.class;
private final RestTemplate restTemplate;
private final WebClient webClient;
private final HttpHeadersProvider httpHeadersProvider;
public ApplicationOperations(RestTemplate restTemplate, HttpHeadersProvider httpHeadersProvider) {
this.restTemplate = restTemplate;
public ApplicationOperations(WebClient webClient, HttpHeadersProvider httpHeadersProvider) {
this.webClient = webClient;
this.httpHeadersProvider = httpHeadersProvider;
}
public ResponseEntity<Map<String, Serializable>> getInfo(Application application) {
public Mono<ResponseEntity<Map<String, Serializable>>> getInfo(Application application) {
URI uri = UriComponentsBuilder.fromHttpUrl(application.getRegistration().getManagementUrl())
.pathSegment("info")
.build()
.toUri();
return doGet(application, uri, RESPONSE_TYPE_MAP);
return doGet(application, uri);
}
public ResponseEntity<Map<String, Serializable>> getHealth(Application application) {
public Mono<ResponseEntity<Map<String, Serializable>>> getHealth(Application application) {
URI uri = UriComponentsBuilder.fromHttpUrl(application.getRegistration().getHealthUrl()).build().toUri();
return doGet(application, uri, RESPONSE_TYPE_MAP);
return doGet(application, uri);
}
protected <T> ResponseEntity<T> doGet(Application application, URI uri, Class<T> responseType) {
LOGGER.debug("Fetching '{}' for {}", uri, application);
HttpHeaders headers = new HttpHeaders();
headers.setAccept(singletonList(MediaType.APPLICATION_JSON));
headers.putAll(httpHeadersProvider.getHeaders(application));
ResponseEntity<T> response = restTemplate.exchange(uri, HttpMethod.GET, new HttpEntity<Void>(headers),
responseType);
LOGGER.debug("'{}' responded with {}", uri, response);
return response;
protected Mono<ResponseEntity<Map<String, Serializable>>> doGet(Application application, URI uri) {
return webClient.get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.headers(headers -> headers.putAll(httpHeadersProvider.getHeaders(application)))
.exchange()
.flatMap(r -> r.toEntity(RESPONSE_TYPE_MAP))
.doOnSubscribe((s) -> log.debug("Fetching '{}' for {}", uri, application));
}
}
......@@ -24,6 +24,7 @@ import de.codecentric.boot.admin.server.model.Registration;
import de.codecentric.boot.admin.server.model.StatusInfo;
import de.codecentric.boot.admin.server.registry.store.SimpleApplicationStore;
import de.codecentric.boot.admin.server.web.client.ApplicationOperations;
import reactor.core.publisher.Mono;
import org.junit.Before;
import org.junit.Test;
......@@ -69,7 +70,8 @@ public class InfoUpdaterTest {
.id(ApplicationId.of("unk"))
.statusInfo(StatusInfo.ofUnknown())
.build();
when(applicationOps.getInfo(any(Application.class))).thenReturn(ResponseEntity.ok(singletonMap("foo", "bar")));
when(applicationOps.getInfo(any(Application.class))).thenReturn(
Mono.just(ResponseEntity.ok(singletonMap("foo", "bar"))));
updater.updateInfo(offline);
verify(publisher, never()).publishEvent(isA(ClientApplicationInfoChangedEvent.class));
......@@ -90,7 +92,7 @@ public class InfoUpdaterTest {
.info(Info.from(singletonMap("foo", "bar")))
.build();
when(applicationOps.getInfo(any(Application.class))).thenReturn(ResponseEntity.status(500).build());
when(applicationOps.getInfo(any(Application.class))).thenReturn(Mono.just(ResponseEntity.status(500).build()));
updater.updateInfo(application);
assertThat(store.find(application.getId()).getInfo()).isEqualTo(Info.empty());
......@@ -105,7 +107,8 @@ public class InfoUpdaterTest {
.info(Info.from(singletonMap("foo", "bar")))
.build();
when(applicationOps.getHealth(any(Application.class))).thenThrow(new ResourceAccessException("error"));
when(applicationOps.getInfo(any(Application.class))).thenReturn(
Mono.error(new ResourceAccessException("error")));
updater.updateInfo(application);
assertThat(store.find(application.getId()).getInfo()).isEqualTo(Info.empty());
......
......@@ -22,6 +22,7 @@ import de.codecentric.boot.admin.server.model.Registration;
import de.codecentric.boot.admin.server.model.StatusInfo;
import de.codecentric.boot.admin.server.registry.store.SimpleApplicationStore;
import de.codecentric.boot.admin.server.web.client.ApplicationOperations;
import reactor.core.publisher.Mono;
import org.junit.Before;
import org.junit.Test;
......@@ -61,7 +62,7 @@ public class StatusUpdaterTest {
@Test
public void test_update_statusChanged() {
when(applicationOps.getHealth(isA(Application.class))).thenReturn(
ResponseEntity.ok().body(singletonMap("status", "UP")));
Mono.just(ResponseEntity.ok().body(singletonMap("status", "UP"))));
updater.updateStatus(application);
......@@ -74,7 +75,7 @@ public class StatusUpdaterTest {
@Test
public void test_update_statusUnchanged() {
when(applicationOps.getHealth(any(Application.class))).thenReturn(
ResponseEntity.ok(singletonMap("status", "UNKNOWN")));
Mono.just(ResponseEntity.ok(singletonMap("status", "UNKNOWN"))));
updater.updateStatus(application);
......@@ -84,7 +85,7 @@ public class StatusUpdaterTest {
@Test
public void test_update_up_noBody() {
when(applicationOps.getHealth(any(Application.class))).thenReturn(ResponseEntity.ok().build());
when(applicationOps.getHealth(any(Application.class))).thenReturn(Mono.just(ResponseEntity.ok().build()));
updater.updateStatus(application);
......@@ -94,7 +95,7 @@ public class StatusUpdaterTest {
@Test
public void test_update_down() {
when(applicationOps.getHealth(any(Application.class))).thenReturn(
ResponseEntity.status(503).body(singletonMap("foo", "bar")));
Mono.just(ResponseEntity.status(503).body(singletonMap("foo", "bar"))));
updater.updateStatus(application);
......@@ -105,7 +106,8 @@ public class StatusUpdaterTest {
@Test
public void test_update_down_noBody() {
when(applicationOps.getHealth(any(Application.class))).thenReturn(ResponseEntity.status(503).body(null));
when(applicationOps.getHealth(any(Application.class))).thenReturn(
Mono.just(ResponseEntity.status(503).body(null)));
updater.updateStatus(application);
......@@ -117,7 +119,8 @@ public class StatusUpdaterTest {
@Test
public void test_update_offline() {
when(applicationOps.getHealth(any(Application.class))).thenThrow(new ResourceAccessException("error"));
when(applicationOps.getHealth(any(Application.class))).thenReturn(
Mono.error(new ResourceAccessException("error")));
updater.updateStatus(application);
......@@ -138,13 +141,13 @@ public class StatusUpdaterTest {
Registration.create("foo", "http://health-2").build()).build();
store.save(app2);
when(applicationOps.getHealth(eq(app1))).thenReturn(ResponseEntity.ok().build());
when(applicationOps.getHealth(eq(app2))).thenReturn(ResponseEntity.ok().build());
when(applicationOps.getHealth(eq(app1))).thenReturn(Mono.just(ResponseEntity.ok().build()));
when(applicationOps.getHealth(eq(app2))).thenReturn(Mono.just(ResponseEntity.ok().build()));
Thread.sleep(120L); //let both statuses expire
updater.updateStatus(app2); //and refresh it for app2
reset(applicationOps);
when(applicationOps.getHealth(eq(app1))).thenReturn(ResponseEntity.ok().build());
when(applicationOps.getHealth(eq(app1))).thenReturn(Mono.just(ResponseEntity.ok().build()));
updater.updateStatusForAllApplications();
......
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.web.client;
import de.codecentric.boot.admin.server.model.Application;
import de.codecentric.boot.admin.server.model.ApplicationId;
import de.codecentric.boot.admin.server.model.Registration;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
public class ApplicationOperationsTest {
private WebClient webClient = mock(WebClient.class, withSettings().defaultAnswer(a -> {
if (a.getMethod().getReturnType().equals(Mono.class)) {
return Mono.empty();
}
return a.getMock();
}).extraInterfaces(WebClient.RequestHeadersSpec.class, WebClient.UriSpec.class, WebClient.ResponseSpec.class));
private RestTemplate restTemplate = mock(RestTemplate.class);
private HttpHeadersProvider headersProvider = mock(HttpHeadersProvider.class);
private ApplicationOperations ops = new ApplicationOperations(restTemplate, headersProvider);
private final Application application = Application.create(ApplicationId.of("id"),
Registration.create("test", "http://health").managementUrl("http://mgmt").build()).build();
private ApplicationOperations ops = new ApplicationOperations(webClient, headersProvider);
private final Application application = Application.create(ApplicationId.of("id"),Registration.create("test", "http://health")
.managementUrl("http://mgmt")
.build()).build();
@Test
@Ignore("Needs resolving of https://jira.spring.io/browse/SPR-15286")
public void test_getInfo() {
ArgumentCaptor<HttpEntity> requestEntity = ArgumentCaptor.forClass(HttpEntity.class);
HttpHeaders headers = new HttpHeaders();
headers.add("auth", "foo:bar");
when(headersProvider.getHeaders(eq(application))).thenReturn(headers);
when(restTemplate.exchange(eq(URI.create("http://mgmt/info")), eq(HttpMethod.GET), requestEntity.capture(),
eq(Map.class))).thenReturn(ResponseEntity.ok().body(singletonMap("foo", "bar")));
StepVerifier.create(ops.getInfo(application)).verifyComplete();
ResponseEntity<Map<String, Serializable>> response = ops.getInfo(application);
ArgumentCaptor<URI> uriCaptor = ArgumentCaptor.forClass(URI.class);
ArgumentCaptor<HttpHeaders> headersCaptor = ArgumentCaptor.forClass(HttpHeaders.class);
verify(webClient, times(1)).get().uri(uriCaptor.capture())
// .headers(headersCaptor.capture())
.accept(eq(MediaType.APPLICATION_JSON)).retrieve();
assertThat(response.getBody()).containsEntry("foo", "bar");
assertThat(requestEntity.getValue().getHeaders()).containsEntry("auth", singletonList("foo:bar"));
assertThat(uriCaptor.getValue()).isEqualTo(URI.create("http://mgmt/info"));
assertThat(headersCaptor.getValue()).containsEntry("auth", singletonList("foo:bar"));
assertThat(headersCaptor.getValue()).containsEntry("Accept", singletonList(MediaType.APPLICATION_JSON_VALUE));
}
@Test
@Ignore("Needs resolving of https://jira.spring.io/browse/SPR-15286")
public void test_getHealth() {
ArgumentCaptor<HttpEntity> requestEntity = ArgumentCaptor.forClass(HttpEntity.class);
HttpHeaders headers = new HttpHeaders();
headers.add("auth", "foo:bar");
when(headersProvider.getHeaders(eq(application))).thenReturn(headers);
when(restTemplate.exchange(eq(URI.create("http://health")), eq(HttpMethod.GET), requestEntity.capture(),
eq(Map.class))).thenReturn(ResponseEntity.ok().body(singletonMap("foo", "bar")));
StepVerifier.create(ops.getHealth(application)).verifyComplete();
ResponseEntity<Map<String, Serializable>> response = ops.getHealth(application);
ArgumentCaptor<URI> uriCaptor = ArgumentCaptor.forClass(URI.class);
ArgumentCaptor<HttpHeaders> headersCaptor = ArgumentCaptor.forClass(HttpHeaders.class);
assertThat(response.getBody()).containsEntry("foo", "bar");
assertThat(requestEntity.getValue().getHeaders()).containsEntry("auth", singletonList("foo:bar"));
}
verify(webClient, times(1)).get().uri(uriCaptor.capture())
// .headers(headersCaptor.capture())
.accept(eq(MediaType.APPLICATION_JSON)).retrieve();
assertThat(uriCaptor.getValue()).isEqualTo(URI.create("http://health"));
assertThat(headersCaptor.getValue()).containsEntry("auth", singletonList("foo:bar"));
assertThat(headersCaptor.getValue()).containsEntry("Accept", singletonList(MediaType.APPLICATION_JSON_VALUE));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment