重新认识 ThreadPoolExecutor
❓ 更深入的理解ThreadPoolExecutor原理,纠正想当然的理解误区
一个事故引发的拷问
或许这篇文章可以起名更惊悚一点,叫做:《不合理的线程池配置酿成的血案。。》。
事情要从一个造成生产事故的案例说起:
同事开发了一个kafka消费者程序,并发消费消息:
- 使用Semaphore控制每次并发处理m个消息
- 每一个消息处理后都会生成 n 个任务,为了加快单个消息的处理速率,子任务也采用了并发的方式执行(消息和子任务线程池是分开的)
- 外层消息处理等待所有子任务执行完成才算完成。
整个模型设计的目标很明确,那就是每次能并发处理m个消息,m*n个子任务,每组子任务全部执行完毕后才会开始下一组。
但真实的情况是,发布线上后,子任务线程池大面积触发Reject,导致消息处理几乎瘫痪,下游出现数据缺失以及高延迟。 在协同检查后,我也认为这个模型设计十分精确,不会存在子任务超量的情况。
问题复现
由于该模型在本地测试并没有触发异常,考虑到是数据量级的原因,我基于现有模型,缩放了任务量级和配置,简化出了以下模型并复现了问题:
假设有多个消费事件组,要求同时最多三个事件组异步消费,而每个事件组由3个子事件组成,子事件同样使用异步消费。
public static void main(String[] args) throws InterruptedException {
// @1 创建外部消费组线程池,固定大小3,队列197,一共200个消费组
ThreadPoolExecutor outter = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(197));
// @2 创建内部事件处理线程池,core=9,max=12 ,队列长度为1。
ThreadPoolExecutor inner = new ThreadPoolExecutor(9, 12, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1)); //2
for (int i = 0; i < 200; i++) {
int group = i;
// @3 消费组异步处理,由于outter固定大小为3,首次只有三个消费组消费事件。
outter.execute(() -> {
System.out.println("开始第" + group + " 组消费");
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int j = 0; j < 3; j++) {
int task = j;
// @4 3个事件异步消费,
inner.execute(() -> {
System.out.println(group + "组消费数据:" + task);
countDownLatch.countDown();
});
}
// @5 outter线程等待三个inner线程结束,当前组才消费完成
try {
countDownLatch.await(3, TimeUnit.SECONDS);
System.out.println("第" + group + " 消费完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
实现该模型的基本策略:
- 利用CountDownLatch保证outter线程在对应的三个inner线程执行完之后才执行完毕,即:保证批次执行;
- 利用outter固定线程池=3,保证每次最多只有三个outter线程执行;
- 同理,inner每次只能有 3(outter执行数量)* 3(事件数量) = 9 个线程在执行。所以理论上,inner的max=12是用不上的,队列也用不上。
执行结果:
...
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-3" Exception in thread "pool-1-thread-2" java.util.concurrent.RejectedExecutionException: Task com.review.string.Demo7$$Lambda$2/1186543288@67dad061 rejected from java.util.concurrent.ThreadPoolExecutor@67aa5a39[Running, pool size = 12, active threads = 3, queued tasks = 0, completed tasks = 14]
...
可发发现,子任务线程池容量竟然扩充到了12,达到了最大容量,按前面所想,子任务数量应该是控制在3*3=9个才对。
分析
猜测:countDownLatch没有控制住
假设实际执行的outter active数量 * 3 > 实际inner active线程数,从而导致inner线程达到最大线程数。
这里countDownLatch数量配置都是对的,增加inner消费处理时间,发现在reject前,outter线程都是在组内inner全部执行完才完成,没有失控的迹象。
如果你的猜测是这个,我觉得你可以多多使用juc中的工具,这个模型中,countDownLatch和线程池的配合使用是没有问题的。
猜测:线程调度的问题
假设outter线程o1执行完,o2、o3还在执行,开始执行o4,此时如果o1组内的inner线程 i1、i2、i3至少一个还没有及时归还到线程池中,这样o4组消费就会创建新的线程使得线程数>core,持续几轮,线程池撑满从而触发reject。
这个猜测并无根据,但依然可以验证一下:
countDownLatch.await();
TimeUnit.SECONDS.sleep(1); // outter强制等待inner结束
执行,仍然reject,可以排除该猜想
猜想都没有实际根据且都可以被证伪,我突然有些恐慌,比起在小同事这里翻车,我发现我根本没有理解线程池这件事才是超恐怖的。
我此时所认知的线程池:
- 活跃线程数 < corePoolSize 时,直接创建线程来执行任务。
- 活跃线程数 = corePoolSize 时,任务添加到队列,等待空闲线程处理。
- 队列满时,直接创建线程来执行任务。
- 活跃线程数 = maxPoolSize,继续添加任务,触发reject。
通过后续的研究,我才发现,这几条简短的概括,并不能说错误,但是因为简短,说明有些概念并不明确,或者很模糊,正是这种似是而非的认识,导致我对线程池的具体细节都是想当然。
终极武器,看源码实现:
直接从核心方法 execute 入手:
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // @1
if (addWorker(command, true)) // @2
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // @3
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // @4
reject(command);
else if (workerCountOf(recheck) == 0) // @5
addWorker(null, false);
}
else if (!addWorker(command, false)) // @6
reject(command);
}
这里主要有6个不太明确作用的方法,我们逐个分析:
- workerCountOf
- isRunning
- addWorker
- workQueue.offer(command)
- remove(command)
- reject(command)
workerCountOf & isRunning
了解这两个方法是ThreadPoolExecutor类的基本方法,我们直接看ThreadPoolExecutor的源码:
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* The main pool control state, ctl, is an atomic integer packing two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* 译:状态ctl是一个包装了两个概念字段原子整数:
* workerCount 指示有效的线程数,
* runState 指示是否运行,关闭等
* ...
*
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
*
* 译:workerCount是【已被允许启动且未被允许停止】的worder(即线程)数量。
* 该值可能与实际的活动线程数暂时不同:线程池创建线程时失败但又未完全注销,
* 此时workerCount 可能会小于线程池实际大小。
*
* 【已被允许启动且未被允许停止】 先简单理解为存活线程比较好理解
* ...
*
* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* 译:runState提供线程池生命周期的控制,有以下状态值:
* RUNNING:可接受新任务,可处理队列中的任务
* SHUTDOWN:不接受新任务,但可以处理队列中的任务
* STOP:不接受新任务,不处理队列中任务,并中断处理中的任务
* TIDYING:所有任务都已终止,workerCount(存活的线程数量)为0,往该状态过渡的线程将执行terminated()
* TERMINATED: terminated()方法结束,相当于TIDYING的终态。
* ...
*
* 后面还有部分注释,讲的是runState 各个状态之间转换条件,不作列出,有兴趣可自研
* /
// 状态和线程数集合包装字段ctl。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; } // 从 ctl 拆解出 runState
private static int workerCountOf(int c) { return c & CAPACITY; } // 从 crl 拆解出 workerCount
private static int ctlOf(int rs, int wc) { return rs | wc; } // 将 workerCount 和 runState 包装为 ctl
...
private static boolean isRunning(int c) { return c < SHUTDOWN;} // 当前线程池状态,是否处于Running状态
ctl包装和解析 runState 与 workCount 的算法并未理解,但并不影响理解其概念。
- workerCountOf方法 :获取存活线程数量
- isRunning方法 :判断线程池状态是否处于Running(Running状态时线程池可接受新任务)
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
//...(略掉一些状态校验)
int wc = workerCountOf(c); // 线程池实际容量(存活线程数量)
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false; // @1 判断是否达到容量上限
//...(略掉一些状态校验)
w = new Worker(firstTask); // @1 根据任务创建Worker对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // @2 将worker对象添加到集合workers中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // @3 添加成功,启动worker内置线程
workerStarted = true;
}
}
//...
}
addWorker在execute方法中有两处调用,区别是参数core传值不同,从源码可以看出是判断当前线程池容量上限是CorePoolSize还是MaxPoolSize
addWorker并不是直接创建线程,而是Worker对象,传入的Runnable任务对象作为其属性。
代码 @2 处 workers就是线程池的集合:
/**
* Set containing all worker threads in pool. Accessed only when holding mainLock.
*
* 译:一个包含线程池内所有工作线程的集合,仅在持有mainLock时可访问
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
Worker就是实际线程池中的“线程”:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread; // 当前worker实际运行的线程
Runnable firstTask; // 初始化的任务
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建线程,注意此处将worker作为Thread的构造参数
}
public void run() {
runWorker(this);
}
}
Worker作为Runnable实现类,并拥有一个Thread属性
从其构造器还可以发现,thread属性的target又是当前worker对象
上面addWorker方法的 @3 处显示Worker创建后会启动Worker内置的Thread对象,
这意味着,thread.start() 实际会调用worker.run(),而run内部又是调用runWorker方法,其源码为:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // @4 无限循环获取任务
...
try {
beforeExecute(wt, task); // @5 protected修饰的空方法,可用于线程池子类扩展
Throwable thrown = null;
try {
task.run(); // @6 调用实际Runnable实例的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // @7 protected修饰的空方法,可用于线程池子类扩展
}
} finally {
task = null; // @8 每个task执行完成后,置空变量,下次再拿到一个新的task
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
@5 、@7处是空的protected方法,很明显是用来提供子类扩展的
@4 、@6、@8 处揭露了Worker在启动之后,会无限循环通过getTask获取Runnable任务,并调用任务的run()方法。
getTask() 方法源码:
private Runnable getTask() {
...(省略若干行状态校验代码)
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
可以到,实际是调用队列的poll或take来获取任务,这两个方法都是阻塞的,没有取到任务就会阻塞住,区别只是pool有超时时间。
综上,可以总结出 addWorker方法作用:创建线程Worker对象,并启动线程,线程内会无限循环的从队列中带阻塞的获取执行任务。
再回到execute方法:
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // @1
if (addWorker(command, true)) // @2
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // @3
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // @4
reject(command);
else if (workerCountOf(recheck) == 0) // @5
addWorker(null, false);
}
else if (!addWorker(command, false)) // @6
reject(command);
}
- @1:存活Worker数量是否小于corePoolSize。
- @2:创建并启动Worker,上限为corePoolSize,Worker执行完初始化时分配任务后,无限循环从队列有阻塞的获取任务。
- @3:存活Worker数量 = corePoolSize,线程池状态为Running(可接受新任务),且将新任务加入队列.
- @4:二次检查线程池状态,非Runing时,将加入的任务移除。
- @5:二次检查线程池状态依然为Running,且存活Worker数量 = 0(因为corePoolSize可能为0),创建并启动Worker,上限为MaxPoolSize
- @6: 线程池状态部位Running或队列添加失败(队列满),创建并启动Worker(内部有状态校验),上限为MaxPoolSize
结论
基于以上分析,发现误区有两点:
- 线程池添加超出corePoolSize线程时,依据活跃线程数量来判断,这个活跃线程和当前处理的任务数量没有任何关系,上面说到理解成存活Worker数量是比较贴切的,而Worker启动后是无限循环读取队列的,所以 活跃的线程数 = 存活线程数 != 活跃(处理中)任务数。
- 任务提交给线程池,如果线程池有空闲线程,那么新加入的任务是能够被空闲线程(未处理任务的线程)处理的,但容易先入为主的认为,任务直接分配给空闲线程的,实际除了创建线程时给的任务,其它的任务是先放到队列中的,线程和队列是一个生产者消费者模型。
案例复盘:
- 当有3个outter线程处理时,inner线程池累计创建了9(corePoolSize)个Worker
- 后续任务到达时,任务将直接入inner队列,由于队列大小为1,线程从队列消费任务及时性无法保证 ① ,队列入队可能失败
- 队列入队失败时,inner继续创建线程,知道总数量为12(maxPoolSize)
- 后续任务到达,再次入队,同步骤 2,此时一旦入队失败,就会触发reject
① 事实上,线程调用 queue.take() 和 外部调用queue.offer()间没有任何关联的,线程之间是由竞争的,无法保证及时消费队列中任务是可能的。
验证
既然已经明白可能的误区,针对误区简化案例即可
static class MyLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
public MyLinkedBlockingQueue(int capacity) {
super(capacity);
}
@Override
public boolean offer(E o) {
System.out.println("任务加入,当前队列数:" + this.size());
return super.offer(o);
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new MyLinkedBlockingQueue<>(1);
// 3个线程的线程池
ThreadPoolExecutor taskPoolExecutor = new ThreadPoolExecutor(3, 3, 30, TimeUnit.SECONDS, queue);
// 先将线程池拉满
for (int i = 0; i < 3; i++) {
final int finalI = i;
taskPoolExecutor.execute(() -> {
logger.info("{}", finalI);
});
}
// 等待全部任务执行完
Thread.sleep(1000);
// 再次执行任务,发现每一个任务都触发加入队列操作。
for (int i = 10; i < 12; i++) { // @1
for (int i = 10; i < 15; i++) { // @2
final int finalI = i;
taskPoolExecutor.execute(() -> {
// @3
/*
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
*/
System.out.println(finalI);
}
}
执行结果:
0
2
1
任务加入,当前队列数:0
任务加入,当前队列数:1
10
11
可以看到,线程池满了之后,哪怕线程全部空闲,新的任务也是放到队列中。
将上述代码中的@1 换成 @2 ,去掉 @3处的注释,使得每个任务耗时更久且要添加任务数大于corePoolSize,这样队列就会来不及出列从而触发reject。
执行结果:
2
0
1
任务加入,当前队列数:0
任务加入,当前队列数:1
任务加入,当前队列数:0
任务加入,当前队列数:1
任务加入,当前队列数:1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.review.string.Demo8$$Lambda$2/249515771@2f7a2457 rejected from java.util.concurrent.ThreadPoolExecutor@566776ad[Running, pool size = 3, active threads = 3, queued tasks = 1, completed tasks = 3]
完美验证!
总结
ThreadPoolExecutor其实并不神秘,就是一个生产消费模型,特殊点:
- 在线程池到达corePoolSize大小时,任务是直接分配给新线程的。
- 在线程池到达corePoolSize大小且添加队列满,任务可以直接分配给新线程,直到达到maxPoolSize大小,相当于生产者生产的事件满了之后,给消费者一次扩容的机会。
- 其它时候队列未满时,添加任务相当于生产事件到队列,线程从队列消费事件,当队列满时,触发reject。
所以实际配置中,除了线程池大小,队列大小也要参考并发量合理设置。