debug ScheduledThreadPoolExecutor 与 f.cancel(false);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 2022-02-23 10:40 debug ScheduledThreadPoolExecutor 与 f.cancel(false);
public static void main(String[] args) throws Exception {
// 1、ScheduledThreadPoolExecutor 底层最终执行也是 ThreadPoolExecutor.addWorker
// 2、把任务封装为 ScheduledFutureTask 对象
// ===> ScheduledFutureTask 继承了 FutureTask
// ===> ScheduledFutureTask 成员变量 long time => 封装了 纳秒值 delayNano + System.nanoTime()
// ===> ScheduledFutureTask 成员变量 long period => 封装了 延迟时间的 循环纳秒值
// 3、ScheduledThreadPoolExecutor 内部默认使用 DelayedWorkQueue 队列来维护
// ===> 在 ThreadPoolExecutor.runWorker
// ===> ThreadPoolExecutor.getTask 获取任务的时候,进行 workQueue.take() 获取任务的时候阻塞住
// ===> 而 take 执行的是 DelayedWorkQueue.take 方法,如果时间未到会执行 available.awaitNanos(delay);
// ===> 等待是:AbstractQueuedSynchronizer.ConditionObject.awaitNanos
// ===> 最终执行:LockSupport.parkNanos 进行阻塞当前线程指定的时候
// 4、阻塞时间到了之后,会从 LockSupport.parkNanos 继续往下执行,跳出循环
// ===> 最终 DelayedWorkQueue.take 方法返回 ScheduledThreadPoolExecutor.DelayedWorkQueue.finishPoll,从而线程池 ThreadPoolExecutor.getTask 获取到任务
// ===> 进而执行 ScheduledFutureTask.run 默认执行 ScheduledFutureTask.super.run(); 也就是 FutureTask.run
// ===> FutureTask.run 做了两件事
// ===> 1、如果不是 NEW 状态,直接退出
// ===> 2、是 NEW 状态,执行 call() 方法获取返回值,并执行 FutureTask.set 设置状态为 COMPLETING 成功后,再通知 get() 阻塞队列
// 5、如果调用了 ScheduledThreadPoolExecutor.ScheduledFutureTask.cancel 方法,进行取消任务
// ===> 传入参数false 表示不中断正在执行。传入参数 true 表示如果正在执行,可能会抛出中断。
// ===> 也会调用到 FutureTask.cancel
// ===> 状态是 NEW 的话,直接修改状态为 INTERRUPTING 或 CANCELLED。
// ===> 状态不是 NEW 的话,传入参数为 true 会进行中断调用 Thread.interrupt,并修改状态为 INTERRUPTED
// ===> 这个 cancel 方法最终 finally 会执行 FutureTask.finishCompletion(通知 get() 阻塞队列)
// 6、周期队列:执行的是 ScheduledThreadPoolExecutor.ScheduledFutureTask.run
// ===> 从而执行 FutureTask.runAndReset,然后设置下一次定时任务
// ===> 然后往延时队列中扔任务
// ===> ensurePrestart 确保预启动:小于核心 添加任务,无核心 当做临时任务
// 7、DelayedWorkQueue 内部使用 leader 来保证一个在执行,其他的 follower,await 等待。
// ===> ScheduledThreadPoolExecutor.scheduleAtFixedRate:time += p; (1点整 2点整 ...)
// ===> ScheduledThreadPoolExecutor.scheduleWithFixedDelay:now() + delay (当前时间往后推 delay)
// ===> 具体参考 ScheduledThreadPoolExecutor.ScheduledFutureTask.setNextRunTime
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);
Runnable runnable1 = () -> {
log.info("+++++++++++++++++++++++++++++ runnable1");
};
ScheduledFuture<?> f = executor.schedule(runnable1, 1, TimeUnit.SECONDS);

log.info("executor.getQueue().size() : {}", executor.getQueue().size());// 1
log.info("f.isCancelled() : {}", f.isCancelled());// false
log.info("f.isDone() : {}", f.isDone());// false
boolean cancel = f.cancel(false);// true取消成功 false取消失败(传入参数false 表示不中断正在执行)
log.info("===============================");
log.info("executor.getQueue().size() : {}", executor.getQueue().size());// 1
log.info("f.isCancelled() : {}", f.isCancelled());// true
log.info("f.isDone() : {}", f.isDone());// true


Runnable runnable2 = () -> {
log.info("+++++++++++++++++++++++++++++ runnable2");
};
executor.scheduleWithFixedDelay(runnable2, 10L, 30L, TimeUnit.SECONDS);// 初次延迟1L,后续 3L执行一次。

}