Commit 35074676 by Johannes Edmeier

Be more resilient when endpoints with the same id are discovered.

fixes #828
parent c4264c7d
...@@ -27,13 +27,20 @@ import reactor.core.publisher.Mono; ...@@ -27,13 +27,20 @@ import reactor.core.publisher.Mono;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponentsBuilder; import org.springframework.web.util.UriComponentsBuilder;
import static java.util.stream.Collectors.groupingBy;
public class ProbeEndpointsStrategy implements EndpointDetectionStrategy { public class ProbeEndpointsStrategy implements EndpointDetectionStrategy {
private static final Logger log = LoggerFactory.getLogger(ProbeEndpointsStrategy.class);
private final Collection<EndpointDefinition> endpoints; private final Collection<EndpointDefinition> endpoints;
private final InstanceWebClient instanceWebClient; private final InstanceWebClient instanceWebClient;
...@@ -49,7 +56,7 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy { ...@@ -49,7 +56,7 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy {
return Flux.fromIterable(endpoints) return Flux.fromIterable(endpoints)
.flatMap(endpoint -> detectEndpoint(instance, endpoint)) .flatMap(endpoint -> detectEndpoint(instance, endpoint))
.collectList() .collectList()
.flatMap(list -> list.isEmpty() ? Mono.empty() : Mono.just(Endpoints.of(list))); .flatMap(this::convert);
} }
private Mono<Endpoint> detectEndpoint(Instance instance, EndpointDefinition endpoint) { private Mono<Endpoint> detectEndpoint(Instance instance, EndpointDefinition endpoint) {
...@@ -63,9 +70,29 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy { ...@@ -63,9 +70,29 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy {
private Function<ClientResponse, Mono<Endpoint>> convert(EndpointDefinition endpoint, URI uri) { private Function<ClientResponse, Mono<Endpoint>> convert(EndpointDefinition endpoint, URI uri) {
return response -> response.bodyToMono(Void.class) return response -> response.bodyToMono(Void.class)
.then(response.statusCode().is2xxSuccessful() ? .then(response.statusCode()
Mono.just(Endpoint.of(endpoint.getId(), uri.toString())) : .is2xxSuccessful() ? Mono.just(Endpoint.of(endpoint.getId(),
Mono.empty()); uri.toString()
)) : Mono.empty());
}
private Mono<Endpoints> convert(List<Endpoint> endpoints) {
if (endpoints.isEmpty()) {
return Mono.empty();
}
Map<String, List<Endpoint>> endpointsById = endpoints.stream().collect(groupingBy(Endpoint::getId));
List<Endpoint> result = endpointsById.values().stream().map(endpointList -> {
if (endpointList.size() > 1) {
log.warn("Duplicate endpoints for id '{}' detected. Omitting: {}",
endpointList.get(0).getId(),
endpointList.subList(1, endpointList.size())
);
}
return endpointList.get(0);
}).collect(Collectors.toList());
return Mono.just(Endpoints.of(result));
} }
@Data @Data
...@@ -79,7 +106,8 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy { ...@@ -79,7 +106,8 @@ public class ProbeEndpointsStrategy implements EndpointDetectionStrategy {
return new EndpointDefinition(idWithPath, idWithPath); return new EndpointDefinition(idWithPath, idWithPath);
} else { } else {
return new EndpointDefinition(idWithPath.substring(0, idxDelimiter), return new EndpointDefinition(idWithPath.substring(0, idxDelimiter),
idWithPath.substring(idxDelimiter + 1)); idWithPath.substring(idxDelimiter + 1)
);
} }
} }
} }
......
...@@ -63,12 +63,13 @@ public class ProbeEndpointsStrategyTest { ...@@ -63,12 +63,13 @@ public class ProbeEndpointsStrategyTest {
.managementUrl(wireMock.url("/mgmt")) .managementUrl(wireMock.url("/mgmt"))
.build()); .build());
wireMock.stubFor(options(urlEqualTo("/mgmt/metrics")).willReturn(ok()));
wireMock.stubFor(options(urlEqualTo("/mgmt/stats")).willReturn(ok())); wireMock.stubFor(options(urlEqualTo("/mgmt/stats")).willReturn(ok()));
wireMock.stubFor(options(urlEqualTo("/mgmt/info")).willReturn(ok())); wireMock.stubFor(options(urlEqualTo("/mgmt/info")).willReturn(ok()));
wireMock.stubFor(options(urlEqualTo("/mgmt/non-exist")).willReturn(notFound())); wireMock.stubFor(options(urlEqualTo("/mgmt/non-exist")).willReturn(notFound()));
ProbeEndpointsStrategy strategy = new ProbeEndpointsStrategy(instanceWebClient, ProbeEndpointsStrategy strategy = new ProbeEndpointsStrategy(instanceWebClient,
new String[]{"metrics:stats", "info", "non-exist"}); new String[]{"metrics:stats", "metrics", "info", "non-exist"});
//when //when
StepVerifier.create(strategy.detectEndpoints(instance)) StepVerifier.create(strategy.detectEndpoints(instance))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment