如何从源码上分析JUC线程池ThreadPoolExecutor的实现原理,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。很早之前就打算看一次JUC线程池ThreadPoolExecutor
的源码实现,由于近段时间比较忙,一直没有时间整理出源码分析的文章。之前在分析扩展线程池实现可回调的Future
时候曾经提到并发大师Doug Lea
在设计线程池ThreadPoolExecutor
的提交任务的顶层接口Executor
只有一个无状态的执行方法:而ExecutorService
提供了很多扩展方法底层基本上是基于Executor#execute()
方法进行扩展。本文着重分析ThreadPoolExecutor#execute()
的实现,笔者会从实现原理、源码实现等角度结合简化例子进行详细的分析。ThreadPoolExecutor
的源码从JDK8
到JDK11
基本没有变化,本文编写的时候使用的是JDK11
。ThreadPoolExecutor
里面使用到JUC同步器框架AbstractQueuedSynchronizer
(俗称AQS
)、大量的位操作、CAS
操作。ThreadPoolExecutor
提供了固定活跃线程(核心线程)、额外的线程(线程池容量 – 核心线程数这部分额外创建的线程,下面称为非核心线程)、任务队列以及拒绝策略这几个重要的功能。ThreadPoolExecutor
里面使用到JUC同步器框架,主要用于四个方面:关于AQS
笔者之前写过一篇相关源码分析的文章:JUC同步器框架AbstractQueuedSynchronizer源码图文分析。这里先参考ThreadPoolExecutor
的实现并且进行简化,实现一个只有核心线程的线程池,要求如下:某次运行结果如下:设计此线程池的时候,核心线程是懒创建的,如果线程空闲的时候则阻塞在任务队列的take()
方法,其实对于ThreadPoolExecutor
也是类似这样实现,只是如果使用了keepAliveTime
并且允许核心线程超时(allowCoreThreadTimeOut
设置为true
)则会使用BlockingQueue#poll(keepAliveTime)
进行轮询代替永久阻塞。构建ThreadPoolExecutor
实例的时候,需要定义maximumPoolSize
(线程池最大线程数)和corePoolSize
(核心线程数)。当任务队列是有界的阻塞队列,核心线程满负载,任务队列已经满的情况下,会尝试创建额外的maximumPoolSize - corePoolSize
个线程去执行新提交的任务。当ThreadPoolExecutor
这里实现的两个主要附加功能是:先分析线程池的关键属性,接着分析其状态控制,最后重点分析ThreadPoolExecutor#execute()
方法。下面看参数列表最长的构造函数:可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:状态控制主要围绕原子整型成员变量ctl
:接下来分析一下线程池的状态变量,工作线程上限数量位的长度是COUNT_BITS
,它的值是Integer.SIZE - 3
,也就是正整数29:我们知道,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于存放0或者1。在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种。工作线程上限数量为2^29 – 1,超过5亿,这个数量在短时间内不用考虑会超限。接着看工作线程上限数量掩码COUNT_MASK
,它的值是(1 ,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:
然后就是线程池的状态常量,这里只详细分析其中一个,其他类同,这里看RUNNING
状态:控制变量ctl
的组成就是通过线程池运行状态rs
和工作线程数wc
通过「或运算」得到的:那么我们怎么从ctl
中取出高3位?上面源码中提供的runStateOf()
方法就是提取运行状态:同理,取出低29位只需要把ctl
和COUNT_MASK
(000-11111111111111111111111111111
)做一次与运算即可。小结一下线程池的运行状态常量:这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(「甚至可以忽略低29位,直接用ctl
进行比较,或者使用ctl
和线程池状态常量进行比较」)来比较和判断线程池的状态:RUNNING(-536870912) ❞
下面这三个方法就是使用这种技巧:最后是线程池状态的跃迁图:PS:线程池源码中有很多中间变量用了简单的单字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status。线程池异步执行任务的方法实现是ThreadPoolExecutor#execute()
,源码如下:这里简单分析一下整个流程:「这里是一个疑惑点」:为什么需要二次检查线程池的运行状态,当前工作线程数量为0,尝试创建一个非核心线程并且传入的任务对象为null?这个可以看API注释:如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程。任务提交流程从调用者的角度来看如下:boolean addWorker(Runnable firstTask, boolean core)
方法的第一的参数可以用于直接传入任务实例,第二个参数用于标识将要创建的工作线程是否核心线程。方法源码如下:笔者发现了Doug Lea
大神十分喜欢复杂的条件判断,而且单行复杂判断不喜欢加花括号,像下面这种代码在他编写的很多类库中都比较常见:上面的分析逻辑中需要注意一点,Worker
实例创建的同时,在其构造函数中会通过ThreadFactory
创建一个Java线程Thread
实例,后面会加锁后二次检查是否需要把Worker
实例添加到工作线程集合workers
中和是否需要启动Worker
中持有的Thread
实例,只有启动了Thread
实例实例,Worker
才真正开始运作,否则只是一个无用的临时对象。Worker
本身也实现了Runnable
接口,它可以看成是一个Runnable
的适配器。线程池中的每一个具体的工作线程被包装为内部类Worker
实例,Worker
继承于AbstractQueuedSynchronizer(AQS)
,实现了Runnable
接口:Worker
的构造函数里面的逻辑十分重要,通过ThreadFactory
创建的Thread
实例同时传入Worker
实例,因为Worker
本身实现了Runnable
,所以可以作为任务提交到线程中执行。只要Worker
持有的线程实例w
调用Thread#start()
方法就能在合适时机执行Worker#run()
。简化一下逻辑如下:Worker
继承自AQS
,这里使用了AQS
的独占模式,这里有个技巧是构造Worker
的时候,把AQS
的资源(状态)通过setState(-1)
设置为-1,这是因为Worker
实例刚创建时AQS
中state
的默认值为0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()
方法。Worker
中两个覆盖AQS
的方法tryAcquire()
和tryRelease()
都没有判断外部传入的变量,前者直接CAS(0,1)
,后者直接setState(0)
。接着看核心方法ThreadPoolExecutor#runWorker()
:这里重点拆解分析一下判断当前工作线程中断状态的代码:Thread.interrupted()
方法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个方法是因为在执行上面这个if
逻辑同时外部有可能调用shutdownNow()
方法,shutdownNow()
方法中也存在中断所有Worker
线程的逻辑,但是由于shutdownNow()
方法中会遍历所有Worker
做线程中断,有可能无法及时在任务提交到Worker
执行之前进行中断,所以这个中断逻辑会在Worker
内部执行,就是if
代码块的逻辑。这里还要注意的是:STOP
状态下会拒绝所有新提交的任务,不会再执行任务队列中的任务,同时会中断所有Worker
线程。也就是,「即使任务Runnable已经runWorker()
中前半段逻辑取出,只要还没走到调用其Runnable#run(),都有可能被中断」。假设刚好发生了进入if
代码块的逻辑同时外部调用了shutdownNow()
方法,那么if
逻辑内会判断线程中断状态并且重置,那么shutdownNow()
方法中调用的interruptWorkers()
就不会因为中断状态判断出现问题导致二次中断线程(会导致异常)。小结一下上面runWorker()
方法的核心流程:接下来分析一下从任务队列中获取任务的getTask()
方法和处理线程退出的后续工作的方法processWorkerExit()
。getTask()
方法是工作线程在while
死循环中获取任务队列中的任务对象的方法:这个方法中,有两处十分庞大的if
逻辑,对于第一处if
可能导致工作线程数减去1直接返回null
的场景有:对于第二处if
,逻辑有点复杂,先拆解一下:这段逻辑大多数情况下是针对非核心线程。在execute()
方法中,当线程池总数已经超过了corePoolSize
并且还小于maximumPoolSize
时,当任务队列已经满了的时候,会通过addWorker(task,false)
添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)
的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize
。如果对于非核心线程,上一轮循环获取任务对象为null
,这一轮循环很容易满足timed && timedOut
为true,这个时候getTask()
返回null会导致Worker#runWorker()
方法跳出死循环,之后执行processWorkerExit()
方法处理后续工作,而该非核心线程对应的Worker
则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut
设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出keepAliveTime
的意义:在一些特定的场景下,配置合理的keepAliveTime
能够更好地利用线程池的工作线程资源。processWorkerExit()
方法是为将要终结的Worker
做一次清理和数据记录工作(因为processWorkerExit()
方法也包裹在runWorker()
方法finally
代码块中,其实工作线程在执行完processWorkerExit()
方法才算真正的终结)。代码的后面部分区域,会判断线程池的状态,如果线程池是RUNNING
或者SHUTDOWN
状态的前提下,如果当前的工作线程由于抛出用户异常被终结,那么会新创建一个非核心线程。如果当前的工作线程并不是抛出用户异常被终结(正常情况下的终结),那么会这样处理:processWorkerExit()
执行完毕之后,意味着该工作线程的生命周期已经完结。每个工作线程终结的时候都会调用tryTerminate()
方法:这里有疑惑的地方是tryTerminate()
方法的第二个if
代码逻辑:工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程。方法API注释中有这样一段话:If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate. 当满足终结线程池的条件但是工作线程数不为0,这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。下面将会分析的shutdown()
方法中会通过interruptIdleWorkers()
中断所有的空闲线程,这个时候有可能有非空闲的线程在执行某个任务,执行任务完毕之后,如果它刚好是核心线程,就会在下一轮循环阻塞在任务队列的take()
方法,如果不做额外的干预,它甚至会在线程池关闭之后永久阻塞在任务队列的take()
方法中。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。interruptIdleWorkers()
方法中会对每一个工作线程先进行tryLock()
判断,只有返回true
才有可能进行线程中断。我们知道runWorker()
方法中,工作线程在每次从任务队列中获取到非null的任务之后,会先进行加锁Worker#lock()
操作,这样就能避免线程在执行任务的过程中被中断,保证被中断的一定是空闲的工作线程。线程池关闭操作有几个相关的变体方法,先看shutdown()
:接着看shutdownNow()
方法:shutdownNow()
方法会把线程池状态先更变为STOP
,中断AbstractQueuedSynchronizer
的state
值大于0的Worker
实例,也就是包括正在执行任务的Worker
和空闲的Worker
),然后遍历任务队列,取出(移除)所有任务存放在一个列表中返回。最后看awaitTermination()
方法:awaitTermination()
虽然不是shutdown()
方法体系,但是它的处理逻辑就是确保调用此方法的线程会阻塞到tryTerminate()
方法成功把线程池状态更新为TERMINATED
后再返回,可以使用在某些需要感知线程池终结时刻的场景。有一点值得关注的是:shutdown()
方法「只会中断空闲的工作线程」,如果工作线程正在执行任务对象Runnable#run()
,这种情况下的工作线程不会中断,而是等待下一轮执行getTask()
方法的时候通过线程池状态判断正常终结该工作线程。private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = 香港云主机 mainLock.newCondition();先看了ThreadPoolExecutor
内部成员属性mainLock
的引用情况:归结一下mainLock
的使用场景:这里分析一下线程池如何通过可重入锁和条件变量实现相对优雅地关闭。先看shutdown()
方法:这里shutdown()
中除了tryTerminate()
,其他它方法都是包裹在锁里面执行,「确保工作线程集合稳定性以及关闭权限、确保状态变更串行化,中断所有工作线程并且避免工作线程”中断风暴”」(多次并发调用shutdown()
如果不加锁,会反复中断工作线程)。shutdownNow()
方法其实加锁的目的和shutdown()
差不多,不过多了一步:导出任务队列中的剩余的任务实例列表。awaitTermination()
方法中使用到前面提到过的条件变量termination
:awaitTermination()
方法的核心功能是:确保当前调用awaitTermination()
方法的线程阻塞等待对应的时间或者线程池状态变更为TERMINATED
,再退出等待返回结果,这样能够让使用者输入一个可以接受的等待时间进行阻塞等待,或者线程池在其他线程中被调用了shutdown()
方法状态变更为TERMINATED
就能正常解除阻塞。awaitTermination()
方法的返回值为布尔值,true
代表线程池状态变更为TERMINATED
或者等待了输入时间范围内的时间周期被唤醒,意味则线程池正常退出,结果为false
代表等待了超过输入时间范围内的时间周期,线程池的状态依然没有更变为TERMINATED
。线程池中的工作线程如何优雅地退出,不导致当前任务执行丢失、任务状态异常或者任务持有的数据异常,是一个很值得探讨的专题,以后有机会一定会分析一下这个专题。reject(Runnable command)
方法很简单:调用线程池持有的成员RejectedExecutionHandler
实例回调任务实例和当前线程池实例。到JDK11
为止,ThreadPoolExecutor
提供的钩子方法没有增加,有以下几个:其中onShutdown()
的方法修饰符为default
,其他三个方法的修饰符为protected
,必要时候可以自行扩展这些方法,可以实现监控、基于特定时机触发具体操作等等。线程池本身提供了大量数据统计相关的方法、扩容方法、预创建方法等等,这些方法的源码并不复杂,这里不做展开分析。「核心线程相关:」「线程池容量相关:」「线程存活周期相关:」「其他监控统计相关方法:」「任务队列操作相关方法:」有部分属性值的设置有可能影响到线程池中的状态或者工作线程的增减等,例如核心线程数改变,有可能会直接增减Worker
,这里就以ThreadPoolExecutor#setCorePoolSize()
为例:这里else if (delta > 0)后面的代码块中有一段描述,翻译一下:我们并不知道真正情况下”需要”多少新的工作线程。作为一种启发式处理方式,预先启动足够多的新的工作线程(直到数量为核心线程池大小)来处理队列中当前的任务,但如果在这样做时队列变为空,则停止创建新的工作线程。看完上述内容,你们掌握如何从源码上分析JUC线程池ThreadPoolExecutor的实现原理的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注开发云行业资讯频道,感谢各位的阅读!
相关推荐: tailf、tail -f、tail -F三者区别是什么
这篇文章主要介绍tailf、tail -f、tail -F三者区别是什么,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!tail -f 等同于–follow=descriptor,根据文件描述符进行追踪,当文件改名或被删除,追踪停止ta…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。