youyichannel

志于道,据于德,依于仁,游于艺!

0%

线程池实现线程复用的原理

在 Java 中,线程池会使用固定数量或者可变数量的线程来执行任务,但无论是固定数量还是可变数量的线程,其线程数量都远远小于任务数量,面对这种情况线程池可以通过线程复用让同一个线程去执行不同的任务,那么线程复用原理是什么呢?

线程复用原理

线程池可以把线程和任务进行解耦,在线程池中,同一个线程可以从 BlockingQueue 中不断提取新任务来执行,其核心原理在于线程池对 Thread 的封装,并不是每次执行都会调用 Thread#start() 来创建新县城,而是让每个线程去执行一个「循环任务」,在这个「循环任务」中,不断检查是否还有任务等待被执行,如果有则直接去执行这个任务,即调用任务的 run 方法,这相当于将 run 方法当做普通方法,把每个任务的 run 方法串联起来,因此线程数量并不增加。

ThreadPoolExecutor#execute

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

源码分析

这段代码短小精悍

1)首先前几行:

// 如果传入的 command 为空,抛出 NPE
if (command == null)
throw new NullPointerException();

execute 方法中通过判断 command ,也就是 Runnable 任务是否为空,如果为空就抛出 NPE 异常。

2)接着判断当前线程数是否小于核心线程数,如果小于核心线程数就调用 addWorker() 方法增加一个 Worker,此处的 Worker 可以理解成一个线程。

if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

其中 addWorker 方法的主要作用是在线程池中创建一个线程并执行第一个参数传入的任务,它的第二个参数是个布尔值,

  • 如果布尔值传入 true 代表增加线程时判断当前线程是否小于 corePoolSize,小于则增加新线程,大于等于则不增加;
  • 如果传入 false 代表增加线程时判断当前线程是否小于 maxPoolSize,小于则增加新线程,大于等于则不增加;

因此这里的布尔值的含义是以核心线程数为界限还是以最大线程数为界限进行是否新增线程的判断。addWorker() 方法如果返回 true 代表添加成功,如果返回 false 代表添加失败。

3)下一部分代码:

if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

如果代码执行到这里,说明当前线程数大于或等于核心线程数或者 addWorker 失败了,那么就需要通过 if (isRunning(c) && workQueue.offer(command)) 检查线程池状态是否为 Running

  • 如果线程池状态是 Running就把任务放入任务队列中,也就是 workQueue.offer(command)
  • 如果线程池已经不处于 Running 状态,说明线程池被关闭,那么就移除刚刚添加到任务队列中的任务,并执行拒绝策略;

也就是这段代码:

if (! isRunning(recheck) && remove(command))
reject(command);

下一个 else if 分支:

else if (workerCountOf(recheck) == 0)
addWorker(null, false);

能进入这个分支说明前面判断到线程池状态为 Running,那么当任务被添加进来之后就需要防止没有可执行线程的情况发生(比如之前的线程被回收了或意外终止了),所以此时如果检查当前线程数为 0,也就是 workerCountOf**(**recheck) == 0,那就执行 addWorker() 方法新建线程。

4)最后一个 else if 分支

else if (!addWorker(command, false))
reject(command);

代码执行到这个分支,说明线程池不是 Running 状态或线程数大于或等于核心线程数并且任务队列已经满了,根据规则,此时需要添加新线程,直到线程数达到「最大线程数」,所以此时就会再次调用 addWorker 方法并将第二个参数传入 false,即以 maxPoolSize 为上限创建新的 worker;addWorker 方法如果返回 true 代表添加成功,如果返回 false 代表任务添加失败,说明当前线程数已经达到 maxPoolSize,然后执行拒绝策略 reject 方法。如果执行到这里线程池的状态不是 Running,那么 addWorker 会失败并返回 false,所以也会执行拒绝策略 reject 方法。

总结

execute 方法中,多次调用 addWorker 方法将任务传入,该方法会添加并启动一个 Worker,这里的 Worker 可以理解为是对 Thread 的包装,Worker 内部有一个 Thread 对象,它是最终真正执行任务的线程,所以一个 Worker 就对应线程池中的一个线程,addWorker 就代表增加线程。

线程复用的逻辑实现主要在 Worker 类中的 run 方法里执行的 runWoker 方法中,

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

简化之后:

final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
}
}
}

可以看出,实现线程复用的逻辑主要在一个 while 循环中:

  1. 通过取 Worker 的 firstTask 或者通过 getTask() 方法从 workQueue 中获取待执行的任务;
  2. 直接调用 task 的 run() 方法来执行具体的任务,而不是新建线程。

实现一个简易线程池 也是模仿这种方式实现的。