资讯专栏INFORMATION COLUMN

ArrayBlockingQueue与LinkedBlockingQueue

jackwang / 940人阅读

摘要:序本文主要简单介绍下与。有界无界有界,适合已知最大存储容量的场景可有界可以无界吞吐量在大多数并发的场景下吞吐量比,但是性能不稳定。测试结果表明,的可伸缩性要高于。

本文主要简单介绍下ArrayBlockingQueue与LinkedBlockingQueue。

对比
queue 阻塞与否 是否有界 线程安全保障 适用场景 注意事项
ArrayBlockingQueue 阻塞 有界 一把全局锁 生产消费模型,平衡两边处理速度 用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)
LinkedBlockingQueue 阻塞 可配置 存取采用2把锁 生产消费模型,平衡两边处理速度 无界的时候注意内存溢出问题,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
ConcurrentLinkedQueue 非阻塞 无界 CAS 对全局的集合进行操作的场景 size() 是要遍历一遍集合,慎用
内存方面

ArrayBlockingQueue
用于存储队列元素的存储空间是预先分配的,使用过程中内存开销较小(无须动态申请存储空间)

LinkedBlockingQueue
用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。

有界无界

ArrayBlockingQueue
有界,适合已知最大存储容量的场景

LinkedBlockingQueue
可有界可以无界

吞吐量

LinkedBlockingQueue在大多数并发的场景下吞吐量比ArrayBlockingQueue,但是性能不稳定。

</>复制代码

  1. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.

测试结果表明,LinkedBlockingQueue的可伸缩性要高于ArrayBlockingQueue。初看起来,这个结果有些奇怪:链表队列在每次插入元素时,都必须分配一个链表节点对象,这似乎比基于数组的队列执行了更多的工作。然而,虽然它拥有更好的内存分配与GC等开销,但与基于数组的队列相比,链表队列的put和take等方法支持并发性更高的访问,因为一些优化后的链接队列算法能将队列头节点的更新操作与尾节点的更新操作分离开来。由于内存分配操作通常是线程本地的,因此如果算法能通过多执行一些内存分配操作来降低竞争程度,那么这种算法通常具有更高的可伸缩性。

并发方面

ArrayBlockingQueue
采用一把锁,两个condition

</>复制代码

  1. /** Main lock guarding all access */
  2. final ReentrantLock lock;
  3. /** Condition for waiting takes */
  4. private final Condition notEmpty;
  5. /** Condition for waiting puts */
  6. private final Condition notFull;
  7. /**
  8. * Inserts element at current put position, advances, and signals.
  9. * Call only when holding lock.
  10. */
  11. private void enqueue(E x) {
  12. // assert lock.getHoldCount() == 1;
  13. // assert items[putIndex] == null;
  14. final Object[] items = this.items;
  15. items[putIndex] = x;
  16. if (++putIndex == items.length)
  17. putIndex = 0;
  18. count++;
  19. notEmpty.signal();
  20. }
  21. /**
  22. * Extracts element at current take position, advances, and signals.
  23. * Call only when holding lock.
  24. */
  25. private E dequeue() {
  26. // assert lock.getHoldCount() == 1;
  27. // assert items[takeIndex] != null;
  28. final Object[] items = this.items;
  29. @SuppressWarnings("unchecked")
  30. E x = (E) items[takeIndex];
  31. items[takeIndex] = null;
  32. if (++takeIndex == items.length)
  33. takeIndex = 0;
  34. count--;
  35. if (itrs != null)
  36. itrs.elementDequeued();
  37. notFull.signal();
  38. return x;
  39. }

此外还支持公平锁

</>复制代码

  1. /**
  2. * Creates an {@code ArrayBlockingQueue} with the given (fixed)
  3. * capacity and the specified access policy.
  4. *
  5. * @param capacity the capacity of this queue
  6. * @param fair if {@code true} then queue accesses for threads blocked
  7. * on insertion or removal, are processed in FIFO order;
  8. * if {@code false} the access order is unspecified.
  9. * @throws IllegalArgumentException if {@code capacity < 1}
  10. */
  11. public ArrayBlockingQueue(int capacity, boolean fair) {
  12. if (capacity <= 0)
  13. throw new IllegalArgumentException();
  14. this.items = new Object[capacity];
  15. lock = new ReentrantLock(fair);
  16. notEmpty = lock.newCondition();
  17. notFull = lock.newCondition();
  18. }

LinkedBlockingQueue
头尾各1把锁

</>复制代码

  1. /** Lock held by take, poll, etc */
  2. private final ReentrantLock takeLock = new ReentrantLock();
  3. /** Wait queue for waiting takes */
  4. private final Condition notEmpty = takeLock.newCondition();
  5. /** Lock held by put, offer, etc */
  6. private final ReentrantLock putLock = new ReentrantLock();
  7. /** Wait queue for waiting puts */
  8. private final Condition notFull = putLock.newCondition();
  9. /**
  10. * Inserts the specified element at the tail of this queue if it is
  11. * possible to do so immediately without exceeding the queue"s capacity,
  12. * returning {@code true} upon success and {@code false} if this queue
  13. * is full.
  14. * When using a capacity-restricted queue, this method is generally
  15. * preferable to method {@link BlockingQueue#add add}, which can fail to
  16. * insert an element only by throwing an exception.
  17. *
  18. * @throws NullPointerException if the specified element is null
  19. */
  20. public boolean offer(E e) {
  21. if (e == null) throw new NullPointerException();
  22. final AtomicInteger count = this.count;
  23. if (count.get() == capacity)
  24. return false;
  25. int c = -1;
  26. Node node = new Node(e);
  27. final ReentrantLock putLock = this.putLock;
  28. putLock.lock();
  29. try {
  30. if (count.get() < capacity) {
  31. enqueue(node);
  32. c = count.getAndIncrement();
  33. if (c + 1 < capacity)
  34. notFull.signal();
  35. }
  36. } finally {
  37. putLock.unlock();
  38. }
  39. if (c == 0)
  40. signalNotEmpty();
  41. return c >= 0;
  42. }
  43. public E poll(long timeout, TimeUnit unit) throws InterruptedException {
  44. E x = null;
  45. int c = -1;
  46. long nanos = unit.toNanos(timeout);
  47. final AtomicInteger count = this.count;
  48. final ReentrantLock takeLock = this.takeLock;
  49. takeLock.lockInterruptibly();
  50. try {
  51. while (count.get() == 0) {
  52. if (nanos <= 0)
  53. return null;
  54. nanos = notEmpty.awaitNanos(nanos);
  55. }
  56. x = dequeue();
  57. c = count.getAndDecrement();
  58. if (c > 1)
  59. notEmpty.signal();
  60. } finally {
  61. takeLock.unlock();
  62. }
  63. if (c == capacity)
  64. signalNotFull();
  65. return x;
  66. }
应用实例 Executors

里头用了LinkedBlockingQueue

</>复制代码

  1. public static ExecutorService newFixedThreadPool(int nThreads) {
  2. return new ThreadPoolExecutor(nThreads, nThreads,
  3. 0L, TimeUnit.MILLISECONDS,
  4. new LinkedBlockingQueue());
  5. }
  6. public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
  7. return new ThreadPoolExecutor(nThreads, nThreads,
  8. 0L, TimeUnit.MILLISECONDS,
  9. new LinkedBlockingQueue(),
  10. threadFactory);
  11. }
  12. public static ExecutorService newSingleThreadExecutor() {
  13. return new FinalizableDelegatedExecutorService
  14. (new ThreadPoolExecutor(1, 1,
  15. 0L, TimeUnit.MILLISECONDS,
  16. new LinkedBlockingQueue()));
  17. }
  18. public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
  19. return new FinalizableDelegatedExecutorService
  20. (new ThreadPoolExecutor(1, 1,
  21. 0L, TimeUnit.MILLISECONDS,
  22. new LinkedBlockingQueue(),
  23. threadFactory));
  24. }

