Commit bb88ac3b by Johannes Edmeier

Fix netty buffer memory leak

The WebClient's responses should be always consumed, so that underlying buffers are released. Fixes #666
parent 412613fd
......@@ -29,8 +29,11 @@ import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import static de.codecentric.boot.admin.server.utils.MediaType.ACTUATOR_V2_MEDIATYPE;
/**
* The StatusUpdater is responsible for updating the status of all or a single application querying
* the healthUrl.
......@@ -73,12 +76,16 @@ public class InfoUpdater {
}
protected Mono<Info> convertInfo(Instance instance, ClientResponse response) {
if (response.statusCode().is2xxSuccessful()) {
if (response.statusCode().is2xxSuccessful() &&
response.headers()
.contentType()
.map(mt -> mt.isCompatibleWith(MediaType.APPLICATION_JSON) ||
mt.isCompatibleWith(ACTUATOR_V2_MEDIATYPE))
.orElse(false)) {
return response.bodyToMono(RESPONSE_TYPE).map(Info::from).defaultIfEmpty(Info.empty());
}
log.info("Couldn't retrieve info for {}: {}", instance, response.statusCode());
return Mono.just(Info.empty());
return response.bodyToMono(Void.class).then(Mono.just(Info.empty()));
}
protected Info convertInfo(Instance instance, Throwable ex) {
......
......@@ -34,6 +34,7 @@ import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ClientResponse;
import static de.codecentric.boot.admin.server.utils.MediaType.ACTUATOR_V2_MEDIATYPE;
import static java.util.Collections.emptyMap;
/**
......@@ -77,7 +78,11 @@ public class StatusUpdater {
}
protected Mono<StatusInfo> convertStatusInfo(ClientResponse response) {
if (response.headers().contentType().map(MediaType.APPLICATION_JSON::includes).orElse(false)) {
if (response.headers()
.contentType()
.map(mt -> mt.isCompatibleWith(MediaType.APPLICATION_JSON) ||
mt.isCompatibleWith(ACTUATOR_V2_MEDIATYPE))
.orElse(false)) {
return response.bodyToMono(RESPONSE_TYPE).map(body -> {
if (body.get("status") instanceof String) {
return StatusInfo.from(body);
......@@ -85,7 +90,7 @@ public class StatusUpdater {
return getStatusInfoFromStatus(response, body);
});
}
return Mono.just(this.getStatusInfoFromStatus(response, emptyMap()));
return response.bodyToMono(Void.class).then(Mono.just(this.getStatusInfoFromStatus(response, emptyMap())));
}
protected StatusInfo getStatusInfoFromStatus(ClientResponse response, Map<String, ?> body) {
......
/*
* Copyright 2014-2017 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -60,7 +60,10 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy {
.options()
.uri(uri)
.exchange()
.filter(response -> response.statusCode().is2xxSuccessful())
.flatMap(response -> response.bodyToMono(Void.class)
.then(response.statusCode().is2xxSuccessful() ?
Mono.just(true) :
Mono.empty()))
.map(r -> Endpoint.of(endpoint.getId(), uri.toString()));
}
......
/*
* Copyright 2014-2017 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -50,12 +50,17 @@ public class QueryIndexEndpointStrategy implements EndpointDetectionStrategy {
.get()
.uri(instance.getRegistration().getManagementUrl())
.exchange()
.filter(response -> response.statusCode().is2xxSuccessful() &&
response.headers()
.contentType()
.map(actuatorMediaType::isCompatibleWith)
.orElse(false))
.flatMap(r -> r.bodyToMono(Response.class))
.flatMap(response -> {
if (response.statusCode().is2xxSuccessful() &&
response.headers()
.contentType()
.map(actuatorMediaType::isCompatibleWith)
.orElse(false)) {
return response.bodyToMono(Response.class);
} else {
return response.bodyToMono(Void.class).then(Mono.empty());
}
})
.flatMap(this::convert);
}
......
/*
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.utils;
import org.springframework.boot.actuate.endpoint.http.ActuatorMediaType;
public final class MediaType {
public static final org.springframework.http.MediaType ACTUATOR_V1_MEDIATYPE = org.springframework.http.MediaType.parseMediaType(
ActuatorMediaType.V1_JSON);
public static final org.springframework.http.MediaType ACTUATOR_V2_MEDIATYPE = org.springframework.http.MediaType.parseMediaType(
ActuatorMediaType.V2_JSON);
private MediaType() {
}
}
......@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Function;
import org.springframework.boot.actuate.endpoint.http.ActuatorMediaType;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
......@@ -46,13 +45,13 @@ import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import static de.codecentric.boot.admin.server.utils.MediaType.ACTUATOR_V1_MEDIATYPE;
import static de.codecentric.boot.admin.server.utils.MediaType.ACTUATOR_V2_MEDIATYPE;
import static java.util.Collections.singletonList;
public final class InstanceExchangeFilterFunctions {
public static final String ATTRIBUTE_INSTANCE = "instance";
public static final String ATTRIBUTE_ENDPOINT = "endpointId";
private static final MediaType ACTUATOR_V1_MEDIATYPE = MediaType.parseMediaType(ActuatorMediaType.V1_JSON);
private static final MediaType ACTUATOR_V2_MEDIATYPE = MediaType.parseMediaType(ActuatorMediaType.V2_JSON);
private InstanceExchangeFilterFunctions() {
}
......
......@@ -53,6 +53,7 @@ public class InstanceWebClient {
filters.add(InstanceExchangeFilterFunctions.addHeaders(httpHeadersProvider));
filters.add(InstanceExchangeFilterFunctions.rewriteEndpointUrl());
filters.add(InstanceExchangeFilterFunctions.convertLegacyEndpoint(LegacyEndpointConverters.health()));
filters.add(InstanceExchangeFilterFunctions.convertLegacyEndpoint(LegacyEndpointConverters.info()));
filters.add(InstanceExchangeFilterFunctions.convertLegacyEndpoint(LegacyEndpointConverters.env()));
filters.add(InstanceExchangeFilterFunctions.convertLegacyEndpoint(LegacyEndpointConverters.httptrace()));
filters.add(InstanceExchangeFilterFunctions.convertLegacyEndpoint(LegacyEndpointConverters.threaddump()));
......
......@@ -98,6 +98,10 @@ public class LegacyEndpointConverters {
convertUsing(RESPONSE_TYPE_LIST_MAP, RESPONSE_TYPE_MAP, LegacyEndpointConverters::convertFlyway));
}
public static LegacyEndpointConverter info() {
return new LegacyEndpointConverter(Endpoint.INFO, flux -> flux);
}
@SuppressWarnings("unchecked")
private static <S, T> Function<Flux<DataBuffer>, Flux<DataBuffer>> convertUsing(ParameterizedTypeReference<S> sourceType,
ParameterizedTypeReference<T> targetType,
......
......@@ -32,6 +32,7 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.springframework.boot.actuate.endpoint.http.ActuatorMediaType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import com.github.tomakehurst.wiremock.core.Options;
......@@ -39,8 +40,10 @@ import com.github.tomakehurst.wiremock.junit.WireMockClassRule;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
import static com.github.tomakehurst.wiremock.client.WireMock.okForContentType;
import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
import static com.github.tomakehurst.wiremock.client.WireMock.status;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
public class StatusUpdaterTest {
......@@ -68,9 +71,10 @@ public class StatusUpdaterTest {
@Test
public void test_update_statusChanged() {
String body = "{ \"status\" : \"UP\" }";
wireMock.stubFor(
get("/health").willReturn(okJson(body).withHeader("Content-Length", Integer.toString(body.length()))));
String body = "{ \"status\" : \"UP\", \"details\" : { \"foo\" : \"bar\" } }";
wireMock.stubFor(get("/health").willReturn(
okForContentType(ActuatorMediaType.V2_JSON, body).withHeader("Content-Length",
Integer.toString(body.length()))));
StepVerifier.create(eventStore)
.expectSubscription()
......@@ -80,6 +84,8 @@ public class StatusUpdaterTest {
assertThat(event.getInstance()).isEqualTo(instance.getId());
InstanceStatusChangedEvent statusChangedEvent = (InstanceStatusChangedEvent) event;
assertThat(statusChangedEvent.getStatusInfo().getStatus()).isEqualTo("UP");
assertThat(statusChangedEvent.getStatusInfo().getDetails()).isEqualTo(
singletonMap("foo", "bar"));
})
.thenCancel()
.verify();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment