Zookeeper核心源码剖析——ZAB原子广播协议+监听机制

整个Zookeeper集群就是一个多节点分布式一致性算法的实现,底层采用的实现协议为ZAB。

ZAB = Zookeeper Atomic Broadcast(Zookeeper原子广播协议)

ZAB协议是Zookeeper专门设计的,支持 “原子广播”和“崩溃恢复”的协议;

Zookeeper是一个保证CP的主从架构,所有的写操作必须通过Leader节点进行,读操作可以通过Leader、Follower、Observer任意节点进行。

所有的事务操作被设计成 两阶段提交(2PC),大致过程简化为:

image.png


一、原子广播

ZAB协议的消息广播过程是一个原子广播协议,类似一个“两阶段提交过程(2PC)”,对于客户端发送的所有写请求(事务操作),全部会由Leader节点处理;

  • Leader节点在收到请求后,会将请求封装成一个事务Proposal,将起发送给所有的Follower节点,Follower节点在收到Proposal后,会给Leader节点返回ack;(第一阶段)

  • 如果Leader节点收到了超过半数以上的ack(包含Leader节点自己的一次ack),就会认为本次事务操作可以被执行,然后就像所有的Follower节点发送commit;同时通知Observer节点存储消息(第二阶段)

image.png

一些细节点:

  • 如果客户端的写请求被Follower节点接收到,那么集群会自动将请求转发到Leader节点进行处理

  • Leader节点在接收到写请求时,会为本次请求生成一个zxid,并将请求封装为一个Proposal发送给所有的Follower节点,同时自己还要将本次事务操作写入到自己本地的事务日志中(磁盘),并直接给自己返回一次ack(统计ack时候千万别忘了自己的ack);而Follower节点收到Proposal后也执行相同的操作,写入本地事务日志文件(磁盘),再给Leader节点返回ack

  • ZAB协议要求保证事务的顺序,所有所有节点会将所有的事务,按照zxid进行排序后,再进行处理(主要时通过队列完成);

  • zxid是一个64位的数字,前32位用来表示本次事务所属的epoch(集群周期),后32位是一个自增的计数;有了epoch的设计,就保证了,即使旧的集群leader生成了新的zxid,但是当集群恢复时,由于它的epoch值小于新的集群中的对应的zxid,旧leader的zxid对应的事务就会被丢弃,而以新集群的为准。(旧Leader虽然会有一次多余的zxid,但是绝对不会被commit执行,因为收不到半数的ack,见于“Zookeeper是如何避免脑裂问题的?”)

  • Leader节点和Follower节点完成第一阶段的ack统计后,进行commit操作,同时不要忘记给Observer节点进行inform通知,Observer节点之前可一直没有参与!

  • Leader和Follower在进行数据发送时,中间还做了一层队列,用来实现解耦,防止同步阻塞!

public class LearnerHandler extends ZooKeeperThread {
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
}

// 循环忘所有的follower节点发送对应的数据,包括Proposal、commit、ping等
void sendPacket(QuorumPacket qp) {
    synchronized (forwardingFollowers) {
        for (LearnerHandler f : forwardingFollowers) {
            f.queuePacket(qp);
        }
    }
}

void queuePacket(QuorumPacket p) {
    queuedPackets.add(p);
}


二、崩溃恢复

当主节点挂掉,或者失去了与过半节点的心跳检测,整个集群就会进入崩溃恢复模式(此时集群不可用,典型的CP牺牲高可用性,保证强一致性!)

整个集群新的选举过程,与集群启动时的选举过程一致,只不过现在的epoch值不再是从0开始!

崩溃恢复模式需要保证解决2个问题:

1、已经被commit的事务不能丢;

如果Leader已经发出了commit后,挂了,那么所有的Follower中只要有一个成哥commit,那么这条数据就能够被成功commit,不会被丢弃;

这也就保证了,如果一条事务被集群的某个节点成功commit执行,他也必然会被集群中的其它节点执行成功!

2、没被commit的事务需要被丢弃;

如果Leader还没有发出commit,就挂了,那么这条事务就会被新的集群丢弃;

因为新的Leader产生后,他会对比自己的“磁盘事务日志文件” 和“内存数据”,对于那些还未被commited的事务日志将直接被丢弃。


三、其它问题

1、Zookeeper如何保证事务消息执行的顺序性

首先就是依赖zxid(前32位为epoch值,后32位为递增的计数),当所有的follower接收到proposal提案时,除了将事务操作写入本地事务日志外,还会将本次request记录到一个阻塞队列pendingTxns中:

LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();

public void logRequest(TxnHeader hdr, Record txn) {
    Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
    if ((request.zxid & 0xffffffffL) != 0) {
        pendingTxns.add(request);
    }
    syncProcessor.processRequest(request);
}

当之后,收到来之Leader节点的commit时,会拿commit中的zxid与pendingTxns队列的头部txid进行对比,必须一样,如果不一样,那么本Follower会退出,然后重新加入集群,再次跟Leader进行同步数据!

/**
 * 当收到 COMMIT 消息时,最终会调用此方法,它将来自 COMMIT 的 zxid 与(希望)pendingTxns 队列的头部进行匹配,并将其交给 commitProcessor 进行提交。
 * @param zxid - 如果存在,则必须对应于 pendingTxns 的头部
 */
public void commit(long zxid) {
    if (pendingTxns.size() == 0) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                + " without seeing txn");
        return;
    }
    long firstElementZxid = pendingTxns.element().zxid;
    if (firstElementZxid != zxid) {
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                + " but next pending txn 0x"
                + Long.toHexString(firstElementZxid));
        System.exit(12);
    }
    Request request = pendingTxns.remove();
    commitProcessor.commit(request);
}

