Zookeeper核心源码剖析——集群Leader选举机制

本次剖析得源码基于Zookeeper3.5.8

一、核心知识点总结前置:

集群核心配置文件如下:

tickTime=2000   # 心跳间隔时间
initLimit=10    # LF初始通讯时限(tickTime的倍数)
syncLimit=5     # LF同步通讯时限(tickTime的倍数)
 
dataDir=/data/zk1  # 指定对应的数据存放目录
clientPort=2181    # 指定对应的节点端口
 
# 集群配置:server.A=B:C:D:E
## A:节点编号,与myid一致
## B:IP
## C:与Leader节点的通讯端口
## D:选举端口
## E:节点类型,participant:参与选举,observer:不参与选举的观察者
server.1=192.168.121.121:2001:3001:participant
server.2=192.168.121.121:2002:3002:participant
server.3=192.168.121.121:2003:3003:participant
server.4=192.168.121.121:2004:3004:observer

1. 在配置集群时,会定义三个端口(以以上配置为例):

  • 2181:对外服务端口(NIO | Netty)

  • 2001:集群内部数据同步端口,包括数据同步、发送事务proposal提交及ack、发送commit,ping心跳检测等;(BIO)

  • 3001:集群选举端口,包括集群启动、Leader宕机重新选举等;(BIO)

2. 集群节点状态分为4大状态:

  • LOOKING:选举进行中;

  • FOLLOWING:成为Follower节点;

  • LEADING:成为Leader节点;

  • OBSERVING:观察者节点的状态,不参与选举;

3. 集群选举过程中的状态机运转:

// org.apache.zookeeper.server.quorum.QuorumPeer#run
public void run() {
    while (running) {
        switch (getPeerState()) {
        case LOOKING:
            ...
            break;
        case OBSERVING:
            ...
            break;
        case FOLLOWING:
            ...
            break;
        case LEADING:
            ...
            break;
        }
    }
}
  • 在Zookeeper集群启动时,首先就会进行Leader选举,刚开始所有participant节点的状态都为LOOKING,选举成功后,分别变为LEADING和FOLLOWING;

  • 选举成功后,Follower节点通过“死循环”从Leader节点同步数据+接受ping包心跳检测;

  • 如果Leader宕机,会触发Following节点读包异常——会打破Follower节点“死循环”,从而将自己状态从Following修改为Looking;

  • 再次回到核心run()方法中,再次进行判断节点状态,开始新一轮的选举过程!

4、Zookeeper集群的核心选举逻辑:

image.png

Vote选票包含3个重要参数:

  • epoch:epoch是Zookeeper中非常特殊的一个变量,字面意思为“纪元”,表示了“当前的Leader周期”,初始值为0,每次产生一个新的Leader后,就+1,维护在数据目录下的currentEpoch文件中,在运行期间,集群通讯时都会带上这个epoch,以确保彼此在同一个Leader周期内

[root@jiguiquan version-2]# cat /data/zk1/version-2/currentEpoch 
1  # 刚起动后,第一次稳定运行,0 + 1 = 1
  • zxid:节点上数据的最大“事务id”,zxid越大,说明数据越新;

  • id(serverid):集群配置时,我们为每个节点设置的myid;

核心判断逻辑代码:

// org.apache.zookeeper.server.quorum.FastLeaderElection#totalOrderPredicate
// 选票的构造很简单 Vote = (serverid, zxid)
// 
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
  • 先比较epoch选举周期,直接选择epoch较大的,如果收到的大于自己,说明自己可能是后加入集群的,所以要先更新自己的选举周期。

  • 先比较 zxid :zxid越大,说明数据越新;

  • 再比较 serverid :约定的规则;

  • 所有的节点在投票时,第一票都是投给自己,之后收到新的投票时进行比较,如果自己败了,更新自己的投票,再下一轮投票时发送出去。

  • 当某个时候,自己收到的投票中,某台机器的票数大于总participant机器数量的半数,则可以确定Leader,更新自己的状态。

  • 当系统中已经有了Leader后,再有机器加入系统,由于epoch < 其它机器,必然会成为Follower。


二、寻找核心启动类与核心方法

1、通过启动脚本 zkServer.sh 寻找到启动类:

// 入参指令 start 分支,定义了一个 Java 启动变量:$ZOOMAIN
case $1 in
start)
    echo  -n "Starting zookeeper ... "
    if [ -f "$ZOOPIDFILE" ]; then
      if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
         echo $command already running as process `cat "$ZOOPIDFILE"`.
         exit 1
      fi
    fi
    nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
    "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
    -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
    if [ $? -eq 0 ]
    
// 找到 $ZOOMAIN 定义的位置
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"

// 这样就找到了启动类:

2、通过启动类找到核心入口方法:

main()
|
main.initializeAndRun(args);
|
runFromConfig(config);  // config配置就是从入参解析而得
|
quorumPeer.start();  
super.start();     // quorumPeer的父类为Thread,所以start就是执行quorumPeer的run()方法
|
public void run() {   
    while (running) {
        switch (getPeerState()) {
        case LOOKING:
            ...
            setCurrentVote(makeLEStrategy().lookForLeader()); // 核心选举过程
            ...
            break;
        case OBSERVING:
            ...
            break;
        case FOLLOWING:
            ...
            break;
        case LEADING:
            ...
            break;
        }
    }
}
// 至此,我们就算是找到了zkServer启动时的核心入口方法了!

