Commit 38452e45 by Johannes Edmeier

Fix Reminding Notifier

parent 3cc312b6
......@@ -30,7 +30,6 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
......@@ -81,19 +80,15 @@ public class SpringBootAdminApplication {
this.repository = repository;
}
@Bean
@Primary
@Bean(initMethod = "start", destroyMethod = "stop")
public RemindingNotifier remindingNotifier() {
RemindingNotifier notifier = new RemindingNotifier(filteringNotifier(), repository);
notifier.setReminderPeriod(Duration.ofMinutes(10));
notifier.setCheckReminderInverval(Duration.ofSeconds(10));
return notifier;
}
@Scheduled(fixedRate = 1_000L)
public void remind() {
remindingNotifier().sendReminders();
}
@Bean
public FilteringNotifier filteringNotifier() {
return new FilteringNotifier(loggerNotifier(), repository);
......
......@@ -34,7 +34,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;
import org.springframework.http.HttpMethod;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.web.authentication.SavedRequestAwareAuthenticationSuccessHandler;
......@@ -108,19 +107,15 @@ public class SpringBootAdminApplication {
this.repository = repository;
}
@Bean
@Primary
@Bean(initMethod = "start", destroyMethod = "stop")
public RemindingNotifier remindingNotifier() {
RemindingNotifier notifier = new RemindingNotifier(filteringNotifier(), repository);
notifier.setReminderPeriod(Duration.ofMinutes(10));
notifier.setCheckReminderInverval(Duration.ofSeconds(10));
return notifier;
}
@Scheduled(fixedRate = 1_000L)
public void remind() {
remindingNotifier().sendReminders();
}
@Bean
public FilteringNotifier filteringNotifier() {
return new FilteringNotifier(loggerNotifier(), repository);
......
......@@ -22,13 +22,19 @@ import de.codecentric.boot.admin.server.domain.events.InstanceDeregisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceStatusChangedEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.retry.Retry;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
/**
......@@ -37,10 +43,13 @@ import org.springframework.util.Assert;
* @author Johannes Edmeier
*/
public class RemindingNotifier extends AbstractEventNotifier {
private static final Logger log = LoggerFactory.getLogger(RemindingNotifier.class);
private final ConcurrentHashMap<InstanceId, Reminder> reminders = new ConcurrentHashMap<>();
private final Notifier delegate;
private Duration checkReminderInverval = Duration.ofSeconds(10);
private Duration reminderPeriod = Duration.ofMinutes(10);
private String[] reminderStatuses = {"DOWN", "OFFLINE"};
private final Notifier delegate;
private Disposable subscription;
public RemindingNotifier(Notifier delegate, InstanceRepository repository) {
super(repository);
......@@ -50,7 +59,7 @@ public class RemindingNotifier extends AbstractEventNotifier {
@Override
public Mono<Void> doNotify(InstanceEvent event, Instance instance) {
return delegate.notify(event).then(Mono.fromRunnable(() -> {
return delegate.notify(event).onErrorResume(error -> Mono.empty()).then(Mono.fromRunnable(() -> {
if (shouldEndReminder(event)) {
reminders.remove(event.getInstance());
} else if (shouldStartReminder(event)) {
......@@ -59,16 +68,37 @@ public class RemindingNotifier extends AbstractEventNotifier {
}));
}
public void sendReminders() {
Instant now = Instant.now();
for (Reminder reminder : new ArrayList<>(reminders.values())) {
if (reminder.getLastNotification().plus(reminderPeriod).isBefore(now)) {
reminder.setLastNotification(now);
delegate.notify(reminder.getEvent());
}
public void start() {
this.subscription = Flux.interval(this.checkReminderInverval, Schedulers.newSingle("reminders"))
.log(log.getName(), Level.FINEST)
.doOnSubscribe(subscription -> log.debug("Started reminders"))
.flatMap(i -> this.sendReminders())
.retryWhen(Retry.any()
.retryMax(Integer.MAX_VALUE)
.doOnRetry(
ctx -> log.error("Resubscribing for reminders after uncaught error",
ctx.exception())))
.subscribe();
}
public void stop() {
if (this.subscription != null && !this.subscription.isDisposed()) {
log.debug("stopped reminders");
this.subscription.dispose();
}
}
protected Mono<Void> sendReminders() {
Instant now = Instant.now();
return Flux.fromIterable(this.reminders.values())
.filter(reminder -> reminder.getLastNotification().plus(reminderPeriod).isBefore(now))
.flatMap(reminder -> delegate.notify(reminder.getEvent())
.doOnSuccess(signal -> reminder.setLastNotification(now)))
.then();
}
protected boolean shouldStartReminder(InstanceEvent event) {
if (event instanceof InstanceStatusChangedEvent) {
return Arrays.binarySearch(reminderStatuses,
......@@ -98,6 +128,10 @@ public class RemindingNotifier extends AbstractEventNotifier {
this.reminderStatuses = copy;
}
public void setCheckReminderInverval(Duration checkReminderInverval) {
this.checkReminderInverval = checkReminderInverval;
}
private static class Reminder {
private final InstanceEvent event;
private Instant lastNotification;
......
......@@ -73,7 +73,7 @@ public class AdminServerNotifierAutoConfigurationTest {
.expectNext(APP_DOWN)
.thenCancel()
.verify();
Thread.sleep(50); //wait for the notifications in different thread
Thread.sleep(100); //wait for the notifications in different thread
assertThat(context.getBean(TestNotifier.class).getEvents()).containsOnly(APP_DOWN);
}
......
......@@ -18,20 +18,26 @@ package de.codecentric.boot.admin.server.notify;
import de.codecentric.boot.admin.server.domain.entities.Instance;
import de.codecentric.boot.admin.server.domain.entities.InstanceRepository;
import de.codecentric.boot.admin.server.domain.events.InstanceDeregisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceEndpointsDetectedEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceStatusChangedEvent;
import de.codecentric.boot.admin.server.domain.values.Endpoints;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.domain.values.Registration;
import de.codecentric.boot.admin.server.domain.values.StatusInfo;
import de.codecentric.boot.admin.server.eventstore.InstanceEventPublisher;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
......@@ -42,63 +48,122 @@ public class RemindingNotifierTest {
private final Instance instance2 = Instance.create(InstanceId.of("id-2"))
.register(Registration.create("App", "http://health").build())
.withStatusInfo(StatusInfo.ofDown());
private final InstanceEvent appDown = new InstanceStatusChangedEvent(instance1.getId(), instance1.getVersion(),
StatusInfo.ofDown());
private final InstanceEvent appUp = new InstanceStatusChangedEvent(instance1.getId(), instance1.getVersion(),
StatusInfo.ofUp());
private final InstanceEvent appDown = new InstanceStatusChangedEvent(instance1.getId(), 0L, StatusInfo.ofDown());
private final InstanceEvent appUp = new InstanceStatusChangedEvent(instance1.getId(), 0L, StatusInfo.ofUp());
private final InstanceEvent appEndpointsDiscovered = new InstanceEndpointsDetectedEvent(instance1.getId(), 0L,
Endpoints.empty());
private final InstanceEvent appDeregister = new InstanceDeregisteredEvent(instance1.getId(), 0L);
private final InstanceEvent otherAppUp = new InstanceStatusChangedEvent(instance2.getId(), 0L, StatusInfo.ofUp());
private InstanceRepository repository;
@Before
public void setUp() {
repository = mock(InstanceRepository.class);
when(repository.find(any())).thenReturn(Mono.empty());
when(repository.find(instance1.getId())).thenReturn(Mono.just(instance1));
when(repository.find(instance2.getId())).thenReturn(Mono.just(instance2));
}
@Test
public void test_ctor_assert() {
public void should_throw_on_invalid_ctor() {
assertThatThrownBy(() -> new CompositeNotifier(null)).isInstanceOf(IllegalArgumentException.class);
}
@Test
public void test_remind() {
public void should_remind_only_down_events() throws InterruptedException {
TestNotifier notifier = new TestNotifier();
RemindingNotifier reminder = new RemindingNotifier(notifier, repository);
reminder.setReminderPeriod(Duration.ZERO);
StepVerifier.create(reminder.notify(appDown)).verifyComplete();
StepVerifier.create(reminder.notify(appEndpointsDiscovered)).verifyComplete();
StepVerifier.create(reminder.notify(otherAppUp)).verifyComplete();
reminder.sendReminders();
reminder.sendReminders();
Thread.sleep(10);
StepVerifier.create(reminder.sendReminders()).verifyComplete();
Thread.sleep(10);
StepVerifier.create(reminder.sendReminders()).verifyComplete();
assertThat(notifier.getEvents()).containsOnly(appDown, otherAppUp, appDown, appDown);
assertThat(notifier.getEvents()).containsExactlyInAnyOrder(appDown, appEndpointsDiscovered, otherAppUp, appDown,
appDown);
}
@Test
public void test_no_remind_after_up() {
public void should_not_remind_remind_after_up() {
TestNotifier notifier = new TestNotifier();
RemindingNotifier reminder = new RemindingNotifier(notifier, repository);
reminder.setReminderPeriod(Duration.ZERO);
StepVerifier.create(reminder.notify(appDown)).verifyComplete();
StepVerifier.create(reminder.notify(appUp)).verifyComplete();
reminder.sendReminders();
StepVerifier.create(reminder.sendReminders()).verifyComplete();
assertThat(notifier.getEvents()).containsOnly(appDown, appUp);
assertThat(notifier.getEvents()).containsExactlyInAnyOrder(appDown, appUp);
}
@Test
public void should_not_remind_remind_after_deregister() {
TestNotifier notifier = new TestNotifier();
RemindingNotifier reminder = new RemindingNotifier(notifier, repository);
reminder.setReminderPeriod(Duration.ZERO);
StepVerifier.create(reminder.notify(appDown)).verifyComplete();
StepVerifier.create(reminder.notify(appDeregister)).verifyComplete();
StepVerifier.create(reminder.sendReminders()).verifyComplete();
assertThat(notifier.getEvents()).containsExactlyInAnyOrder(appDown, appDeregister);
}
@Test
public void test_no_remind_before_end() {
public void should_not_remind_remind_before_period_ends() {
TestNotifier notifier = new TestNotifier();
RemindingNotifier reminder = new RemindingNotifier(notifier, repository);
reminder.setReminderPeriod(Duration.ofHours(24));
StepVerifier.create(reminder.notify(appDown)).verifyComplete();
reminder.sendReminders();
StepVerifier.create(reminder.sendReminders()).verifyComplete();
assertThat(notifier.getEvents()).containsExactlyInAnyOrder(appDown);
}
@Test
public void should_resubscribe_after_error() {
FluxNotifier notifier = new FluxNotifier();
RemindingNotifier reminder = new RemindingNotifier(notifier, repository);
reminder.setCheckReminderInverval(Duration.ofMillis(1));
reminder.setReminderPeriod(Duration.ofMillis(1));
reminder.start();
StepVerifier.create(notifier)
.expectSubscription()
.then(() -> StepVerifier.create(reminder.notify(appDown)).verifyComplete())
.expectNext(appDown)
.expectNext(appDown)
.then(() -> StepVerifier.create(
reminder.notify(new InstanceDeregisteredEvent(InstanceId.of("ERROR"), 0L))).verifyComplete())
.expectNext(appDown)
.expectNext(appDown)
.then(reminder::stop)
.expectNoEvent(Duration.ofMillis(10))
.thenCancel()
.verify();
reminder.stop();
}
private static class FluxNotifier extends InstanceEventPublisher implements Notifier {
assertThat(notifier.getEvents()).containsOnly(appDown);
@Override
public Mono<Void> notify(InstanceEvent event) {
if (event.getInstance().getValue().equals("ERROR")) {
throw new IllegalArgumentException("TEST-ERROR");
}
this.publish(Collections.singletonList(event));
return Mono.empty();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment