堵塞队列:消息中间件MQ的底层核心原理
一、基本概念?
队列:排队,先到先得;
1、什么是堵塞队列?
顾名思义,首先它是一个队列,一个堵塞队列在数据结构中所起的作用大致如图所示:

线程1向队列中添加元素,而线程2从队列中移除元素
当堵塞队列为空时,从队列中获取元素的操作将会被堵塞;
当堵塞队列为满时,向队列中添加元素的操作将会被堵塞;
试图从已空的堵塞队列中获取元素的线程将会被堵塞,直到其他线程往空的队列插入新的元素;
同样,试图往已满的阻塞队列中插入新元素的线程也会被堵塞,直到其他的线程从队列中移除一个或多个元素或完全清空队列后,使队列重新变得空闲起来的时候,添加操作的线程才能继续工作;
2、堵塞队列有没有好的一面?
-
不得不堵塞,该如何管理,(去医院体检);
-
阻塞是好事,好比“海底捞火锅”,越堵越开心;
3、堵塞队列的好处?——解放程序员
在多线程领域,所谓堵塞,在某些情况下会挂起线程(即堵塞),一旦条件满足,被挂起的线程又会自动被唤醒,不需要程序员人工控制堵塞wait或者唤醒notify(notifyAll);
在concurrent并发包发布之前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
二、堵塞队列有哪些?


首先,由上图可以清晰的看到,所有的Queue都是Collection家族的成员,这个一定要记牢,在以前的单线程业务中,很少提及Queue;
阻塞队列种类分析:(BlockingQueue和List平级,列出的7个都是它的实现)

其中:
ArrayBlockingQueue使用可参考ArrayList;
LinkedBlockingQueue使用可参考LinkedList;
SynchronousQueue:专属定制版,生产一个取用一个,绝不多生产!
线程池的底层只使用:标红的3个
三、阻塞队列(BlockingQueue)的核心方法

| 队列的进出遵循FIFO规则,先进先出 | |
| 抛出异常 |
当阻塞队列满时,再往队里里add插入元素将会抛出异常“IllegalStateException:Queue full”; 当阻塞队列空时,再从队列中remove移除元素会抛出异常“NoSuchElementException”; |
|
特殊值 (略温和) |
插入方法(offer),成功为true,失败为false; 移除方法(poll),成功返回获取到的队列中的元素,队列里面为空时返回null; |
|
一直堵塞 (慎用) |
当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出; 当阻塞队列空时,消费者线程试图从队列中take元素,队列会一直阻塞消费者线程直到队列中有了元素; |
|
超时退出 (折中) |
当阻塞队列满时,队列会阻塞生产者线程一段时间,超出限时后,生产者线程会退出;
当阻塞队列空时,队列会阻塞消费者线程一段时间,超出限时后,消费者线程会退出; |
四、阻塞队列之同步SynchronousQueue队列
SynchronousQueue没有容量;
它与其他BlockingQueue队列不同,SynchronousQueue是一个不存储元素的BlockingQueue;
每一个put操作,必须要等待一个take操作完成后才可以进行,否则不能继续添加元素;
反之亦然。
示例代码:
package com.jiguiquan.www;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+"\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+"\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AAA").start();
new Thread(() -> {
try {
//故意让BBB线程睡5秒的目的就是让看出,即使给AAA足够长的时间,它也无法put 2,直到BBB线程取走1之后
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName()+"\t take"+blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BBB").start();
}
}
执行结果:

很显然:
即使给了AAA线程足够长的时间,在它put完一组数据后,也必须要等到BBB取走后,才会开始生产第一批的数据;
SynchronousQueue线程就是容量为1的特殊BlockingQueue;
五、阻塞队列BlockingQueue的应用场景有哪些?
-
生产者消费者模式;
-
线程池
-
消息中间件MQ
生产者和消费者模式内容及引申出来的知识点也比较多,所以放到下一章节!




1 Comment
你好