本次剖析得源码基于Zookeeper3.5.8
一、核心知识点总结前置:
集群核心配置文件如下:
tickTime=2000 # 心跳间隔时间 initLimit=10 # LF初始通讯时限(tickTime的倍数) syncLimit=5 # LF同步通讯时限(tickTime的倍数) dataDir=/data/zk1 # 指定对应的数据存放目录 clientPort=2181 # 指定对应的节点端口 # 集群配置:server.A=B:C:D:E ## A:节点编号,与myid一致 ## B:IP ## C:与Leader节点的通讯端口 ## D:选举端口 ## E:节点类型,participant:参与选举,observer:不参与选举的观察者 server.1=192.168.121.121:2001:3001:participant server.2=192.168.121.121:2002:3002:participant server.3=192.168.121.121:2003:3003:participant server.4=192.168.121.121:2004:3004:observer
1. 在配置集群时,会定义三个端口(以以上配置为例):
-
2181:对外服务端口(NIO | Netty)
-
2001:集群内部数据同步端口,包括数据同步、发送事务proposal提交及ack、发送commit,ping心跳检测等;(BIO)
-
3001:集群选举端口,包括集群启动、Leader宕机重新选举等;(BIO)
2. 集群节点状态分为4大状态:
-
LOOKING:选举进行中;
-
FOLLOWING:成为Follower节点;
-
LEADING:成为Leader节点;
-
OBSERVING:观察者节点的状态,不参与选举;
3. 集群选举过程中的状态机运转:
// org.apache.zookeeper.server.quorum.QuorumPeer#run public void run() { while (running) { switch (getPeerState()) { case LOOKING: ... break; case OBSERVING: ... break; case FOLLOWING: ... break; case LEADING: ... break; } } }
-
在Zookeeper集群启动时,首先就会进行Leader选举,刚开始所有participant节点的状态都为LOOKING,选举成功后,分别变为LEADING和FOLLOWING;
-
选举成功后,Follower节点通过“死循环”从Leader节点同步数据+接受ping包心跳检测;
-
如果Leader宕机,会触发Following节点读包异常——会打破Follower节点“死循环”,从而将自己状态从Following修改为Looking;
-
再次回到核心run()方法中,再次进行判断节点状态,开始新一轮的选举过程!
4、Zookeeper集群的核心选举逻辑:
Vote选票包含3个重要参数:
-
epoch:epoch是Zookeeper中非常特殊的一个变量,字面意思为“纪元”,表示了“当前的Leader周期”,初始值为0,每次产生一个新的Leader后,就+1,维护在数据目录下的currentEpoch文件中,在运行期间,集群通讯时都会带上这个epoch,以确保彼此在同一个Leader周期内;
[root@jiguiquan version-2]# cat /data/zk1/version-2/currentEpoch 1 # 刚起动后,第一次稳定运行,0 + 1 = 1
-
zxid:节点上数据的最大“事务id”,zxid越大,说明数据越新;
-
id(serverid):集群配置时,我们为每个节点设置的myid;
核心判断逻辑代码:
// org.apache.zookeeper.server.quorum.FastLeaderElection#totalOrderPredicate // 选票的构造很简单 Vote = (serverid, zxid) // protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
-
先比较epoch选举周期,直接选择epoch较大的,如果收到的大于自己,说明自己可能是后加入集群的,所以要先更新自己的选举周期。
-
先比较 zxid :zxid越大,说明数据越新;
-
再比较 serverid :约定的规则;
所有的节点在投票时,第一票都是投给自己,之后收到新的投票时进行比较,如果自己败了,更新自己的投票,再下一轮投票时发送出去。
当某个时候,自己收到的投票中,某台机器的票数大于总participant机器数量的半数,则可以确定Leader,更新自己的状态。
当系统中已经有了Leader后,再有机器加入系统,由于epoch < 其它机器,必然会成为Follower。
二、寻找核心启动类与核心方法
1、通过启动脚本 zkServer.sh 寻找到启动类:
// 入参指令 start 分支,定义了一个 Java 启动变量:$ZOOMAIN case $1 in start) echo -n "Starting zookeeper ... " if [ -f "$ZOOPIDFILE" ]; then if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then echo $command already running as process `cat "$ZOOPIDFILE"`. exit 1 fi fi nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \ "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \ -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null & if [ $? -eq 0 ] // 找到 $ZOOMAIN 定义的位置 ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" // 这样就找到了启动类:
2、通过启动类找到核心入口方法:
main() | main.initializeAndRun(args); | runFromConfig(config); // config配置就是从入参解析而得 | quorumPeer.start(); super.start(); // quorumPeer的父类为Thread,所以start就是执行quorumPeer的run()方法 | public void run() { while (running) { switch (getPeerState()) { case LOOKING: ... setCurrentVote(makeLEStrategy().lookForLeader()); // 核心选举过程 ... break; case OBSERVING: ... break; case FOLLOWING: ... break; case LEADING: ... break; } } } // 至此,我们就算是找到了zkServer启动时的核心入口方法了!
3、核心选举过程 makeLEStrategy().lookForLeader():
// 默认肯定是快速选举算法,其它的已经废弃:FastLeaderElection#lookForLeader public Vote lookForLeader() throws InterruptedException { Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS); // 如果recvqueue队列中没有选票,说明可能当启动,所以需要去与其它节点建立socket if(n == null){ if(manager.haveDelivered()){ sendNotifications(); } else { manager.connectAll(); } } // 如果收到选票,则对选票进行处理,校验选票状态 else if (validVoter(n.sid) && validVoter(n.leader)) { switch (n.state) { case LOOKING: ...还在投票选举过程中... if (n.electionEpoch > logicalclock.get()) { //选票epoch大于自己 logicalclock.set(n.electionEpoch); // 说明自己加入的晚了,更新自己的epoch值 recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { // 如果收到的选票胜,则更新自己的选票为收到的选票 updateProposal(n.leader, n.zxid, n.peerEpoch); } else { // 如果自己胜出,则更新选票为自己(因为epoch有变动,所以要更新) updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); // 发出自己的新一轮投票 } else if (n.electionEpoch < logicalclock.get()) { ...过期的选票,不做处理,直接丢弃... break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) { // 如果收到的选票胜,则更新自己的选票为收到的选票,如果自己胜,保持即可。 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); // 发出自己的新一轮投票 } break; case OBSERVING: ...不需要处理... break; case FOLLOWING: case LEADING: ...已经明确Leader,对应的处理... break; } } // 选票pk方法:先比epoch,再比zxid,再比serverid protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }
三、集群间发送选票的过程为了提升效率,使用了“多层队列”架构设计:
1、业务数据侧(抽象理解):
当Zookeeper启动时,会在业务数据侧,准备两个阻塞队列 “sendqueue”和“recequeue”,用来存放要“即将要发送的选票”和“收到待处理的选票”
LinkedBlockingQueue<ToSend> sendqueue = new LinkedBlockingQueue<ToSend>(); LinkedBlockingQueue<Notification> recvqueue = new LinkedBlockingQueue<Notification>();
以及两个工作线程“WorkerSender”和“WorkerReceiver”,负责在“业务数据侧”和“网络传输侧”的队列中搬运“选票”
class WorkerSender extends ZooKeeperThread { public void run() { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); //从 sendqueue 中poll选票 ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY); addToSendQueue(bq, b); // 放入网络传输侧的queueSendMap中 } } } class WorkerReceiver extends ZooKeeperThread { response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); // 从 recvQueue 中poll选票 recvqueue.offer(n); // 放入到 recvqueue 队列中 }
2、网络传输侧(抽象理解):
Zookeeper会在“网络传输侧”创建一个“queueSendMap队列数组”和 一个“recvQueue接收队列”:
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();; public final ArrayBlockingQueue<Message> recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);;
-
queueSendMap队列数组:业务侧的“选票”数据,会根据目的地(遍历配置得到的所有participant节点),放入到不通的队列中;如果目标机器时自己,则自己放到自己的recvQueue队列中。
-
recvQueue接收队列:接受来自其它机器的选票,还有自己的选票;
Zookeeper在节点创建选举Connection连接时,还会为每个连接创建一个“SendWorker”和“RecvWorker”,负责网络通信层面“发送和接收选票”。
private void handleConnection(Socket sock, DataInputStream din) { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); }
每组选举socket连接,都有一对“SendWorker”和“RecvWorker”。
3、为什么要把“选票”的发送和接收设计这么复杂?
-
“业务侧”和“网络侧”隔离,便于后期的维护与升级;
-
“网络侧”的 queueSendMap + senderWorkerMap + reccWorkerMap 这种以 sid 为 key 的 Map,可以确保:不会因为某两台节点之间的响应慢,而阻塞其它节点间的数据传输;
-
“业务侧”与 “网络侧”的数据拷贝发生在内存中,效率非常的快,拷贝的性能损失几乎可以忽略。
四、Zookeeper集群节点数为什么推荐“奇数”?
是从Zookeeper集群的宕机节点容忍度:Zookeeper的集群选举机制决定了,只要集群中有“大于half的机器存活,集群就可以选举Leader成功”,这样就不难计算出“集群中允许的宕机节点数量”
比如集群有5个节点,最大允许的宕机数为2;而当集群有6个节点时,最大允许的宕机数依然为2;
不难得出结论,2n 与 2n-1 的“宕机节点容忍度”是一样的,都为n-1,所以为了节约机器,我们推荐节点数为“奇数”!