重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章将为大家详细讲解有关java中阻塞队列详细介绍,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
我们提供的服务有:成都做网站、网站制作、微信公众号开发、网站优化、网站认证、襄城ssl等。为上千企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的襄城网站制作公司
阻塞队列(BlockingQueue)首先是一个支持先进先出的队列,与普通的队列完全相同;
其次是一个支持阻塞操作的队列,即:
阻塞队列用在多线程的场景下,因此阻塞队列使用了锁机制来保证同步,这里使用的可重入锁;
而对于阻塞与唤醒机制则有与锁绑定的Condition
实现
应用场景:生产者消费者模式
java中的阻塞队列根据容量可以分为有界队列和无界队列:
java8中提供了7种阻塞队列阻塞队列供开发者使用,如下表:
类名 | 描述 |
ArrayBlockingQueue | 一个由数组结构组成的有界阻塞队列 |
LinkedBlockingQueue | 由链表结构组成的有界阻塞队列(默认大小Integer.MAX_VALUE) |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列 |
DelayQueue | 使用优先级队列实现的延迟无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列,即单个元素的队列 |
LinkedTransferQueue | 由链表结构组成的无界阻塞队列 |
LinkedBlockingDeque | 由链表结构组成的双向阻塞队列 |
另外还有一个在ScheduledThreadPoolExecutor
中实现的DelayedWorkQueue
阻塞队列,
但这个阻塞队列开发者不能使用。它们之间的UML类图如下图:
BlockingQueue接口是阻塞队列对外的访问接口,所有的阻塞队列都实现了BlockQueue中的方法
作为一个队列的核心方法就是入队和出队。由于存在阻塞策略,BlockQueue
将出队入队的情况分为了四组,每组提供不同的方法:
IllegalStateException
异常;当队列为空时,从队列中获取元素则抛出NoSuchElementException
异常。对于每种情况BlockingQueue
提供的方法如下表:
方法\处理方式 | 抛出异常 | 返回特定值(布尔值) | 一直阻塞 | 超时退出 |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time.unit) |
检查 | element() | peek() | 不可用 | 不可用 |
上述方法一般用于生产者-消费者模型中,是其中的生产和消费操作队列的核心方法。
除了这些方法,BlockingQueue
还提供了一些其他的方法如下表:
方法名称 | 描述 |
remove(Object o) | 从队列中移除一个指定值 |
size() | 获取队列中元素的个数 |
contains(Object o) | 判断队列是否包含指定的元素,但是这个元素在这次判断完可能就会被消费 |
drainTo(Collection<? super E> c) | 将队列中元素放在给定的集合中,并返回添加的元素个数 |
drainTo(Collection<? super E> c, int maxElements) | 将队列中元素取maxElements(不超过队列中元素个数)个放在给定的集合中,并返回添加的元素个数 |
remainingCapacity() | 计算队列中还可以存放的元素个数 |
toArray() | 以objetc数组的形式获取队列中所有的元素 |
toArray(T[] a) | 以给定类型数组的方式获取队列中所有的元素 |
clear() | 清空队列,危险的操作 |
阻塞队列的实现依靠通知模式实现:当生产者向满了的队列中添加元素时,会阻塞住生产者,
直到消费者消费了一个队列中的元素后会通知消费者队列可用,此时再由生产者向队列中添加元素。反之亦然。
阻塞队列的阻塞唤醒依靠Condition
——条件队列来实现。
以ArrayBlockingQueue
为例说明:
ArrayBlockingQueue
的定义:
public class ArrayBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { /** The queued items */ //以数组的结构存储队列的元素,采用的是循环数组 final Object[] items; /** items index for next take, poll, peek or remove */ //队列的队头索引 int takeIndex; /** items index for next put, offer, or add */ //队列的队尾索引 int putIndex; /** Number of elements in the queue */ //队列中元素的个数 int count; /** Main lock guarding all access */ //对于ArrayBlockingQueue所有的操作都需要加锁, final ReentrantLock lock; /** Condition for waiting takes */ //条件队列,当队列为空时阻塞消费者并在生产者生产后唤醒消费者 private final Condition notEmpty; /** Condition for waiting puts */ //条件队列,当队列满时阻塞生产者,并在消费者消费队列后唤醒生产者 private final Condition notFull; }
根据类的定义字段可以看到,有两个Condition
条件队列,猜测以下过程
notEmpty.await()
方法阻塞,并在生产者生产后调用notEmpty.single()
方法notFull.await()
方法阻塞,并在消费者消费队列后调用notFull.single()
方法向队向队列中添加元素put()
方法的添加过程。
/** * 向队列中添加元素 * 当队列已满时需要阻塞当前线程 * 放入元素后唤醒因队列为空阻塞的消费者 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列已满时需要notFull.await()阻塞当前线程 //offer(e,time,unit)方法就是阻塞的时候加了超时设定 while (count == items.length) notFull.await(); //放入元素的过程 enqueue(e); } finally { lock.unlock(); } } /**enqueue实际添加元素的方法*/ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //如果条件队列中存在等待的线程 //唤醒 notEmpty.signal(); }
从队列中获取元素take()
方法的获取过程。
/** * 从队列中获取元素 * 当队列已空时阻塞当前线程 * 从队列中消费元素后唤醒等待的生产线程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //队列为空需要阻塞当前线程 while (count == 0) notEmpty.await(); //获取元素的过程 return dequeue(); } finally { lock.unlock(); } } /**dequeue实际消费元素的方法*/ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //消费元素后从唤醒阻塞的生产者线程 notFull.signal(); return x; }
阻塞队列提供了不同于普通队列的增加、删除元素的方法,核心在与队列满时阻塞生产者和队列空时阻塞消费者。
这一阻塞过程依靠与锁绑定的Condition
对象实现。Condition
接口的实现在AQS中实现,具体的实现类是ConditionObject
关于java中阻塞队列详细介绍就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。