名言警句:
线程操纵资源类;
判断、干活、唤醒通知;
严防多线程并发状态下的虚假唤醒;
一、生产者消费者模式的实现方式
在我们学习阻塞队列之前,要想实现生产者和消费者模式,共有两种方案:第一代锁(Synchronized)和第二代锁(Lock);
以上两种方案都可以被称为传统实现方案;
二、传统版——ProdConsumer_TraditionalDemo
题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮 |
实现代码:
package com.jiguiquan.www; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** ** 题目:一个初始值为0的变量,两个线程同时对其操作,一个加1,一个减1,执行5次; * 1、线程 操纵 资源类 * 2、判断 干活 唤醒通知 * 3、严防多线程并发状态下的虚假唤醒 * 多线程中的判断必须使用while,不可以使用if * @author jiguiquan * */ public class ProdConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); new Thread(() -> { for (int i = 1; i <= 5; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } },"AA").start(); new Thread(() -> { for (int i = 1; i <= 5; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } },"BB").start(); } } class ShareData { //资源类 private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); //加1的操作 public void increment() throws Exception { lock.lock(); try { //1、判断 while (number != 0) { //等待,不能生产,刚开始0!=0必为false,跳出循环,不会执行await()等待 condition.await(); } //2、干活 number++; System.out.println(Thread.currentThread().getName()+"\t"+number); //3、唤醒通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } //减1操作 public void decrement() { lock.lock(); try { while (number == 0) { //很容易理解,已经等于0了,肯定不能再减了,所以得await等待 condition.await(); } number--; System.out.println(Thread.currentThread().getName()+"\t"+number); condition.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
执行结果:
很显然,严格遵循,AA线程生产一个,BB线程消费一个;
附1:注意,在多线程并发情况下,判断操作只能使用while循环判断,千万不能使用if判断;
可查看java.lang.Object类中的wait和notify方法;——注意wait和notify是Object带油的方法,而不是JUC包下的方法;
附2:**我们将上面代码中的while循环修改为if判断试试看运行结果**
//1、判断 if (number != 0) { condition.await(); }
发现2个线程的情况下,执行结果也没有问题;
附3:我们再试试四个线程;
AA 1 BB 0 CC 1 AA 2 BB 1 BB 0 CC 1 AA 2 DD 1 DD 0 BB -1 CC 0 CC 1 AA 2 DD 1 DD 0 CC 1 DD 0
明显的错误发生了,出现了明显的虚假唤醒问题;
附4:我们这次将if判断修改为while循环判断再试试,4个线程的情况;
运行结果:
CC 1 BB 0 CC 1 BB 0 CC 1 BB 0 CC 1 BB 0 CC 1 DD 0 AA 1 DD 0 AA 1 DD 0 AA 1 DD 0 AA 1 DD 0
毫无问题,这就更明白,为什么JAVA的API文档中都明确要求,在多线程下的判断要是用while而不可以使用if判断的原因,这就是严防虚假唤醒的有效解决办法;
三、阻塞队列版——ProdConsumer_BlockingQueueDemo
一个例子,将 volatile/CAS/AtomicInteger/BlockQueue/线程交互 全部串联起来
实现代码:
package com.jiguiquan.www; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** ** 一个Demo,整合了 volatile/CAS/AtomicInteger/BlockingQueue/线程交互 串联起来 * @author jiguiquan * */ class MyResource { //控制整个流程的开启/停止,默认开启,必须使用volatile保证线程间共享变量的可见性 private volatile boolean flag = true; private AtomicInteger atomicInteger = new AtomicInteger(); //阻塞队列的落地有7个,到底选择哪一个,我们不知道,所以留一个抽象接口(通顺、适配、通用) BlockingQueue<String> blockingQueue = null; //使用构造方法的方式注入实际使用的BlockingQueue类型,注入思想很重要 public MyResource(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; //打印得知实际传进来的BlockingQueue的落地类型 System.out.println(blockingQueue.getClass().getName()); } //生产方法 public void myProd() throws Exception { String data = null; boolean retValue; while (flag) { //++i data = atomicInteger.incrementAndGet()+""; retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (retValue) { System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"成功"); }else { System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"失败"); } TimeUnit.SECONDS.sleep(1); //1秒钟生产一个 } System.out.println(Thread.currentThread().getName()+"\t 大老板叫停,表示flag=false,生产动作结束"); } //消费方法 public void myConsumer() throws Exception { String result = null; while (flag) { result = blockingQueue.poll(2, TimeUnit.SECONDS); if (result == null || result.equalsIgnoreCase("")) { flag = false; System.out.println(Thread.currentThread().getName()+"\t 超过2秒钟没有渠道蛋糕,消费退出"); System.out.println(); System.out.println(); return; //必须让自己退出,flag变为false后,生产者就要停了,消费者不停也是干等,跳出循环 } System.out.println(Thread.currentThread().getName()+"\t 消费蛋糕"+result+"成功"); } } //叫停方法 public void stop() { this.flag = false; } } public class ProdConsumer_BlockingQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10)); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 生产线程启动"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } },"prod").start(); new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t 消费线程启动"); System.out.println(); System.out.println(); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } },"consumer").start(); //大老板叫停,不然程序停不下来 try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();} System.out.println(); System.out.println(); System.out.println("5秒钟时间到,大bossmain线程叫停"); myResource.stop(); } }
执行结果:
java.util.concurrent.ArrayBlockingQueue prod 生产线程启动 consumer 消费线程启动 prod 插入队列1成功 consumer 消费蛋糕1成功 prod 插入队列2成功 consumer 消费蛋糕2成功 prod 插入队列3成功 consumer 消费蛋糕3成功 prod 插入队列4成功 consumer 消费蛋糕4成功 prod 插入队列5成功 consumer 消费蛋糕5成功 5秒钟时间到,大bossmain线程叫停 prod 大老板叫停,表示flag=false,生产动作结束 consumer 超过2秒钟没有渠道蛋糕,消费退出
显然,阻塞队列版生产者消费者模式得到了实现