使用LinkedBlockingQueue实现logger

</>复制代码

  1. public class BungeeLogger extends Logger {
  2. private final ColouredWriter writer;
  3. private final Formatter formatter = new ConciseFormatter();
  4. // private final LogDispatcher dispatcher = new LogDispatcher(this);
  5. private final BlockingQueue queue = new LinkedBlockingQueue<>();
  6. volatile boolean running = true;
  7. Thread recvThread = new Thread(){
  8. @Override
  9. public void run() {
  10. while (!isInterrupted() && running) {
  11. LogRecord record;
  12. try {
  13. record = queue.take();
  14. } catch (InterruptedException ex) {
  15. continue;
  16. }
  17. doLog(record);
  18. }
  19. for (LogRecord record : queue) {
  20. doLog(record);
  21. }
  22. }
  23. };
  24. public BungeeLogger() throws IOException {
  25. super("BungeeCord", null);
  26. this.writer = new ColouredWriter(new ConsoleReader());
  27. try {
  28. FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true);
  29. handler.setFormatter(formatter);
  30. addHandler(handler);
  31. } catch (IOException ex) {
  32. System.err.println("Could not register logger!");
  33. ex.printStackTrace();
  34. }
  35. recvThread.start();
  36. Runtime.getRuntime().addShutdownHook(new Thread(){
  37. @Override
  38. public void run() {
  39. running = false;
  40. }
  41. });
  42. }
  43. @Override
  44. public void log(LogRecord record) {
  45. if (running) {
  46. queue.add(record);
  47. }
  48. }
  49. void doLog(LogRecord record) {
  50. super.log(record);
  51. writer.print(formatter.format(record));
  52. }
  53. }
doc

BungeeCord

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70386.html

相关文章

  • 通俗易懂,JDK 并发容器总结

    摘要:线程安全的线程安全的,在读多写少的场合性能非常好,远远好于高效的并发队列,使用链表实现。这样带来的好处是在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 评论0 收藏0
  • (十五)java多线程之并发集合ArrayBlockingQueue

    摘要:本人邮箱欢迎转载转载请注明网址代码已经全部托管有需要的同学自行下载引言做的同学们或多或少的接触过集合框架在集合框架中大多的集合类是线程不安全的比如我们常用的等等我们写一个例子看为什么说是不安全的例子证明是线程不安全的我们开启个线程每个线程向 本人邮箱: 欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kcogithub: https://github...

    stefan 评论0 收藏0
  • Java多线程进阶(三三)—— J.U.C之collections框架:LinkedBlocking

    摘要:在章节中,我们说过,维护了一把全局锁,无论是出队还是入队,都共用这把锁,这就导致任一时间点只有一个线程能够执行。入队锁对应的是条件队列,出队锁对应的是条件队列,所以每入队一个元素,应当立即去唤醒可能阻塞的其它入队线程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首发于一世流云专栏:https://seg...

    W_BinaryTree 评论0 收藏0
  • BlockingQueue学习

    摘要:引言在包中,很好的解决了在多线程中,如何高效安全传输数据的问题。同时,也用于自带线程池的缓冲队列中,了解也有助于理解线程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全传输数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时,BlockingQueue也用于...

    xuhong 评论0 收藏0
  • 使用 Executors,ThreadPoolExecutor,创建线程池,源码分析理解

    摘要:源码分析创建可缓冲的线程池。源码分析使用创建线程池源码分析的构造函数构造函数参数核心线程数大小,当线程数,会创建线程执行最大线程数,当线程数的时候,会把放入中保持存活时间,当线程数大于的空闲线程能保持的最大时间。 之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 评论0 收藏0

发表评论

0条评论

jackwang

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<