Commit 6b2eee28 by Johannes Edmeier

REFA: Trigger the status/info updates via reactive fluxxes

parent 87a2eb10
...@@ -16,14 +16,14 @@ ...@@ -16,14 +16,14 @@
package de.codecentric.boot.admin.server.config; package de.codecentric.boot.admin.server.config;
import de.codecentric.boot.admin.server.event.ClientApplicationEvent; import de.codecentric.boot.admin.server.event.ClientApplicationEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationStatusChangedEvent;
import de.codecentric.boot.admin.server.eventstore.ClientApplicationEventStore; import de.codecentric.boot.admin.server.eventstore.ClientApplicationEventStore;
import de.codecentric.boot.admin.server.eventstore.SimpleEventStore; import de.codecentric.boot.admin.server.eventstore.SimpleEventStore;
import de.codecentric.boot.admin.server.registry.ApplicationIdGenerator; import de.codecentric.boot.admin.server.registry.ApplicationIdGenerator;
import de.codecentric.boot.admin.server.registry.ApplicationRegistry; import de.codecentric.boot.admin.server.registry.ApplicationRegistry;
import de.codecentric.boot.admin.server.registry.HashingApplicationUrlIdGenerator; import de.codecentric.boot.admin.server.registry.HashingApplicationUrlIdGenerator;
import de.codecentric.boot.admin.server.registry.InfoUpdateTrigger;
import de.codecentric.boot.admin.server.registry.InfoUpdater; import de.codecentric.boot.admin.server.registry.InfoUpdater;
import de.codecentric.boot.admin.server.registry.StatusUpdateApplicationListener; import de.codecentric.boot.admin.server.registry.StatusUpdateTrigger;
import de.codecentric.boot.admin.server.registry.StatusUpdater; import de.codecentric.boot.admin.server.registry.StatusUpdater;
import de.codecentric.boot.admin.server.registry.store.ApplicationStore; import de.codecentric.boot.admin.server.registry.store.ApplicationStore;
import de.codecentric.boot.admin.server.registry.store.SimpleApplicationStore; import de.codecentric.boot.admin.server.registry.store.SimpleApplicationStore;
...@@ -31,8 +31,7 @@ import de.codecentric.boot.admin.server.web.client.ApplicationOperations; ...@@ -31,8 +31,7 @@ import de.codecentric.boot.admin.server.web.client.ApplicationOperations;
import de.codecentric.boot.admin.server.web.client.BasicAuthHttpHeaderProvider; import de.codecentric.boot.admin.server.web.client.BasicAuthHttpHeaderProvider;
import de.codecentric.boot.admin.server.web.client.HttpHeadersProvider; import de.codecentric.boot.admin.server.web.client.HttpHeadersProvider;
import org.springframework.beans.factory.annotation.Autowired; import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.boot.web.client.RestTemplateBuilder;
...@@ -43,7 +42,6 @@ import org.springframework.core.Ordered; ...@@ -43,7 +42,6 @@ import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.DefaultResponseErrorHandler;
@Configuration @Configuration
...@@ -93,40 +91,35 @@ public class AdminServerCoreConfiguration { ...@@ -93,40 +91,35 @@ public class AdminServerCoreConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public StatusUpdater statusUpdater(ApplicationStore applicationStore, ApplicationOperations applicationOperations) { public StatusUpdater statusUpdater(ApplicationStore applicationStore, ApplicationOperations applicationOperations) {
StatusUpdater statusUpdater = new StatusUpdater(applicationStore, applicationOperations); StatusUpdater statusUpdater = new StatusUpdater(applicationStore, applicationOperations);
statusUpdater.setStatusLifetime(adminServerProperties.getMonitor().getStatusLifetime()); statusUpdater.setStatusLifetime(adminServerProperties.getMonitor().getStatusLifetime());
return statusUpdater; return statusUpdater;
} }
@Bean @Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean @ConditionalOnMissingBean
public InfoUpdater infoUpdater(ApplicationStore applicationStore, ApplicationOperations applicationOperations) { public StatusUpdateTrigger statusUpdateTrigger(StatusUpdater statusUpdater,
return new InfoUpdater(applicationStore, applicationOperations); Publisher<ClientApplicationEvent> events) {
StatusUpdateTrigger trigger = new StatusUpdateTrigger(statusUpdater, events);
trigger.setUpdateInterval(adminServerProperties.getMonitor().getPeriod());
return trigger;
} }
@Bean @Bean
@Qualifier("updateTaskScheduler") @ConditionalOnMissingBean
public ThreadPoolTaskScheduler updateTaskScheduler() { public InfoUpdater infoUpdater(ApplicationStore applicationStore, ApplicationOperations applicationOperations) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); return new InfoUpdater(applicationStore, applicationOperations);
taskScheduler.setPoolSize(1);
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("updateTask");
return taskScheduler;
} }
@Bean @Bean(initMethod = "start", destroyMethod = "stop")
@ConditionalOnMissingBean @ConditionalOnMissingBean
public StatusUpdateApplicationListener statusUpdateApplicationListener(StatusUpdater statusUpdater, public InfoUpdateTrigger infoUpdateTrigger(InfoUpdater infoUpdater, Publisher<ClientApplicationEvent> events) {
@Qualifier("updateTaskScheduler") ThreadPoolTaskScheduler taskScheduler) { return new InfoUpdateTrigger(infoUpdater, events);
StatusUpdateApplicationListener listener = new StatusUpdateApplicationListener(statusUpdater, taskScheduler);
listener.setUpdatePeriod(adminServerProperties.getMonitor().getPeriod());
return listener;
} }
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public ClientApplicationEventStore journaledEventStore() { public ClientApplicationEventStore eventStore() {
return new SimpleEventStore(); return new SimpleEventStore();
} }
...@@ -136,21 +129,9 @@ public class AdminServerCoreConfiguration { ...@@ -136,21 +129,9 @@ public class AdminServerCoreConfiguration {
return new SimpleApplicationStore(); return new SimpleApplicationStore();
} }
@Autowired
private InfoUpdater infoUpdater;
@EventListener
public void onClientApplicationStatusChangedEvent(ClientApplicationStatusChangedEvent event) {
infoUpdater.updateInfo(event.getApplication());
}
@Autowired
private ClientApplicationEventStore eventStore;
@Order(Ordered.HIGHEST_PRECEDENCE) @Order(Ordered.HIGHEST_PRECEDENCE)
@EventListener @EventListener
public void onClientApplicationEvent(ClientApplicationEvent event) { public void onClientApplicationEvent(ClientApplicationEvent event) {
eventStore.store(event); eventStore().store(event);
} }
} }
...@@ -66,7 +66,7 @@ public class HazelcastStoreConfiguration { ...@@ -66,7 +66,7 @@ public class HazelcastStoreConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public ClientApplicationEventStore journaledEventStore() { public ClientApplicationEventStore eventStore() {
IList<ClientApplicationEvent> list = hazelcastInstance.getList(eventListName); IList<ClientApplicationEvent> list = hazelcastInstance.getList(eventListName);
return new HazelcastEventStore(list); return new HazelcastEventStore(list);
} }
......
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.registry;
import de.codecentric.boot.admin.server.event.ClientApplicationEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationStatusChangedEvent;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InfoUpdateTrigger {
private static final Logger log = LoggerFactory.getLogger(InfoUpdateTrigger.class);
private final Publisher<ClientApplicationEvent> events;
private final InfoUpdater infoUpdater;
private Disposable subscription;
public InfoUpdateTrigger(InfoUpdater infoUpdater, Publisher<ClientApplicationEvent> events) {
this.infoUpdater = infoUpdater;
this.events = events;
}
public void start() {
log.debug("Subscribed to {} events for info updates", ClientApplicationStatusChangedEvent.class);
subscription = Flux.from(events)
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("info-updater"))
.ofType(ClientApplicationStatusChangedEvent.class)
.cast(ClientApplicationStatusChangedEvent.class)
.doOnNext(this::updateInfo)
.retry()
.subscribe();
}
public void stop() {
if (subscription != null) {
subscription.dispose();
}
}
protected void updateInfo(ClientApplicationStatusChangedEvent event) {
infoUpdater.updateInfo(event.getApplication());
}
}
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.registry;
import de.codecentric.boot.admin.server.event.ClientApplicationEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationRegisteredEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationRegistrationUpdatedEvent;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.context.WebApplicationContext;
public class StatusUpdateApplicationListener {
private static final Logger LOGGER = LoggerFactory.getLogger(StatusUpdateApplicationListener.class);
private final ThreadPoolTaskScheduler taskScheduler;
private final StatusUpdater statusUpdater;
private long updatePeriod = 10_000L;
private ScheduledFuture<?> scheduledTask;
public StatusUpdateApplicationListener(StatusUpdater statusUpdater, ThreadPoolTaskScheduler taskScheduler) {
this.statusUpdater = statusUpdater;
this.taskScheduler = taskScheduler;
}
@EventListener
public void onApplicationReady(ApplicationReadyEvent event) {
if (event.getApplicationContext() instanceof WebApplicationContext) {
startStatusUpdate();
}
}
@EventListener
public void onContextClosed(ContextClosedEvent event) {
if (event.getApplicationContext() instanceof WebApplicationContext) {
stopStatusUpdate();
}
}
@EventListener
public void onClientApplicationRegistered(ClientApplicationEvent event) {
if (event instanceof ClientApplicationRegisteredEvent ||
event instanceof ClientApplicationRegistrationUpdatedEvent) {
taskScheduler.submit(() -> statusUpdater.updateStatus(event.getApplication()));
}
}
public void startStatusUpdate() {
if (scheduledTask != null && !scheduledTask.isDone()) {
return;
}
scheduledTask = taskScheduler.scheduleAtFixedRate(statusUpdater::updateStatusForAllApplications, updatePeriod);
LOGGER.debug("Scheduled status-updater task for every {}ms", updatePeriod);
}
public void stopStatusUpdate() {
if (scheduledTask != null && !scheduledTask.isDone()) {
scheduledTask.cancel(true);
LOGGER.debug("Canceled status-updater task");
}
}
public void setUpdatePeriod(long updatePeriod) {
this.updatePeriod = updatePeriod;
}
}
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.registry;
import de.codecentric.boot.admin.server.event.ClientApplicationEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationRegisteredEvent;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.logging.Level;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StatusUpdateTrigger {
private static final Logger log = LoggerFactory.getLogger(StatusUpdateTrigger.class);
private final Publisher<ClientApplicationEvent> events;
private final StatusUpdater statusUpdater;
private long updateInterval = 10_000L;
private Disposable eventSubscription;
private Disposable intervalSubscription;
public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<ClientApplicationEvent> events) {
this.statusUpdater = statusUpdater;
this.events = events;
}
public void start() {
log.debug("Subscribed to {} events for status updates", ClientApplicationRegisteredEvent.class);
eventSubscription = Flux.from(events)
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("status-updater"))
.ofType(ClientApplicationRegisteredEvent.class)
.cast(ClientApplicationRegisteredEvent.class)
.doOnNext(this::updateStatus)
.retry()
.subscribe();
log.debug("Scheduled status update every {}ms", updateInterval);
intervalSubscription = Flux.interval(Duration.ofMillis(updateInterval))
.log(log.getName(), Level.FINEST)
.subscribeOn(Schedulers.newSingle("status-monitor"))
.doOnNext((i) -> this.updateStatusForAllApplications())
.retry()
.subscribe();
}
public void stop() {
if (eventSubscription != null) {
eventSubscription.dispose();
}
if (intervalSubscription != null) {
intervalSubscription.dispose();
}
}
protected void updateStatusForAllApplications() {
statusUpdater.updateStatusForAllApplications();
}
protected void updateStatus(ClientApplicationRegisteredEvent event) {
statusUpdater.updateStatus(event.getApplication());
}
public void setUpdateInterval(long updateInterval) {
this.updateInterval = updateInterval;
}
}
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
*/ */
package de.codecentric.boot.admin.server; package de.codecentric.boot.admin.server;
import de.codecentric.boot.admin.server.AdminApplicationTest.TestAdminApplication;
import de.codecentric.boot.admin.server.config.EnableAdminServer; import de.codecentric.boot.admin.server.config.EnableAdminServer;
import de.codecentric.boot.admin.server.model.Registration; import de.codecentric.boot.admin.server.model.Registration;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
...@@ -24,20 +23,16 @@ import reactor.test.StepVerifier; ...@@ -24,20 +23,16 @@ import reactor.test.StepVerifier;
import java.net.URI; import java.net.URI;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.json.JSONObject; import org.json.JSONObject;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.springframework.boot.SpringApplication;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringBootConfiguration; import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.test.web.reactive.server.WebTestClient;
import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.ExchangeStrategies;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
...@@ -46,18 +41,18 @@ import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; ...@@ -46,18 +41,18 @@ import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = TestAdminApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT, properties = {
"spring.main.web-application-type=servlet", "management.context-path=/mgmt", "info.test=foobar"})
public class AdminApplicationTest { public class AdminApplicationTest {
private WebTestClient webClient; private WebTestClient webClient;
@LocalServerPort
private int port; private int port;
@Autowired private ServletWebServerApplicationContext instance;
private ObjectMapper mapper;
@Before @Before
public void setup() { public void setUp() {
instance = (ServletWebServerApplicationContext) SpringApplication.run(TestAdminApplication.class,
"--server.port=0", "--management.context-path=/mgmt", "--info.test=foobar");
port = instance.getWebServer().getPort();
ObjectMapper mapper = new ObjectMapper().registerModule(new JsonOrgModule());
webClient = WebTestClient.bindToServer() webClient = WebTestClient.bindToServer()
.baseUrl("http://localhost:" + port) .baseUrl("http://localhost:" + port)
.exchangeStrategies(ExchangeStrategies.builder().codecs((configurer) -> { .exchangeStrategies(ExchangeStrategies.builder().codecs((configurer) -> {
...@@ -67,6 +62,11 @@ public class AdminApplicationTest { ...@@ -67,6 +62,11 @@ public class AdminApplicationTest {
.build(); .build();
} }
@After
public void shutdown() {
instance.close();
}
@Test @Test
public void lifecycle() throws InterruptedException { public void lifecycle() throws InterruptedException {
Flux<JSONObject> events = getEventStream(); Flux<JSONObject> events = getEventStream();
...@@ -163,9 +163,5 @@ public class AdminApplicationTest { ...@@ -163,9 +163,5 @@ public class AdminApplicationTest {
@EnableAutoConfiguration @EnableAutoConfiguration
@SpringBootConfiguration @SpringBootConfiguration
public static class TestAdminApplication { public static class TestAdminApplication {
@Bean
public JsonOrgModule jsonOrgModule() {
return new JsonOrgModule();
}
} }
} }
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.registry;
import de.codecentric.boot.admin.server.event.ClientApplicationEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationRegisteredEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationStatusChangedEvent;
import de.codecentric.boot.admin.server.model.Application;
import de.codecentric.boot.admin.server.model.ApplicationId;
import de.codecentric.boot.admin.server.model.Registration;
import de.codecentric.boot.admin.server.model.StatusInfo;
import reactor.test.publisher.TestPublisher;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class InfoUpdateTriggerTest {
private final Application application = Application.create(ApplicationId.of("id-1"),
Registration.create("foo", "http://health-1").build()).build();
@Test
public void should_update_on_event() {
//given
InfoUpdater updater = mock(InfoUpdater.class);
TestPublisher<ClientApplicationEvent> events = TestPublisher.create();
InfoUpdateTrigger trigger = new InfoUpdateTrigger(updater, events);
trigger.start();
//when some non-registered event is emitted
events.next(new ClientApplicationRegisteredEvent(application, application.getRegistration()));
//then should not update
verify(updater, never()).updateInfo(application);
//when registered event is emitted
events.next(new ClientApplicationStatusChangedEvent(application, StatusInfo.ofUp(), StatusInfo.ofDown()));
//then should update
verify(updater, times(1)).updateInfo(application);
//when registered event is emitted but the trigger has been stopped
trigger.stop();
reset(updater);
events.next(new ClientApplicationRegisteredEvent(application, application.getRegistration()));
//then should not update
verify(updater, never()).updateInfo(application);
}
}
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.registry;
import de.codecentric.boot.admin.server.event.ClientApplicationRegisteredEvent;
import de.codecentric.boot.admin.server.model.Application;
import de.codecentric.boot.admin.server.model.ApplicationId;
import de.codecentric.boot.admin.server.model.Registration;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.concurrent.SettableListenableFuture;
import org.springframework.web.context.ConfigurableWebApplicationContext;
import org.springframework.web.context.WebApplicationContext;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class StatusUpdateApplicationListenerTest {
@SuppressWarnings({"unchecked", "rawtypes"})
@Test
public void test_start_stop() throws Exception {
StatusUpdater statusUpdater = mock(StatusUpdater.class);
ThreadPoolTaskScheduler scheduler = mock(ThreadPoolTaskScheduler.class);
StatusUpdateApplicationListener listener = new StatusUpdateApplicationListener(statusUpdater, scheduler);
ScheduledFuture task = mock(ScheduledFuture.class);
when(scheduler.scheduleAtFixedRate(isA(Runnable.class), eq(10_000L))).thenReturn(task);
listener.onApplicationReady(new ApplicationReadyEvent(mock(SpringApplication.class), null,
mock(ConfigurableWebApplicationContext.class)));
verify(scheduler).scheduleAtFixedRate(isA(Runnable.class), eq(10_000L));
listener.onContextClosed(new ContextClosedEvent(mock(WebApplicationContext.class)));
verify(task).cancel(true);
}
@Test
public void test_newApplication() throws Exception {
StatusUpdater statusUpdater = mock(StatusUpdater.class);
ThreadPoolTaskScheduler scheduler = mock(ThreadPoolTaskScheduler.class);
when(scheduler.submit(any(Runnable.class))).then((Answer<Future<?>>) invocation -> {
invocation.<Runnable>getArgument(0).run();
SettableListenableFuture<?> future = new SettableListenableFuture<Void>();
future.set(null);
return future;
});
StatusUpdateApplicationListener listener = new StatusUpdateApplicationListener(statusUpdater, scheduler);
Application application = Application.create(ApplicationId.of("id"),
Registration.builder().name("test").healthUrl("http://example.com").build()).build();
listener.onClientApplicationRegistered(
new ClientApplicationRegisteredEvent(application, application.getRegistration()));
verify(statusUpdater).updateStatus(eq(application));
}
}
/*
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package de.codecentric.boot.admin.server.registry;
import de.codecentric.boot.admin.server.event.ClientApplicationEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationInfoChangedEvent;
import de.codecentric.boot.admin.server.event.ClientApplicationRegisteredEvent;
import de.codecentric.boot.admin.server.model.Application;
import de.codecentric.boot.admin.server.model.ApplicationId;
import de.codecentric.boot.admin.server.model.Info;
import de.codecentric.boot.admin.server.model.Registration;
import reactor.test.publisher.TestPublisher;
import org.junit.Test;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class StatusUpdateTriggerTest {
private final Application application = Application.create(ApplicationId.of("id-1"),
Registration.create("foo", "http://health-1").build()).build();
@Test
public void should_start_and_stop_monitor() throws Exception {
//given
StatusUpdater updater = mock(StatusUpdater.class);
StatusUpdateTrigger trigger = new StatusUpdateTrigger(updater, TestPublisher.create());
trigger.setUpdateInterval(10L);
//when trigger is initialized
trigger.start();
Thread.sleep(15L);
//then it should start updating
verify(updater, atLeastOnce()).updateStatusForAllApplications();
//when trigger ist destroyed
trigger.stop();
reset(updater);
Thread.sleep(15L);
// it should stop updating
verify(updater, never()).updateStatusForAllApplications();
}
@Test
public void should_update_on_event() {
//given
StatusUpdater updater = mock(StatusUpdater.class);
TestPublisher<ClientApplicationEvent> events = TestPublisher.create();
StatusUpdateTrigger trigger = new StatusUpdateTrigger(updater, events);
trigger.start();
//when some non-registered event is emitted
events.next(new ClientApplicationInfoChangedEvent(application, Info.empty()));
//then should not update
verify(updater, never()).updateStatus(application);
//when registered event is emitted
events.next(new ClientApplicationRegisteredEvent(application, application.getRegistration()));
//then should update
verify(updater, times(1)).updateStatus(application);
//when registered event is emitted but the trigger has been stopped
trigger.stop();
reset(updater);
events.next(new ClientApplicationRegisteredEvent(application, application.getRegistration()));
//then should not update
verify(updater, never()).updateStatus(application);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment