分布式锁的实现——Zookeeper/Curator

基于docker安装和简单实用Zookeeper的文章:

http://www.jiguiquan.com/archives/1395

场景描述:和Redis版本的一样,还是借助Redis减库存场景;

一、基础篇——Zookeeper原生实现分布式锁(有缺陷版)

了解一个设计模式——模板方法

1、创建一个项目:pom.xml(zookeeper和zkclient):

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.9</version>
</dependency>

2、application.yml(只是端口和redis的信息,zookeeper的我放到了代码里面):

server:
  port: 9000

spring:
  redis:
    host: 192.168.174.141
    port: 6379
    timeout: 5000

3、养成好的变成习惯:干大事前先定义接口 ZkLock.java:

/**
 * 养成好习惯,做大事前先定义接口
 */
public interface ZkLock {
    public void zkLock();
    public void zkUnLock();
}

4、实现上面的接口 ZkAbstractTemplateLock.java :使用的是抽象方法:即实现得不完全;——使用了 "模板方法" 设计模式:

/**
 * 使用“模板设计模式:将骨架定义为抽象类,将具体的实现交付给子类自己去完成”
 */
public abstract class ZkAbstractTemplateLock implements ZkLock {
    // 配置ZkClient的连接参数
    private static final String ZK_URL = "192.168.174.141:2181";
    private static final int TIME_OUT = 45*1000;
    ZkClient zkClient = new ZkClient(ZK_URL, TIME_OUT);

    protected String path = "/jgqlock";  //想在Zookeeper中创建的临时节点
    //使用CountDownLatch是为了让waitZkLock主程序停在那不向下走,但是当path节点变化时,就可以向下走;
    protected CountDownLatch countDownLatch = null;

    @Override
    public void zkLock() {
        //因为不知道能不能加锁成功,所以需要tryLock()
        if(tryZkLock()){

        }else {
            waitZkLock();  //注意此处不会产生多层的递归调用,耗费内存,因为我们得实现使用的是CountDownLatch堵塞
            zkLock();   //wait结束后,再次尝试获取锁;
        }
    }

    //重点:使用“模板设计模式:将骨架定义为抽象类,将具体的实现交付给子类自己去完成”
    public abstract boolean tryZkLock();
    public abstract void waitZkLock();


    //释放锁比较简单,其实就是关闭客户端连接即可
    @Override
    public void zkUnLock() {
        if (zkClient != null){
            zkClient.close();
        }
    }
}

注意,这里面有几个亮点:

  • 模板方法设计模式,定义方法的骨架,将具体实现,交给不同的子类自己去实现;

  • 使用了CountDownLatch阻塞,不然会无限递归zkLock方法,消耗内存;

5、具体子类,实现父类的未完成方法:ZkDistributeLock.java 

public class ZkDistributeLock extends ZkAbstractTemplateLock {
    @Override
    public boolean tryZkLock() {
        //尝试创建一个临时节点
        try {
            zkClient.createEphemeral(path);
            return true;  //创建临时节点成功,则代表抢占锁成功
        }catch (Exception e){
            return false;
        }
    }

    @Override
    public void waitZkLock() {
        //在等待锁的过程中,要对刚刚的path临时节点进行监听watch
        //创建监听器
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                //当监听的节点被删除时,countDownLatch减1
                if (countDownLatch != null){
                    countDownLatch.countDown();
                }
            }
        };

        //注册监听器
        zkClient.subscribeDataChanges(path, iZkDataListener);

        //如果有path这个节点,程序不可以继续向下走,
        // 只有当这个节点down掉我们才能继续向下走,如何阻止主程序往下走,但是却能在path节点发生变化时,就可以继续向下走
        // 我们使用CountDownLatch方式堵塞实现
        if (zkClient.exists(path)){
            countDownLatch = new CountDownLatch(1);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //释放监听器
        zkClient.unsubscribeDataChanges(path, iZkDataListener);
    }
}

二、在业务中使用并测试上面我们自定义的ZkLock

1、测试类:LoclController.java:

@RestController
public class LockController {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Value("${server.port}")
    private String port;


    @GetMapping("/lock/deduct")
    public String deductStock(){

        ZkLock zkLock = new ZkDistributeLock();

        zkLock.zkLock();
        //获取到锁之后,才进行后面的操作
        try{
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0){
                int realStock = stock - 1;
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println(port + "扣减成功,剩余库存:" + realStock);
            }else {
                System.out.println(port + "扣减失败, 库存不足");
            }
        }finally {
            zkLock.zkUnLock();
        }

        return port + "--end";
    }
}

2、上面的程序,起了2个服务,使用Nginx负载均衡后,使用Jmeter进行压力测试:

会出现一个严重的问题:

当并发很小时,抢锁还是可以成功的;

但是当并发很大的时候,由于每调用一次 deductStock() 方法,程序都会创建一个ZkClient连接,这样导致的结果就是,同一个IP创建的Zk连接过多,但是zookeeper为了防止 Ddos 攻击,其实是限制了每个IP地址建立的连接数的,所以很多连接会直接不成功,问题严重;——且连接数非常大时候,CPU内存消耗也非常严重,机器会非常卡,根本没法玩!

针对上面单节点zk连接数过多情况,我简单实用线程池做了个限制,使程序才能勉强跑起来(实际生产中当然不使像我这么简单粗暴地处理);——要限制也是会在专门的Zk锁工具类中统一进行,不会放在业务代码中的;

@RestController
public class LockController {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Value("${server.port}")
    private String port;

    private static ExecutorService executorService = Executors.newFixedThreadPool(5);

    @GetMapping("/lock/deduct")
    public String deductStock(){

        executorService.execute(() -> {
            ZkLock zkLock = new ZkDistributeLock();

            zkLock.zkLock();
            //获取到锁之后,才进行后面的操作
            try{
                int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
                if (stock > 0){
                    int realStock = stock - 1;
                    redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                    System.out.println(port + "扣减成功,剩余库存:" + realStock);
                }else {
                    System.out.println(port + "扣减失败, 库存不足");
                }
            }finally {
                zkLock.zkUnLock();
            }
        });

        return port + "--end";
    }
}

3、使用Jmeter进行400*2的并发压测:

21.jpg

  2服务负载均衡,处理 400*2 = 800 个并发,全部成功,无“超卖”现象,耗时约18s;  


三、以上方案存在问题分析

1、单节点Zk连接数过大问题:

可以通过线程池的方式,约束单节点同一时间对 Zookeeper 的连接数;

2、惊群效应(羊群效应)

惊群效应是Zookeeper实现分布式锁的过程中一个很常见的问题! 

所谓惊群效应:由于很多个节点,在添加加点不成功的时候,会去监听已经存在的那个节点,这样当第一个获得锁的client释放锁之后,那些监听状态的客户端肯定都会被唤醒,同时也许有上千个监听的客户端(这还是建立问题1的基础上在,限制了每个服务的zk连接数),但是别忘了,我们还有很多的服务被负载均衡了呢;

一次解锁,就要唤醒1000个监听客户端,这就有点像羊群收到惊吓一样,非常形象!

那么该如何解决呢?

我们上面实现得分布式锁,使用的是zookeeper临时节点的特性,连接断开,节点清楚(防止程序宕机时候的死锁发生,这点好过redis的手动实现);

Zookeeper实现分布式锁还有一种实现,是借助zk的“临时顺序节点”;

3、解决办法?

其实一次解锁,即使我们唤醒了1000个监听客户端,但是下一次又只会有一个客户端可以获取到锁,而其他的客户端再次进入监听等待;

既然这样,为什么我们不能只唤醒一个呢?

这时候我们就需要使用到zk“临时顺序节点”的特性了;


四、图解临时顺序节点实现分布式锁——解决“惊群效应”

1、获取锁

首先,在 Zookeeper 当中创建一个持久节点 ParentLock。当第一个客户端想要获得锁时,需要在 ParentLock 这个节点下面创建一个临时顺序节点 Lock1。

27.png

之后,Client1 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock1 是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

28.png

这时候,如果再有一个客户端 Client2 前来获取锁,则在 ParentLock 下载再创建一个临时顺序节点 Lock2。

29.png

Client2 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock2 是不是顺序最靠前的一个,结果发现节点 Lock2 并不是最小的。

于是,Client2 向排序仅比它靠前的节点 Lock1 注册 Watcher,用于监听 Lock1 节点是否存在。这意味着 Client2 抢锁失败,进入了等待状态。

30.png

这时候,如果又有一个客户端 Client3 前来获取锁,则在 ParentLock 下载再创建一个临时顺序节点 Lock3。

31.png

Client3 查找 ParentLock 下面所有的临时顺序节点并排序,判断自己所创建的节点 Lock3 是不是顺序最靠前的一个,结果同样发现节点 Lock3 并不是最小的。

于是,Client3 向排序仅比它靠前的节点 Lock2 注册 Watcher,用于监听 Lock2 节点是否存在。这意味着 Client3 同样抢锁失败,进入了等待状态。

32.png

