Commit 436357c7 by Johannes Edmeier

Trigger endpoint detection and immediate health check on registration update

fixes #753
parent f7a41f58
/*
* Copyright 2014-2017 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -17,6 +17,7 @@
package de.codecentric.boot.admin.server.services;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegistrationUpdatedEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceStatusChangedEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
......@@ -24,20 +25,23 @@ import reactor.core.scheduler.Schedulers;
import org.reactivestreams.Publisher;
public class EndpointDetectionTrigger extends ResubscribingEventHandler<InstanceStatusChangedEvent> {
public class EndpointDetectionTrigger extends ResubscribingEventHandler<InstanceEvent> {
private final EndpointDetector endpointDetector;
public EndpointDetectionTrigger(EndpointDetector endpointDetector, Publisher<InstanceEvent> publisher) {
super(publisher, InstanceStatusChangedEvent.class);
super(publisher, InstanceEvent.class);
this.endpointDetector = endpointDetector;
}
@Override
protected Publisher<?> handle(Flux<InstanceStatusChangedEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector")).flatMap(this::detectEndpoints);
protected Publisher<?> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("endpoint-detector"))
.filter(event -> event instanceof InstanceStatusChangedEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(this::detectEndpoints);
}
protected Mono<Void> detectEndpoints(InstanceStatusChangedEvent event) {
protected Mono<Void> detectEndpoints(InstanceEvent event) {
return endpointDetector.detectEndpoints(event.getInstance());
}
}
......@@ -18,6 +18,7 @@ package de.codecentric.boot.admin.server.services;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegistrationUpdatedEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
......@@ -34,7 +35,7 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StatusUpdateTrigger extends ResubscribingEventHandler<InstanceRegisteredEvent> {
public class StatusUpdateTrigger extends ResubscribingEventHandler<InstanceEvent> {
private static final Logger log = LoggerFactory.getLogger(StatusUpdateTrigger.class);
private final StatusUpdater statusUpdater;
private Map<InstanceId, Instant> lastQueried = new HashMap<>();
......@@ -44,7 +45,7 @@ public class StatusUpdateTrigger extends ResubscribingEventHandler<InstanceRegis
public StatusUpdateTrigger(StatusUpdater statusUpdater, Publisher<InstanceEvent> publisher) {
super(publisher, InstanceRegisteredEvent.class);
super(publisher, InstanceEvent.class);
this.statusUpdater = statusUpdater;
}
......@@ -65,8 +66,10 @@ public class StatusUpdateTrigger extends ResubscribingEventHandler<InstanceRegis
}
@Override
protected Publisher<?> handle(Flux<InstanceRegisteredEvent> publisher) {
protected Publisher<?> handle(Flux<InstanceEvent> publisher) {
return publisher.subscribeOn(Schedulers.newSingle("status-updater"))
.filter(event -> event instanceof InstanceRegisteredEvent ||
event instanceof InstanceRegistrationUpdatedEvent)
.flatMap(event -> updateStatus(event.getInstance()));
}
......
......@@ -19,6 +19,7 @@ package de.codecentric.boot.admin.server.services;
import de.codecentric.boot.admin.server.domain.entities.Instance;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegistrationUpdatedEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceStatusChangedEvent;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.domain.values.Registration;
......@@ -26,6 +27,7 @@ import de.codecentric.boot.admin.server.domain.values.StatusInfo;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.any;
......@@ -39,28 +41,45 @@ import static org.mockito.Mockito.when;
public class EndpointDetectionTriggerTest {
private final Instance instance = Instance.create(InstanceId.of("id-1"))
.register(Registration.create("foo", "http://health-1").build());
private TestPublisher<InstanceEvent> events = TestPublisher.create();
private EndpointDetector detector = mock(EndpointDetector.class);
private EndpointDetectionTrigger trigger;
@Test
public void should_detect_on_event() throws InterruptedException {
//given
EndpointDetector detector = mock(EndpointDetector.class);
@Before
public void setUp() throws Exception {
when(detector.detectEndpoints(any(InstanceId.class))).thenReturn(Mono.empty());
TestPublisher<InstanceEvent> events = TestPublisher.create();
EndpointDetectionTrigger trigger = new EndpointDetectionTrigger(detector, events.flux());
trigger = new EndpointDetectionTrigger(detector, events.flux());
trigger.start();
Thread.sleep(50L); //wait for subscription
}
//when some non-status-change event is emitted
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should not update
verify(detector, never()).detectEndpoints(instance.getId());
@Test
public void should_detect_on_status_changed() {
//when status-change event is emitted
events.next(new InstanceStatusChangedEvent(instance.getId(), instance.getVersion(), StatusInfo.ofDown()));
//then should update
verify(detector, times(1)).detectEndpoints(instance.getId());
}
@Test
public void should_detect_on_registration_updated() {
//when status-change event is emitted
events.next(
new InstanceRegistrationUpdatedEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should update
verify(detector, times(1)).detectEndpoints(instance.getId());
}
@Test
public void should_not_detect_on_non_relevant_event() {
//when some non-status-change event is emitted
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should not update
verify(detector, never()).detectEndpoints(instance.getId());
}
@Test
public void should_not_detect_on_trigger_stopped() {
//when registered event is emitted but the trigger has been stopped
trigger.stop();
clearInvocations(detector);
......@@ -68,5 +87,4 @@ public class EndpointDetectionTriggerTest {
//then should not update
verify(detector, never()).detectEndpoints(instance.getId());
}
}
......@@ -20,6 +20,7 @@ import de.codecentric.boot.admin.server.domain.entities.Instance;
import de.codecentric.boot.admin.server.domain.events.InstanceEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceInfoChangedEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegisteredEvent;
import de.codecentric.boot.admin.server.domain.events.InstanceRegistrationUpdatedEvent;
import de.codecentric.boot.admin.server.domain.values.Info;
import de.codecentric.boot.admin.server.domain.values.InstanceId;
import de.codecentric.boot.admin.server.domain.values.Registration;
......@@ -27,6 +28,7 @@ import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
import java.time.Duration;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.ArgumentMatchers.any;
......@@ -41,22 +43,29 @@ import static org.mockito.Mockito.when;
public class StatusUpdateTriggerTest {
private final Instance instance = Instance.create(InstanceId.of("id-1"))
.register(Registration.create("foo", "http://health-1").build());
private StatusUpdater updater = mock(StatusUpdater.class);
private TestPublisher<InstanceEvent> events = TestPublisher.create();
private StatusUpdateTrigger trigger;
@Before
public void setUp() throws Exception {
when(updater.updateStatus(any(InstanceId.class))).thenReturn(Mono.empty());
trigger = new StatusUpdateTrigger(updater, events.flux());
trigger.start();
Thread.sleep(50L); //wait for subscription
}
@Test
public void should_start_and_stop_monitor() throws Exception {
//given
StatusUpdater updater = mock(StatusUpdater.class);
when(updater.updateStatus(any(InstanceId.class))).thenReturn(Mono.empty());
TestPublisher<InstanceEvent> publisher = TestPublisher.create();
StatusUpdateTrigger trigger = new StatusUpdateTrigger(updater, publisher);
trigger.stop();
trigger.setUpdateInterval(Duration.ofMillis(10));
trigger.setStatusLifetime(Duration.ofMillis(10));
//when trigger is initialized and an appliation is registered
trigger.start();
Thread.sleep(50L); //wait for subscription
publisher.next(new InstanceRegisteredEvent(instance.getId(), 0L, instance.getRegistration()));
events.next(new InstanceRegisteredEvent(instance.getId(), 0L, instance.getRegistration()));
Thread.sleep(50L);
//then it should start updating one time for registration and at least once for monitor
......@@ -82,26 +91,32 @@ public class StatusUpdateTriggerTest {
}
@Test
public void should_update_on_event() throws InterruptedException {
//given
StatusUpdater updater = mock(StatusUpdater.class);
when(updater.updateStatus(any(InstanceId.class))).thenReturn(Mono.empty());
public void should_update_on_instance_registered_event() {
//when registered event is emitted
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should update
verify(updater, times(1)).updateStatus(instance.getId());
}
TestPublisher<InstanceEvent> events = TestPublisher.create();
StatusUpdateTrigger trigger = new StatusUpdateTrigger(updater, events.flux());
trigger.start();
Thread.sleep(50L); //wait for subscription
@Test
public void should_update_on_instance_registration_update_event() {
//when registered event is emitted
events.next(
new InstanceRegistrationUpdatedEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should update
verify(updater, times(1)).updateStatus(instance.getId());
}
@Test
public void should_not_update_on_non_relevant_event() {
//when some non-registered event is emitted
events.next(new InstanceInfoChangedEvent(instance.getId(), instance.getVersion(), Info.empty()));
//then should not update
verify(updater, never()).updateStatus(instance.getId());
}
//when registered event is emitted
events.next(new InstanceRegisteredEvent(instance.getId(), instance.getVersion(), instance.getRegistration()));
//then should update
verify(updater, times(1)).updateStatus(instance.getId());
@Test
public void should_not_update_when_stopped() {
//when registered event is emitted but the trigger has been stopped
trigger.stop();
clearInvocations(updater);
......@@ -109,5 +124,4 @@ public class StatusUpdateTriggerTest {
//then should not update
verify(updater, never()).updateStatus(instance.getId());
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment