JavaGuide/docs/java/collection/delayqueue-source-code.md

360 lines
17 KiB
Markdown
Raw Normal View History

---
title: DelayQueue 源码分析
category: Java
tag:
- Java集合
---
2023-06-30 19:46:59 +08:00
## DelayQueue 简介
2023-06-30 19:46:59 +08:00
`DelayQueue` 是 JUC 包(`java.util.concurrent)`为我们提供的延迟队列,用于实现延时任务比如订单下单 15 分钟未支付直接取消。它是 `BlockingQueue` 的一种,底层是一个基于 `PriorityQueue` 实现的一个无界队列,是线程安全的。关于`PriorityQueue`可以参考笔者编写的这篇文章:[PriorityQueue 源码分析](./priorityqueue-source-code.md) 。
2023-06-30 19:46:59 +08:00
![BlockingQueue 的实现类](https://oss.javaguide.cn/github/javaguide/java/collection/blocking-queue-hierarchy.png)
2023-06-30 19:46:59 +08:00
`DelayQueue` 中存放的元素必须实现 `Delayed` 接口,并且需要重写 `getDelay()`方法(计算是否到期)。
2023-06-30 19:46:59 +08:00
```java
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
```
2023-06-30 19:46:59 +08:00
2023-10-08 16:33:50 +08:00
默认情况下, `DelayQueue` 会按照到期时间升序编排任务。只有当元素过期时(`getDelay()`方法返回值小于等于 0才能从队列中取出。
2023-06-30 19:46:59 +08:00
## DelayQueue 发展史
2023-06-30 19:46:59 +08:00
- `DelayQueue` 最早是在 Java 5 中引入的,作为 `java.util.concurrent` 包中的一部分,用于支持基于时间的任务调度和缓存过期删除等场景,该版本仅仅支持延迟功能的实现,还未解决线程安全问题。
- 在 Java 6 中,`DelayQueue` 的实现进行了优化,通过使用 `ReentrantLock``Condition` 解决线程安全及线程间交互的效率,提高了其性能和可靠性。
- 在 Java 7 中,`DelayQueue` 的实现进行了进一步的优化,通过使用 CAS 操作实现元素的添加和移除操作,提高了其并发操作性能。
- 在 Java 8 中,`DelayQueue` 的实现没有进行重大变化,但是在 `java.time` 包中引入了新的时间类,如 `Duration``Instant`,使得使用 `DelayQueue` 进行基于时间的调度更加方便和灵活。
- 在 Java 9 中,`DelayQueue` 的实现进行了一些微小的改进,主要是对代码进行了一些优化和精简。
2023-06-30 19:46:59 +08:00
总的来说,`DelayQueue` 的发展史主要是通过优化其实现方式和提高其性能和可靠性,使其更加适用于基于时间的调度和缓存过期删除等场景。
2023-06-30 19:46:59 +08:00
## DelayQueue 常见使用场景示例
2023-06-30 19:46:59 +08:00
我们这里希望任务可以按照我们预期的时间执行,例如提交 3 个任务,分别要求 1s、2s、3s 后执行即使是乱序添加1s 后要求 1s 执行的任务会准时执行。
2023-06-30 19:46:59 +08:00
![延迟任务](https://oss.javaguide.cn/github/javaguide/java/collection/delayed-task.png)
2023-06-30 19:46:59 +08:00
对此我们可以使用 `DelayQueue` 来实现,所以我们首先需要继承 `Delayed` 实现 `DelayedTask`,实现 `getDelay` 方法以及优先级比较 `compareTo`
2023-06-30 19:46:59 +08:00
```java
2023-06-30 19:46:59 +08:00
/**
* 延迟任务
*/
public class DelayedTask implements Delayed {
/**
* 任务到期时间
*/
private long executeTime;
/**
* 任务
*/
private Runnable task;
public DelayedTask(long delay, Runnable task) {
this.executeTime = System.currentTimeMillis() + delay;
this.task = task;
}
/**
* 查看当前任务还有多久到期
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 延迟队列需要到期时间升序入队所以我们需要实现compareTo进行到期时间比较
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((DelayedTask) o).executeTime);
}
public void execute() {
task.run();
}
}
```
完成任务的封装之后,使用就很简单了,设置好多久到期然后将任务提交到延迟队列中即可。
```java
// 创建延迟队列,并添加任务
DelayQueue < DelayedTask > delayQueue = new DelayQueue < > ();
//分别添加1s、2s、3s到期的任务
delayQueue.add(new DelayedTask(2000, () -> System.out.println("Task 2")));
delayQueue.add(new DelayedTask(1000, () -> System.out.println("Task 1")));
delayQueue.add(new DelayedTask(3000, () -> System.out.println("Task 3")));
// 取出任务并执行
while (!delayQueue.isEmpty()) {
//阻塞获取最先到期的任务
DelayedTask task = delayQueue.take();
if (task != null) {
task.execute();
}
}
2023-06-30 19:46:59 +08:00
```
从输出结果可以看出,即使笔者先提到 2s 到期的任务1s 到期的任务 Task1 还是优先执行的。
2023-06-30 19:46:59 +08:00
```java
2023-06-30 19:46:59 +08:00
Task 1
Task 2
Task 3
```
## DelayQueue 源码解析
2023-06-30 19:46:59 +08:00
2023-09-01 11:31:03 +08:00
这里以 JDK1.8 为例,分析一下 `DelayQueue` 的底层核心源码。
2023-06-30 19:46:59 +08:00
`DelayQueue` 的类定义如下:
2023-06-30 19:46:59 +08:00
```java
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
{
//...
2023-06-30 19:46:59 +08:00
}
```
2023-09-01 11:31:03 +08:00
`DelayQueue` 继承了 `AbstractQueue` 类,实现了 `BlockingQueue` 接口。
2023-06-30 19:46:59 +08:00
![DelayQueue类图](https://oss.javaguide.cn/github/javaguide/java/collection/delayqueue-class-diagram.png)
2023-06-30 19:46:59 +08:00
### 核心成员变量
`DelayQueue` 的 4 个核心成员变量如下:
2023-06-30 19:46:59 +08:00
```java
//可重入锁,实现线程安全的关键
private final transient ReentrantLock lock = new ReentrantLock();
//延迟队列底层存储数据的集合,确保元素按照到期时间升序排列
private final PriorityQueue<E> q = new PriorityQueue<E>();
2023-06-30 19:46:59 +08:00
//指向准备执行优先级最高的线程
private Thread leader = null;
//实现多线程之间等待唤醒的交互
private final Condition available = lock.newCondition();
2023-06-30 19:46:59 +08:00
```
- `lock` : 我们都知道 `DelayQueue` 存取是线程安全的,所以为了保证存取元素时线程安全,我们就需要在存取时上锁,而 `DelayQueue` 就是基于 `ReentrantLock` 独占锁确保存取操作的线程安全。
- `q` : 延迟队列要求元素按照到期时间进行升序排列,所以元素添加时势必需要进行优先级排序,所以 `DelayQueue` 底层元素的存取都是通过这个优先队列 `PriorityQueue` 的成员变量 `q` 来管理的。
- `leader` : 延迟队列的任务只有到期之后才会执行,对于没有到期的任务只有等待,为了确保优先级最高的任务到期后可以即刻被执行,设计者就用 `leader` 来管理延迟任务,只有 `leader` 所指向的线程才具备定时等待任务到期执行的权限,而其他那些优先级低的任务只能无限期等待,直到 `leader` 线程执行完手头的延迟任务后唤醒它。
- `available` : 上文讲述 `leader` 线程时提到的等待唤醒操作的交互就是通过 `available` 实现的,假如线程 1 尝试在空的 `DelayQueue` 获取任务时,`available` 就会将其放入等待队列中。直到有一个线程添加一个延迟任务后通过 `available``signal` 方法将其唤醒。
2023-06-30 19:46:59 +08:00
### 构造方法
2023-06-30 19:46:59 +08:00
相较于其他的并发容器,延迟队列的构造方法比较简单,它只有两个构造方法,因为所有成员变量在类加载时都已经初始完成了,所以默认构造方法什么也没做。还有一个传入 `Collection` 对象的构造方法,它会将调用 `addAll()`方法将集合元素存到优先队列 `q` 中。
2023-06-30 19:46:59 +08:00
```java
public DelayQueue() {}
2023-06-30 19:46:59 +08:00
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
2023-06-30 19:46:59 +08:00
```
### 添加元素
`DelayQueue` 添加元素的方法无论是 `add`、`put` 还是 `offer`,本质上就是调用一下 `offer` ,所以了解延迟队列的添加逻辑我们只需阅读 offer 方法即可。
2023-06-30 19:46:59 +08:00
`offer` 方法的整体逻辑为:
2023-06-30 19:46:59 +08:00
1. 尝试获取 `lock`
2. 如果上锁成功,则调 `q``offer` 方法将元素存放到优先队列中。
3. 调用 `peek` 方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素),于是将 `leader` 设置为空,通知因为队列为空时调用 `take` 等方法导致阻塞的线程来争抢元素。
4. 上述步骤执行完成,释放 `lock`
5. 返回 true。
2023-06-30 19:46:59 +08:00
源码如下,笔者已详细注释,读者可自行参阅:
```java
2023-06-30 19:46:59 +08:00
public boolean offer(E e) {
//尝试获取lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
2023-06-30 19:46:59 +08:00
//如果上锁成功,则调q的offer方法将元素存放到优先队列中
q.offer(e);
//调用peek方法看看当前队首元素是否就是本次入队的元素,如果是则说明当前这个元素是即将到期的任务(即优先级最高的元素)
if (q.peek() == e) {
2023-06-30 19:46:59 +08:00
//将leader设置为空,通知调用取元素方法而阻塞的线程来争抢这个任务
leader = null;
available.signal();
2023-06-30 19:46:59 +08:00
}
return true;
} finally {
//上述步骤执行完成释放lock
lock.unlock();
2023-06-30 19:46:59 +08:00
}
}
2023-06-30 19:46:59 +08:00
```
### 获取元素
`DelayQueue` 中获取元素的方式分为阻塞式和非阻塞式,先来看看逻辑比较复杂的阻塞式获取元素方法 `take`,为了让读者可以更直观的了解阻塞式获取元素的全流程,笔者将以 3 个线程并发获取元素为例讲述 `take` 的工作流程。
2023-06-30 19:46:59 +08:00
> 想要理解下面的内容,需要用到 AQS 相关的知识,推荐阅读下面这两篇文章:
>
> - [图文讲解 AQS ,一起看看 AQS 的源码……(图文较长)](https://xie.infoq.cn/article/5a3cc0b709012d40cb9f41986)
> - [AQS 都看完了Condition 原理可不能少!](https://xie.infoq.cn/article/0223d5e5f19726b36b084b10d)
2023-06-30 19:46:59 +08:00
1、首先 3 个线程会尝试获取可重入锁 `lock`,假设我们现在有 3 个线程分别是 t1、t2、t3,随后 t1 得到了锁,而 t2、t3 没有抢到锁,故将这两个线程存入等待队列中。
2023-06-30 19:46:59 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/collection/delayqueue-take-0.png)
2023-06-30 19:46:59 +08:00
2、紧接着 t1 开始进行元素获取的逻辑。
2023-06-30 19:46:59 +08:00
3、线程 t1 首先会查看 `DelayQueue` 队列首元素是否为空。
2023-06-30 19:46:59 +08:00
4、如果元素为空则说明当前队列没有任何元素故 t1 就会被阻塞存到 `conditionWaiter` 这个队列中。
2023-06-30 19:46:59 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/collection/delayqueue-take-1.png)
2023-06-30 19:46:59 +08:00
注意,调用 `await` 之后 t1 就会释放 `lcok` 锁,假如 `DelayQueue` 持续为空,那么 t2、t3 也会像 t1 一样执行相同的逻辑并进入 `conditionWaiter` 队列中。
2023-06-30 19:46:59 +08:00
![](https://oss.javaguide.cn/github/javaguide/java/collection/delayqueue-take-2.png)
2023-06-30 19:46:59 +08:00
如果元素不为空,则判断当前任务是否到期,如果元素到期,则直接返回出去。如果元素未到期,则判断当前 `leader` 线程(`DelayQueue` 中唯一一个可以等待并获取元素的线程引用)是否为空,若不为空,则说明当前 `leader` 正在等待执行一个优先级比当前元素还高的元素到期,故当前线程 t1 只能调用 `await` 进入无限期等待,等到 `leader` 取得元素后唤醒。反之,若 `leader` 线程为空,则将当前线程设置为 leader 并进入有限期等待,到期后取出元素并返回。
2023-06-30 19:46:59 +08:00
自此我们阻塞式获取元素的逻辑都已完成后,源码如下,读者可自行参阅:
```java
2023-06-30 19:46:59 +08:00
public E take() throws InterruptedException {
// 尝试获取可重入锁,将底层AQS的state设置为1,并设置为独占锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//查看队列第一个元素
E first = q.peek();
//若为空,则将当前线程放入ConditionObject的等待队列中并将底层AQS的state设置为0表示释放锁并进入无限期等待
if (first == null)
available.await();
else {
//若元素不为空,则查看当前元素多久到期
long delay = first.getDelay(NANOSECONDS);
//如果小于0则说明已到期直接返回出去
if (delay <= 0)
return q.poll();
//如果大于0则说明任务还没到期首先需要释放对这个元素的引用
first = null; // don't retain ref while waiting
//判断leader是否为空如果不为空则说明正有线程作为leader并等待一个任务到期则当前线程进入无限期等待
if (leader != null)
2023-06-30 19:46:59 +08:00
available.await();
else {
//反之将我们的线程成为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//并进入有限期等待
available.awaitNanos(delay);
} finally {
2023-06-30 19:46:59 +08:00
//等待任务到期时释放leader引用进入下一次循环将任务return出去
if (leader == thisThread)
leader = null;
2023-06-30 19:46:59 +08:00
}
}
}
}
} finally {
2023-12-30 17:14:13 +08:00
// 收尾逻辑:当leader为null并且队列中有任务时唤醒等待的获取元素的线程。
if (leader == null && q.peek() != null)
available.signal();
//释放锁
lock.unlock();
2023-06-30 19:46:59 +08:00
}
}
2023-06-30 19:46:59 +08:00
```
我们再来看看非阻塞的获取元素方法 `poll` ,逻辑比较简单,整体步骤如下:
2023-06-30 19:46:59 +08:00
1. 尝试获取可重入锁。
2. 查看队列第一个元素,判断元素是否为空。
3. 若元素为空,或者元素未到期,则直接返回空。
4. 若元素不为空且到期了,直接调用 `poll` 返回出去。
5. 释放可重入锁 `lock`
2023-06-30 19:46:59 +08:00
源码如下,读者可自行参阅源码及注释:
```java
2023-06-30 19:46:59 +08:00
public E poll() {
//尝试获取可重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
2023-06-30 19:46:59 +08:00
//查看队列第一个元素,判断元素是否为空
E first = q.peek();
2023-06-30 19:46:59 +08:00
//若元素为空,或者元素未到期,则直接返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
2023-06-30 19:46:59 +08:00
//若元素不为空且到期了直接调用poll返回出去
return q.poll();
} finally {
//释放可重入锁lock
lock.unlock();
2023-06-30 19:46:59 +08:00
}
}
2023-06-30 19:46:59 +08:00
```
### 查看元素
上文获取元素时都会调用到 `peek` 方法peek 顾名思义仅仅窥探一下队列中的元素,它的步骤就 4 步:
2023-06-30 19:46:59 +08:00
1. 上锁。
2. 调用优先队列 q 的 peek 方法查看索引 0 位置的元素。
2023-06-30 19:46:59 +08:00
3. 释放锁。
4. 将元素返回出去。
```java
2023-06-30 19:46:59 +08:00
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
2023-06-30 19:46:59 +08:00
}
}
2023-06-30 19:46:59 +08:00
```
## DelayQueue 常见面试题
2023-06-30 19:46:59 +08:00
### DelayQueue 的实现原理是什么?
`DelayQueue` 底层是使用优先队列 `PriorityQueue` 来存储元素,而 `PriorityQueue` 采用二叉小顶堆的思想确保值小的元素排在最前面,这就使得 `DelayQueue` 对于延迟任务优先级的管理就变得十分方便了。同时 `DelayQueue` 为了保证线程安全还用到了可重入锁 `ReentrantLock`,确保单位时间内只有一个线程可以操作延迟队列。最后,为了实现多线程之间等待和唤醒的交互效率,`DelayQueue` 还用到了 `Condition`,通过 `Condition``await``signal` 方法完成多线程之间的等待唤醒。
2023-06-30 19:46:59 +08:00
### DelayQueue 的实现是否线程安全?
2023-06-30 19:46:59 +08:00
`DelayQueue` 的实现是线程安全的,它通过 `ReentrantLock` 实现了互斥访问和 `Condition` 实现了线程间的等待和唤醒操作,可以保证多线程环境下的安全性和可靠性。
2023-06-30 19:46:59 +08:00
### DelayQueue 的使用场景有哪些?
`DelayQueue` 通常用于实现定时任务调度和缓存过期删除等场景。在定时任务调度中,需要将需要执行的任务封装成延迟任务对象,并将其添加到 `DelayQueue` 中,`DelayQueue` 会自动按照剩余延迟时间进行升序排序(默认情况),以保证任务能够按照时间先后顺序执行。对于缓存过期这个场景而言,在数据被缓存到内存之后,我们可以将缓存的 key 封装成一个延迟的删除任务,并将其添加到 `DelayQueue` 中,当数据过期时,拿到这个任务的 key将这个 key 从内存中移除。
2023-06-30 19:46:59 +08:00
### DelayQueue 中 Delayed 接口的作用是什么?
`Delayed` 接口定义了元素的剩余延迟时间(`getDelay`)和元素之间的比较规则(该接口继承了 `Comparable` 接口)。若希望元素能够存放到 `DelayQueue` 中,就必须实现 `Delayed` 接口的 `getDelay()` 方法和 `compareTo()` 方法,否则 `DelayQueue` 无法得知当前任务剩余时长和任务优先级的比较。
2023-06-30 19:46:59 +08:00
### DelayQueue 和 Timer/TimerTask 的区别是什么?
`DelayQueue``Timer/TimerTask` 都可以用于实现定时任务调度,但是它们的实现方式不同。`DelayQueue` 是基于优先级队列和堆排序算法实现的,可以实现多个任务按照时间先后顺序执行;而 `Timer/TimerTask` 是基于单线程实现的,只能按照任务的执行顺序依次执行,如果某个任务执行时间过长,会影响其他任务的执行。另外,`DelayQueue` 还支持动态添加和移除任务,而 `Timer/TimerTask` 只能在创建时指定任务。
2023-06-30 19:46:59 +08:00
## 参考文献
2023-10-08 16:33:50 +08:00
- 《深入理解高并发编程JDK 核心技术》:
- 一口气说出 Java 6 种延时队列的实现方法(面试官也得服):<https://www.jb51.net/article/186192.htm>
- 图解 DelayQueue 源码java 8——延时队列的小九九: <https://blog.csdn.net/every__day/article/details/113810985>
2023-09-01 11:31:03 +08:00
<!-- @include: @article-footer.snippet.md -->