技术标签: 阻塞队列 Java多线程 多线程 线程安全 队列 并发容器
并发编程最佳学习路线
【Java多线程】高并发修炼基础之高并发必须了解的概念
【Java多线程】了解线程的锁池和等待池概念
【Java多线程】了解Java锁机制
【Java多线程】线程通信
【Java基础】多线程从入门到掌握-第十五节.使用Concurrent集合
【Java多线程】JUC之线程池(一)与线程池的初识第四节.线程池的工作队列
ArrayBlockingQueue:由数组
实现的有界阻塞队列
,在初始化时必须指定容器大小,按照FIFO
的方式存储元素。内部使用ReentrantLock和Condition实现,支持公平锁和非公平锁。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//内部数组
final Object[] items;
// 下一个待删除元素的索引: take, poll, peek, remove方法使用
int takeIndex;
//下一个待插入元素的索引: put, offer, add方法使用
int putIndex;
//当前队列中元素的个数
int count;
//唯一全局可重入独占“”:掌管所有读写操作的锁
final ReentrantLock lock;
//两个等待队列
/** 取元素条件队列:队列为空时,用于阻塞读线程,唤醒写线程 */
private final Condition notEmpty;
/** 写元素条件队列:队列已满时,用于阻塞写线程,唤醒读线程 */
private final Condition notFull
// Itrs表示队列和迭代器之间的共享数据,其实用来存储多个迭代器实例的
transient Itrs itrs = null;
可重入锁 ReentrantLock
实现的访问公平性,通过2个 Condition
保证了写入和获取元素的等待通知
从上面的入队/出队操作,可以看出,ArrayBlockingQueue的内部数组其实是一种环形结构
。
6
,我们来看下整个入队过程:初始时
插入元素“9”
插入元素“2”、“10”、“25”、“93”
插入元素 “90”
注意,此时再插入一个元素“90”,则putIndex变成6
,等于队列容量6
,由于是循环队列,所以会将takeIndex重置为0
:
这时队列已经满了(count==6)
,如果再有线程尝试插入元素
,并不会覆盖原有值,而是被阻塞
。
我们再来看下出队过程:
“9”
出队元素“2”、“10”、“25”、“93”
出队元素“90”
注意,此时再出队一个元素“90”,则takeIndex变成6
,等于队列容量6
,由于是循环队列,所以会将takeIndex重置为0
:
这时队列已经空了(count==0)
,如果再有线程尝试出队元素
,则会被阻塞
。
// 必须指定初始容量, 默认采用非公平策略
public ArrayBlockingQueue(int capacity) {
this(capacity, false);//默认构造非公平锁的阻塞队列
}
// 指定队列初始容量和公平/非公平策略的构造器.
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//初始化ReentrantLock重入锁,出队入队拥有这同一个锁
lock = new ReentrantLock(fair);
//初始化非空等待队列
notEmpty = lock.newCondition();
//初始化非满等待队列
notFull = lock.newCondition();
}
/**
* 根据已有集合构造队列
*/
public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
//遍历添加指定集合的元素
checkNotNull(e);// 不能有null元素
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
//如果传入集合的个数超过了容量,抛出异常被catch,最多放capacity个
throw new IllegalArgumentException();
}
//循环结束,i刚好是写入元素的个数
count = i;
//修改 putIndex 为 c 的容量 +1,如果队列已满,则重置puIndex索引为0
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
可以看到,有3种构造函数:
核心思想:
- 将元素x置入数组中。
- 计算下一个元素应该存放的下标位置。
- 元素个数器递增,这里count前加了锁,值都是从主内存中获取,不会存在内存不可见问题,并且更新也会直接刷新回主内存中。
- 最后唤醒在条件队列notEmpty因取出元素(take)而被阻塞的一个线程。
//入队操作
private void enqueue(E x) {
final Object[] items = this.items;
//通过putIndex索引直接将元素添加到数组items中
items[putIndex] = x;
//下一个元素应该存放的下标位置:当putIndex索引大小等于数组长度时,将putIndex重置为0
//当队列索引(从0开始)与数组长度相等时,下次就需要从数组头部重写开始写入
if (++putIndex == items.length) putIndex = 0;
count++; // 元素个数+1
notEmpty.signal();// 唤醒一个notEmpty上的等待线程(可以来队列取元素了)
}
可以看到,enqueue(E) 方法会将元插入到数组尾部。
重置 putIndex 为 0
,添加后调用notEmpty.signal()
通知唤醒阻塞在取出元素(take)的线程。队尾阻塞式插入元素
,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列为空闲,或者元素被其他线程取出
。
InterruptedException异常
并返回。 //put操作将向队尾插入元素,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列不满。
//如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock; // 唯一锁
// 可响应中断式地获取锁
lock.lockInterruptibly();
try {
//如果队列已满,则将当前线程包装为等待节点置入notFull的条件队列中。这里必须用while,防止虚假唤醒
while (count == items.length)
notFull.await();
// 队列非满,或者被消费者线程唤醒了,执行入队操作,往队尾写入一个元素,然后唤醒等待在notEmpty条件队列的首节点
enqueue(e);
} finally {
lock.unlock();// 解锁
}
}
响应中断
,当队列满了,就调用notFull.await() 阻塞等待
,等有消费者获取元素后继续执行;
enqueue(E)
。public boolean add(E e) {
return super.add(e);
}
//super.add() 的实现
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer(E)
,如果返回 false 就抛出异常。如果队列未满,则插入成功并返回true
, 如果队列已满则返回false。 public boolean offer(E e) {
checkNotNull(e); // 如果插入元素为null,则抛出NullPointerException异常
// 获取独占锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列已满, 则返回false
if (count == items.length)
return false;
else {
// 否则则入队
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
可以看到 offer(E) 方法要先获取锁
,如果当前队列中元素已满,就立即返回 false,这点比 add() 友好一些;
enqueue(E)
入队:超时功能
,传入一个timeout
,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回false
。public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
//获取可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
//如果队列已满,但是时间到了,直接返回false
if (nanos <= 0)
return false;
//阻塞当前线程指定纳秒数,并更新剩余时间
nanos = notFull.awaitNanos(nanos);
}
//队列非满,或者在设定时间内被消费者线程唤醒了,执行入队操作,后唤醒等待在notEmpty条件队列的首节点
enqueue(e);
return true;
} finally {
lock.unlock();//释放锁
}
}
offer() 和 put()
方法很相似,不同之处在于需要设置等待超时时间
,超时未写入,就返回 false;否则调用enqueue(E)入队
,然后返回 true。核心思想:
- 获取元素,并将当前位置置null。
- 重新设置队头下标。
- 元素计数器递减。
- 更新迭代器中的元素数据,itrs默认情况下都是为null的,只有使用迭代器的时候才会实例化Itrs。
- 唤醒在条件队列notFull因写入操作(put)而被阻塞的一个线程。
//删除队列头元素并返回
private E dequeue() {
//拿到当前数组的数据
final Object[] items = this.items;
//获取要删除的对象
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//将数组中takeIndex索引位置设置为null
items[takeIndex] = null;
//takeIndex索引加1并判断是否与数组长度相等,如果相等说明队列已空,重置takeIndex为0
if (++takeIndex == items.length) takeIndex = 0;
count--;//队列个数减1
// 更新迭代器中的元素数据,itrs只用在使用迭代器的时候才实例化
if (itrs != null)
itrs.elementDequeued();//同时更新迭代器中的元素数据
notFull.signal(); // 唤醒一个notFull上的等待线程(可以插入元素到队列了)
return x;
}
默认情况下dequeue()
方法会从队首移除元素(即 takeIndex 位置)
。
向后移动 takeIndex
,如果已经到队尾,就归零(takeIndex =0)
。结合前面添加元素时的归零,可以看到,其实 ArrayBlockingQueue 是个环形数组。itrs. elementDequeued()
,这个 itrs 是 ArrayBlockingQueue 的内部类 Itrs 的对象
,看起来像是个迭代器,实际上它的作用是保证循环数组迭代时的正确性,具体实现比较复杂,这里暂不介绍。 //从队列头部删除,队列没有元素就阻塞,可中断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 可响应中断式地获取锁
lock.lockInterruptibly();
try {
//如果队列为空,则将当前写线程包装为等待节点加入notEmpty的条件队列中。这里必须用while,防止虚假唤醒
while (count == 0)
notEmpty.await();
// 队列非空,或者被生产者线程唤醒了,执行出队操作,出队时唤醒notEmpty的条件队列中的首节点
return dequeue();
} finally {
lock.unlock();// 释放锁
}
}
响应中断
,与 poll() 不同的是,如果队列为空会一直阻塞等待
,直到中断或者有元素,有元素时还是调用dequeue()
方法入队。返回null
。//poll方法,该方法获取并移除此队列的头元素,若队列为空,则返回 null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果为空,返回null, 否则执行出队操作
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
dequeue()出队
:timeout
,获取元素超时会立即返回null
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
// 队列仍为空,但是时间到了,必须返回了
if (nanos <= 0)
return null;
// 在条件队列里等着,但是需要更新时间
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
允许阻塞一段时间
,如果在阻塞一段时间还没有元素写入队列,就返回 null。直接获取队首元素
,只获取不出队。可能返回为null(队列为空)public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); //直接返回当前队列的头元素,但不删除
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
指的是equals方法判定相同
】的元素,移除成功返回true,如果队列为空或没有匹配元素,则返回false。 public boolean remove(Object o) {
if (o == null) return false;
//获取数组数据
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
//如果此时队列不为null,这里是为了防止并发情况
if (count > 0) {
//获取下一个要添加元素时的索引
final int putIndex = this.putIndex;
//获取当前要被删除元素的索引
int i = takeIndex;
//执行循环查找要删除的元素
do {
// 找到了对应的元素的位置,removeAt删除该位置的元素
if (o.equals(items[i])) {
removeAt(i);//执行删除
return true;//删除成功返回true
}
//当前删除索引执行加1后判断是否与数组长度相等
//若为true,说明索引已到数组尽头,将i设置为0
if (++i == items.length)
i = 0;
} while (i != putIndex);
//到达区间[takeIndex, putIndex)的边界,说明所有非null元素都找遍了
}
return false;//没有找到元素
} finally {
lock.unlock();//解锁
}
}
//移除removeIndex位置的元素:根据索引删除元素,实际上是把删除索引之后的元素往前移动一个位置
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//如果刚好删除的是队首,那刚好是一个出队动作
if (removeIndex == takeIndex) {
//如果是直接删除
items[takeIndex] = null;
//当前队列头元素加1并判断是否与数组长度相等,若为true设置为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;//队列元素减1
if (itrs != null)
itrs.elementDequeued();//更新迭代器中的数据
}
//其他情况
else {
//如果要删除的元素不在队列头部,
//那么只需循环迭代把删除元素后面的所有元素往前移动一个位置
//获取下一个要被添加的元素的索引,作为循环判断结束条件
final int putIndex = this.putIndex;
//执行循环
for (int i = removeIndex;;) {
//获取要删除节点索引的下一个索引
int next = i + 1;
//判断是否已为数组长度,如果是从数组头部(索引为0)开始找
if (next == items.length)
next = 0;
//如果查找的索引不等于要添加元素的索引,说明移除的不是队尾,后面的元素补充上来
if (next != putIndex) {
items[i] = items[next];//把后一个元素前移覆盖要删除的元
i = next;
} else {
//在removeIndex索引之后的元素都往前移动完毕后清空最后一个元素
items[i] = null;
//最后putIndex当然也得左移,i此时肯定是putIndex - 1
this.putIndex = i;
break;//结束循环
}
}
count--;//队列元素减1
if (itrs != null)
itrs.removedAt(removeIndex);//更新迭代器数据
}
notFull.signal();//唤醒添加线程
}
一波源码看下来,ArrayBlockingQueue 使用可重入锁 ReentrantLock保证线程安全,通过两个 Condition 实现生产者-消费者模型,看起来很简单的样子,这背后要感谢 ReentrantLock 和 Condition 的功劳!
获取到AQS独占锁
才能进行操作如果队列为空,则读线程
将会被包装为条件节点扔到读线程等待条件队列中阻塞
,等待写线程写入新的元素,并唤醒等待中的读线程,反之亦然。超高并发的环境
,由于生产者-消息者共用一把锁,可能出现性能瓶
颈。后续,我们会介绍另一种基于单链表实现的阻塞队列——LinkedBlockingQueue
,该队列的最大特点是使用了“两把锁”
,以提升吞吐量。
文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态
文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境
文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn
文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker
文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机
文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk
文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入
文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。 Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。
文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动
文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计
文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;gt;Jni-&amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图
文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法