(一言不合就去找Leader同步,而Leader节点中的事务执行顺序肯定是有保障的)

2、Zookeeper的所有节点为什么是先写磁盘,后写内存?

因为整个Zookeeper集群的数据读取,是从内存中的dataTree读取。

case OpCode.getData: {
    lastOp = "GETD";
    GetDataRequest getDataRequest = new GetDataRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
            getDataRequest);
    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());  // 从内存dataTree中读取数据
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
            ZooDefs.Perms.READ,
            request.authInfo);
    Stat stat = new Stat();
    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
            getDataRequest.getWatch() ? cnxn : null);
    rsp = new GetDataResponse(b, stat);
    break;
}

// ZKDatabase中维护的核心就是dataTree
public class ZKDatabase {
    protected DataTree dataTree;  // 核心数据
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
    protected FileTxnSnapLog snapLog;
    protected long minCommittedLog, maxCommittedLog;
}

// dataTree的数据结构为:
public class DataTree {
    // 核心数据是一个ConcurrentHashMap = (path, DataNode)
    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
    private final WatchManager dataWatches = new WatchManager();  // get -w 监听本身数据
    private final WatchManager childWatches = new WatchManager();  // ls -w 监听子节点
}

// 每一个DataNode的结构为:
public class DataNode implements Record {
    byte data[];  // 字节数组存储实际数据
    Long acl;
    public StatPersisted stat;  // -s 状态信息
    private Set<String> children = null;  // 子节点列表
}

如果先写内存的话,那么即使没有完成2PC操作,有客户端来读取时,也会直接从内存中把数据读取走,ZAB一致性协议也就失效了!

4、旧Leader断线后,重新加入集群,会乖乖地做Follower

因为旧的Leader的epoch值小于新集群Leader的epoch值,它只能乖乖做小弟;

对于数据中的zxid不一致的数据,只能以新Leader的为准,因为前32位表示的epoch值比新集群的小。


四、为什么Zookeeper不存在脑裂问题?

所谓的脑裂问题:可能由于网络等问题,导致集群中的机器被分为两个阵营,如果不考虑“过半机制”,那么两个阵营都会选出新的Leader,这样出现了两个Leader的现象就被称为“脑裂”现象!

image.png

但是由于Zookeeper的“过半机制”,脑裂问题旧不存在了,体现在两个环节:

  • 选举阶段:当网络被分为两个分区时,由于“选举过半机制”,那么只能会选出“1个或者0个”Leader节点,用于不会出现2个Leader!

  • 写数据阶段:当旧Leader节点从大网络分区分离出去时,大网络分区会选举出一个新的Leader,而旧Leader并不知道,它还在执行“写数据”的操作,但是由于ZAB的“proposal-ack过半机制”,就导致这一条事务并不会被写成功,同时由于 1/2 以上的心跳检测失败,很快旧Leader将不在是Leader。所以集群很快就会只有1或0个Leader,所以永远不会出现脑裂问题;


五、Zookeeper的监听机制的实现?何时触发?

以“创建节点”为例,任意节点,在接收到commit后,会执行processTxn()方法:

private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
                                    Record txn) {
    ProcessTxnResult rc;
    int opCode = request != null ? request.type : hdr.getType();
    long sessionId = request != null ? request.sessionId : hdr.getClientId();
    if (hdr != null) {
        rc = getZKDatabase().processTxn(hdr, txn);  // 调用DataTree创建节点
    } else {
        rc = new ProcessTxnResult();
    }
}

DataTree中的processTxn()方法(加深印象:Zookeeper的数据就是这个DataTree):

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn)
{
    ProcessTxnResult rc = new ProcessTxnResult();

    try {
        rc.clientId = header.getClientId();
        rc.cxid = header.getCxid();
        rc.zxid = header.getZxid();
        rc.type = header.getType();
        rc.err = 0;
        rc.multiResult = null;
        switch (header.getType()) {
            case OpCode.create:
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
                createNode(   // 去创建DataNode节点
                        createTxn.getPath(),
                        createTxn.getData(),
                        createTxn.getAcl(),
                        createTxn.getEphemeral() ? header.getClientId() : 0,
                        createTxn.getParentCVersion(),
                        header.getZxid(), header.getTime(), null);
                break;
        }
    }
}

createNode()方法的最后:

public void createNode(final String path, byte data[], List<ACL> acl,
        long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat){
        
    ...创建节点的操作...
    // 触发
    dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
    childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
}

org.apache.zookeeper.server.WatchManager#triggerWatch(…):

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path); // watch是一次性的,取完一次就remove
        // ...周边逻辑,如果为空,就返回了...
    }
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);  //遍历watch,进行处理,其实就是通过Netty或者NIO给客户端发送event事件
    }
    return watchers;
}

我们很有必要看看中各重要的watchTable容器是什么?

private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();

//通过追溯,我们会发现,watch其实就是cnxn,而cnxn就是NettyServerCnxn或者NIOServerCnxn连接
watch = getDataRequest.getWatch() ? cnxn : null

所以我们最终的结论就是:

        Zookeeper服务端在接收到请求时,会判断watch这个字段为true还是false,如果为true,则将本次的cnxn连接作为watch对象,保存在watchTable中,key为path,值为cnxn的HashSet;

        watchs只需要保存在处理本次请求的节点即可,不需要同步给其他节点

        当有其他“写请求”发生时,这台服务节点最终肯定会接收到对应的事务处理,如:createNode,在createNode的最后,会尝试从本地的watchTables中获取watchs,如果有就遍历通过对应的cncx连接发送event事件

        所有的watch都是一次性的,因为使用的是ConcurrentHashMap.remove()方法。

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