BlockingQueue详解--一文让你彻底明白阻塞队列
下文笔者讲述BlockingQueue的相关说明
检索元素
BlockingQueue简介
BlockingQueue是一个接口,它位于java.util.concurrent包下面 BlockingQueue用于处理并发生产者和消费者的问题 BlockingQueue是线程安全的类 无论何时只有一个线程对其进行take或put操作 BlockingQueue还提供超时机制
Queue队列类型
一、无限队列(unbounded queue):可无限增长 二、有限队列(bounded queue):定义最大容量
队列数据结构
队列实质是一种存储数据的结构 可使用链表或数组实现 队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列 队列主要操作分为:入队(EnQueue)与出队(Dequeue)
常见的4种阻塞队列
ArrayBlockingQueue: 由数组支持的有界队列 LinkedBlockingQueue: 由链接节点支持的可选有界队列 PriorityBlockingQueue: 由优先级堆支持的无界优先级队列 DelayQueue: 由优先级堆支持的、基于时间的调度队列
ArrayBlockingQueue
队列基于数组实现 容量大小在创建ArrayBlockingQueue对象时已定义好数据结构
队列创建
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
应用场景
在线程池中有比较多的应用 生产者消费者场景 队列工作原理 基于ReentrantLock保证线程安全 根据Condition实现队列满时的阻塞
LinkedBlockingQueue
是一个基于链表的无界队列(理论上有界)
//创建队列 BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(); blockingQueue 的容量将设置为 Integer.MAX_VALUE 。 向无限队列添加元素的所有操作都将永远不会阻塞[注意这里不是说不会加锁保证线程安全]因此它可以增长到非常大的容量。 无限队列需注意,消费者应该像添加消息一样快速消费消息,否则内存会撑爆,出现OutOfMemory
DelayQueue
由优先级堆支持的、 基于时间的调度队列 内部基于无界队列PriorityQueue实现 而无界队列基于数组的扩容实现。 队列创建: BlockingQueue<String> blockingQueue = new DelayQueue(); 入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口 应用场景: 电影票 工作原理: 队列内部会根据时间优先级进行排序。延迟类线程池周期执行。
BlockingQueue API
BlockingQueue 接口的所有方法可以分为两大类 负责向队列添加元素的方法和检索这些元素的方法 在队列满/空的情况下 来自这两个组的每个方法的行为都不同。添加元素
方法 | 备注 |
add() | 如果插入成功则返回 true 否则抛出 IllegalStateException 异常 |
put() | 将指定的元素插入队列 当队列满了 那么会阻塞直到有空间插入 |
offer() | 如果插入成功则返回 true 否则返回false |
offer(E e, long timeout, TimeUnit unit) | 尝试将元素插入队列 当队列已满 则会阻塞直到有空间插入 |
方法 | 备注 |
take() | 获取队列的头部元素并将其删除 当队列为空 则阻塞并等待元素变为可用 |
poll(long timeout, TimeUnit unit) | 检索并删除队列的头部 如有必要 等待指定的等待时间以使元素可用 如果超时,则返回 null |
多线程生产者-消费者示例
生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue中 将创建 16 个线程用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。 从生产者向消费者发出信号的好方法生产者代码
@Slf4j public class NumbersProducer implements Runnable { private final int poisonPill; private final int poisonPillPerProducer; private BlockingQueue<Integer> numbersQueue; public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } @Override public void run() { try { generateNumbers (); } catch (InterruptedException e) { Thread.currentThread ().interrupt (); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put (ThreadLocalRandom.current ().nextInt (100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put (poisonPill); } } }
消费者代码
@Slf4j public class NumbersConsumer implements Runnable { private final int poisonPill; private BlockingQueue<Integer> queue; public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } @Override public void run() { try { while (true) { Integer number = queue.take (); if (number.equals (poisonPill)) { return; } } } catch (InterruptedException e) { Thread.currentThread ().interrupt (); } } }例:创建4个生产者,N个消费者
public class Main { public static void main(String[] args) { int BOUND = 10; int N_PRODUCERS = 16; int N_CONSUMERS = Runtime.getRuntime ().availableProcessors (); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue<Integer> queue = new LinkedBlockingQueue<> (BOUND); for (int i = 1; i < N_PRODUCERS; i++) { new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer)).start (); } for (int j = 0; j < N_CONSUMERS; j++) { new Thread (new NumbersConsumer (queue, poisonPill)).start (); } new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer + mod)).start (); } }
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。