Commit 23aa468d by Johannes Edmeier

Polish servlet.InstancesProxyController

parent 9146943a
......@@ -20,16 +20,17 @@ import de.codecentric.boot.admin.server.services.InstanceRegistry;
import de.codecentric.boot.admin.server.web.AbstractInstancesProxyController;
import de.codecentric.boot.admin.server.web.AdminController;
import de.codecentric.boot.admin.server.web.client.InstanceWebClient;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.time.Duration;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
......@@ -52,7 +53,6 @@ import org.springframework.web.util.UriComponentsBuilder;
*/
@AdminController
public class InstancesProxyController extends AbstractInstancesProxyController {
private static final Logger log = LoggerFactory.getLogger(InstancesProxyController.class);
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
public InstancesProxyController(String adminContextPath,
......@@ -80,7 +80,7 @@ public class InstancesProxyController extends AbstractInstancesProxyController {
.build(true)
.toUri();
//We need to explicitly block until the headers are recieved.
//We need to explicitly block until the headers are recieved and write them before the async dispatch.
//otherwise the FrameworkServlet will add wrong Allow header for OPTIONS request
ClientResponse clientResponse = super.forward(instanceId, uri, request.getMethod(), request.getHeaders(),
() -> BodyInserters.fromDataBuffers(
......@@ -88,24 +88,22 @@ public class InstancesProxyController extends AbstractInstancesProxyController {
response.setStatusCode(clientResponse.statusCode());
response.getHeaders().addAll(filterHeaders(clientResponse.headers().asHttpHeaders()));
response.flush();
OutputStream responseBody = response.getBody();
return clientResponse.body(BodyExtractors.toDataBuffers()).window(1).flatMap(body -> {
return clientResponse.body(BodyExtractors.toDataBuffers())
.window(1)
.flatMap(body -> writeAndFlush(body, responseBody))
.then();
}
private Mono<Void> writeAndFlush(Flux<DataBuffer> body, OutputStream responseBody) {
return DataBufferUtils.write(body, responseBody).map(DataBufferUtils::release).then(Mono.create(sink -> {
try {
return DataBufferUtils.write(body, response.getBody())
.map(DataBufferUtils::release)
.then(Mono.fromRunnable(() -> {
try {
response.getBody().flush();
} catch (IOException ex) {
//It's most likely that the client has disconnected.
//We can't handle that properly so we're ignoring the exception
log.debug("Error flushing the response", ex);
}
}));
responseBody.flush();
sink.success();
} catch (IOException ex) {
return Mono.error(ex);
sink.error(ex);
}
}).then();
}));
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment