该文章主要讲解普通线程池子类调度线程池的源码分析
定时线程池源码分析
简介
ScheduledThreadPoolExecutor
继承了ThreadPoolExecutor, 并且可以延迟执行某个任务或定期执行一个任务
类继承图如下
ScheduledExecutorService
作为ScheduledThreadPoolExecutor的父接口, 其定义了schedule
等方法来向线程池提交定时任务.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public interface ScheduledExecutorService extends ExecutorService {
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); }
|
scheduleAtFixedRate
和scheduleWithFixedDelay
方法都是提交的周期性任务. 但是两个方法唯一的区别是参考的计算时间点不同. 一个是上次任务执行开始时间, 一个是上次任务的执行结束时间
.
区别
和ThreadPoolExecutor相比, ScheduledThreadPoolExecutor使用了新的ScheduledFutureTask
类来包装向线程池提交的任务, 重写了父类的submit
方法. 并且是使用了DelayQueue
的变体DelayedWorkQueue
来存储任务.
核心构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue()); }
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
|
可以看出线程池的最大线程数都是Integer.MAX_VALUE
, 且空闲线程的存活时间为 0 . 则表明非核心线程存活时间为0, 即没有任务就会退出
, 因为 getTask 方法中的 poll 方法很快就会超时.
核心方法
调度线程池核心方法为: submit
和 schedule
两个, 由于调度线程池使用的ScheduledFutureTask, 所以submit方法最终都调用的是schedule
方法.
submit方法原理
1 2 3 4 5 6 7 8 9 10 11
| public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); }
public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); }
public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, NANOSECONDS); }
|
可以看出调度线程池重写了所有的submit方法, 并最终调用了schedule, 但是延迟相关的参数都是0.
schedule方法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit))); delayedExecute(t); return t; }
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable,triggerTime(delay, unit))); delayedExecute(t); return t; }
|
使用ScheduledFutureTask对提交的任务进行了封装, 并调用triggerTime
方法计算出延迟的时间.
可以看出核心方法应该存在于delayedExecute
中.
delayedExecute方法原理
此方法为延迟或周期性执行任务的核心方法.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
|
需要注意canRunInCurrentRunState
方法其判断的条件, 同时了解ensurePrestart
方法执行细节.
ensurePrestart方法原理
此方法处于ThreadPoolExecutor类中.
1 2 3 4 5 6 7 8 9 10 11 12
| void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
|
scheduleAtFixedRate原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t; delayedExecute(t); return t; }
|
scheduleWithFixedDelay原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
|
scheduleAtFixedRate和scheduleWithFixedDelay代码大致相同, 只是构建ScheduledFutureTask的参数有所差异.
triggerTime方法原理
该方法用于计算出周期性任务的执行时间, 此方法在schedule
也有用调用.
1 2 3 4
| private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); }
|
调用下面的重载方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; }
|
总结
至此可以看出所有 schedule 开头的方法都是先构建ScheduledFutureTask对象, 然后在 delayedExecute
方法将构建好的任务提交到线程池并添加一个Worker.
ScheduledFutureTask
看完池的任务提交代码, 可以知道池的核心逻辑在父类ThreadPoolExecutor已经实现好了, 唯一需要了解的就是ScheduledFutureTask, 此类决定了任务的执行逻辑, 和执行时间.
此类类似于FutureTask.
核心变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| private final long sequenceNumber;
private final long period;
RunnableScheduledFuture<V> outerTask = this;
int heapIndex;
private long time;
|
核心构造函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); }
ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
|
核心方法
run方法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
|
setNextRunTime方法原理
设置Task下次执行的事件
1 2 3 4 5 6 7
| private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); }
|
DelayedWorkQueue
此队列用来存储向调度线程池提交的任务. 底层使用数组来存储任务, 并将任务在数组中的索引存储在任务中.
核心变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[16]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; private Thread leader = null; private final Condition available = lock.newCondition(); }
|
核心方法
在线程池的源码分析中, 我们知道了线程池操作队列的基本方法:
- offer: 向队列中添加任务
- poll: 超时阻塞获取队列中的任务
- take: 阻塞获取队列中的任务
- remove: 删除队列中的任务
所以重点关注这几个方法.
offer方法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public boolean offer(Runnable x) { RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow();
size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; }
|
poll方法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) { if (nanos <= 0) return null; else nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); if (nanos <= 0) return null;
first = null;
if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
|
在线程池的runWorker
源码中, 只有当工作线程大于核心线程数时才会使用poll方法来超时获取任务.
而此方法的逻辑, 则是开启循环, 不断计算任务的延迟时间, 当任务的延迟时间到了, 则出队. 反之则一直循环.
take方法原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return finishPoll(first); first = null; if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
|
take方法和poll比较类似, 只是判断的条件少了很多.
总结
调度线程池复用了很多线程池的代码. 特别注意在初始化调度线程池时部分参数的特殊性. 且是使用的队列和普通线程池不同.
调度线程池执行的任务由 ScheduledFutureTask
封装, 且如果任务为周期性任务, 则任务的再次入队操作在 run
方法中完成提交. 同时使用了 time
和 period
变量来分别存储任务的执行时间
和任务执行间隔
.
调度线程池使用 DelayedWorkQueue
来存储提交的任务, 底层使用数组结构. 并且在 poll
和 take
方法中不断读取 ScheduledFutureTask
的延迟时间以便执行此任务(循环+条件队列). 同时可以使用 offer
方法将任务入队, 同时唤醒等待的线程和对队列中的任务进行排序. (保证任务能够执行)
.