Java 线程池ThreadPoolExecutor execute & addWorker源码分析
本文参考自:从Java构建线程的方式 到 线程池ThreadPoolExecutor源码剖析

本文分成以下几个部分:
创建线程的方式
ThreadPoolExecutor概述
ThreadPoolExecutor#execute ThreadPoolExecutor#addWorker方法解析

创建线程的方式
继承自Thread
写一个类继承自Thread,并且调用start方法即可开启线程。这个是老生常谈的实现方法了,继承Thread的时候需要实现run,如果直接调用run的话是无法开启线程的。调用start最后会调用到native的start0方法,然后如果是Linux的话,底层使用的是Linux的api pthread。关于这点我在这篇文章中谈及过,并且说了一下sychronized锁的实现。有兴趣的同学可以看看~
通过FutureTask+Callable实现
然后将创建好的FutureTask的实例放到Thread中执行。Callable是有返回值的,我们可以通过get获取到。get是一个阻塞的,底层的实现我稍微看了一下是基于LockSupport#park实现的阻塞。
使用线程池实现
使用线程池主要类似于这样:
其实JDK也提供了一些默认的线程池创建方法,但是一般都不推荐使用,因为这些方法可能不符合我们常规的业务需求。所以一般都使用手动创建的方式实现。

ThreadExecutorPool概述
重要的常量
ctl这个AtomicInteger是基于自旋锁+CAS操作实现的自旋锁我在这篇文章也聊过。Executor本身也有生命周期,根据数值大小排序的生命周期状态是:
Running < Shutdown < Stop < Tidying < Terminated
Running:当前的线程池中的线程正常运行,而且线程池接受新的Runnable
Shutdown:在调用了shutdown方法会走到这个状态,并且此时不接收新的Runnable,但是会将阻塞队列中的Runnable处理完成
Stop:在调用shutdownNow之后会走到这个状态,不接收新的Runnable,同时会暂停正在执行的线程
Tidying:是一个中间的过渡状态,可能做一个内容的清理工作等等。
Terminated:在调用terminated方法之后会到这个状态,线程池结束
我直接拿过来课程中的图:

拒绝策略
AbortPolicy
直接会抛出异常
CallerRunsPolicy
这块直接调用了Runnable实现的run方法。run方法会在Excecutor所在的线程中执行,所以如果是耗时操作也会出现问题。
DiscardPolicy
直接会放弃,什么都不会做
DiscardOldestPolicy
会尝试获取ThreadPoolExecutor中的队列,然后将队列头,也就是最开始的一个出栈。

源码分析
任务加入线程池是一个这样的过程:
首先会询问核心线程是否有没有分配到的,通常是和核心线程数进行比较。如果核心线程都满了,就会通过阻塞队列进行缓冲。如果阻塞队列都放慢了,就会看非核心线程是否到了最大的线程数,如果达到了最大线程,就会执行拒绝策略。
下面我们会通过看execute+addWorker的源码来还原这个过程。
ThreadPoolExecutor#execute
execute的代码中首先通过ctl进行位运算的分解获取当前的工作线程数,优先使用核心线程。然后下面会放入到阻塞队列中,如果阻塞队列中都放不下,再会看工作线程是否达到最大线程数。如果以上的执行都不能放入这个任务,就执行拒绝策略。
上面的代码中可以看到前面两步,也就是询问核心线程和询问阻塞队列是否放满,第三步看工作线程是否达到最大线程数是在addWorker中的
ThreadPoolExecutor#addWorker
part1 判断部分
addWorker的代码拆分成两部分来看,第一部分是进行一些条件判断:
特别是if中的第二个条件有点复杂,传入firstTask为空的情况是当阻塞队列中有任务,但是工作线程为0时,一般情况下firstTask肯定不为null。具体三种情况为什么需要返回false的原因我写在注释中了~
下面我们来看看执行的逻辑
part2 执行部分
总的来说addWorker除了进行Worker的构建和添加到Workers之外,还进行了Worker中线程的启动,这块是真正执行我们定义的逻辑的地方。
我们再来看看Worker:
因为Worker本身也是一个Runnable,所以当调用start的时候会执行Woker的run方法,Worker#run调用了runWorker。在runWorker中,我们重写的run会被执行。同时提供了两个钩子:beforeExecute和afterExecute,这两个方法本身是空实现,我们可以自行定义执行一些操作。
作为判断条件的代码我使用黄色底的字体标记出来了,具体的逻辑就是这样。

ThreadPoolExecutor#getTask
getTask其实就是从阻塞队列wokerQueue中获取task这样一件事情。
线程池的线程复用逻辑

这块直接上图,在addWorker中会执行Worker中的Thread#start,我们知道执行完成start之后就不能再次调用start。线程池与其说他是复用Thread,不如说他是不断地向Thread中填充新的Runnable,然后调用run,减少了创建Thread的开销。我们仔细看看addWorker的核心代码:
不断地从workerQueue中取出新的Task,然后执行run。如果wokerQueue为空,getTask就会阻塞,等到有了新的Task再执行。
总结时间
JDK中提供了一些可以直接启动线程池的方式,但是我们最好自己写一个ThreadPoolExecutor进行调整参数。ThreadPoolExecutor有以下几个核心参数:核心线程数、最大线程数、线程存活时间、阻塞队列
ThreadPoolExecutor是有5中状态的,Running,Shutdown,Stop,Tidying,terminated。
execute比较好理解,我们使用Runnable添加到ThreadPoolExecutor之后,首先会创建核心线程,核心线程其实就是一个标志位为true的Worker。Worker内部有一个Thread,会在addWorker方法中启动(Thread#start)。但是ThreadPoolExecutor其实并不会立刻【放过】Worker中的Thread。如果后续有runnable被放到阻塞队列之后,会从阻塞队列中读取。这点其实也是复用机制的关键。