Java并发学习之线程池ThreadPoolExecutor的小结
本篇博文将带着问题来回顾小结多线程池相关的知识点
- 线程池的几种创建方式
- 线程池的优点是什么
- 应用场景
- 如何使用
- 实现原理
- 异常状况怎么处理
- 线程池中任务的提交执行后,到线程执行,执行完成的整个流程逻辑
- 线程池中的线程回收机制
I. 什么是线程池
1. 通俗讲解
我们先举一个小例子来说一下什么是线程池,以及线程池的工作方式
首先在看一下线程池中提交一个任务的流程图
下面就是实际的case:基本上大家都去过银行,我们就以到银行的柜台上办理业务的流程来说明线程池,我们先假设这里有一个xx银行(这里是广告位,待租😉),总共有8个柜台,平时只开放4个柜台,大厅内总共有20个座位。
- 那么来一个办理业务的,如果开放的四个柜台上,有空的,直接上去办理业务即可
- 如果四个柜台都在处理业务了,那么办理业务则需要取一个号,到大厅的座位上等着叫号
- 如果大厅坐满了,银行经理决定开放所有的柜台,那么新来办理的人直接到新的柜台上处理
- 如果所有柜台都在处理,且大厅也满了,这个时候就告诉新来办理业务的现在已经满载了,你们到xxx地的银行去办理吧(或者回家等下午再来好了)
从流程上的对比来看,就很相似了,虽然实际上银行可不会因为人的太多来新增开放柜台的数量,下面简单的将上面的case映射到线程池的成员上
- 4个开放柜台 : 对应线程池的corePoolSize(核心工作线程数)
- 8个总柜台:对应线程池的maximumPoolSize(最大工作线程数)
- 20个座位:对应线程池的workQueue(任务队列)
所以线程池中提交一个任务时,优先看核心工作线程数是否已满,未满时,直接创建线程执行;已满,则丢入队列;如果队列也满了,则判断工作线程数是否超过最大数,没有则直接创建线程执行;否则直接“丢弃”这个任务了 (注意这个丢弃不是真的丢弃,其处理策略可以由你自己定义)
上面是基本流程,并没有涉及到工作线程的回收,线程池的状态(比如银行是否打烊了),任务的执行策略等
2. 线程池说明
线程池是一种多线程的处理机制,主要是为了减少线程的频繁创建和销毁,从而提升系统效率
使用线程池优点
- 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
- 可以根据系统的承受能力,调整线程池中工作线线程的数量
使用线程池场景
我们将线程进行拆分,创建线程耗时T1, 线程执行耗时T2, 销毁线程耗时T3
如果你的场景中,提交线程执行的任务非常频繁,且具体的执行耗时较短,即 T1 + T3 > T2, 这种场景下使用线程池可以带来明显的性能提升
一般来说,如果不是你的任务只偶尔的运行几次,那么绝大部分场景都适合用线程池来处理
3. 线程池组成
类定义: java.util.concurrent.ThreadPoolExecutor
构造
1 | // 线程池构造方法 |
II. 线程池使用
1. 构造参数详解
构造参数较多,创建一个线程池,当然首先得搞清楚这些参数是干嘛用的
参数 | 含义 | 说明 |
---|---|---|
corePoolSize | 核心工作线程数 | 没有任务时,线程池中允许存在的最小空闲线程数 工作线程数 < corePoolSize 时,提交任务创建工作线程来执行任务 |
maximumPoolSize | 最大工作线程数 | 线程池中允许出现的最大工作线程数量 当 队列满 && 工作线程数 < maximumPoolSize 时,新的队列将创建线程来执行;如果 队列没有边界 ,那么这个参数没有意义 |
workQueue | 任务队列 | 保存待执行任务的阻塞队列; 当 (工作线程数 >= corePoolSize) && (任务数 < 任务队列长度) 时,任务会offer()入队等待 |
keepAliveTime | 工作线程最大空闲时间 | 当线程数 > corePoolSize 时,这个参数表示空闲线程存活时间; 超时的空闲线程,会被回收掉,直到 线程数==corePoolSzie ; 当 allowCoreThreadTimeOut=true 时,则超时的核心工作线程也会被回收 |
unit | 时间单位 | keepAliveTime的时间单位 |
threadFactory | 线程创建工厂 | 创建线程的工厂类,可以在这里指定创建线程的name,设置守护线程,异常case处理等 |
handler | 饱和策略执行器 | 线程池和队列都已满时,新提交任务的处理策略 默认是Abort(直抛Reject异常),包括Discard(LIFO规则丢弃)、DiscardOldest(LRU规则丢弃) 以及 CallerRuns(调用者线程执行),允许自定义执行器 |
2. 线程池的创建
直接调用构造方法创建
最直观的方式,直接构造方法new一个
1 | // 报警线程池 |
利用 Executors创建
jdk1.5+ 中提供了 java.util.concurrent.Executors
来创建常见的集中线程池方式
关于各种线程池的说明可以参考: Java并发学习之玩转线程池
固定大小线程池
1 | // 创建一个固定大小的线程池 |
工作窃取线程池
1 | public static ExecutorService newWorkStealingPool(int parallelism) { |
创建单线程池
1 | public static ExecutorService newSingleThreadExecutor() { |
缓存线程池
1 | public static ExecutorService newCachedThreadPool() { |
定时任务线程池
1 | public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { |
不可配置线程池
1 | public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { |
3. 提交任务
execute: 提交无须返回值的任务
submit(Runnable): 适用于提交需要返回值的任务
- 相比较于上面的,区别是这个会返回一个 Future
对象,通过调用future.get()可以获取线程的返回值, - 其中这个方程是线程阻塞的,直到返回了结果之后,才会继续执行下去
4. 关闭线程池
shutdown(): 有序地关闭线程池,已提交的任务会被执行(包含正在执行和任务队列中的),但会拒绝新任务
shutdownNow(): 立即(尝试)停止执行所有任务(包含正在执行和任务队列中的),并返回待执行任务列表
III. 线程池实现原理
1. 线程池状态
线程池状态流程如下:
RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED
每个状态含义
1 | //高3位111,低29位为0 该状态下线程池会接收新提交任务和执行队列任务 |
2. 任务提交逻辑
最开始的流图就说明了任务提交后的流程,针对流程块也就不继续细说,只提一个注意点
若实际工作线程数workers<核心工作线程数corePoolSize,则创建新工作线程来执行新任务execute(Runable)
若实际工作线程数workers>=核心工作线程数corePoolSize(核心工作线程们都在执行任务)且任务队列workQueue未满,则将任务加入到任务队列workQueue中
若任务队列workQueue已满,则创建新工作线程来执行任务execute()
若实际工作线程数workers>=最大工作线程数maximumPoolSize(所有线程都在执行任务),此时任务数已饱和,需要根据饱和拒绝策略rejectedExecutionHandler执行相对应的饱和拒绝操作
线程池的总体设计是基于性能考虑,尽可能避免获取全局锁:
由于创建新线程时都需要获取全局锁,因此步骤1和步骤3必须加锁
为了避免多次获取全局锁(性能伸缩瓶颈),当实际工作线程数>=核心工作线程数时,之后会执行步骤2(入队时无须获取全局锁)
线程池内线程回收策略
若实际工作线程数workers>核心工作线程数corePoolSize,回收空闲时间超过keepAliveTime的空闲的非核心线程(减少工作线程数直到<=核心工作线程数即可)
若设置allowCoreThreadTimeOut为true时,则超过keepAliveTime的空闲的核心工作线程也会被回收
3. 任务执行
说明,下面两段代码解析来自转载: 并发番@ThreadPoolExecutor
execute() - 提交任务
1 | /** |
上面的代码虽然非常少,但是逻辑还是比较多的,创建线程是根据 addWorker
方法来实现的,其主要逻辑为
1 | /** |
小问:快速检测线程状态时,情况1.2、2.1、2.3的意义是什么?
小答:在阐明这个问题之前,我们先明确两个知识点:
新增Worker的目的是处理任务,任务来源分初始任务和队列任务(即剩余的待处理任务)
线程池在非RUNNING状态下是不允许接收新任务的,换句话说您都要下班了,难道还想接新需求?
针对2.1 - > 线程池状态==SHUTDOWN,但firstTask!= null,不允许新增Worker
当线程池状态为SHUTDOWN时,由于不允许接收新任务,因此一旦firstTask!= null需要直接拒绝
针对2.2 - > 线程池状态==SHUTDOWN,且firstTask == null, 但队列为空, 不允许新增Worker
当firstTask为null时,说明调用addWorker()目的不是为了处理新增任务
那么其目的应该是为了处理剩余任务,即队列中的任务,而一旦队列为空,那也没必要新增Worker了
针对1.2 - > 若线程池状态==SHUTDOWN,必须满足firstTask为null且队列非空,才允许新增Worker
当线程池状态为SHUTDOWN时(调用shutdown()),此时不允许接收新任务,因此firstTask必须为null
但需要处理剩余任务,因此队列必须非空,否则新增的工作线程就无任务可做,那就没意义了
结论:传入一个空任务的目的是为了新增工作线程去处理任务队列中的剩余任务
3. Worker类详解
worker包装了任务的调度,用于封装工作线程和任务并管理工作线程的中断状态等功能
由于工作线程和worker实例是一对一的关系,因为可以简单的理解工作线程等价于worker,尤其是谈及数量时,比如创建工作线程实际上就是创建一个worker
线程在线程池执行任务的工作流程:
工作线程开始执行前,需先对worker加锁,任务完成解锁
任务执行前后分别执行beforeExecute()和afterExecute()方法
执行中遇到异常会向外抛出,线程是否死亡取决于您对于异常的处理
每个任务执行完后,当前工作线程任务完成数自增,同时会循环调用getTask()从任务队列中反复获取任务并执行,无任务可执行时线程会阻塞在该方法上
当工作线程因各种理由退出时,会执行processWorkerExit()回收线程(核心是将该worker从workers集合中移除,注意之前worker已经退出任务循环,因此已经不再做工了,从集合移除后就方便gc了)
问:worker中断如何控制的
当工作线程真正开始执行之前,不允许被中断
当工作线程正在执行任务时,不允许被中断
当工作线程正等待从任务队列中获取任务getTask()时才能被中断
调用interruptIdleWorkers()中断空闲线程时必须先获得worker锁
问:为什么Worker不被设计成可重入锁?
由于在动态控制方法中可能会中断线程,比如调用interruptIdleWorkers(),由此该方法在执行interrupt()之前会调用worker.tryLock(),若此时允许重入,就会导致线程被意外中断,这跟当工作线程正在执行任务时,不允许被中断准则是相违背的
IV. 问题解答
1. 如何创建线程池
直接根据构造方法创建
1 | java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, |
利用 Executors 创建线程池
1 | java.util.concurrent.Executors#newFixedThreadPool(int) |
2. 线程池的适用场景
优点
减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
可以根据系统的承受能力,调整线程池中工作线线程的数量
使用线程池场景
我们将线程进行拆分,创建线程耗时T1, 线程执行耗时T2, 销毁线程耗时T3
如果你的场景中,提交线程执行的任务非常频繁,且具体的执行耗时较短,即 T1 + T3 > T2, 这种场景下使用线程池可以带来明显的性能提升
一般来说,如果不是你的任务只偶尔的运行几次,那么绝大部分场景都适合用线程池来处理
3. 如何使用线程池
创建线程池,提交任务
- execute 适用于提交没有返回结果的任务
- submit 适用于提交有返回结果的任务, 返回一个Futrure的包装类
4. 线程池实现原理 & 任务提交后的流程
在实现原理中会穿插上任务提交后的流程,所以就放在一起了
首先从提交一个任务开始:
- 首先判断工作线程数是否小于核心工作线程数,是则直接创建工作线程执行
- 否,则将任务丢入任务队列中
- 若任务队列已满,且工作线程数 < 最大工作线程数,则直接创建工作线程执行任务
- 若队列满,且工作线程数达到最大值,则采用拒绝任务策略
其中上面的任务进队or创建线程执行,都需要关注线程池的状态,每个状态对应的原则
状态 | 说明 | 限制 |
---|---|---|
RUNNING | 运行状态 | 线程池会接收新提交任务和执行队列任务 |
SHUTDOWN | 关闭状态 | 线程池不再接收新任务,但还会继续执行队列任务 |
STOP | 停止状态 | 不再接收新任务,不会再执行队列任务,并会中断正在执行中的任务 |
TIDYING | 整理状态 | 所有任务都被终止,工作线程数为0,期间会调用钩子方法terminated() |
TERMINATED | 终止状态 | 线程池terminated()方法已经调用完成 |
接着上面,工作线程执行完毕之后,会尝试从任务队列中获取任务来执行,如果队列为空,则阻塞;此时工作线程空闲
根据工作线程的回收机制
- 允许回收核心工作线程时,将所有空闲时间大于keepAliveTime的线程回收掉
- 不允许回收核心工作线程,回收空闲时间大于keepAliveTime的线程,知道工作线程数量为核心工作线程数
5. 异常状况处理
submit()异常处理
- 异常会保存在Future对象的ExecutionException中,可以在调用get()使用try-catch方式捕获,有N个任务有异常就会抛出来N个异常,但不会终止当前工作线程
- 单独设置UncaughtExceptionHandler没卵用,但结合(3)使用就有效
- 允许在submit()方法内部用try-catch捕获该异常,同样不会终止当前线程
- 若想在内部处理异常,还可以重写afterExecute()方法,
execute()异常处理
- 默认会在execute()方法内部直接抛出异常,注意这不会中断线程池运行,但会终止当前工作线程,并重新创建新的工作线程执行该任务
- 允许在execute()方法内部用try-catch捕获该异常,好处是不会终止当前线程并重新创建一个新的线程了
- 重写afterExecute()方法
- 还可以设置UncaughtExceptionHandler
一个实例如下:
1 | ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(), |
6. 线程池关闭
关闭线程池主要有两种方式,两者的区别是:
- shutdown() : 队列剩余任务全部执行完毕再终止
- shutdownNow() : 放弃执行队列剩余任务,但会将它们返回
两者的共性在于:
- 正在执行中的任务会继续执行,不会被终止或放弃
- 新提交的任务会被直接拒绝
V. 其他
参考
个人博客: Z+|blog
基于hexo + github pages搭建的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛
声明
尽信书则不如,已上内容,纯属一家之言,因本人能力一般,见识有限,如发现bug或者有更好的建议,随时欢迎批评指正,我的微博地址: 小灰灰Blog