8645

阻塞队列(超详细易懂)

目录

一、阻塞队列

1.阻塞队列概述

2.生产者消费者模型

3.阻塞队列的作用

4.标准库中的阻塞队列类

5.例子:简单生产者消费者模型

二、阻塞队列模拟实现

1.实现循环队列(可跳过)

1.1简述环形队列

1.2代码实现

2.实现阻塞队列

2.1实现思路

2.2代码实现

2.3代码解析

①wait和notify的使用,实现自动阻塞和解阻塞

②while循环判断,线程安全的铜墙铁壁

2.4纯享版代码实现(无注释)

一、阻塞队列

1.阻塞队列概述

阻塞队列是一种特殊的队列,同样遵循“先进先出”的原则,支持入队操作和出队操作。在此基础上,阻塞队列会在队列已满或队列为空时陷入阻塞,使其成为一个线程安全的数据结构,它具有如下特性:

当队列已满时,继续入队列就会阻塞,直到有其他线程从队列中取走元素。当队列为空时,继续出队列也会阻塞,直到有其他线程向队列中插入元素。

2.生产者消费者模型

生产者消费者模型有两种角色,生产者和消费者,两者之间通过缓冲容器来达到解耦合的效果。类似于厂商和客户与中转仓库之间的关系,如下图:

厂家生产的商品堆积在中转仓库,当中转仓库满时,入仓阻塞,当中转仓库为空时,出仓阻塞。通过上述结构,生产者和消费者摆脱了“产销一体”的运作模式,即解耦合。同时,无论是客户需求暴增,还是厂家产量飙升,都会被中央仓库协调,避免突发情况导致结构崩溃。

同理,根据生产者消费者模型,我们将线程带入到消费者和生产者的角色,阻塞队列带入到缓冲空间的角色,一个类似的模型很容易就搭建起来了。

所以说,阻塞队列对生产者消费者模型是相当重要的。

3.阻塞队列的作用

①解耦合

作为生产者消费者模式的缓冲空间,将线程(其他)之间分隔,通过阻塞队列间接联系起来,起到降低耦合性的作用,这样即使其中一个挂掉,也不会使另一个也跟着挂掉。

②削峰填谷

因为阻塞队列本身的大小是有限的,所以能起到一个限制作用,即在消费者面对突发暴增的入队操作,依然不受影响。

如电商平台在每年双十一时都会出现请求峰值的情况,如下(杜撰):

而假设电商平台对请求的处理流程是这样的:

因为处理请求需要消耗硬件资源,如果没有消息队列,面对双十一这种请求暴增的情况,请求处理服务器很可能就直接挂掉了。

而有了消息队列之后,请求处理服务器不必直接面对大量请求的冲击,仍旧可以按原先的处理速度来处理请求,避免了被冲爆,这就是‘削峰’。

没有被处理的请求也不是不处理了,而是当消息队列有空闲时再继续流程,即高峰请求被填在低谷中,这就是‘填谷’。

经过‘削峰填谷’之后的请求处理曲线就(大致)变成了下图:

4.标准库中的阻塞队列类

类名说明LinkedBlockingQueue<>基于链表的阻塞队列(常用)LinkedBlockingDeque<>基于链表的双端阻塞队列LinkedTransferQueue<>基于链表的无界阻塞队列ArrayBlockingQueue<>基于顺序表的阻塞队列PriorityBlockingQueue<>带有优先级功能的阻塞队列

方法 解释void put(E e)带有阻塞特性的入队操作方法(常用)E take() 带有阻塞特性的出队操作方法(常用)boolean offer(E e, long timeout, TimeUnit unit) 带有阻塞特性的入队操作方法,并且可以设置最长等待时间E poll(long timeout, TimeUnit unit) 带有阻塞特性的出队操作方法,并且可以设置最长等待时间public boolean contains(Object o) 判断阻塞队列中是否包含某个元素

5.例子:简单生产者消费者模型

可调整生产时间和消费时间观察效果。

//模拟实现的阻塞队列

class MyBlockingQueue {

private Object lock = new Object();

private String[] elems = null;

private int head = 0;

private int tail = 0;

private int size = 0;

public MyBlockingQueue(int capacity) {

elems = new String[capacity];

}

public void put(String elem) throws InterruptedException {

synchronized(lock) {

while(size == elems.length) {

lock.wait();

}

elems[tail] = elem;

tail++;

if(tail >= elems.length) {

tail = 0;

}

size++;

lock.notify();

}

}

public String tack() throws InterruptedException {

String elem = null;

synchronized (lock) {

while(size == 0) {

lock.wait();

}

elem = elems[head];

head++;

if(head == elems.length) {

head = 0;

}

size--;

lock.notify();

}

return elem;

}

}

public class ThreadDemo1 {

public static void main(String[] args) throws InterruptedException {

MyBlockingQueue queue = new MyBlockingQueue(10);

//生产者线程

Thread producerModel = new Thread(() -> {

int number = 0;

while(true) {

try {

queue.put("" + number);

System.out.println("生产了 " + number++);

Thread.sleep(400);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

});

//消费者线程A

Thread consumerModelA = new Thread(() -> {

String number;

while(true) {

try {

number = queue.tack();

System.out.println("A消费了 " + number);

Thread.sleep(1000);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

});

//消费者线程B

Thread consumerModelB = new Thread(() -> {

String number;

while(true) {

try {

number = queue.tack();

System.out.println("B消费了 " + number);

Thread.sleep(1000);

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

});

producerModel.start();

consumerModelA.start();

consumerModelB.start();

}

}

二、阻塞队列模拟实现

1.实现环形队列(可跳过)

在正式实现阻塞队列之前,我们需要先将普通环形队列这个框架搭建起来,再考虑为其加入阻塞队列的特性。

1.1简述环形队列

环形队列,也被称为循环队列,是一种特殊的线性数据结构。它的操作基于先进先出(FIFO)的原则,并且队尾被连接在队首之后以形成一个循环,可以使用数组或链表实现,这里采用数组。

环形队列在逻辑上是环形的,但在物理上,它通常是通过一个定长的数组来实现的。环形队列的大小是确定的,一旦创建,它所能存放的元素个数就是固定的。先进先出,队列队首出队,队尾入队

1.2代码实现

class AnnularQueue {

//String类型的数组,存储队列元素

private String[] elems = null;

//队首位置

private int head = 0;

//队尾位置

private int tail = 0;

//存储的元素个数

private int size = 0;

//构造方法,用于构建定长数组,数组长度由参数指定

public AnnularQueue(int capacity) {

elems = new String[capacity];

}

//入队方法

public void put(String elem) throws InterruptedException {

//当队列已满时,拒绝入队

if(size == elems.length) {

return;

}

//将元素存入队尾

elems[tail] = elem;

//存入后,队尾位置后移一位

tail++;

//实现环形队列的关键,超过数组长度后回归数组首位

if(tail >= elems.length) {

//回归数组首位

tail = 0;

}

//存入后元素总数加一

size++;

}

//出队方法

public String tack() throws InterruptedException {

String elem = null;

//当队列为空时,拒绝入队,返回null

if(size == 0) {

return elem;

}

//出队,取出队首值(不用置空,队尾存入时覆盖)

elem = elems[head];

//出队后,队首位置后移一位

head++;

//实现环形队列的关键,超过数组长度后回归数组首位

if(head == elems.length) {

//回归数组首位

head = 0;

}

//存入后元素总数加一

size--;

//返回取出的元素

return elem;

}

}

2.实现阻塞队列

2.1实现思路

前面说到了“阻塞队列是一种特殊的队列,同样遵循‘先进先出’的原则,支持入队操作和出队操作”,实现一个只有入队和出队操作的队列很简单,关键在于如何将阻塞队列的特性加入进去,使其能够判断队列是否已满或是为空,进而阻塞等待。

判断是否未满很简单,只要在队列中定义一个size变量统计已存个数,当已存个数和队列长度相同时就为已满,size为0时队列为空。

阻塞等待也不难,只要引入锁,在入队、出队操作中使用wait和notify就可以了。

真正的难点在于,阻塞队列是适配于多线程程序的,必须要考虑到线程安全问题,而这一问题往往不好解决。

来,先看一下基于环形队列的简单阻塞队列到底是如何实现的吧。

2.2代码实现

测试可使用上文的例子‘简单生产者消费者模型’。

class MyBlockingQueue {

//对象公用锁

private Object lock = new Object();

//String类型的数组,存储队列元素

private String[] elems = null;

//队首位置

private int head = 0;

//队尾位置

private int tail = 0;

//存储的元素个数

private int size = 0;

//构造方法,用于构建定长数组,数组长度由参数指定

public MyBlockingQueue(int capacity) {

elems = new String[capacity];

}

//入队方法

public void put(String elem) throws InterruptedException {

synchronized(lock) {

//已满时入队操作阻塞

while(size == elems.length) {

lock.wait();

}

//将元素存入队尾

elems[tail] = elem;

//存入后,队尾位置后移一位

tail++;

//实现环形队列的关键,超过数组长度后回归数组首位

if(tail >= elems.length) {

//回归数组首位

tail = 0;

}

//存入后元素总数加一

size++;

//当出队操作阻塞时,入队后为其解除阻塞

//(入队后队列不为空了)

lock.notify();

}

}

//出队方法

public String tack() throws InterruptedException {

//存储取出的元素,默认为null

String elem = null;

synchronized (lock) {

//队列为空时出队操作阻塞

while (size == 0) {

lock.wait();

}

//出队,取出队首值(不用置空,队尾存入时覆盖)

elem = elems[head];

//出队后,队首位置后移一位

head++;

//实现环形队列的关键,超过数组长度后回归数组首位

if(head == elems.length) {

//回归数组首位

head = 0;

}

//存入后元素总数加一

size--;

//当入队操作阻塞时,出队后为其解除阻塞

//(出队后队列不满)

lock.notify();

}

//返回取出的元素

return elem;

}

}

2.3代码解析

①wait和notify的使用,实现自动阻塞和解阻塞

首先看wait的位置:

判断条件很易懂,就是当队列已满(为空)时调用wait方法,使调用该方法的线程陷入阻塞,也就是说线程阻塞时,队列是一定陷入已满或为空状态的。

那么什么时候解除阻塞呢?当然是失去已满或为空状态的时候。

调用tack方法出队能够使队列留出位置,不再已满;调用put方法入队能够为队列存入元素,不再为空,两者相辅相成。

所以阻塞的put方法一定是要在别的线程调用tack方法,完成出队后才可能解除阻塞的;阻塞的tack方法也一定是要在别的线程调用put方法,完成入队后才可能解除阻塞的。

注意“可能”二字,因为阻塞的可能有很多线程,所以还要再参与lock锁竞争。

②while循环判断,线程安全的铜墙铁壁

代码中,while循环条件为判断队列状态是否已满(为空),若判断通过,则线程阻塞,等待状态解除。就直观效果而言,使用if语句和while循环的区别不大,但对于多线程程序,我们不得不多考虑一些。

在实现自动阻塞和解阻塞时,我们让put方法(入队操作)和tack方法(出队操作)互相为对方解除阻塞。

但是,put方法和stack方法中用的是同一把锁(lock),并且notify方法的机制是随机解除一个线程的阻塞,那么不管是put方法(入队操作)还是tack方法(出队操作)调用notify方法都可能反而为“同类”解除了阻塞,而我们的原意是要让他们互相解除阻塞。

下图示例就是一种bug可能,在队列容量为100,当前元素量为99的情况下,由于notify的随机唤醒机制,已满状态下的队列又进行了一次入队操作。(出队bug同理)

而使用while循环则不会出现这样的bug,由于wait方法在循环体内部,因此当阻塞结束后仍然会再次判断队列状态,即便再次堵塞后又出现同样的问题也没关系,大不了继续判断,至死方休。

③其他问题请留言😘

2.4纯享版代码实现(无注释)

class MyBlockingQueue {

private static Object lock = new Object();

private String[] elems = null;

private int head = 0;

private int tail = 0;

private int size = 0;

public MyBlockingQueue(int capacity) {

elems = new String[capacity];

}

public void put(String elem) throws InterruptedException {

synchronized(lock) {

while(size == elems.length) {

lock.wait();

}

elems[tail] = elem;

tail++;

if(tail >= elems.length) {

tail = 0;

}

size++;

lock.notify();

}

}

public String tack() throws InterruptedException {

String elem = null;

synchronized (lock) {

while(size == 0) {

lock.wait();

}

elem = elems[head];

head++;

if(head == elems.length) {

head = 0;

}

size--;

lock.notify();

}

return elem;

}

}

博主是Java新人,每位同志的支持都会给博主莫大的动力,欢迎留言讨论,如果有任何疑问,或者发现了任何错误,都欢迎大家在评论区交流“ψ(`∇´)ψ