堵塞队列知道吗?

堵塞队列:消息中间件MQ的底层核心原理

一、基本概念?

队列:排队,先到先得;

1、什么是堵塞队列?

顾名思义,首先它是一个队列,一个堵塞队列在数据结构中所起的作用大致如图所示:

9.png

线程1向队列中添加元素,而线程2从队列中移除元素

当堵塞队列为空时,从队列中获取元素的操作将会被堵塞;

当堵塞队列为满时,向队列中添加元素的操作将会被堵塞;

试图从已空的堵塞队列中获取元素的线程将会被堵塞,直到其他线程往空的队列插入新的元素;

同样,试图往已满的阻塞队列中插入新元素的线程也会被堵塞,直到其他的线程从队列中移除一个或多个元素或完全清空队列后,使队列重新变得空闲起来的时候,添加操作的线程才能继续工作;

2、堵塞队列有没有好的一面?

  • 不得不堵塞,该如何管理,(去医院体检);

  • 阻塞是好事,好比“海底捞火锅”,越堵越开心;

3、堵塞队列的好处?——解放程序员

在多线程领域,所谓堵塞,在某些情况下会挂起线程(即堵塞),一旦条件满足,被挂起的线程又会自动被唤醒,不需要程序员人工控制堵塞wait或者唤醒notify(notifyAll);

在concurrent并发包发布之前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。


二、堵塞队列有哪些?

11.png

12.jpg

首先,由上图可以清晰的看到,所有的Queue都是Collection家族的成员,这个一定要记牢,在以前的单线程业务中,很少提及Queue;

阻塞队列种类分析:(BlockingQueue和List平级,列出的7个都是它的实现)

image.png

其中:

ArrayBlockingQueue使用可参考ArrayList;

LinkedBlockingQueue使用可参考LinkedList;

SynchronousQueue:专属定制版,生产一个取用一个,绝不多生产!

线程池的底层只使用:标红的3个


三、阻塞队列(BlockingQueue)的核心方法

image.png

队列的进出遵循FIFO规则,先进先出
抛出异常

当阻塞队列满时,再往队里里add插入元素将会抛出异常“IllegalStateException:Queue full”;

当阻塞队列空时,再从队列中remove移除元素会抛出异常“NoSuchElementException”;

特殊值

(略温和)

插入方法(offer),成功为true,失败为false;

移除方法(poll),成功返回获取到的队列中的元素,队列里面为空时返回null;

一直堵塞

(慎用)

当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出;

当阻塞队列空时,消费者线程试图从队列中take元素,队列会一直阻塞消费者线程直到队列中有了元素;

超时退出

(折中)

当阻塞队列满时,队列会阻塞生产者线程一段时间,超出限时后,生产者线程会退出;

当阻塞队列空时,队列会阻塞消费者线程一段时间,超出限时后,消费者线程会退出;


四、阻塞队列之同步SynchronousQueue队列

SynchronousQueue没有容量;

它与其他BlockingQueue队列不同,SynchronousQueue是一个不存储元素的BlockingQueue;

每一个put操作,必须要等待一个take操作完成后才可以进行,否则不能继续添加元素;

反之亦然。

示例代码:

package com.jiguiquan.www;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

public class SynchronousQueueDemo {
	public static void main(String[] args) {
		BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
		
		new Thread(() -> {
			try {
				System.out.println(Thread.currentThread().getName()+"\t put 1");
				blockingQueue.put("1");
				System.out.println(Thread.currentThread().getName()+"\t put 2");
				blockingQueue.put("2");
				System.out.println(Thread.currentThread().getName()+"\t put 3");
				blockingQueue.put("3");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"AAA").start();
		
		new Thread(() -> {
			try {
				//故意让BBB线程睡5秒的目的就是让看出,即使给AAA足够长的时间,它也无法put 2,直到BBB线程取走1之后
				TimeUnit.SECONDS.sleep(5);
				System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
				TimeUnit.SECONDS.sleep(5);
				System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
				TimeUnit.SECONDS.sleep(5);
				System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		},"BBB").start();
	}
}

执行结果:

image.png

很显然:

即使给了AAA线程足够长的时间,在它put完一组数据后,也必须要等到BBB取走后,才会开始生产第一批的数据;

SynchronousQueue线程就是容量为1的特殊BlockingQueue;


五、阻塞队列BlockingQueue的应用场景有哪些?

  • 生产者消费者模式;

  • 线程池

  • 消息中间件MQ

生产者和消费者模式内容及引申出来的知识点也比较多,所以放到下一章节!

jiguiquan@163.com

文章作者信息...

1 Comment

  • 你好

留下你的评论

*评论支持代码高亮<pre class="prettyprint linenums">代码</pre>

相关推荐