欢迎光临散文网 会员登陆 & 注册

Java 线程池ThreadPoolExecutor execute & addWorker源码分析

2022-03-09 21:06 作者:房顶上的铝皮水塔  | 我要投稿

本文参考自:从Java构建线程的方式 到 线程池ThreadPoolExecutor源码剖析

本文分成以下几个部分:

  1. 创建线程的方式

  2. ThreadPoolExecutor概述

  3. ThreadPoolExecutor#execute ThreadPoolExecutor#addWorker方法解析

创建线程的方式

继承自Thread

写一个类继承自Thread,并且调用start方法即可开启线程。这个是老生常谈的实现方法了,继承Thread的时候需要实现run,如果直接调用run的话是无法开启线程的。调用start最后会调用到native的start0方法,然后如果是Linux的话,底层使用的是Linux的api pthread。关于这点我在这篇文章中谈及过,并且说了一下sychronized锁的实现。有兴趣的同学可以看看~

通过FutureTask+Callable实现

然后将创建好的FutureTask的实例放到Thread中执行。Callable是有返回值的,我们可以通过get获取到。get是一个阻塞的,底层的实现我稍微看了一下是基于LockSupport#park实现的阻塞。

使用线程池实现

使用线程池主要类似于这样:

其实JDK也提供了一些默认的线程池创建方法,但是一般都不推荐使用,因为这些方法可能不符合我们常规的业务需求。所以一般都使用手动创建的方式实现。

ThreadExecutorPool概述

重要的常量

ctl这个AtomicInteger是基于自旋锁+CAS操作实现的自旋锁我在这篇文章也聊过。Executor本身也有生命周期,根据数值大小排序的生命周期状态是:

Running < Shutdown < Stop < Tidying < Terminated

  • Running:当前的线程池中的线程正常运行,而且线程池接受新的Runnable

  • Shutdown:在调用了shutdown方法会走到这个状态,并且此时不接收新的Runnable,但是会将阻塞队列中的Runnable处理完成

  • Stop:在调用shutdownNow之后会走到这个状态,不接收新的Runnable,同时会暂停正在执行的线程

  • Tidying:是一个中间的过渡状态,可能做一个内容的清理工作等等。

  • Terminated:在调用terminated方法之后会到这个状态,线程池结束

我直接拿过来课程中的图:

拒绝策略

AbortPolicy

直接会抛出异常

CallerRunsPolicy

这块直接调用了Runnable实现的run方法。run方法会在Excecutor所在的线程中执行,所以如果是耗时操作也会出现问题。

DiscardPolicy

直接会放弃,什么都不会做

DiscardOldestPolicy

会尝试获取ThreadPoolExecutor中的队列,然后将队列头,也就是最开始的一个出栈。  

源码分析

任务加入线程池是一个这样的过程:

首先会询问核心线程是否有没有分配到的,通常是和核心线程数进行比较。如果核心线程都满了,就会通过阻塞队列进行缓冲。如果阻塞队列都放慢了,就会看非核心线程是否到了最大的线程数,如果达到了最大线程,就会执行拒绝策略。

下面我们会通过看execute+addWorker的源码来还原这个过程。

ThreadPoolExecutor#execute

execute的代码中首先通过ctl进行位运算的分解获取当前的工作线程数,优先使用核心线程。然后下面会放入到阻塞队列中,如果阻塞队列中都放不下,再会看工作线程是否达到最大线程数。如果以上的执行都不能放入这个任务,就执行拒绝策略。

上面的代码中可以看到前面两步,也就是询问核心线程和询问阻塞队列是否放满,第三步看工作线程是否达到最大线程数是在addWorker中的

ThreadPoolExecutor#addWorker

part1 判断部分

addWorker的代码拆分成两部分来看,第一部分是进行一些条件判断:

特别是if中的第二个条件有点复杂,传入firstTask为空的情况是当阻塞队列中有任务,但是工作线程为0时,一般情况下firstTask肯定不为null。具体三种情况为什么需要返回false的原因我写在注释中了~

下面我们来看看执行的逻辑

part2 执行部分

总的来说addWorker除了进行Worker的构建和添加到Workers之外,还进行了Worker中线程的启动,这块是真正执行我们定义的逻辑的地方。

我们再来看看Worker:

因为Worker本身也是一个Runnable,所以当调用start的时候会执行Woker的run方法,Worker#run调用了runWorker。在runWorker中,我们重写的run会被执行。同时提供了两个钩子:beforeExecute和afterExecute,这两个方法本身是空实现,我们可以自行定义执行一些操作。

作为判断条件的代码我使用黄色底的字体标记出来了,具体的逻辑就是这样。

ThreadPoolExecutor#getTask

getTask其实就是从阻塞队列wokerQueue中获取task这样一件事情。

线程池的线程复用逻辑


这块直接上图,在addWorker中会执行Worker中的Thread#start,我们知道执行完成start之后就不能再次调用start。线程池与其说他是复用Thread,不如说他是不断地向Thread中填充新的Runnable,然后调用run,减少了创建Thread的开销。我们仔细看看addWorker的核心代码:

不断地从workerQueue中取出新的Task,然后执行run。如果wokerQueue为空,getTask就会阻塞,等到有了新的Task再执行。

总结时间

  1. JDK中提供了一些可以直接启动线程池的方式,但是我们最好自己写一个ThreadPoolExecutor进行调整参数。ThreadPoolExecutor有以下几个核心参数:核心线程数、最大线程数、线程存活时间、阻塞队列

  2. ThreadPoolExecutor是有5中状态的,Running,Shutdown,Stop,Tidying,terminated。

  3. execute比较好理解,我们使用Runnable添加到ThreadPoolExecutor之后,首先会创建核心线程,核心线程其实就是一个标志位为true的Worker。Worker内部有一个Thread,会在addWorker方法中启动(Thread#start)。但是ThreadPoolExecutor其实并不会立刻【放过】Worker中的Thread。如果后续有runnable被放到阻塞队列之后,会从阻塞队列中读取。这点其实也是复用机制的关键。


Java 线程池ThreadPoolExecutor execute & addWorker源码分析的评论 (共 条)

分享到微博请遵守国家法律