3、核心选举过程 makeLEStrategy().lookForLeader():

// 默认肯定是快速选举算法,其它的已经废弃:FastLeaderElection#lookForLeader
public Vote lookForLeader() throws InterruptedException {
    Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);

    // 如果recvqueue队列中没有选票,说明可能当启动,所以需要去与其它节点建立socket
    if(n == null){
        if(manager.haveDelivered()){
            sendNotifications();
        } else {
            manager.connectAll();
        }
    } 
    // 如果收到选票,则对选票进行处理,校验选票状态
    else if (validVoter(n.sid) && validVoter(n.leader)) {
            switch (n.state) {
            case LOOKING:
                ...还在投票选举过程中...
                if (n.electionEpoch > logicalclock.get()) {  //选票epoch大于自己
                    logicalclock.set(n.electionEpoch);  // 说明自己加入的晚了,更新自己的epoch值
                    recvset.clear();
                    if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                        // 如果收到的选票胜,则更新自己的选票为收到的选票
                        updateProposal(n.leader, n.zxid, n.peerEpoch);  
                    } else { 
                        // 如果自己胜出,则更新选票为自己(因为epoch有变动,所以要更新)
                        updateProposal(getInitId(),
                                getInitLastLoggedZxid(),
                                getPeerEpoch());
                    }
                    sendNotifications();    // 发出自己的新一轮投票
                } else if (n.electionEpoch < logicalclock.get()) {
                    ...过期的选票,不做处理,直接丢弃...
                    break;
                } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                        proposedLeader, proposedZxid, proposedEpoch)) {
                    // 如果收到的选票胜,则更新自己的选票为收到的选票,如果自己胜,保持即可。
                    updateProposal(n.leader, n.zxid, n.peerEpoch);    
                    sendNotifications(); // 发出自己的新一轮投票
                }
                break;
            case OBSERVING:
                ...不需要处理...
                break;
            case FOLLOWING:
            case LEADING:
                ...已经明确Leader,对应的处理...
                break;
    }
}


// 选票pk方法:先比epoch,再比zxid,再比serverid
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}


三、集群间发送选票的过程为了提升效率,使用了“多层队列”架构设计:

image.png

1、业务数据侧(抽象理解):

当Zookeeper启动时,会在业务数据侧,准备两个阻塞队列 “sendqueue”和“recequeue”,用来存放要“即将要发送的选票”和“收到待处理的选票”

LinkedBlockingQueue<ToSend> sendqueue = new LinkedBlockingQueue<ToSend>();
LinkedBlockingQueue<Notification> recvqueue = new LinkedBlockingQueue<Notification>();

以及两个工作线程“WorkerSender”和“WorkerReceiver”,负责在“业务数据侧”和“网络传输侧”的队列中搬运“选票”

class WorkerSender extends ZooKeeperThread {
    public void run() {
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); //从 sendqueue 中poll选票
            
            ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
            addToSendQueue(bq, b);  // 放入网络传输侧的queueSendMap中
        }
    }
}

class WorkerReceiver extends ZooKeeperThread  {
    response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);  // 从 recvQueue 中poll选票
    recvqueue.offer(n);  // 放入到 recvqueue 队列中
}

2、网络传输侧(抽象理解):

Zookeeper会在“网络传输侧”创建一个“queueSendMap队列数组”和 一个“recvQueue接收队列

final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();;
public final ArrayBlockingQueue<Message> recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);;
  • queueSendMap队列数组:业务侧的“选票”数据,会根据目的地(遍历配置得到的所有participant节点),放入到不通的队列中;如果目标机器时自己,则自己放到自己的recvQueue队列中。

  • recvQueue接收队列:接受来自其它机器的选票,还有自己的选票;

Zookeeper在节点创建选举Connection连接时,还会为每个连接创建一个“SendWorker”和“RecvWorker”,负责网络通信层面“发送和接收选票”。

private void handleConnection(Socket sock, DataInputStream din) {
    SendWorker sw = new SendWorker(sock, sid);
    RecvWorker rw = new RecvWorker(sock, din, sid, sw);
}

每组选举socket连接,都有一对“SendWorker”和“RecvWorker”。

3、为什么要把“选票”的发送和接收设计这么复杂?

  • “业务侧”和“网络侧”隔离,便于后期的维护与升级;

  • “网络侧”的 queueSendMap + senderWorkerMap + reccWorkerMap 这种以 sid 为 key 的 Map,可以确保:不会因为某两台节点之间的响应慢,而阻塞其它节点间的数据传输;

  • “业务侧”与 “网络侧”的数据拷贝发生在内存中,效率非常的快,拷贝的性能损失几乎可以忽略。


四、Zookeeper集群节点数为什么推荐“奇数”?

是从Zookeeper集群的宕机节点容忍度:Zookeeper的集群选举机制决定了,只要集群中有“大于half的机器存活,集群就可以选举Leader成功”,这样就不难计算出“集群中允许的宕机节点数量

比如集群有5个节点,最大允许的宕机数为2;而当集群有6个节点时,最大允许的宕机数依然为2;

不难得出结论,2n 与 2n-1 的“宕机节点容忍度”是一样的,都为n-1,所以为了节约机器,我们推荐节点数为“奇数”!

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