生产者消费者模式的3种实现方式

名言警句:

线程操纵资源类;

判断、干活、唤醒通知;

严防多线程并发状态下的虚假唤醒;

一、生产者消费者模式的实现方式

在我们学习阻塞队列之前,要想实现生产者和消费者模式,共有两种方案:第一代锁(Synchronized)和第二代锁(Lock);

image.png

以上两种方案都可以被称为传统实现方案;


二、传统版——ProdConsumer_TraditionalDemo

题目:一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮

实现代码:

package com.jiguiquan.www;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 ** 题目:一个初始值为0的变量,两个线程同时对其操作,一个加1,一个减1,执行5次;
 *	1、线程		操纵		资源类
 *	2、判断		干活		唤醒通知
 *	3、严防多线程并发状态下的虚假唤醒
 *	多线程中的判断必须使用while,不可以使用if
 * @author jiguiquan
 *
 */
public class ProdConsumer_TraditionDemo {
	public static void main(String[] args) {
		ShareData shareData = new ShareData();
		new Thread(() -> {
			for (int i = 1; i <= 5; i++) {
				try {
					shareData.increment();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		},"AA").start();
		
		new Thread(() -> {
			for (int i = 1; i <= 5; i++) {
				try {
					shareData.decrement();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		},"BB").start();
	}
}

class ShareData { //资源类
	private int number = 0;
	private Lock lock = new ReentrantLock();
	private Condition condition = lock.newCondition();
	
	//加1的操作
	public void increment() throws Exception {
		lock.lock();
		try {
			//1、判断
			while (number != 0) {  //等待,不能生产,刚开始0!=0必为false,跳出循环,不会执行await()等待
				condition.await();
			}
			//2、干活
			number++;
			System.out.println(Thread.currentThread().getName()+"\t"+number);
			//3、唤醒通知
			condition.signalAll();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
	
	//减1操作
	public void decrement() {
		lock.lock();
		try {
			while (number == 0) {   //很容易理解,已经等于0了,肯定不能再减了,所以得await等待
				condition.await();
			}
			number--;
			System.out.println(Thread.currentThread().getName()+"\t"+number);
			condition.signal();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			lock.unlock();
		}
	}
}

执行结果:

image.png

很显然,严格遵循,AA线程生产一个,BB线程消费一个;

附1:注意,在多线程并发情况下,判断操作只能使用while循环判断,千万不能使用if判断;

可查看java.lang.Object类中的wait和notify方法;——注意wait和notify是Object带油的方法,而不是JUC包下的方法;

1563280539727538.png

附2:**我们将上面代码中的while循环修改为if判断试试看运行结果**

    //1、判断
    if (number != 0) {
	condition.await();
    }

发现2个线程的情况下,执行结果也没有问题;

附3:我们再试试四个线程;

AA	1
BB	0
CC	1
AA	2
BB	1
BB	0
CC	1
AA	2
DD	1
DD	0
BB	-1
CC	0
CC	1
AA	2
DD	1
DD	0
CC	1
DD	0

明显的错误发生了,出现了明显的虚假唤醒问题;

附4:我们这次将if判断修改为while循环判断再试试,4个线程的情况;

运行结果:

CC	1
BB	0
CC	1
BB	0
CC	1
BB	0
CC	1
BB	0
CC	1
DD	0
AA	1
DD	0
AA	1
DD	0
AA	1
DD	0
AA	1
DD	0

毫无问题,这就更明白,为什么JAVA的API文档中都明确要求,在多线程下的判断要是用while而不可以使用if判断的原因,这就是严防虚假唤醒的有效解决办法;


三、阻塞队列版——ProdConsumer_BlockingQueueDemo

一个例子,将 volatile/CAS/AtomicInteger/BlockQueue/线程交互 全部串联起来

实现代码:

package com.jiguiquan.www;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 ** 一个Demo,整合了 volatile/CAS/AtomicInteger/BlockingQueue/线程交互 串联起来
 * @author jiguiquan
 *
 */
class MyResource {
	//控制整个流程的开启/停止,默认开启,必须使用volatile保证线程间共享变量的可见性
	private volatile boolean flag = true;  
	private AtomicInteger atomicInteger = new AtomicInteger();
	//阻塞队列的落地有7个,到底选择哪一个,我们不知道,所以留一个抽象接口(通顺、适配、通用)
	BlockingQueue<String> blockingQueue = null;
	
	//使用构造方法的方式注入实际使用的BlockingQueue类型,注入思想很重要
	public MyResource(BlockingQueue<String> blockingQueue) {
		this.blockingQueue = blockingQueue;
		//打印得知实际传进来的BlockingQueue的落地类型
		System.out.println(blockingQueue.getClass().getName());
	}
	
	//生产方法
	public void myProd() throws Exception {
		String data = null;
		boolean retValue;
		while (flag) {  //++i
			data = atomicInteger.incrementAndGet()+"";
			retValue = blockingQueue.offer(data, 2, TimeUnit.SECONDS);
			if (retValue) {
				System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"成功");
			}else {
				System.out.println(Thread.currentThread().getName()+"\t 插入队列"+data+"失败");
			}
			TimeUnit.SECONDS.sleep(1);  //1秒钟生产一个
		}
		System.out.println(Thread.currentThread().getName()+"\t 大老板叫停,表示flag=false,生产动作结束");
	}
	
	//消费方法
	public void myConsumer() throws Exception {
		String result = null;
		while (flag) {
			result = blockingQueue.poll(2, TimeUnit.SECONDS);
			if (result == null || result.equalsIgnoreCase("")) {
				flag = false;
				System.out.println(Thread.currentThread().getName()+"\t 超过2秒钟没有渠道蛋糕,消费退出");
				System.out.println();
				System.out.println();
				return;  //必须让自己退出,flag变为false后,生产者就要停了,消费者不停也是干等,跳出循环
			}
			System.out.println(Thread.currentThread().getName()+"\t 消费蛋糕"+result+"成功");
		}
	}
	
	//叫停方法
	public void stop() {
		this.flag = false;
	}
}

public class ProdConsumer_BlockingQueueDemo {
	public static void main(String[] args) {
		MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));
		new Thread(() -> {
			System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");
			try {
				myResource.myProd();
			} catch (Exception e) {
				e.printStackTrace();
			}
		},"prod").start();
		
		new Thread(() -> {
			System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");
			System.out.println();
			System.out.println();
			try {
				myResource.myConsumer();
			} catch (Exception e) {
				e.printStackTrace();
			}
		},"consumer").start();
		
		//大老板叫停,不然程序停不下来
		try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}
		System.out.println();
		System.out.println();
		System.out.println("5秒钟时间到,大bossmain线程叫停");
		myResource.stop();
	}
}

执行结果:

java.util.concurrent.ArrayBlockingQueue
prod	 生产线程启动
consumer	 消费线程启动


prod	 插入队列1成功
consumer	 消费蛋糕1成功
prod	 插入队列2成功
consumer	 消费蛋糕2成功
prod	 插入队列3成功
consumer	 消费蛋糕3成功
prod	 插入队列4成功
consumer	 消费蛋糕4成功
prod	 插入队列5成功
consumer	 消费蛋糕5成功


5秒钟时间到,大bossmain线程叫停
prod	 大老板叫停,表示flag=false,生产动作结束
consumer	 超过2秒钟没有渠道蛋糕,消费退出

显然,阻塞队列版生产者消费者模式得到了实现

jiguiquan@163.com

文章作者信息...

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