整个Zookeeper集群就是一个多节点分布式一致性算法的实现,底层采用的实现协议为ZAB。
ZAB = Zookeeper Atomic Broadcast(Zookeeper原子广播协议)
ZAB协议是Zookeeper专门设计的,支持 “原子广播”和“崩溃恢复”的协议;
Zookeeper是一个保证CP的主从架构,所有的写操作必须通过Leader节点进行,读操作可以通过Leader、Follower、Observer任意节点进行。
所有的事务操作被设计成 两阶段提交(2PC),大致过程简化为:
一、原子广播
ZAB协议的消息广播过程是一个原子广播协议,类似一个“两阶段提交过程(2PC)”,对于客户端发送的所有写请求(事务操作),全部会由Leader节点处理;
-
Leader节点在收到请求后,会将请求封装成一个事务Proposal,将起发送给所有的Follower节点,Follower节点在收到Proposal后,会给Leader节点返回ack;(第一阶段)
-
如果Leader节点收到了超过半数以上的ack(包含Leader节点自己的一次ack),就会认为本次事务操作可以被执行,然后就像所有的Follower节点发送commit;同时通知Observer节点存储消息(第二阶段)
一些细节点:
-
如果客户端的写请求被Follower节点接收到,那么集群会自动将请求转发到Leader节点进行处理;
-
Leader节点在接收到写请求时,会为本次请求生成一个zxid,并将请求封装为一个Proposal发送给所有的Follower节点,同时自己还要将本次事务操作写入到自己本地的事务日志中(磁盘),并直接给自己返回一次ack(统计ack时候千万别忘了自己的ack);而Follower节点收到Proposal后也执行相同的操作,写入本地事务日志文件(磁盘),再给Leader节点返回ack;
-
ZAB协议要求保证事务的顺序,所有所有节点会将所有的事务,按照zxid进行排序后,再进行处理(主要时通过队列完成);
-
zxid是一个64位的数字,前32位用来表示本次事务所属的epoch(集群周期),后32位是一个自增的计数;有了epoch的设计,就保证了,即使旧的集群leader生成了新的zxid,但是当集群恢复时,由于它的epoch值小于新的集群中的对应的zxid,旧leader的zxid对应的事务就会被丢弃,而以新集群的为准。(旧Leader虽然会有一次多余的zxid,但是绝对不会被commit执行,因为收不到半数的ack,见于“Zookeeper是如何避免脑裂问题的?”)
-
Leader节点和Follower节点完成第一阶段的ack统计后,进行commit操作,同时不要忘记给Observer节点进行inform通知,Observer节点之前可一直没有参与!
-
Leader和Follower在进行数据发送时,中间还做了一层队列,用来实现解耦,防止同步阻塞!
public class LearnerHandler extends ZooKeeperThread { final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>(); } // 循环忘所有的follower节点发送对应的数据,包括Proposal、commit、ping等 void sendPacket(QuorumPacket qp) { synchronized (forwardingFollowers) { for (LearnerHandler f : forwardingFollowers) { f.queuePacket(qp); } } } void queuePacket(QuorumPacket p) { queuedPackets.add(p); }
二、崩溃恢复
当主节点挂掉,或者失去了与过半节点的心跳检测,整个集群就会进入崩溃恢复模式(此时集群不可用,典型的CP牺牲高可用性,保证强一致性!)
整个集群新的选举过程,与集群启动时的选举过程一致,只不过现在的epoch值不再是从0开始!
崩溃恢复模式需要保证解决2个问题:
1、已经被commit的事务不能丢;
如果Leader已经发出了commit后,挂了,那么所有的Follower中只要有一个成哥commit,那么这条数据就能够被成功commit,不会被丢弃;
这也就保证了,如果一条事务被集群的某个节点成功commit执行,他也必然会被集群中的其它节点执行成功!
2、没被commit的事务需要被丢弃;
如果Leader还没有发出commit,就挂了,那么这条事务就会被新的集群丢弃;
因为新的Leader产生后,他会对比自己的“磁盘事务日志文件” 和“内存数据”,对于那些还未被commited的事务日志将直接被丢弃。
三、其它问题
1、Zookeeper如何保证事务消息执行的顺序性
首先就是依赖zxid(前32位为epoch值,后32位为递增的计数),当所有的follower接收到proposal提案时,除了将事务操作写入本地事务日志外,还会将本次request记录到一个阻塞队列pendingTxns中:
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>(); public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } syncProcessor.processRequest(request); }
当之后,收到来之Leader节点的commit时,会拿commit中的zxid与pendingTxns队列的头部txid进行对比,必须一样,如果不一样,那么本Follower会退出,然后重新加入集群,再次跟Leader进行同步数据!
/** * 当收到 COMMIT 消息时,最终会调用此方法,它将来自 COMMIT 的 zxid 与(希望)pendingTxns 队列的头部进行匹配,并将其交给 commitProcessor 进行提交。 * @param zxid - 如果存在,则必须对应于 pendingTxns 的头部 */ public void commit(long zxid) { if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } long firstElementZxid = pendingTxns.element().zxid; if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } Request request = pendingTxns.remove(); commitProcessor.commit(request); }
(一言不合就去找Leader同步,而Leader节点中的事务执行顺序肯定是有保障的)
2、Zookeeper的所有节点为什么是先写磁盘,后写内存?
因为整个Zookeeper集群的数据读取,是从内存中的dataTree读取。
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath()); // 从内存dataTree中读取数据 if (n == null) { throw new KeeperException.NoNodeException(); } PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, request.authInfo); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; } // ZKDatabase中维护的核心就是dataTree public class ZKDatabase { protected DataTree dataTree; // 核心数据 protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts; protected FileTxnSnapLog snapLog; protected long minCommittedLog, maxCommittedLog; } // dataTree的数据结构为: public class DataTree { // 核心数据是一个ConcurrentHashMap = (path, DataNode) private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>(); private final WatchManager dataWatches = new WatchManager(); // get -w 监听本身数据 private final WatchManager childWatches = new WatchManager(); // ls -w 监听子节点 } // 每一个DataNode的结构为: public class DataNode implements Record { byte data[]; // 字节数组存储实际数据 Long acl; public StatPersisted stat; // -s 状态信息 private Set<String> children = null; // 子节点列表 }
如果先写内存的话,那么即使没有完成2PC操作,有客户端来读取时,也会直接从内存中把数据读取走,ZAB一致性协议也就失效了!
4、旧Leader断线后,重新加入集群,会乖乖地做Follower
因为旧的Leader的epoch值小于新集群Leader的epoch值,它只能乖乖做小弟;
对于数据中的zxid不一致的数据,只能以新Leader的为准,因为前32位表示的epoch值比新集群的小。
四、为什么Zookeeper不存在脑裂问题?
所谓的脑裂问题:可能由于网络等问题,导致集群中的机器被分为两个阵营,如果不考虑“过半机制”,那么两个阵营都会选出新的Leader,这样出现了两个Leader的现象就被称为“脑裂”现象!
但是由于Zookeeper的“过半机制”,脑裂问题旧不存在了,体现在两个环节:
-
选举阶段:当网络被分为两个分区时,由于“选举过半机制”,那么只能会选出“1个或者0个”Leader节点,用于不会出现2个Leader!
-
写数据阶段:当旧Leader节点从大网络分区分离出去时,大网络分区会选举出一个新的Leader,而旧Leader并不知道,它还在执行“写数据”的操作,但是由于ZAB的“proposal-ack过半机制”,就导致这一条事务并不会被写成功,同时由于 1/2 以上的心跳检测失败,很快旧Leader将不在是Leader。所以集群很快就会只有1或0个Leader,所以永远不会出现脑裂问题;
五、Zookeeper的监听机制的实现?何时触发?
以“创建节点”为例,任意节点,在接收到commit后,会执行processTxn()方法:
private ProcessTxnResult processTxn(Request request, TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = request != null ? request.type : hdr.getType(); long sessionId = request != null ? request.sessionId : hdr.getClientId(); if (hdr != null) { rc = getZKDatabase().processTxn(hdr, txn); // 调用DataTree创建节点 } else { rc = new ProcessTxnResult(); } }
DataTree中的processTxn()方法(加深印象:Zookeeper的数据就是这个DataTree):
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) { ProcessTxnResult rc = new ProcessTxnResult(); try { rc.clientId = header.getClientId(); rc.cxid = header.getCxid(); rc.zxid = header.getZxid(); rc.type = header.getType(); rc.err = 0; rc.multiResult = null; switch (header.getType()) { case OpCode.create: CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); createNode( // 去创建DataNode节点 createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime(), null); break; } } }
createNode()方法的最后:
public void createNode(final String path, byte data[], List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat){ ...创建节点的操作... // 触发 dataWatches.triggerWatch(path, Event.EventType.NodeCreated); childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); }
org.apache.zookeeper.server.WatchManager#triggerWatch(…):
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); // watch是一次性的,取完一次就remove // ...周边逻辑,如果为空,就返回了... } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); //遍历watch,进行处理,其实就是通过Netty或者NIO给客户端发送event事件 } return watchers; }
我们很有必要看看中各重要的watchTable容器是什么?
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>(); //通过追溯,我们会发现,watch其实就是cnxn,而cnxn就是NettyServerCnxn或者NIOServerCnxn连接 watch = getDataRequest.getWatch() ? cnxn : null
所以我们最终的结论就是:
Zookeeper服务端在接收到请求时,会判断watch这个字段为true还是false,如果为true,则将本次的cnxn连接作为watch对象,保存在watchTable中,key为path,值为cnxn的HashSet;
watchs只需要保存在处理本次请求的节点即可,不需要同步给其他节点;
当有其他“写请求”发生时,这台服务节点最终肯定会接收到对应的事务处理,如:createNode,在createNode的最后,会尝试从本地的watchTables中获取watchs,如果有就遍历通过对应的cncx连接发送event事件。
所有的watch都是一次性的,因为使用的是ConcurrentHashMap.remove()方法。