浅析 J.U.C 线程池核心源码
前言
简单分析一下线程池的运行机制、Worker、线程池中的锁、异常处理等源码。
线程池运行机制
这里简单提一下线程池的运行机制,相信大家对这个并不陌生。
当一个 Task 被提交,首先检查当前线程池中的工作线程数量,如果尚未超过 corePoolSize,那么可以创建一个新的 Worker,将这个 Task 作为它的 firstTask 执行。
如果此时工作线程的数量来到了 corePoolSize,那么我们尝试将这个 Task 加入到队列中,后续由线程池中的某个线程从队列中拿到这个 Task 执行。
如果我们设置的队列是一个有界队列,那么当任务数量超出队列容量时,再次尝试创建一个新的 Worker。
如果当前线程池中的工作线程数量来到了 maximumPoolSize,显然创建新 Worker 失败,我们执行预设的拒绝策略,否则该 Task 作为新 Worker 的 firstTask 执行。
与这个机制相关的代码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// Worker 数量 < corePoolSize,增加 Worker 来执行 command
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// Worker 数量超过 corePoolSize,那么就先入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列满,就增加 Worker 来执行 command
else if (!addWorker(command, false))
// 增加 Worker 失败,就要调用拒绝策略了
reject(command);
}
当然,这只是一个粗略的线程池运行机制,更多细节的内容,接着往下看。
谁在执行任务
经过我们上面的一段描述,一个关键的角色我想你应该注意到了,那就是 Worker。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这里你可以看到,将当前 Worker 对象作为 Runnable
// 所以,后续线程启动之后,就会执行 Worker 的 run 方法
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try { t.interrupt(); } catch (SecurityException ignore) {}
}
}
}
Worker 类是 ThreadPoolExecutor 的一个内部类,它实现了 Runnable 接口,所以本质上来说,它也是一个任务,只不过在这个任务里面要 不断执行用户提交的 Task。
并且,你要执行任务,至少应该有一个线程吧,所以在 Worker 中维护了实际执行任务的线程 thread。
最后,还有一个统计的变量 completedTasks,表示这个 Worker 执行的任务数量。
怎么创建 Worker
要了解 Worker 是怎么创建的,我们要将目光聚焦到 addWorker 方法上。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 原子的将 workerCount +1
// 如果出现并发,那么只会有一个线程原子成功,break 出去,其他失败的线程再次循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// 这里线程池状态发生了改变,需要跳到最外层循环执行,重新获取线程池状态 rs
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/**
* 这里就是在创建 Worker,并且将 firstTask 作为 Worker 的 firstTask
* 从上面 Worker 类我们也看到,在 Worker 的构造方法中,主要做三件事:
* 1. 设置 AQS state 为 -1,抑制中断直到调用 runWorker 方法
* 2. 将 firstTask 赋值给成员属性
* 3. 从 ThreadFactory 创建一个新线程,赋值给成员属性
*/
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将 worker 添加到 workers(HashSet) 中
workers.add(w);
// 主要是更新 largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 这里启动 Worker 内部的线程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
/**
* 如果添加 Worker 失败,就会进入下面的方法,主要做下面几件事:
* 1. 将创建的 Worker 从 workers 中移除
* 2. 将 Worker 的数量减一
* 3. 尝试终止线程池,即调用 tryTerminate
*/
addWorkerFailed(w);
}
return workerStarted;
}
从代码里面,我们发现有三处与 SHUTDOWN
比较大小的地方,这是什么意思?
首先 SHUTDOWN
是线程池的一个状态,实际值为 0,其余状态如下:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 接受新的任务并且处理等待队列中的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS; // 不接受新的任务,但是会处理已经提交的任务(包括队列中的任务)。
private static final int STOP = 1 << COUNT_BITS; // 不接受新的任务,不再处理等待队列中的任务,并且会中断正在执行的任务。
private static final int TIDYING = 2 << COUNT_BITS; // 所有的任务都已终止,workerCount 为0,线程池即将进入终态。
private static final int TERMINATED = 3 << COUNT_BITS; // 线程池已经完成了终止过程。
有了这个概念,我们再来分析下面的代码:
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
第一个条件很好理解,当 rs ≥ SHUTDOWN
时,表示线程池正在被关闭,或者已经关闭,此时不会再接受新的任务,所以此时是不能创建 Worker 的。
而第二个条件,就是说如果线程池处于 SHUTDOWN
状态,只有当有明确的任务给新线程执行(firstTask != null
),或者即使没有明确的任务,但是队列中有等待的任务时,才允许创建新的线程。
而 addWorker 方法的第二个参数 core 就决定了当前创建 Worker 数量的上限,如果调用方传 true,那么只能创建 corePoolSize 个 Worker,否则可以创建 maximumPoolSize 个 Worker。
从这里我们就可以假想出,当阻塞队列满了之后,传入的 core 就为 false,否则就是 true。
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
当 Worker 成功被添加后,就要启动 Worker 内部的 thread,即调用 t.start() 方法。
线程启动后跑什么代码
当调用 t.start() 之后,必然就要开始执行 run 方法了,那执行哪个 Runnable 类的 run 方法呢?
其实在前面你应该注意到 Worker 类的构造方法了:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 这里你可以看到,将当前 Worker 对象作为 Runnable
// 所以,后续线程启动之后,就会执行 Worker 的 run 方法
this.thread = getThreadFactory().newThread(this);
}
所以,我们接下来聚焦在 Worker 类的 run 方法中:
public void run() {
// 委托外部类的方法
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 这个 getTask() 就是从队列里面拿任务,如果能成功拿到,就会一直执行下去
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 扩展钩子
Throwable thrown = null;
try {
task.run(); // 这里就是真正在执行任务了
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 扩展钩子
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
在什么情况下会退出 while 循环呢?主要关注 getTask() 方法。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
// 如果拿到任务了,就直接返回就行了,在 runWorker 那里又会进入 while 循环执行任务
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这里有两段逻辑是比较关键的,我们详细看看。
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
// 如果拿到任务了,就直接返回就行了,在 runWorker 那里又会进入 while 循环执行任务
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
就阻塞队列来说,poll 和 take 的区别是什么?
- 无参的 poll 会立即返回,不会阻塞,而这里带参数的 poll 表示只会阻塞 keepAliveTime 纳秒。
- 而 take 会一直阻塞,直到有返回结果。
而具体选择 poll 还是 take 呢?取决于 timed,这个 timed 是什么?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
当 allowCoreThreadTimeOut 参数为 true 或者 Worker 数量已经超过 corePoolSize,那么 timed 为 true。
所以,allowCoreThreadTimeOut 参数的含义,以及为什么超出 corePoolSize 数量的线程在超过 keepAliveTime 后会被“回收”我想你应该懂了。
那这个 timedOut 是干啥的?就和下面这段逻辑相关了:
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
首先 timedOut 为 true 仅当 timed 为 true,并且等待了 keepAliveTime 纳秒,并且没有拿到任务。
那么这种情况下,如果 timed 还是为 true,说明 Worker 数量是可以减少了,于是就有了上面的代码。
那么总的来说,getTask() 方法会返回一个 Runnable 对象或者一个 null 值,如果返回 null 值表示这个线程可以被“回收”了,否则就继续执行返回的 Runnable 任务。
我们再回到 runWorker 方法中,这里面,还有一个比较关键的变量 completedAbruptly,从代码结构可以看出来,如果线程从队列中没有拿到任务,就会正常退出 while 循环,自然 completedAbruptly 为 false,而一旦线程执行任务出现异常,就会跳过 completedAbruptly 的赋值,直接执行 finally 块的 processWorkerExit()。
所以,你需要记住,completedAbruptly 为 true 表示线程执行出现异常了,为 false 就表示线程没有拿到任务,可以被“回收”了。
带着这个理解,再来看 processWorkerExit 是怎么对线程进行善后的?
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 线程执行出现异常,Worker 数量要减一
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 所以,一旦进入这个方法,那么 Worker 一定是会被移除的
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
比较关键的逻辑是下面的代码,认真理解一下:
// 如果线程池还在运行中
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 如果线程是正常退出,是从阻塞队列中没有拿到任务,而不是执行异常
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
// 如果队列不为空,那么至少保留一个线程
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一个新的 Worker,它没有 firstTask,只能从队列中拿任务
addWorker(null, false);
}
不难看出,这个 min 变量就表示这个线程池里面至少需要滞留的 Worker 数量,如果此时 Worker 的数量 ≥ min,那么直接返回,那么对应的 Worker 的 run 方法就执行完了,这个线程就被“回收”了。
而最后一行的 addWorker 相当于就是在增加一个 Worker 来替换当前必定“回收”的 Worker,那什么情况下需要新增 Worker 替换呢?有两个原因:一是 Worker 数量没有达到 min,二是线程执行出现异常。
为什么要继承 AQS
当然,在 Worker 类的继承体系上还有一个关键点:它继承了 AQS,我们知道 AQS 是用于实现同步器的,那 Worker 继承 AQS 有什么用呢?
我们先看看哪些地方用到了 lock 和 tryLock 方法。
对于 lock 方法,其实只在 runWorker 方法中用到了,对于 runWorker 这个方法,想必应该不陌生了,
final void runWorker(Worker w) {
// ...
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 执行 Task
} finally {
// ...
w.unlock();
}
}
completedAbruptly = false;
} finally { processWorkerExit(w, completedAbruptly); }
}
所以 Worker 在执行任务会获取锁,执行完毕之后就会释放。
而 tryLock 方法也只有一个地方使用到了,那就是 interruptIdleWorkers() 方法:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock()
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally { w.unlock(); }
}
if (onlyOne) break;
}
} finally { mainLock.unlock(); }
}
从这个方法名称,应该可以看出,这是在中断 Idle 的 Worker,Idle 的意思是“闲置的”,那么对 Worker 而言,其实就是阻塞在 poll 或者 take 上的线程,我想你应该还记得下面的代码:
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
那怎么才能知道哪些 Worker 正在阻塞呢?这就是 Worker 继承 AQS 的作用了,当 Worker 执行任务的时候会获取锁,而在 interruptIdleWorkers 方法中,需要通过 tryLock 拿到锁才能打断,如果一个 Worker 正在执行任务,那么注定 tryLock 会失败,自然就不会中断它,而如果一个 Worker 正在阻塞时,它肯定没有进行 lock,那么此时 tryLock 是可以成功的,继而就可以打断它。
所以,总结一句话,Worker 继承 AQS 的作用就是 保证执行任务的线程不会被中断。
事实真的是如此吗?我们再来看看 Lea 老头是怎么解释的。
下面是 Worker 类 doc 注释的几句比较核心的解释:
Class Worker mainly maintains interrupt control state for threads running tasks.
Worker 类主要是维护那些正在执行任务的线程的中断状态。
This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run.
这(Worker 继承 AQS)可以防止原本用于唤醒等待任务的 Worker 的 中断,意外 中断 了正在执行任务的 Worker。
有点绕,注意理解,第一个中断是名词,第二个是动词。
We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize.
这段话解释了 Lea 老头为什么要自己搞一个 AQS,而不是使用现成的 ReentrantLock。
我们实现了一个简单的不可重入的排他锁,而不是使用 ReentrantLock,这是因为我们不希望 Worker 在执行任务时再调用像 setCorePoolSize 这样的线程池控制方法时能够重新获取锁。
这段话是什么意思?先看 setCorePoolSize 方法:
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
// 如果 Worker 数量大于 corePoolSize,那么就要打断 Idle Worker
// 那些 Worker 被打断之后,没有从队列中拿到任务,而此时又更新了 corePoolSize,所以它们就可能被“回收”了
// interruptIdleWorkers 内部会通过 tryLock 拿锁
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
如果使用 ReentrantLock,那么假设线程执行的任务是这样的:
ThreadPoolExecutor executor =
new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
executor.execute(() -> executor.setCorePoolSize(2));
本来线程在执行任务,那么一定已经 lock 加锁了,而任务内调用 setCorePoolSize 又会调用 tryLock,锁又是可重入的,那不就把这个正在执行任务的线程中断了。
Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker).
最后一句,就是说,为了 抑制中断直到线程真正开始执行任务,我们初始化锁状态为负数(-1),并在启动时(runWorker 中)清除它。
所以在 Worker 类的构造方法中,如下:
Worker(Runnable firstTask) {
// 初始化锁状态为 -1
setState(-1); // inhibit interrupts until runWorker
// ...
}
而在 runWorker 类,会看到这段代码:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 并在启动时清除它
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// ...
}
completedAbruptly = false;
} finally { processWorkerExit(w, completedAbruptly); }
}
最后,这句话如何理解?抑制中断直到线程真正开始执行任务。
在 Worker 类中,locked 状态的 state 值为 1,unlocked 状态的 state 值为 0,所以显然,tryLock 方法会 cas 将 state 从 0 改为 1。
那 state 为 -1,是不是说明 tryLock 一定会失败,那么就不会中断这个 Worker,这下应该理解了吧。
另一把锁 mainLock
前面,我们知道 Worker 类继承了 AQS,所以我们也可以将 Worker 理解为一把锁,而在 ThreadPoolExecutor 中还有另一把锁 mainLock。
从图中,我们看出,这把锁是一个 ReentrantLock,在 ThreadPoolExecutor 类中,像 workers、largestPoolSize、completedTaskCount 等都需要先持有 lock 才能访问。
我们以 workers 为例,分析一下。
首先,这个 workers 是一个 HashSet,存储了线程池中所有的 Worker,我们都知道 HashSet 是线程不安全的,所以需要锁来保证线程安全,但是你会不会有一个疑问,为什么不用一个线程安全的 Set 集合,比如 ConcurrentSkipListSet,而要用锁。
其实,这个问题 Lea 老头早就给你想好了,来看 mainLock 的 doc 注释:
While we could use a concurrent set of some sort, it turns out to be generally preferable to use a lock.
我们当然可以使用并发安全的 Set 集合,但事实证明,使用 lock 是更合适的。
接下来就是解释为什么使用 lock 更合适了。
Among the reasons is that this serializes interruptIdleWorkers, which avoids unnecessary interrupt storms, especially during shutdown. Otherwise exiting threads would concurrently interrupt those that have not yet interrupted.
It also simplifies some of the associated statistics bookkeeping of largestPoolSize etc.
We also hold mainLock on shutdown and shutdownNow, for the sake of ensuring workers set is stable while separately checking permission to interrupt and actually interrupting.
其中一个原因是它使 interruptIdleWorkers 方法串行执行,从而避免了不必要的“中断风暴”,特别是在 shutdown 时,否则,退出的线程会并发的中断那些尚未中断的线程。
这也简化了一些变量比如 largestPoolSize 等的统计维护工作。
在执行 shutdown 和 shutdownNow 时,我们也会持有 mainLock,用于确保在单独检查中断权限以及实际执行中断过程中,workers 集合的稳定性。
这里面比较难理解的是这个“中断风暴”的概念。
假设我们使用的是并发安全的 Set 集合,这时候有多个线程并发的执行 shutdown 方法,由于没有 mainLock,所以不存在阻塞,每个线程都会执行 interruptIdleWorkers 方法。
这会出现什么情况呢?假设线程池中有一个阻塞的线程 t,那么此时一个线程来对这个 t 发起了中断,此时 t 正在中断中,又来了一个线程对这个 t 发起了中断,这就出现了对正在中断的线程又发起了中断。
而执行 shutdown 方法的并发线程越多,就越有可能出现这种对正在中断的线程又发起了中断的情况,这就是所谓的“中断风暴”。
其他的倒没有什么说的,额外注意一点的就是,我们应该学习 Lea 老头的加、解锁的规范,比如下面一段代码:
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// ...
} finally {
mainLock.unlock();
}
相信大家一定知道 unlock 要写在 finally 块内,但是 lock 为什么要写在 try 上面呢?
假设在 lock 加锁时出现异常,那么进入 finally 解锁,此时 Thread.currentThread() 一定不是持锁线程,所以会抛出 IllegalMonitorStateException 异常。
// java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// ...
}
当然,抛这个异常并不是最严重的问题,最严重的是会导致异常堆栈丢失,比如我通过重写 ReentrantLock 来模拟 lock 异常:
import java.util.concurrent.locks.*
public class SupportedReentrantLock extends ReentrantLock {
final NonfairSync sync = new NonfairSync();
static final class NonfairSync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 7316153563782823691L;
void lock() {
if (true) { throw new RuntimeException("error!!!"); }
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
@Override
public void lock() { sync.lock(); }
public static void main(String[] args) {
Lock lock = new SupportedReentrantLock();
try {
lock.lock();
System.out.println("加锁了");
} finally {
lock.unlock();
}
}
}
来看执行情况:
可以看到,异常堆栈已经丢了,当然,一般情况下是不会出现这种问题的,但毕竟这是一种 良好的编码规范。
任务执行时抛异常
在线程池中,所有的 Worker 都有条不紊的执行着任务,但是如果 Worker,或者说线程在执行任务时出现异常,会发生什么呢?
线程执行任务的关键还是在 runWorker 方法,再来看看这个方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// ...
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我们来模拟一下代码的执行流程,当 task.run 出现异常,那么首先由各个 catch 块进行捕获,将异常赋值给 thrown 变量,最后传递给 afterExecute 钩子方法。
接着执行第二个 finally 块进行解锁等操作,最后执行最后一个 finally 块中的 processWorkerExit 方法处理 Worker 的退出,我们前面其实提到过,这里 completedAbruptly 为 true(跳过对其赋值为 false)
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
// ...
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// ...
addWorker(null, false);
}
}
可以看到,如果此时线程池还在运行中,那么一定会添加一个新的 Worker 来顶替这个执行异常的线程。
当所有的 finally 执行完后,异常就会逐个向外抛。
在 Thread 类中,还有这样一个方法 dispatchUncaughtException,不知道你以前注意到没有。
private void dispatchUncaughtException(Throwable e) {
getUncaughtExceptionHandler().uncaughtException(this, e);
}
它的作用是什么呢?将未捕获的异常分发给 Handler,注意,这个方法只能被 JVM 调用。
而这里的 Handler 其实就是 UncaughtExceptionHandler,也是 Thread 内部的一个函数式接口:
@FunctionalInterface
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}
根据 java doc 来看,就是当一个线程因为未捕获的异常而即将终止时,JVM 将使用 Thread.getUncaughtExceptionHandler 获取设置的 UncaughtExceptionHandler 实例,并通过调用 uncaughtException 方法传递相关的异常信息,其实说的就是 dispatchUncaughtException 方法的作用。
一个线程默认的 UncaughtExceptionHandler 是其 ThreadGroup 对象,如下:
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return uncaughtExceptionHandler != null ? uncaughtExceptionHandler : group;
}
这个 ThreadGroup 是 UncaughtExceptionHandler 的子类,所以默认情况下,当线程由于未捕获的异常而即将终止时,就会调用到下面的方法:
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
parent.uncaughtException(t, e);
} else {
Thread.UncaughtExceptionHandler ueh =
Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \""
+ t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}
}
如果我们设置了线程的 UncaughtExceptionHandler,如下:
import java.util.concurrent.*;
public class ExecutorTest {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), new ExceptionThreadFactory());
executor.execute(() -> {
throw new RuntimeException("error!!");
});
}
static class ExceptionThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
ExceptionThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
/**
* 设置 UncaughtExceptionHandler
*/
t.setUncaughtExceptionHandler((thread, e) -> {
System.out.println("出现异常");
e.printStackTrace(); // 打印堆栈
});
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
那么当线程由于未捕获的异常而即将终止时,就会执行我们自定义的 UncaughtExceptionHandler#uncaughtException 方法
上面这种方式仅仅是基于 execute 来提交任务,但是我们知道线程池还有一种提交任务的方式 — submit。
在 submit 方法中,每一个 Callable 对象都会被封装为一个 RunnableFuture,从名字可以看出来,这个类和 Runnable、Future 都有关系,事实上也是如此,RunnableFuture 继承了 Runnable 和 Future。
当然,RunnableFuture 只是一个接口,这里使用的具体实现是 FutureTask
所以,真正在线程池执行的是 FutureTask 的 run 方法:
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); // 调用 call
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran) // 如果 call 出现异常则 ran 为 false,不会设置 result,否则设置 result
set(result);
}
} finally {
// ...
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
可以看到,即使执行 call 方法出现异常,也会被捕获,然后将异常暂存到 FutureTask 的 outcome 属性中。
如果想要拿到 call 方法抛出的异常,怎么做呢?其实很简单,get 一下就行,就像下面这样:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecutorTest {
public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException,
ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), new ExceptionThreadFactory());
Future<Object> future = executor.submit(() -> {
throw new RuntimeException("error!!");
});
System.out.println("get: ");
future.get(); // get 一下
}
static class ExceptionThreadFactory implements ThreadFactory {
// ...
public Thread newThread(Runnable r) {
// ...
t.setUncaughtExceptionHandler((thread, e) -> {
System.out.println("出现异常");
e.printStackTrace();
});
// ...
}
}
}
结果如下图:
那执行任务的线程呢?线程执行的是 FutureTask 的 run 方法,而在这个 run 方法里面,即使 call 时出现了异常,也会被捕获,那你说对整体的外部流程有影响吗?很明显没有啊,所以,这个执行任务的线程接着会调用 afterExecute 钩子,然后进入下一轮 while 循环(从队列里面继续拿任务),并不会被替换。
既然线程都不会退出,那 JVM 理所当然的不会调用 dispatchUncaughtException 方法进行异常分发了。
注意,上图中的第一段堆栈异常其实是 main 线程的,因为我们并没有对 future.get 做异常捕获,所以 JVM 会对 main 线程调用 dispatchUncaughtException。
最后,总结:
当使用 execute 提交任务时,如果任务执行(run 方法)出现异常,那么异常往外抛,异常堆栈被输出,同时执行任务的线程被“销毁”,但是还会创建一个替代的线程加入到线程池,而执行异常的线程由于未被捕获的异常而退出,那么对应的 UncaughtExceptionHandler 的 uncaughtException 方法会被 JVM 调用。
而当使用 submit 提交任务时,如果任务执行(call 方法)出现异常,那么异常被捕获,设置到 FutureTask 的 outcome 字段上,要想感知到异常,必须调用 get 方法,同时执行任务的线程也不会退出。
当然,不管是 execute 还是 submit,执行出现异常都不会影响线程池中其他线程执行任务,这是显而易见的,如果一个线程执行异常导致整个线程池崩了,那还得了。
最后的最后,给一个提交任务的建议,其实就是能用 execute 就不用 submit,非要用 submit 就必须 get 结果,才能避免异常丢失。
总结
这篇文章只是简单的分析了线程池内部的部分源码,更多的内容,还得靠你自己发现。