摘要:实现阻塞队列在自己实现之前先搞清楚阻塞队列的几个特点基本队列特性先进先出。消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程。
实现Java 阻塞队列
</>复制代码
在自己实现之前先搞清楚阻塞队列的几个特点:
基本队列特性:先进先出。
写入队列空间不可用时会阻塞。
获取队列数据时当队列为空时将阻塞。
实现队列的方式多种,总的来说就是数组和链表;其实我们只需要搞清楚其中一个即可,不同的特性主要表现为数组和链表的区别。
这里的 ArrayBlockingQueue 看名字很明显是由数组实现。
我们先根据它这三个特性尝试自己实现试试。
初始化队列我这里自定义了一个类:ArrayBlockQueue,它的构造函数如下:
</>复制代码
//队列参数
public int size;
private volatile Object[] items;
private volatile int pollPoint = 0;
private volatile int addPoint = 0;
private volatile int count = 0;
//初始化队列容量
public ArrayBlockQueue(int size) {
this.size = size;
this.items = new Object[size];
}
写入操作
有几个需要注意的点:
队列满的时候,写入的线程需要被阻塞。
写入过队列的数量大于队列大小时需要从第一个下标开始写。
先看第一个队列满的时候,写入的线程需要被阻塞,先来考虑下如何才能使一个线程被阻塞,看起来的表象线程卡住啥事也做不了。
</>复制代码
其实这样的一个特点很容易让我们想到 Java 的等待通知机制来实现线程间通信。
所以我这里的做法是,一旦队列满时就将写入线程调用 addLock.wait() 进入 waiting 状态,直到空间可用时再进行唤醒。
初始化 两个锁
</>复制代码
//初始化两个锁
private Object addLock = new Object();
private Object pollLock = new Object();
写入操作代码
</>复制代码
public void add(Object item) {
synchronized (addLock) {
//队列满则阻塞
while (count >= size) {
try {
addLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
items[addPoint] = item;
addPoint = (addPoint + 1) % size;
count++;
//当队列从0个元素添加到1个元素,则说明从空状态转非空状态可以通知一次取元素线程
if (count == 1)
synchronized (pollLock) {
pollLock.notifyAll();
}
}
}
所以这里声明了两个对象用于队列满、空情况下的互相通知作用。
在写入数据成功后需要使用 pollLock.notifyAll(),这样的目的是当获取队列为空时,一旦写入数据成功就可以把消费队列的线程唤醒。
</>复制代码
这里的 wait 和 notify 操作都需要对各自的对象使用 synchronized 方法块,这是因为 wait 和 notifyAll 都需要获取到各自的锁。
消费队列
上文也提到了:当队列为空时,获取队列的线程需要被阻塞,直到队列中有数据时才被唤醒。
代码和写入的非常类似,也很好理解;只是这里的等待、唤醒恰好是相反的。
总的来说就是:
写入队列满时会阻塞直到获取线程消费了队列数据后唤醒写入线程。
消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程。
</>复制代码
public Object poll() {
synchronized (pollLock) {
//队列空则阻塞
while (count == 0) {
try {
pollLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object value = items[pollPoint];
pollPoint = (pollPoint + 1) % size;
count--;
//当队列从满到非满,可以通知一次增添元素的线程
if (count == (size - 1))
synchronized (addLock) {
addLock.notifyAll();
}
return value;
}
}
测试
每一秒出队1个
</>复制代码
//消费者
//每一秒出队1个
class Counsum extends Thread {
private ArrayBlockQueue queue;
public Counsum(ArrayBlockQueue queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 出队一个数据: " + queue.poll());
}
}
}
每5秒入队2个,可见生产明显慢于消费
</>复制代码
//生产者
//每5秒入队2个
class Product extends Thread {
private ArrayBlockQueue queue;
public Product(ArrayBlockQueue queue) {
this.queue = queue;
}
@Override
public void run() {
int i = 0;
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 0; j < 2; j++, i++) {
queue.add(i);
System.out.println(Thread.currentThread().getName() + "【入队一个数据: " + i+"】");
}
}
}
}
main
</>复制代码
public class Test extends Object {
public static void main(String[] args) throws CloneNotSupportedException {
ArrayBlockQueue queue = new ArrayBlockQueue(5);
Thread counsum = new Counsum(queue);
Thread counsum1 = new Counsum(queue);
Thread counsum2 = new Counsum(queue);
Thread product = new Product(queue);
Thread product1 = new Product(queue);
Thread product2 = new Product(queue);
counsum.start();
product.start();
counsum1.start();
product1.start();
counsum2.start();
product2.start();
}
}
//output
消费者线程-1出队一个数据: 0
生成者线程-3入队一个数据: 0
消费者线程-3出队一个数据: 0
消费者线程-2出队一个数据: 0
生成者线程-2入队一个数据: 0
生成者线程-1入队一个数据: 0
生成者线程-2入队一个数据: 1
生成者线程-3入队一个数据: 1
生成者线程-1入队一个数据: 1
消费者线程-3出队一个数据: 1
消费者线程-1出队一个数据: 1
消费者线程-2出队一个数据: 1
消费者线程-2出队一个数据: 2
生成者线程-2入队一个数据: 2
消费者线程-1出队一个数据: 2
生成者线程-3入队一个数据: 2
生成者线程-1入队一个数据: 2
消费者线程-3出队一个数据: 2
生成者线程-1入队一个数据: 3
生成者线程-3入队一个数据: 3
生成者线程-2入队一个数据: 3
消费者线程-1出队一个数据: 3
消费者线程-3出队一个数据: 3
消费者线程-2出队一个数据: 3
引用
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75935.html
摘要:什么是阻塞队列阻塞队列是一个在队列基础上又支持了两个附加操作的队列。阻塞队列的应用场景阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。由链表结构组成的无界阻塞队列。 什么是阻塞队列? 阻塞队列是一个在队列基础上又支持了两个附加操作的队列。 2个附加操作: 支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。 支持阻塞的...
摘要:知识点总结容器知识点总结容器接口与是在同一级别,都是继承了接口。另一种队列则是双端队列,支持在头尾两端插入和移除元素,主要包括。一个由链表结构组成的无界阻塞队列。是一个阻塞的线程安全的队列,底层实现也是使用链式结构。 Java知识点总结(Java容器-Queue) @(Java知识点总结)[Java, Java容器] Queue Queue接口与List、Set是在同一级别,都是继承了...
摘要:尽管中已经包含了阻塞队列的官方实现,但是熟悉其背后的原理还是很有帮助的。阻塞队列的实现阻塞队列的实现类似于带上限的的实现。下面是阻塞队列的一个简单实现必须注意到,在和方法内部,只有队列的大小等于上限或者下限时,才调用方法。 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程...
摘要:并发编程实战水平很高,然而并不是本好书。一是多线程的控制,二是并发同步的管理。最后,使用和来关闭线程池,停止其中的线程。当线程调用或等阻塞时,对这个线程调用会使线程醒来,并受到,且线程的中断标记被设置。 《Java并发编程实战》水平很高,然而并不是本好书。组织混乱、长篇大论、难以消化,中文翻译也较死板。这里是一篇批评此书的帖子,很是贴切。俗话说:看到有这么多人骂你,我就放心了。 然而知...
阅读 3353·2021-11-08 13:21
阅读 1305·2021-08-12 13:28
阅读 1516·2019-08-30 14:23
阅读 2010·2019-08-30 11:09
阅读 917·2019-08-29 13:22
阅读 2765·2019-08-29 13:12
阅读 2636·2019-08-26 17:04
阅读 2419·2019-08-26 13:22
极致性价比!云服务器续费无忧!
Tesla A100/A800、Tesla V100S等多种GPU云主机特惠2折起,不限台数,续费同价。
NVIDIA RTX 40系,高性价比推理显卡,满足AI应用场景需要。
乌兰察布+上海青浦,满足东推西训AI场景需要