BlockingQueue详解--一文让你彻底明白阻塞队列

书欣 Java经验 发布时间:2023-01-28 21:37:51 阅读数:8287 1
下文笔者讲述BlockingQueue的相关说明

BlockingQueue简介

 BlockingQueue是一个接口,它位于java.util.concurrent包下面
 BlockingQueue用于处理并发生产者和消费者的问题
 BlockingQueue是线程安全的类
    无论何时只有一个线程对其进行take或put操作
BlockingQueue还提供超时机制

Queue队列类型

一、无限队列(unbounded queue):可无限增长
二、有限队列(bounded queue):定义最大容量

队列数据结构

队列实质是一种存储数据的结构
  可使用链表或数组实现
队列具备FIFO先进先出的特性,当然也有双端队列(Deque)优先级队列
队列主要操作分为:入队(EnQueue)与出队(Dequeue)
blockingQueue

常见的4种阻塞队列

ArrayBlockingQueue:
    由数组支持的有界队列

LinkedBlockingQueue:
    由链接节点支持的可选有界队列

PriorityBlockingQueue:
    由优先级堆支持的无界优先级队列

DelayQueue:
    由优先级堆支持的、基于时间的调度队列

ArrayBlockingQueue

队列基于数组实现
  容量大小在创建ArrayBlockingQueue对象时已定义好数据结构
ArrayBlockingQueue

队列创建

 BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
 

应用场景

在线程池中有比较多的应用
  生产者消费者场景
队列工作原理
  基于ReentrantLock保证线程安全
  根据Condition实现队列满时的阻塞

LinkedBlockingQueue

是一个基于链表的无界队列(理论上有界)
//创建队列
  BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
  blockingQueue 的容量将设置为 Integer.MAX_VALUE 。
  向无限队列添加元素的所有操作都将永远不会阻塞[注意这里不是说不会加锁保证线程安全]因此它可以增长到非常大的容量。
  
   无限队列需注意,消费者应该像添加消息一样快速消费消息,否则内存会撑爆,出现OutOfMemory 

DelayQueue

由优先级堆支持的、
基于时间的调度队列
  内部基于无界队列PriorityQueue实现
而无界队列基于数组的扩容实现。

 队列创建:
  BlockingQueue<String> blockingQueue = new DelayQueue();      
 入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口

 应用场景:
  电影票

 工作原理:
   队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

BlockingQueue API

BlockingQueue 接口的所有方法可以分为两大类
   负责向队列添加元素的方法和检索这些元素的方法
   在队列满/空的情况下
  来自这两个组的每个方法的行为都不同。
添加元素
方法 备注
add() 如果插入成功则返回 true
否则抛出 IllegalStateException 异常
put() 将指定的元素插入队列
当队列满了
那么会阻塞直到有空间插入
offer() 如果插入成功则返回 true
否则返回false
offer(E e, long timeout, TimeUnit unit) 尝试将元素插入队列
当队列已满
则会阻塞直到有空间插入
检索元素
方法 备注
take() 获取队列的头部元素并将其删除
当队列为空
则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit) 检索并删除队列的头部
如有必要
等待指定的等待时间以使元素可用
如果超时,则返回 null

多线程生产者-消费者示例

 
生产者将生成一个 0 到 100 的随机数,并将该数字放在 BlockingQueue中
将创建 16 个线程用于生成随机数并使用 put() 方法阻塞,直到队列中有可用空间。
从生产者向消费者发出信号的好方法
生产者代码
@Slf4j
public class NumbersProducer implements Runnable {
private final int poisonPill;
private final int poisonPillPerProducer;
private BlockingQueue<Integer> numbersQueue;
public NumbersProducer(BlockingQueue<Integer> numbersQueue,
int poisonPill,
int poisonPillPerProducer) {
this.numbersQueue = numbersQueue;
this.poisonPill = poisonPill;
this.poisonPillPerProducer = poisonPillPerProducer;
  }
@Override
  public void run() {
try {
      generateNumbers ();
    } catch (InterruptedException e) {
Thread.currentThread ().interrupt ();
    }
  }
private void generateNumbers() throws InterruptedException {
for (int i = 0; i < 100; i++) {
numbersQueue.put (ThreadLocalRandom.current ().nextInt (100));
      
    }
for (int j = 0; j < poisonPillPerProducer; j++) {
numbersQueue.put (poisonPill);
    
    }
  }
}

消费者代码

@Slf4j
public class NumbersConsumer implements Runnable {
private final int poisonPill;
private BlockingQueue<Integer> queue;
public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
this.queue = queue;
this.poisonPill = poisonPill;
  }
@Override
  public void run() {
try {
while (true) {
Integer number = queue.take ();
if (number.equals (poisonPill)) {
return;
        } 
      }
    } catch (InterruptedException e) {
Thread.currentThread ().interrupt ();
    }
  }
}
例:创建4个生产者,N个消费者
public class Main {
public static void main(String[] args) {
int BOUND = 10;
int N_PRODUCERS = 16;
int N_CONSUMERS = Runtime.getRuntime ().availableProcessors ();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
int mod = N_CONSUMERS % N_PRODUCERS;
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<> (BOUND);
    
    for (int i = 1; i < N_PRODUCERS; i++) {
       new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer)).start ();
    } 

    for (int j = 0; j < N_CONSUMERS; j++) {
      new Thread (new NumbersConsumer (queue, poisonPill)).start ();
    }
   new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer + mod)).start ();
  }
}
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/JavaJingYan/202301/16749164525561.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者