JUC的PriorityBlockingQueue如何使用


本篇内容介绍了“JUC的PriorityBlockingQueue如何使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!PriorityBlockingQueue 底层依赖于数组作为存储结构,最大容量上限是 Integer.MAX_VALUE - 8,所以几乎可以将其视为无界的。同 PriorityQueue 一样,PriorityBlockingQueue 同样引入了堆数据结构来编排队列元素的优先级,默认使用最小堆结构。此外,由 Blocking 字样我们可以推断出 PriorityBlockingQueue 是一个阻塞队列。PriorityBlockingQueue 实现自 BlockingQueue 接口,并基于 ReentrantLock 锁保证线程安全。不过需要注意的一点是,PriorityBlockingQueue 的阻塞仅针对出队列操作而言,当队列为空时出队列的线程会阻塞等待其它线程往队列中添加新的元素。对于入队列操作来说,因为 PriorityBlockingQueue 定义为无界,所以执行入队列的线程会立即得到响应,如果队列底层数组已满则该线程会尝试对底层数组进行扩容,当底层数据达到容量上限而无法继续扩容时会抛出 OOM 异常。下面先来了解一下 PriorityBlockingQueue 的字段定义,如下:PriorityBlockingQueue 默认初始时的底层数组大小设置为 11,并在元素已满时触发扩容操作,字段 PriorityBlockingQueue#allocationSpinLock 用于控制同一时间只有一个线程在执行扩容。当某个线程检测到当前底层数组已满时会基于 CAS 操作尝试将该字段值由 0 改为 1,然后开始执行扩容,并在完成之后重置该标记字段。字段 PriorityBlockingQueue#comparator 用于指定元素比较器以判定队列元素的优先级,如果该字段为 null,则 PriorityBlockingQueue 会基于元素自带的比较器排列优先级。对于基本类型而言则参考元素的自然顺序,对于自定义对象来说,需要保证这些对象实现了 java.lang.Comparable 接口,否则会抛出 ClassCastException 异常。PriorityBlockingQueue 实现自 BlockingQueue 接口,下面针对核心方法的实现逐一进行分析。针对添加元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#offerPriorityBlockingQueue#addPriorityBlockingQueue#put 方法,不过后两者都是直接调用了 PriorityBlockingQueue#offer 方法。此外,该方法的超时版本 PriorityBlockingQueue#offer(E, long, TimeUnit) 也是直接委托给 PriorityBlockingQueue#offer 方法执行,并没有真正实现超时等待机制,这主要是因为 PriorityBlockingQueue 是无界的,所有的添加操作都能够被立即响应,而不会阻塞。下面展开分析一下 PriorityBlockingQueue#offer 方法的实现,如下:PriorityBlockingQueue 同样不允许往其中添加 null 元素,如果待添加的元素值合法则执行:加锁,保证同一时间只有一个线程在操作队列;判断队列是否已满,如果是则执行扩容操作;将元素基于最小堆数据结构的约束插入到底层数据的合适位置;队列结点计数加 1;因为当前队列至少包含一个元素,所以尝试唤醒一个之前因为队列为空而阻塞的线程;释放锁并返回。继续来看一下上述步骤中的扩容过程,实现位于 PriorityBlockingQueue#tryGrow 方法中,如下:在开始执行扩容之前,当前线程会释放持有的锁,以避免在扩容期间阻塞其它线程的出队列操作,然后基于 CAS 操作修改扩容标记位 PriorityBlockingQueue#allocationSpinLock,保证同一时间只有一个线程在执行扩容。一开始数组较小(长度小于 64)时,线程将对底层数组成倍扩容(即 2(n + 1)),然后再按照 50% 的比例进行扩容(即 (1 + 1/2) * n),如果底层数组已经到达容量上限,则会抛出 OOM 异常。线程在完成扩容操作之后会重置扩容标记,如果有线程在竞争 CAS 时失败则会尝试让渡其它线程获取锁。这里主要是让渡给成功完成扩容操作的线程,因为此时扩容操作还未真正完成,该线程需要尝试获取锁以继续用扩容后的数组替换当前底层数组。继续回到 PriorityBlockingQueue#offer 方法,如果扩容操作完成或者本次入队列操作无需触发扩容,则接下去线程会将待添加的元素按照最小堆的约束插入到底层数据的合适位置。此时需要区分两种情况,如果在构造 PriorityBlockingQueue 对象时指定了比较器 Comparator,则会调用 PriorityBlockingQueue#siftUpUsingComparator 方法基于该比较器执行最小堆插入操作,否则调用 PriorityBlockingQueue#siftUpComparable 方法按照元素的自然顺序将当前元素插入到最小堆中。基于数组实现的堆结构,在操作上是比较简单的,读者可以自行参考源码,本文不对最小堆 siftUp*siftDown* 操作展开分析。前面几篇介绍的队列都满足 FIFO 的特性,在执行出队列时返回的都是在队列中存活时间最长的元素。对于 PriorityBlockingQueue 香港云主机而言,结点的顺序则按照优先级进行编排,所以这里获取元素的操作返回的是队列中优先级最高的结点。针对获取元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#pollPriorityBlockingQueue#peekPriorityBlockingQueue#take 方法。其中 PriorityBlockingQueue#peek 方法仅获取最小堆堆顶结点元素值,而不移除该结点,实现上比较简单。方法 PriorityBlockingQueue#take 相对于 PriorityBlockingQueue#poll 的区别在于,当队列为空时该方法会无限期阻塞,直到有其它线程往队列中插入新的元素,或者该线程被中断。实现层面,二者大同小异,所以下面以 PriorityBlockingQueue#poll 方法为例展开分析从 PriorityBlockingQueue 中获取元素操作的具体实现。PriorityBlockingQueue 针对 PriorityBlockingQueue#poll 方法定义了两个版本,区别在于当队列为空时是立即返回还是阻塞等待一段时间,而在实现思路上是一致的。这里以不带超时参数的版本为例展开分析,实现如下:对于优先级队列而言,出队列操作获取到的是队列中优先级最高的元素,因为底层依赖于最小堆实现,所以只需要移除最小堆堆顶结点,并返回结点元素即可。但是因为这样破坏了堆的结构,所以需要调用 shiftDown* 方法从上往下进行调整,以再次满足最小堆结构的约束。针对移除元素的操作,PriorityBlockingQueue 实现了 PriorityBlockingQueue#remove 方法,并提供了有参和无参的版本,其中无参版本实际上是委托给 PriorityBlockingQueue#poll 方法执行的。下面来分析一下有参版本的实现,如下:如果待删除的元素是优先级最低的元素,则只需要将底层数组末尾结点置为 null 即可,否则,对于其它优先级的元素来说,在执行删除之后需要调整堆结构以满足最小堆定义。方法 PriorityBlockingQueue#contains 接收一个参数,用于判断队列中是否包含值等于参数的结点。方法 PriorityBlockingQueue#size 用于返回当前队列中包含的结点个数,因为 PriorityBlockingQueue 已经定义了 PriorityBlockingQueue#size 字段,用于对队列中的结点进行计数,所以该方法只需要返回字段值即可。“JUC的PriorityBlockingQueue如何使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注开发云网站,小编将为大家输出更多高质量的实用文章!

相关推荐: Hbase备份与恢复的方法是什么

这篇文章主要介绍“Hbase备份与恢复的方法是什么”,在日常操作中,相信很多人在Hbase备份与恢复的方法是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hbase备份与恢复的方法是什么”的疑惑有所帮助!接下来,请跟 香港云…

免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。

Like (0)
Donate 微信扫一扫 微信扫一扫
Previous 10/06 10:46
Next 10/06 10:47

相关推荐