背景: 线上现场池出现偶发性的RejectedExecutionException, 第一时间检查线程池配置后未发现有明显异常, 在尝试调大队列长度后问题得到解决, 本文记录了排故的心路历程以及故障的真实原因; 代码已脱敏.
问题点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| final static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(32, 32, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(16));
public static void main(String[] args) { for (int i = 0; i < 100; i++) { CountDownLatch c = new CountDownLatch(16); for (int n = 0; n < 16; n++) { threadPoolExecutor.execute(() -> { try { } finally { c.countDown(); } }); } try { c.await(); } catch (InterruptedException ignore) {} System.err.println("============== task done, batch " + i + " =============="); } System.err.println("============== all done =============="); } }
|
代码描述了一个批量任务的执行过程, 小循环会每次提交 16 个任务给线程池,一共执行 100 次小循环; 因为 countDownLatch 的存在, 可以保证每一次都会执行完 16 个任务后再放 16 个
根据八股文中所描述的线程池执行逻辑, 首先由核心线程执行, 然后其他任务进入阻塞队列等待,如果队列满了, 启用非核心线程,数量由最大线程配置项控制, 如果最大线程满了,则触发拒绝策略。
代码中线程池配置核心线程数量等于32, 最大线程数量等于 32, 队列长度为 16, 因此根据八股理论, 这个线程池支持支持一次接收 32 + 16 = 48 个任务, 最多支持同时执行 32 个任务。
根据小学数学理论,我们知道 16 是小于 48 的 ,当前的线程池配置最多可以接收 3 批任务(1 批 16 个), 因此我认为正常不会触发线程池的拒绝策略, 起码不会在前3 批任务之前触发。
理论分析完毕, 上实操, idea, 启动!
第一查
执行到第二批时,线程池就满了,触发了拒绝策略, 我的第一直觉是,任务之间交接出了问题, 线程池执行完一批任务之后,未完全释放线程时,下一批任务已经进来了,这个交接的阶段, 线程池的动作如果慢一点或者放任务的动作快一点, 就会导致线程池被打满,从而触发拒绝策略;
所以我把代码改成了这样来验证我的想法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void main(String[] args) { for (int i = 0; i < 100; i++) { try { Thread.sleep(100); } catch (InterruptedException ignore) {} CountDownLatch c = new CountDownLatch(20); for (int n = 0; n < 20; n++) { threadPoolExecutor.execute(() -> { try { } finally { c.countDown(); } }); } try { c.await(); } catch (InterruptedException ignore) {} System.err.println("============== task done, batch " + i + " =============="); } System.err.println("============== all done =============="); }
|
当每一个小循环执行完毕后,间隔 100ms 再执行下一个小循环, 通过这种方法控制了任务提交的频率,ok,启动!
结果是它并没有像我想象的那样奏效; 接着我尝试了扩大主线程的睡眠时间, 从 500ms 一直扩大到 2000ms, 结果还是一样, 在执行了几批之后, 就会触发线程池的拒绝策略, 这里就可以排除我之前的猜想, 我当时的猜想认为这个交接的间隙应该是很短的,可能就是几十毫秒到几百毫秒, 当我把主线程的睡眠时间延长到秒级别依旧会触发拒绝策略的时候,这个猜想就已经没有继续排查的必要了。
第二查
第二个排查点就是CountDownLatch了, 我尝试将CountDownLatch的去掉来验证,将代码改造成这样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) { for (int i = 0; i < 100; i++) { for (int n = 0; n < 20; n++) { threadPoolExecutor.execute(() -> { try { } finally {
} }); } System.err.println("============== task done, batch " + i + " =============="); } System.err.println("============== all done =============="); }
|
启动验证, 跑到第 36 批的时候,依旧是触发了拒绝策略,CountDownLatch并没有起到设想中的作用, 反而去掉CountDownLatch之后,跑的批次还变多了, 这就很奇怪了。
尝试降低单批次提交任务的数量以及调高线程池的最大线程和阻塞队列长度,结果是, 后者起作用了
当我降低单批次提交任务的数量时 ,代码修改成了这样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) { for (int i = 0; i < 100; i++) { for (int n = 0; n < 2; n++) { threadPoolExecutor.execute(() -> { try { } finally {
} }); } System.err.println("============== task done, batch " + i + " =============="); } System.err.println("============== all done =============="); }
|
每一批提交两个任务, 一共提交 100 批, 多次测试发现, 有些时候, 程序能够正常跑完, 有些时候又会触发拒绝策略;
调高线程池的最大线程和阻塞队列长度, 代码修改成了这样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| final static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(32, 32, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(16 * 10));
public static void main(String[] args) { for (int i = 0; i < 100; i++) { CountDownLatch c = new CountDownLatch(20); for (int n = 0; n < 20; n++) { threadPoolExecutor.execute(() -> { try {
} catch (InterruptedException ignore) { } finally { c.countDown(); } }); } try { c.await(); } catch (InterruptedException ignore) {} System.err.println("============== task done, batch " + i + " =============="); } System.err.println("============== all done =============="); }
|
结果是程序每次执行都能正常跑完, 不会再触发拒绝策略, 这里我的猜想又回到了第一查的时候, 我认为就是线程池有延时导致的,第一查中排除掉了任务提交间隙,线程池对上一批任务做收尾工作导致线程未及时释放的猜想, 那么线程池中还有哪些操作会产生延时或者说是间隙时间呢? 源码之下无秘密, 第三查,启动!
第三查
进入 ThreadPoolExecutor源码,我们通过 execute 方法向线程池提交了任务, execute方法有三个主要分支,
一、通过 ctl 变量判断了当前的工作线程数量是否小于核心线程数, 如果小于则会新建一个线程来执行当前的任务,然后返回;(ctl变量是一个AtomicInteger类型的原子整数,低29位表示线程池中当前活动的工作线程数(workerCount),高3位表示线程池的状态(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED等))
二、判断线程池是否在运行, 并且将任务添加到队列中成功,则再次判断线程池的状态,如果线程池停止并且移除任务成功, 则触发拒绝策略;如果任务已加入到了队列中,但是没有工作线程的,则会创建一个非核心线程来从队列中获取任务来执行。
三、尝试创建非核心线程来执行任务,如果新增线程失败,则触发拒绝策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
为了简化验证过程,我们将业务代码中线程池的参数和任务量调小, 使得 debug 更加可控,将代码调整为如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| final static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
public static void main(String[] args) { for (int i = 0; i < 100; i++) { for (int n = 0; n < 2; n++) { threadPoolExecutor.execute(() -> { try {
} catch (InterruptedException ignore) { } }); } System.err.println("============== all done =============="); }
|
通过断点调试,可以发现在第一个 小循环执行完毕时, 线程池的工作线程就已经满了, 因此在第二个小循环执行时,会进入到execute 中第二个分支中,将任务放入队列中,这里的workQueue.offer()方法扮演了一个生产者的角色, 对应的workQueue.take()则是扮演的消费则的角色,take 方法从队列中取出并移除头元素,如果队列为空,则当前线程会被阻塞,直至有元素加入队列
1 2 3 4 5 6 7 8 9 10 11
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
|
图片是第二次小循环进入调试信息, 可以看到核心线程为 2,这是第一个小循环是创建的两个线程, 所以第二次小循环进入了第二个分支,调用了 workQueue 的 offer 方法将任务放入了队列, 此时 workQueue的 size = 1;
接着 take 方法会从队列中拿走任务,然后 offer 方法会继续放入任务, 发现问题了么? take 和 offer 两个方法之间, 是可能存在错位执行的可能性的, 而队列长度为 1, 如果发生了错位执行或者是时延, 那么队列长度就会超, 则就会触发拒绝策略, 这就是问题的触发点。
验证
验证方法很简单, 就是让 offer 方法睡一会,确保任务已被 take 方法拿走
将ArrayBlockingQueue复制一份,在 offer 方法中的 finally代码块中添加睡眠代码, 并将任务代码中的线程池队列设置为自定义的ArrayBlockingQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueueRyan<>(1));
public static void main(String[] args) { for (int i = 0; i < 100; i++) { try { Thread.sleep(100); } catch (InterruptedException ignore) { } for (int n = 0; n < 2; n++) { threadPoolExecutor.execute(() -> { try {
Thread.sleep(100); } catch (InterruptedException ignore) { } }); } } System.err.println("============== all done =============="); }
|
执行发现,不会再触发拒绝策略了
当将阻塞队列换成 没有改造过的ArrayBlockingQueue,则依旧会触发拒绝策略
至此,问题点已经可以确认了, 因为生产者消费者之间的错序,导致了队列中的任务没有被及时拿走,从而导致了积压,最后触发拒绝策略; 改造队列源码这种方法在开发中肯定是禁止的, 在日常开发中,我们可以通过调大队列的长度来解决这个问题。
总结
源码之下无秘密, 少看八股,多看源码, 别人总结的八股,看了背了,也总是似是而非,好像懂了又好像没懂,也无法运用自如。