这样一来, Client1 得到了锁,Client2 监听了 Lock1,Client3 监听了 Lock2 。这恰恰形成了一个等待队列!!!

2、释放锁

释放锁分为两种情况:

2.1、任务完成,客户端显示释放

当任务完成时,Client1 会显示调用删除节点 Lock1 的指令。

22.png

2、任务执行过程中,客户端崩溃

获得锁的 Client1 在任务执行过程中,如果崩溃,则会断开与 Zookeeper 服务端的链接。根据临时节点的特性,相关联的节点 Lock1 会随之自动删除。

23.png

由于 Client2 一直监听着 Lock1 的存在状态,当 Lock1 节点被删除,Client2 会立刻收到通知。这时候 Client2 会再次查询 ParentLock 下面的所有节点,确认自己创建的节点 Lock2 是不是目前最小的节点。如果是最小,则 Client2 顺理成章获得了锁。

24.png

同理,如果 Client2 也因为任务完成或者节点崩溃而删除了节点 Lock2,那么 Client3 就会接到通知。

25.png

最终,Client3 成功得到了锁。

26.png

关于“临时顺序节点”的处理方案,我这里只作图文解释,不做代码Demo;

因为十几生成中,上面两种我们都不用!


五、基于Curator实现分布式锁

1、Curator简介:

Curator是Netflix公司开源的一个ZooKeeper客户端封装。通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁。即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid + 递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

由于原生zookeeper实现的分布式锁实现步骤繁琐且不好控制,Curator提供了很好的基于zookeeper的实现,提供了InterProcessMutex(可重入锁)、InterProcessSemaphoreMutex、InterProcessReadWriteLock(读写锁)相关锁的操作。

2、Curator相较于Zookeeper,就像Redisson相较于redis一样,原生实现太过繁琐,而已经有开源阻止,专门为分布式环境下的问题封装了专用的客户端,所以我们在实际生产中,就可以直接拿过来用;

效率也比我们自己实现的高;

3、pom.xml添加依赖:

<!-- curator:zk客户端 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

4、代码实现——通过配置类,注入 CuratorFramework 对象

@Configuration
public class CuratorConfig {
    private int retryCount = 3;
    private int elapsedTimeMs = 5000;
    private String connectString = "192.168.174.141:2181";
    private int sessionTimeoutMs = 60000;
    private int connectionTimeoutMs = 5000;

    @Bean(initMethod = "start")
    public CuratorFramework curatorFramework() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(elapsedTimeMs, retryCount);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .build();
        return curatorFramework;
    }
}

5、修改测试Controller方法:很简单,是就是a、获取InterProcessMutex锁;b、lock.acquire()尝试获取锁;c、lock.release()解锁;

@RestController
public class CuratorLockController {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Value("${server.port}")
    private String port;

    private static String LOCK_PATH = "/jiguiquan";

    @Autowired
    private CuratorFramework curatorFramework;

    @GetMapping("/lock/deduct")
    public String deductStock(){
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, LOCK_PATH);  

        //获取到锁之后,才进行后面的操作
        try{
            lock.acquire();
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock"));
            if (stock > 0){
                int realStock = stock - 1;
                redisTemplate.opsForValue().set("stock", String.valueOf(realStock));
                System.out.println(port + "扣减成功,剩余库存:" + realStock);
            }else {
                System.out.println(port + "扣减失败, 库存不足");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return port + "--end";
    }
}

6、启动测试,还是双服务负载均衡,Jmeter 400*2 = 800 并发测试

34.jpg 35.jpg

非常顺利,耗时仅5秒,效率非常高!

33.jpg

所以,以后的工作中,需要使用Zookeeper实现分布式锁时候,要选择Curator!


六、Redis和Zookeeper实现分布式锁的对比与选择

1、性能消耗:Zookeeper强

redis:如果获取不到锁,需要自己不断去自旋尝试获取锁,比较消耗性能;

Zookeeper:如果获取不到锁,只需要添加一个监听器就可以了,不用一直轮询,性能消耗较小

2、可靠性:Zookeeper强

因为Redis有效期不是很好控制,可能会产生有效期延迟;

Zookeeper就不一样,zookeeper天生设计定位就是分布式协调,强一致性。锁的模型健壮、简单易用、适合做分布式锁。获取不到锁,只需要添加一个监听器就可以;临时节点也保证了不会出现服务宕机死锁情况发生;

3、性能对比:

Redis的性能高于Zookeeper,可以提供高可用高性能地分布式锁服务;(在不考虑极端情况:集群复制丢锁情况——RedLock解决的问题)

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