Nacos官方文档:https://nacos.io/zh-cn/docs/quick-start.html
服务端对外暴露的API:https://nacos.io/zh-cn/docs/open-api.html
Nacos的Server端其实就是一个Web服务,对外提供了Http服务接口,所有的客户端与服务端的通讯都通过Http调用完成(短链接)。
Nacos注册服务核心类:NacosNamingService
Nacos配置中心核心类:NacosConfigService
一、微服务中常用的注册中心对比:
-
Zookeeper(Apache):典型的CP架构,有Leader节点,在选举Leader的过程中,整个集群对外不可用,为了强一致性,牺牲高可用性!(Client与Server之间为心跳维持的TCP长连接)
-
Eureka(Netflix):AP架构,为了高可用性,牺牲强一致性;服务提供者新节点注册后,消费者需要一定的时间后才能拿到最新服务列表,最长可达60s;
-
Nacos(阿里):参考了Zookeeper+Eureka,同时支持AP/CP架构,集群默认为AP架构,也可以通过配置切换为CP架构(Raft);服务列表变动后,消费者获取最新列表最然会有一点延迟,但是比Eureka好很多,而且还可以通过udp实时通知,虽然UDP可靠性无法保证!(Client与Server之间为短链接Http调用)
二、Euraka的服务架构图

-
服务注册+服务心跳:首先无论是“服务提供者”还是“服务消费者”都会将自己注册到nacos,并维持心跳。(每5秒发送一次心跳包)
-
服务健康检查:服务端再启动后,会以Service为单位,开启ClientBeatCheckTask心跳检查任务。(每5秒检查一次,如果某个客户端最后一次心跳超过15秒,标记为不健康,超过30秒踢除)
-
服务发现:“服务消费者”会根据需要自己的自己所需的目标服务的namespace/group/serviceName/cluster只根据需要查询对应的服务注册表,保存在本地。(定时每10s去服务端更新一次)!
-
服务同步:服务端集群之间会同步服务注册表,用来保证服务信息的一致性!(注意AP架构的集群中,即使配置了mysql,也不是用来存放注册表)
三、Nacos的核心注册表结构(双层ConcurrentHashMap)

1、Nacos和Eureka的注册表底层都是双层ConcurrentHashMap:
// 本篇只介绍Nacos
public class ServiceManager implements RecordListener<Service> {
// Nacos服务注册表的实际存储结构(双层Map)
Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>()
//Map<nameSpaceId, Map<group::serverName, Service>> ————> 通过nameSpaceId + group::serviceName定位到具体的服务(Service)
//其中的Service服务实例的结构:
public class Service {
private Map<String, Cluster> clusterMap = new HashMap<>();
//Map<clusterName, Cluster> ————> 在具体的serviceInstance内通过clusterName定位到具体的Cluster集群
//而Cluster集群,又是这样的结构
public class Cluster {
private Set<Instance> ephemeralInstances = new HashSet<>(); //这就是实际可以对外提供的单个服务(serviceInstanceItem)
}
}
}
// 总结:Nacos底层数据结构,显示一个双层Map,
—— 1、服务发现阶段,通过nameSpaceId, group::serviceName找到对应的服务 Service服务
—— 2、在服务Service内通过clusterName定位到具体的集群Cluster
—— 3、在Cluster集群里面以HashSet的形式,存放着所有能够提供服务的每个实例Instance(这个Instance中有访问它的详细信息),最后把整个Set列表返回给客户端即可!
2、Nacos这么多层的配置,该如何使用?最佳实践?
-
最佳实践一(中小型公司):
// namespace:用来区分不通的项目,如haier-iot / haier-code / cold-chain // group:用来区分不通项目的 prod / test / dev 等环境 // ————spring.application.name———— // cluster:可以以低于来划分集群:BJ / NJ / SH // 示例: spring: application: name: haier-iot-device-manager cloud: nacos: discovery: server-addr: 10.206.73.156:8848 namespace: haier-iot group: dev cluster-name: BJ //可以不区分 config: server-addr: 10.206.73.156:8848 file-extension: yaml namespace: haier-iot group: dev cluster-name: BJ //可以不区分
-
最佳实践二(大型公司):
//与最佳实践一的区别在于,直接使用nacos项目专用,直接使用 namespace 区分环境 // namespace:直接用来区分 prod / test / dev 等环境 // group:使用 DEFAULT_GROUP,因为微服务有可能太多,管理容易混乱;同时这一层可以做扩展,比如多个小服务可能属于另一个大服务下; // ————spring.application.name———— // cluster:可以以低于来划分集群:BJ / NJ / SH // 示例: spring: application: name: haier-iot-device-manager cloud: nacos: discovery: server-addr: 10.206.73.156:8848 namespace: dev group: DEFAULT_GROUP //默认GROUP可以不指定 cluster-name: BJ //可以不区分 config: server-addr: 10.206.73.156:8848 namespace: dev group: DEFAULT_GROUP //默认GROUP可以不指定 file-extension: yaml
3、为什么Nacos要设计这么复杂的数据结构?
因为Nacos是一个开放的产品,为了适应绝大多数使用者的使用场景,所以扩展性一定要好,这么多层的设计,几乎可以满足任意复杂的业务场景!
三、Nacos的注册表写入性能保证
1、Nacos这么负责的注册表结构,如何支撑高并发场景?(阻塞队列、异步注册)
// 使用内存阻塞队列实现异步注册 —— 当接收到provider的注册时,Nacos服务端会将任务封装成Task
public class DistroConsistencyServiceImpl{
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
// 而notifier是一个线程,单线程处理服务注册任务,也避免了“并发覆盖”问题!
public class Notifier implements Runnable {
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
// run()方法就是在处理放入到tasks队列中的Task任务
@Override
public void run() {
for (; ; ) { // 死循环,即使出现异常也不会退出
try {
Pair<String, DataOperation> pair = tasks.take(); //阻塞队列不消耗CPU
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
// 新的Instance任务被封装成Task任务,放入到Notifier中
public void put(String key, Record value) throws NacosException {
onPut(key, value); // 任务被封装成
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
...边缘逻辑...
notifier.addTask(key, DataOperation.CHANGE);
}
}
2、使用阻塞队列实现异步注册,会不会存在不一致问题,还没注册成功就给客户端返回结果?
# 是的,肯定会存在这个问题,但是这是一个取舍,高性能的中间件内部都使用了大量的异步操作; # 想一想,我们的应用程序可能依赖很多第三方服务,如果第三方中间件都用同步的方式去执行自己的内部逻辑,那么应用程序的启动将变得非常地缓慢,最后的效果肯定是难以接受的; —— 支持高并发! —— 要说不及时,之前的Eureka更严重! # 其实正常情况下,几乎不会太过阻塞,因为几乎没有多少公司,是一次性增加n多台服务的,都是慢慢添加的,而且即使个别慢了,也是可以接受的,先用其它服务节点即可,站在服务消费者的角度,也就是provider服务起得有点慢而已。
3、为了解决高并发下的读写冲突问题,Nacos使用了CopyOnWrite方案:
// 在Notifier.run()方法中:
listener.onChange(datumKey, dataStore.get(datumKey).value);
|
com.alibaba.nacos.naming.core.Service#onChange(){
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
}
|
com.alibaba.nacos.naming.core.Service#updateIPs(){
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
|
com.alibaba.nacos.naming.core.Cluster#updateIps{
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 将旧的临时实例ephemeralInstances列表,复制转化为一个Map进行更新操作
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
//...对旧Set拷贝后转化为HashMap进行更新操作...
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
4、同时n多个实例注册或更新,都进行CopyOnWrite,岂不是会存在“更新覆盖”?
// 1、首先,根据上面的第1条,Notifier的执行是一个单线程执行任务:
/// Notifier所在类DistroConsistencyServiceImpl是一个单例Service,@PostConstruct决定了init方法只会被调用一次:
//// 而GlobalExecutor 是一个单线程的线程池,所以处理实例注册的最终线程只会有一个
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
// 2、CopyOnWrite后的集合中的元素不能直接修改,因为集合中的元素是引用!
// —— 当新增时,直接在新集合中新增Instance,然后替换原注册表中的集合即可!
// —— 当删除时,直接将新集合中的对应Instance删除,然后替换原注册表中的集合即可!
// —— 当更新时,新增一个Instance,然后删除原集合中的Instance元素,增加新的Instance元素即可!
// ————原则就是:永远是替换,不直接修改原Instance对象!
5、随着注册表的不断增大,进行CopyOnWrite时候的成本是不是变得非常大?
// 当然不是每次直接Copy整张注册表,那样开销肯定很大
// 每次Copy的粒度是缩小到Service下对应的Cluster中的Set<Instance>集合,这个粒度是很小的!
public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
private Set<Instance> persistentInstances = new HashSet<>(); // AP模式实例列表(服务发现得到的列表就是它)
private Set<Instance> ephemeralInstances = new HashSet<>(); // CP模式实例列表(服务发现得到的列表就是它)
// 更新节点的操作(Cluster级别)
public void updateIps(List<Instance> ips, boolean ephemeral) {
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
//...对旧Set拷贝后转化为HashMap进行更新操作...
toUpdateInstances = new HashSet<>(ips);
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
}
// 为了性能,CopyOnWrite的粒度一定要越小越好!
四、Nacos的心跳机制(定时去调Nacos服务端Http接口)
核心类:NacosNamingService
1、Client在向服务端注册服务的同时,开启定时任务向服务端发送心跳请求:
// com.alibaba.nacos.client.naming.NacosNamingService#registerInstance()
// 既是注册服务的核心代码,也是发送心跳的核心代码
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
if (instance.isEphemeral()) {
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
beatReactor.addBeatInfo(groupedServiceName, beatInfo); // 发送心跳
}
serverProxy.registerService(groupedServiceName, groupName, instance); // 注册实例
}
|
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
...
// 第一次调用 = 触发心跳任务
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
...
}
|
//BeatTask.run()任务核心代码:
public void run() {
if (!this.beatInfo.isStopped()) {
// 计算下一次发送的时间
long nextTime = this.beatInfo.getPeriod();
try {
//此处就是去调用“发送心跳API”
JsonNode result = BeatReactor.this.serverProxy.sendBeat(this.beatInfo, BeatReactor.this.lightBeatEnabled);
...对心跳发送结果进行处理...
} catch (NacosException var11) {
...log...
}
//第二次发送心跳,循环进行,就形成定时发送心跳的效果
BeatReactor.this.executorService.schedule(BeatReactor.this.new BeatTask(this.beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
}
我们再看看执行“心跳任务”的线程长啥样:
// 定时任务线程
this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true); // 守护线程(所有用户线程结束后,守护线程会自动结束)
thread.setName("com.alibaba.nacos.naming.beat.sender");
return thread;
}
});
所以“心跳包”的核心就是:通过一个定时任务守护线程,定时去调用Nacos服务端的发送心跳包的API接口!
2、“心跳包”的默认间隔时间是多少?(5-15-30)
// 构建心跳包的信息
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
public long getInstanceHeartBeatInterval() {
return this.getMetaDataByKeyWithDefault("preserved.heart.beat.interval", Constants.DEFAULT_HEART_BEAT_INTERVAL);
}
//而Constants.DEFAULT_HEART_BEAT_INTERVAL是个常量
static {
DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15L); //15秒 收不到心跳,则会被标记为“不健康”
DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30L); //30秒 收不到心跳,则“剔除”该实例IP
DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5L); //默认心跳时间 5秒
}
3、Nacos服务端对心跳包的处理逻辑(服务健康检查)?(定时任务,每5秒健康检查)
// 以Service为单位,每个Service在被初始化时,都会创建一个健康检查器HealthCheckReactor
public class Service {
public void init() {
// HealthCheckReactor 健康检查器,也是通过schduled线程池去做检查的
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
}
|
// HealthCheckReactor.scheduleCheck()方法就是开启定时任务线程
|线程池大小为1~核数/2
public static void scheduleCheck(BeatCheckTask task) {
futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
//延迟5秒后,每5秒执行一次
}
健康检查任务的核心 run() 方法逻辑:
public class ClientBeatCheckTask implements BeatCheckTask {
@Override
public void run() {
//拿出Service中所有的实例(后面遍历检查)
List<Instance> instances = service.allIPs(true);
// 系统当前时间 - 最后一次心跳时间 > 不健康阈值(15秒),标记为不健康
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// 系统当前时间 - 最后一次心跳时间 > 可剔除阈值(30秒),直接剔除
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
}
}
所以“服务健康检查”的逻辑就是:服务端以Service为单位,使用定时任务线程池,每5秒检查一次Service中所有实例的状态:
最后心跳时间距当前超过15秒,标记为不健康;
最后心跳时间距当前超过30秒,将此实例踢除!
五、服务发现
当服务消费者需要查询自己需要的服务列表时,会优先从本地缓存注册表获取数据,第一次获取为空时,才会从远程Server端获取;
从远程Server获取服务列表的粒度为Cluster粒度,同时还会将自己的udp端口告诉Server端,便于Server变化时的主动通知;
从远程Server获取列表的同时,还会启动定时任务,每隔10秒从Server端同步一次自己的注册表(只同步自己需要的);
udp通知的可靠性不能保证,但是影响不大,因为有定时任务同步托底!
1、获取服务实例列表核心方法:NacosNamingService#getAllInstances()
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) {
ServiceInfo serviceInfo;
if (subscribe) { // 默认是开启订阅(udp通知),所以走这一分支
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor
.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ","));
}
List<Instance> list;
if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
return new ArrayList<Instance>();
}
return list;
}
|
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key); // 故障转移功能,从故障转移文件获取服务列表
}
// 从本地缓存的注册表获取服务列表
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) { // 第一次启动时候,缓存肯定为空,所以会走这一分支
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters); // 核心去远程获取服务列表的方法
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters); // 开启定时任务,定时更新本地注册表
return serviceInfoMap.get(serviceObj.getKey());
}
从远程获取服务列表没啥看的,我们重点看看定时任务更新本地缓存注册表的逻辑:
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
// UpdateTask见名知意
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
|
// 看看UpdateTask.run()核心方法:
public void run() {
long delayTime = -1;
try {
...一系列逻辑,但是最终都会走finally中的逻辑...
delayTime = serviceObj.getCacheMillis();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (delayTime > 0) {
// delayTime默认为10秒
executor.schedule(this, delayTime, TimeUnit.MILLISECONDS);
}
}
}
所以“服务发现”的逻辑就是:在客户端启动时,会根据需要从Nacos服务端获取自己需要的服务列表(Cluster级别),
并保存到本地缓存中的注册表中,并开启一个定时任务,每隔10秒去服务端同步一下对应的注册表;
之后每次需要时,都是从本地缓存中的注册表获取服务列表即可!
2、如何尽可能地保证本地注册表的实时性?(开启订阅,开放udp端口):
从第一条中我们看到一个开启订阅的逻辑,在对应的分支中,从服务端获取服务列表时:
updateServiceNow(serviceName, clusters); | String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); | // pushReceiver.getUdpPort() // 可以知道,从服务端获取服务列表时,顺便把自己的udp端口也传给了服务端 // 那么当服务端发现对应的服务列表有变动时,就可以通过此Udp端口通知到本Client
六、服务同步
Nacos集群即使配置了外部mysql数据库,注册表信息也是存储在每个节点的内存中的,而不是存储在mysql中,而当Client向Nacos服务端注册时,只会选择一个Nacos Server节点注册,那么就必须有一套机制能够实现Nacos集群的各个节点都能同步到数据,Nacos自己实现了一套Distro协议,以实现分布式集群各节点之间的数据最终一致性!
1、什么时Distro协议?
Distro协议时Nacos社区自研的一套AP分布式协议,为了集群的高可用,牺牲强一致性,只追求最终一致性!
-
Nacos集群的每个节点时平等的,都可以处理读写请求,同时把数据同步到其他节点;
-
每个节点只负责部分数据(服务健康检查等),定时发送自己负责的数据的校验值到其他节点,以保证数据的一致性;
-
每个节点独立处理请求,不需要经过其他节点同意,及时从本地发起对Client端的相应!
2、Nacos集群中的节点,如何知道其它节点的存在?
得熟悉Nacos AP集群得部署方式,Nacos集群在部署时,需要在配置 cluster.conf 文件中配置集群得各个节点,这样每台机器就都知道集群中得其它节点得ip:port了;
@Component("serverListManager")
public class ServerListManager extends MemberChangeListener {
@PostConstruct
public void init() {
// 集群节点状态同步任务,它会每2秒调用集群其它节点的状态接口,以判断节点是否还在线!
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
}
// 集群节点状态同步任务,每2秒执行一次,
ServerStatusReporter.run(){
// 很简单,就是调用其它节点的状态接口,告诉其它机器自己还活着(集群中每两台机器直接都会互相调用);
// 如果某个节点在一定时间内,没有收到其它某个节点的状态报告,那就认为这个节点挂了,就会更新自己本地认为的集群存活节点数;
// 集群存活节点数会直接影响到“服务健康检查”的目标机器核心变量,从而决定每个Service,将会在哪个Server节点被执行健康检查!
synchronizer.send(server.getAddress(), msg);
}
}
3、“服务注册”任务由哪个节点负责?如何同步数据到其他节点?
“服务注册”任务,有Client端发起,根据负载均衡算法挑选一台Server机器进行注册;
被挑选到的Server节点,处理自己的注册任务的同时,通过Distro协议,同步到集群中的其它节点;
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
// 在本机处理服务注册请求
onPut(key, value);
// 同步给其它机器进行注册
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
4、如何判断各个Service的健康检查任务,由集群中的哪个节点负责检查?
// 我们找到心跳检查任务的run()方法:
ClientBeatCheckTask.run(){
// 判断是否该由本节点负责该Service的心跳检查任务
if (!getDistroMapper().responsible(service.getName())) {
return;
}
...如果是自己负责该Service的心跳检查,才会继续执行心跳检查任务...
}
// 判断逻辑
public boolean responsible(String serviceName) {
final List<String> servers = healthyList;
if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {
return true;
}
if (CollectionUtils.isEmpty(servers)) {
// means distro config is not ready yet
return false;
}
int index = servers.indexOf(EnvUtil.getLocalAddress());
int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());
if (lastIndex < 0 || index < 0) {
return true; // 自己不在集群列表中,那可能当前就不是集群部署,所以自己得检查
}
// 对serviceName进行hash后,对当前集群节点数量取余,看看是不是自己
// 如果不是自己,不用担心,其它机器在被注册时,也会走到这条逻辑,总有一台机器是负责该Service的“健康检查”的
int target = distroHash(serviceName) % servers.size();
return target >= index && target <= lastIndex;
}
5、集群间两个重要的同步任务:
1. ServerListManager下的ServerStatusReporter任务: —— 上面已经讲过,在集群之间通过定时调用状态接口的方式,同步集群各节点的在线状态! 2. ServiceManager下的ServiceReporter任务: —— 当某个节点执行完健康检查后,如果发现某个Service实例状态改变了,它必须要同步给集群中其它节点,修改各自注册表中的状态(通过调用InstanceController中的API接口实现)
6、如果有新节点加入集群,如果从其它节点同步数据?
// 每个节点启动时,会注入一个DistroProtocol的Bean
@Component
public class DistroProtocol {
// 在DistroProtocol的构造函数中,会启动DistroTask数据同步任务
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startDistroTask();
}
private void startDistroTask() {
// 如果时单节点运行,就不用同步啦
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask();
startLoadTask(); // 开启数据加载任务
}
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}
}
//DistroLoadDataTask任务的核心run()方法:
DistroLoadDataTask.run(){
try {
load(); // 从其它节点加载数据
if (!checkCompleted()) {
// 如果不成功,就开个延时任务,过会儿继续尝试去加载
GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());
} else {
loadCallback.onSuccess();
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");
}
} catch (Exception e) {
loadCallback.onFailed(e);
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);
}
}
真正的load()从远程加载逻辑:
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
// for循环尝试从所有远程节点获取注册表全量文件,只要有一个成功,则跳出for循环
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
// 调取远程节点的获取DatumSnapshot快照数据接口
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
// 处理数据,加载到本节点内存的注册表中,完成新节点数据初始化
boolean result = dataProcessor.processSnapshot(distroData);
if (result) {
return true; // 有一个节点成功,则跳出全部for循环,直接返回成功结果
}
} catch (Exception e) {
......
}
}
return false;
}



