一、Nacos CP集群架构的基础知识
1、Nacos集群部署后,可以同时支持AP和CP(注意,不是同时支持CAP)
-
AP架构:临时实例
-
CP架构:持久化实例
在注册服务时,如果我们让我们的节点注册为:持久化实例,即自动会走CP架构!
spring: application: name: nacos-config-client cloud: nacos: discovery: server-addr: 10.206.73.156:8848 namespace: haier-iot group: dev cluster-name: BJ ephemeral: true # 持久化实例走CP架构
2、Nacos CP架构使用的分布式一致性协议?(简化版的Raft)
Raft分布式一致性协议 和 Zookeeper使用的ZAB原子广播协议非常相似;
都是一个Leader带领多个Follower,区别在于,Leader选举时的投票机制:
-
ZAB投票时:所有候选节点都会发起投票,然后进行选票PK,决定谁胜出;
-
Raft投票时:会让所有节点随机睡眠,先睡醒的节点发起投票,投自己,并将选票发到其它节点等待结果;
但是,对结果的判断,ZAB 和 Raft 都遵循“半数机制”。
二、CP架构下,持久化实例的注册逻辑
1、注册实例的API接口不变:/nacos/v1/ns/instance

这里当然时选择RaftConsistencyServiceImpl的实现:
public void put(String key, Record value) throws NacosException {
checkIsStopWork();
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
......
}
}
2、整个CP架构下的主节点注册逻辑都在signalPublish()方法中
public void signalPublish(String key, Record value) throws Exception {
if (stopWork) {
throw new IllegalStateException("old raft protocol already stop work");
}
// 判断自己是不是Leader
if (!isLeader()) {
ObjectNode params = JacksonUtils.createEmptyJsonNode();
params.put("key", key);
params.replace("value", JacksonUtils.transferToJsonNode(value));
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
final RaftPeer leader = getLeader();
// 如果本节点不是Leader,就把请求转发给Leader节点处理
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
OPERATE_LOCK.lock();
try {
......
// 核心方法,写本地数据,写内存缓存,发布事件——更新内存服务注册表
onPublish(datum, peers.local());
final String content = json.toString();
// 通过CountDownLatch实现半数ack的统计,如果获得到半数以上的ack,则Countdownlatch逻辑才可以继续向下走!
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) { // 首先自己的钥匙先用上
latch.countDown();
continue;
}
final String url = buildUrl(server, API_ON_PUB); // /v1/ns/raft/datum/commit
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
latch.countDown(); // Http调用正常后,则当作一次ack
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
}
} finally {
OPERATE_LOCK.unlock();
}
其实Nacos实现的简单Raft协议,逻辑有点不太严谨,就是:即使本次同步不成功,但是主节点的本地磁盘文件 + 内存文件 已经都修改过了,不像Zookeeper的两阶段提交;
后期Nacos的一致性协议会修改为JRaft,这点肯定会解决!
3、Leader本节点保存数据的逻辑:onPublish(datum, peers.local())
public void onPublish(Datum datum, RaftPeer source) throws Exception {
......
// 逻辑能走到这,这个if正常都为true
if (KeyBuilder.matchPersistentKey(datum.key)) {
// 核心,将数据写到本地磁盘
raftStore.write(datum);
}
// 往内存中保存一些注册表信息
datums.put(datum.key, datum);
if (isLeader()) {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
} else {
if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
//set leader term:
getLeader().term.set(source.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
}
}
raftStore.updateTerm(local.term.get());
// 发布一个ValueChangeEvent事件,PersistentNotifier.onEvent(ValueChangeEvent)会去处理这个事件
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term);
}
Leader保存本节点数据,分为三个过程:
-
1. 保存数据到磁盘文件
-
2. 保存部分信息到缓存datums
-
3. 保存文件到内存中的服务注册表(双层ConcurrentHashMap)—— 但不是同步保存,而是通过事件发布,实现异步保存
需要保存到内存中的服务注册表时,会发布一个ValueChangeEvent事件,该事件会被PersisNotifier.onEvent(ValueChangeEvent)捕捉到,并进行处理:
// com.alibaba.nacos.naming.consistency.persistent.PersistentNotifier#onEvent
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}
|
|
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
......
for (RecordListener listener : listenerMap.get(key)) {
try {
if (action == DataOperation.CHANGE) {
listener.onChange(key, value); //执行updateIps更新内存注册表
continue;
}
if (action == DataOperation.DELETE) {
listener.onDelete(key);
}
} catch (Throwable e) {
......
}
}
}
|
|
public void onChange(String key, Instances value) throws Exception {
......
// 更新注册表(这个方法,在AP架构师,重点介绍过)
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
recalculateChecksum();
}
至此,整个Leader节点的持久化节点注册逻辑就完成了!
同步数据给其它Follower节点,就是HTTP调用“raft/datum/commit”接口,处理逻辑比主节点简单!
三、Nacos CP集群选举过程
1、集群节点启动时,会执行RaftCore.init()核心方法:
这个Init()核心方法,会启动2个核心定时任务(每500ms执行一次):
-
选举任务:new MasterElection()
-
心跳任务:new HeartBeat()
@Component
public class RaftCore implements Closeable {
@PostConstruct
public void init() throws Exception {
// 启动CP集群节点
raftStore.loadDatums(notifier, datums); //从本地磁盘文件加载数据
// 如果Leader周期不存在,则置为0
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
initialized = true;
// 每500ms做一次选举任务
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
// 每500ms做一次心跳任务
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
versionJudgement.registerObserver(isAllNewVersion -> {
stopWork = isAllNewVersion;
if (stopWork) {
try {
shutdown();
raftListener.removeOldRaftMetadata();
} catch (NacosException e) {
throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);
}
}
}, 100);
// 注册PersistentNotifier监听器,用来监听处理 ValueChangeEvent 事件,保存内存中服务注册表时用的
NotifyCenter.registerSubscriber(notifier);
}
}
2、选举任务的核心run()方法:
public class MasterElection implements Runnable {
@Override
public void run() {
try {
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
// Raft选举前的随机休眠阶段(15s到20s之间的随机值)
local.resetLeaderDue();
// 重新心跳时间为5s
local.resetHeartbeatDue();
// 率先跳出休眠的节点,发起投票
sendVote();
} catch (Exception e) {
}
}
private void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
// 重置集群节点投票
peers.reset();
// 选举周期+1
local.term.incrementAndGet();
// 默认投给自己
local.voteFor = local.ip;
// 把自己的状态改为 “候选者”
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JacksonUtils.toJson(local));
for (final String server : peers.allServersWithoutMySelf()) {
// 其它节点的API接口为:/raft/vote
final String url = buildUrl(server, API_VOTE);
try {
// 向其它节点发出选票
HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {
@Override
//其它节点给本节点的响应
public void onReceive(RestResult<String> result) {
if (!result.ok()) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);
return;
}
RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);
// 判断是否达到半数选票,成为Leader
peers.decideLeader(peer);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
}
}
}
}
3、其它节点收到投票后的处理逻辑:
如果收到的候选节点的term小于自己本地节点的term,则voteFor自己;(我更适合做Leader,这一票我投给自己)
否则,重置自己的election timeout,设置voteFor为收到的候选节点,更新集群周期term为候选节点的term;(我同意收到的节点做Leader)
给Http的调用方返回response;
四、Nacos CP集群的心跳任务
1、心跳任务由Leader节点发出,有2个作用:
-
确定Follower节点在线;
-
帮助Follower节点判断数据是否一致;(因为服务注册或变更时,Leader节点自己修改了,且收到了“过半”以上节点的ack,但是不排除有些节点没有执行成功,所以通过心跳任务,进行纠错容错)
2、心跳任务的核心run()方法(只有Leader节点才可以向其它节点发送心跳包):
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (stopWork) {
return;
}
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
// 任务每0.5s执行一次,每次减0.5s,总共5s减完后,就可以开始sendBeat()
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
// 重置心跳间隔时间为5s
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
// 如果当前是单机模式,或者本节点不是Leader节点,则无权发送心跳,直接跳过
if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return;
}
local.resetLeaderDue();
// build data
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
ArrayNode array = JacksonUtils.createEmptyArrayNode();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());
}
// 封装心跳包,从内存中获取Leader节点注册表缓存中抽取数据的key和timestamp值
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {
ObjectNode element = JacksonUtils.createEmptyJsonNode();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());
array.add(element);
}
}
packet.replace("datums", array);
// broadcast
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
String content = JacksonUtils.toJson(params);
// 对心跳包做gzip压缩
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
// 把压缩后的心跳包,发送给除自己外的其他所有节点
for (final String server : peers.allServersWithoutMySelf()) {
try {
// 心跳包的API为:/raft/beat
final String url = buildUrl(server, API_BEAT);
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
} catch (Exception e) {
}
}
}
}
3、Follower节点收到心跳包后的处理逻辑:
-
Leader发出的心跳包中,包含了数据中的所有key和timestamp,Follower节点通过遍历对比,可以排查自己数据是否为最新最全数据;
-
如果数据不是最新或最全的,则批量从Leader节点获取不一致的数据的最新值;(Leader节点新增或修改的数据)
-
同时要删除掉自己比Leader多出来的数据;(Leader节点删除掉的数据)
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
...从心跳包中解析数据...
// 设置Leader为发送心跳包给我的机器,因为只有Leader才可以发送心跳包
peers.makeLeader(remote);
if (!switchDomain.isSendBeatOnly()) {
// receivedKeysMap 的作用是判断出本节点 比 Leader节点多出来的数据(见方法的最后)
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
// 如果这个Map中的数据为0,则代表是本地自己的数据;
// 接收到的主节点数据时,把对应的值改为1;
// 那么直到处理最后,这个Map中还有0,说明这条数据在主节点并没有,只有一种可能,这条数据在主节点中被删除掉了!(妙)
// 最后,可以把这些数据,在本地清除掉
receivedKeysMap.put(entry.getKey(), 0);
}
// batch用来收集本节点没有的数据,或者不是最新的数据
List<String> batch = new ArrayList<>();
int processedCount = 0; // 已处理的数据条数
for (Object object : beatDatums) {
processedCount = processedCount + 1;
......
receivedKeysMap.put(datumKey, 1);
try {
// 包含,且我自己缓存中这条key对应的数据的时间戳>=收到的心跳中的数据,则代表这条数据我有,就可以跳出本轮
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
// 取反,不满足上面的条件,则说明这条数据我没有,或者不是最新的,则收集到batch中
// 因为节点变化的时候,虽然Leader节点收到了半数以上的ack,但是毕竟还有可能有些节点没有收到,或者处理不成功,所以这里通过心跳包进行数据同步的容错处理
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
// 当batch的数据量>=50或者数据已经全部处理完,则就可以继续下面向Leader节点发起批量请求数据的逻辑;
// 反过来,如果batch<50,且数据还没有处理完,那么这里先跳过,不要向Leader节点发起批量获取数据的请求
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
// 如果batch为空,当然也不用发请求
if (batch.size() <= 0) {
continue;
}
// update datum entry
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
// 从Leader批量获取本节点缺少的或过时的数据
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
...获取缺少的或过时的数据成功后...
for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
try {
......
// 和上面Leader节点新增数据时候逻辑相同,写内存注册表
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
......
} catch (Throwable e) {
} finally {
OPERATE_LOCK.unlock();
}
}
return;
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onCancel() {
}
});
batch.clear();
} catch (Exception e) {
}
}
// 如果最后receivedKeysMap中还有value为0的数据,说明这些数据在主节点已经被删除了,那我们从节点也主动删除一下
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey); //删除本节点多出来的数据逻辑
} catch (Exception e) {
}
}
}
return local;
}



