Commit f0002da6 by 赵天增

补充注释

parent 74f29998
...@@ -27,6 +27,7 @@ import reactor.core.publisher.Mono; ...@@ -27,6 +27,7 @@ import reactor.core.publisher.Mono;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -43,6 +44,8 @@ import org.springframework.util.PatternMatchUtils; ...@@ -43,6 +44,8 @@ import org.springframework.util.PatternMatchUtils;
/** /**
* Listener for Heartbeats events to publish all services to the instance registry. * Listener for Heartbeats events to publish all services to the instance registry.
* *
* 监听Heartbeats事件以将所有服务发布到实例注册表。
*
* @author Johannes Edmeier * @author Johannes Edmeier
*/ */
public class InstanceDiscoveryListener { public class InstanceDiscoveryListener {
...@@ -89,6 +92,10 @@ public class InstanceDiscoveryListener { ...@@ -89,6 +92,10 @@ public class InstanceDiscoveryListener {
discoverIfNeeded(event.getValue()); discoverIfNeeded(event.getValue());
} }
/**
* 监听eureka的心跳
* @param event
*/
@EventListener @EventListener
public void onApplicationEvent(HeartbeatEvent event) { public void onApplicationEvent(HeartbeatEvent event) {
discoverIfNeeded(event.getValue()); discoverIfNeeded(event.getValue());
...@@ -101,13 +108,31 @@ public class InstanceDiscoveryListener { ...@@ -101,13 +108,31 @@ public class InstanceDiscoveryListener {
} }
protected void discover() { protected void discover() {
// 开始进行发现逻辑,获取所有注册上去的服务
Flux.fromIterable(discoveryClient.getServices()) Flux.fromIterable(discoveryClient.getServices())
// 是否需要进行注册,可以控制有的服务不被发现
.filter(this::shouldRegisterService) .filter(this::shouldRegisterService)
.flatMapIterable(discoveryClient::getInstances) .flatMapIterable(new Function<String, Iterable<ServiceInstance>>() {
.flatMap(this::registerInstance) @Override
public Iterable<ServiceInstance> apply(String s) {
return discoveryClient.getInstances(s);
}
})
.flatMap(new Function<ServiceInstance, Mono<InstanceId>>() {
@Override
public Mono<InstanceId> apply(ServiceInstance serviceInstance) {
return registerInstance(serviceInstance);
}
})
.collect(Collectors.toSet()) .collect(Collectors.toSet())
.flatMap(this::removeStaleInstances) .flatMap(new Function<Set<InstanceId>, Mono<Void>>() {
.subscribe(v -> { }, ex -> log.error("Unexpected error.", ex)); @Override
public Mono<Void> apply(Set<InstanceId> objects) {
return removeStaleInstances(objects);
}
})
.subscribe(v -> {
}, ex -> log.error("Unexpected error.", ex));
} }
protected Mono<Void> removeStaleInstances(Set<InstanceId> registeredInstanceIds) { protected Mono<Void> removeStaleInstances(Set<InstanceId> registeredInstanceIds) {
......
...@@ -48,12 +48,16 @@ public class InstanceRegistry { ...@@ -48,12 +48,16 @@ public class InstanceRegistry {
*/ */
public Mono<InstanceId> register(Registration registration) { public Mono<InstanceId> register(Registration registration) {
Assert.notNull(registration, "'registration' must not be null"); Assert.notNull(registration, "'registration' must not be null");
// 通过某种算法生成InstanceId
InstanceId id = generator.generateId(registration); InstanceId id = generator.generateId(registration);
Assert.notNull(id, "'id' must not be null"); Assert.notNull(id, "'id' must not be null");
// 然后更新这个id对应的实例信息
return repository.compute(id, (key, instance) -> { return repository.compute(id, (key, instance) -> {
// 如果这个对应的实例信息为空,会创建一个信息实例上去
if (instance == null) { if (instance == null) {
instance = Instance.create(key); instance = Instance.create(key);
} }
// 然后把注册实例注册上去
return Mono.just(instance.register(registration)); return Mono.just(instance.register(registration));
}).map(Instance::getId); }).map(Instance::getId);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment