Commit 25bf8ecc by Johannes Edmeier

Polish

parent 159dd261
......@@ -40,6 +40,8 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
......@@ -74,11 +76,16 @@ public class AdminServerCoreConfiguration {
@Bean
@ConditionalOnMissingBean
public ApplicationOperations applicationOperations(HttpHeadersProvider headersProvider) {
WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(options -> {
ReactorClientHttpConnector httpConnector = new ReactorClientHttpConnector(options -> {
options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
adminServerProperties.getMonitor().getConnectTimeout());
options.option(ChannelOption.SO_TIMEOUT, adminServerProperties.getMonitor().getReadTimeout());
})).build();
});
WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.defaultHeader(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.build();
return new ApplicationOperations(webClient, headersProvider);
}
......
......@@ -43,7 +43,8 @@ public class NotificationTrigger {
subscription = Flux.from(events)
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("notifications"))
.doOnNext(this::sendNotifications).retryWhen(ReactiveUtils.logAndRetryAny(log))
.doOnNext(this::sendNotifications)
.retryWhen(ReactiveUtils.logAndRetryAny(log))
.subscribe();
}
......
......@@ -46,7 +46,8 @@ public class InfoUpdateTrigger {
.subscribeOn(Schedulers.newSingle("info-updater"))
.ofType(ClientApplicationStatusChangedEvent.class)
.cast(ClientApplicationStatusChangedEvent.class)
.doOnNext(this::updateInfo).retryWhen(ReactiveUtils.logAndRetryAny(log))
.doOnNext(this::updateInfo)
.retryWhen(ReactiveUtils.logAndRetryAny(log))
.subscribe();
}
......
......@@ -49,7 +49,8 @@ public class StatusUpdateTrigger {
.subscribeOn(Schedulers.newSingle("status-updater"))
.ofType(ClientApplicationRegisteredEvent.class)
.cast(ClientApplicationRegisteredEvent.class)
.doOnNext(this::updateStatus).retryWhen(ReactiveUtils.logAndRetryAny(log))
.doOnNext(this::updateStatus)
.retryWhen(ReactiveUtils.logAndRetryAny(log))
.subscribe();
log.debug("Scheduled status update every {}ms", updateInterval);
......
......@@ -68,7 +68,7 @@ public class StatusUpdater implements ApplicationEventPublisherAware {
}
}
private void updateStatus(Application application) {
protected void updateStatus(Application application) {
StatusInfo oldStatus = application.getStatusInfo();
queryStatus(application).filter(newStatus -> !newStatus.getStatus().equals(oldStatus.getStatus()))
.doOnNext(newStatus -> {
......
......@@ -88,8 +88,7 @@ public class AdminApplicationTest {
deregisterApplication(location.get());
})
.assertNext((event) -> assertThat(event.opt("type")).isEqualTo("DEREGISTERED"))
.then(this::listEmptyApplications)
.thenCancel().verify(Duration.ofSeconds(30));
.then(this::listEmptyApplications).thenCancel().verify(Duration.ofSeconds(30));
}
private Flux<JSONObject> getEventStream() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment