Java并发学习之线程池ThreadPoolExecutor的小结

文章目录
  1. Java并发学习之线程池ThreadPoolExecutor的小结
    1. I. 什么是线程池
      1. 1. 通俗讲解
      2. 2. 线程池说明
      3. 3. 线程池组成
    2. II. 线程池使用
      1. 1. 构造参数详解
      2. 2. 线程池的创建
        1. 直接调用构造方法创建
        2. 利用 Executors创建
      3. 3. 提交任务
      4. 4. 关闭线程池
    3. III. 线程池实现原理
      1. 1. 线程池状态
      2. 2. 任务提交逻辑
      3. 3. 任务执行
        1. execute() - 提交任务
      4. 3. Worker类详解
    4. IV. 问题解答
      1. 1. 如何创建线程池
      2. 2. 线程池的适用场景
      3. 3. 如何使用线程池
      4. 4. 线程池实现原理 & 任务提交后的流程
      5. 5. 异常状况处理
      6. 6. 线程池关闭
    5. V. 其他
      1. 参考
      2. 个人博客: Z+|blog
      3. 声明
      4. 扫描关注

Java并发学习之线程池ThreadPoolExecutor的小结

本篇博文将带着问题来回顾小结多线程池相关的知识点

  1. 线程池的几种创建方式
  2. 线程池的优点是什么
  3. 应用场景
  4. 如何使用
  5. 实现原理
  6. 异常状况怎么处理
  7. 线程池中任务的提交执行后,到线程执行,执行完成的整个流程逻辑
  8. 线程池中的线程回收机制

I. 什么是线程池

1. 通俗讲解

我们先举一个小例子来说一下什么是线程池,以及线程池的工作方式

首先在看一下线程池中提交一个任务的流程图

流程图

下面就是实际的case:基本上大家都去过银行,我们就以到银行的柜台上办理业务的流程来说明线程池,我们先假设这里有一个xx银行(这里是广告位,待租😉),总共有8个柜台,平时只开放4个柜台,大厅内总共有20个座位。

  • 那么来一个办理业务的,如果开放的四个柜台上,有空的,直接上去办理业务即可
  • 如果四个柜台都在处理业务了,那么办理业务则需要取一个号,到大厅的座位上等着叫号
  • 如果大厅坐满了,银行经理决定开放所有的柜台,那么新来办理的人直接到新的柜台上处理
  • 如果所有柜台都在处理,且大厅也满了,这个时候就告诉新来办理业务的现在已经满载了,你们到xxx地的银行去办理吧(或者回家等下午再来好了)

从流程上的对比来看,就很相似了,虽然实际上银行可不会因为人的太多来新增开放柜台的数量,下面简单的将上面的case映射到线程池的成员上

  • 4个开放柜台 : 对应线程池的corePoolSize(核心工作线程数)
  • 8个总柜台:对应线程池的maximumPoolSize(最大工作线程数)
  • 20个座位:对应线程池的workQueue(任务队列)

所以线程池中提交一个任务时,优先看核心工作线程数是否已满,未满时,直接创建线程执行;已满,则丢入队列;如果队列也满了,则判断工作线程数是否超过最大数,没有则直接创建线程执行;否则直接“丢弃”这个任务了 (注意这个丢弃不是真的丢弃,其处理策略可以由你自己定义)

上面是基本流程,并没有涉及到工作线程的回收,线程池的状态(比如银行是否打烊了),任务的执行策略等

2. 线程池说明

线程池是一种多线程的处理机制,主要是为了减少线程的频繁创建和销毁,从而提升系统效率

使用线程池优点

  1. 减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  2. 可以根据系统的承受能力,调整线程池中工作线线程的数量

使用线程池场景

我们将线程进行拆分,创建线程耗时T1, 线程执行耗时T2, 销毁线程耗时T3

如果你的场景中,提交线程执行的任务非常频繁,且具体的执行耗时较短,即 T1 + T3 > T2, 这种场景下使用线程池可以带来明显的性能提升

一般来说,如果不是你的任务只偶尔的运行几次,那么绝大部分场景都适合用线程池来处理

3. 线程池组成

类定义: java.util.concurrent.ThreadPoolExecutor

构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 线程池构造方法
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 存活时间
TimeUnit unit,
BlockingQueue<Runnable> workQueue, // 排队队列
ThreadFactory threadFactory, // 创建线程的工作类
RejectedExecutionHandler handler) // 线程数满,队列满时具体任务策略
{
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

II. 线程池使用

1. 构造参数详解

构造参数较多,创建一个线程池,当然首先得搞清楚这些参数是干嘛用的

参数 含义 说明
corePoolSize 核心工作线程数 没有任务时,线程池中允许存在的最小空闲线程数
工作线程数 < corePoolSize时,提交任务创建工作线程来执行任务
maximumPoolSize 最大工作线程数 线程池中允许出现的最大工作线程数量
队列满 && 工作线程数 < maximumPoolSize时,新的队列将创建线程来执行;
如果队列没有边界,那么这个参数没有意义
workQueue 任务队列 保存待执行任务的阻塞队列;
(工作线程数 >= corePoolSize) && (任务数 < 任务队列长度)时,任务会offer()入队等待
keepAliveTime 工作线程最大空闲时间 线程数 > corePoolSize时,这个参数表示空闲线程存活时间;
超时的空闲线程,会被回收掉,直到线程数==corePoolSzie;
allowCoreThreadTimeOut=true时,则超时的核心工作线程也会被回收
unit 时间单位 keepAliveTime的时间单位
threadFactory 线程创建工厂 创建线程的工厂类,可以在这里指定创建线程的name,设置守护线程,异常case处理等
handler 饱和策略执行器 线程池和队列都已满时,新提交任务的处理策略
默认是Abort(直抛Reject异常),包括Discard(LIFO规则丢弃)、DiscardOldest(LRU规则丢弃) 以及 CallerRuns(调用者线程执行),允许自定义执行器

2. 线程池的创建

直接调用构造方法创建

最直观的方式,直接构造方法new一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 报警线程池
ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(10),
new DefaultThreadFactory("test-thread"),
new ThreadPoolExecutor.CallerRunsPolicy());


// 线程创建工厂,主要设置为非守护线程,指定线程名,设置优先级
// 关于这个工厂类,推荐看netty的实现
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public DefaultThreadFactory(String poolName) {
if (null == poolName) {
poolName = "pool";
}
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + poolNumber.getAndIncrement() + "-thread-";
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

利用 Executors创建

jdk1.5+ 中提供了 java.util.concurrent.Executors 来创建常见的集中线程池方式

关于各种线程池的说明可以参考: Java并发学习之玩转线程池

固定大小线程池

1
2
3
4
5
6
// 创建一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

工作窃取线程池

1
2
3
4
5
6
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

创建单线程池

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

缓存线程池

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

定时任务线程池

1
2
3
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

不可配置线程池

1
2
3
4
5
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}

3. 提交任务

execute: 提交无须返回值的任务

submit(Runnable): 适用于提交需要返回值的任务

  • 相比较于上面的,区别是这个会返回一个 Future 对象,通过调用future.get()可以获取线程的返回值,
  • 其中这个方程是线程阻塞的,直到返回了结果之后,才会继续执行下去

4. 关闭线程池

shutdown(): 有序地关闭线程池,已提交的任务会被执行(包含正在执行和任务队列中的),但会拒绝新任务

shutdownNow(): 立即(尝试)停止执行所有任务(包含正在执行和任务队列中的),并返回待执行任务列表

III. 线程池实现原理

1. 线程池状态

线程池状态流程如下:

RUNNING -> SHUTDOWN -> STOP -> TIDYING -> TERMINATED

每个状态含义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//高3位111,低29位为0 该状态下线程池会接收新提交任务和执行队列任务
private static final int RUNNING = -1 << COUNT_BITS;

//高3位000,低29位为0 该状态下线程池不再接收新任务,但还会继续执行队列任务
private static final int SHUTDOWN = 0 << COUNT_BITS;

//高3位001,低29位为0 该状态下线程池不再接收新任务,不会再执行队列任务,并会中断正在执行中的任务
private static final int STOP = 1 << COUNT_BITS;

//高3位010,低29位为0 该状态下线程池的所有任务都被终止,工作线程数为0,期间会调用钩子方法terminated()
private static final int TIDYING = 2 << COUNT_BITS;

//高3位011,低29位为0 该状态下表明线程池terminated()方法已经调用完成
private static final int TERMINATED = 3 << COUNT_BITS;

2. 任务提交逻辑

最开始的流图就说明了任务提交后的流程,针对流程块也就不继续细说,只提一个注意点

  1. 若实际工作线程数workers<核心工作线程数corePoolSize,则创建新工作线程来执行新任务execute(Runable)

  2. 若实际工作线程数workers>=核心工作线程数corePoolSize(核心工作线程们都在执行任务)且任务队列workQueue未满,则将任务加入到任务队列workQueue中

  3. 若任务队列workQueue已满,则创建新工作线程来执行任务execute()

  4. 若实际工作线程数workers>=最大工作线程数maximumPoolSize(所有线程都在执行任务),此时任务数已饱和,需要根据饱和拒绝策略rejectedExecutionHandler执行相对应的饱和拒绝操作

线程池的总体设计是基于性能考虑,尽可能避免获取全局锁:

  1. 由于创建新线程时都需要获取全局锁,因此步骤1和步骤3必须加锁

  2. 为了避免多次获取全局锁(性能伸缩瓶颈),当实际工作线程数>=核心工作线程数时,之后会执行步骤2(入队时无须获取全局锁)

线程池内线程回收策略

  1. 若实际工作线程数workers>核心工作线程数corePoolSize,回收空闲时间超过keepAliveTime的空闲的非核心线程(减少工作线程数直到<=核心工作线程数即可)

  2. 若设置allowCoreThreadTimeOut为true时,则超过keepAliveTime的空闲的核心工作线程也会被回收

3. 任务执行

说明,下面两段代码解析来自转载: 并发番@ThreadPoolExecutor

execute() - 提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/**
* 1.若实际工作线程数 < 核心工作线程数,会尝试创建一个工作线程去执行该
* 任务,即该command会作为该线程的第一个任务,即第一个firstTask
*
* 2.若任务入队成功,仍需要执行双重校验,原因有两点:
* - 第一个是去确认是否需要新建一个工作线程,因为可能存在
* 在上次检查后已经死亡died的工作线程
* - 第二个是可能在进入该方法后线程池被关闭了,
* 比如执行shutdown()
* 因此需要再次检查state状态,并分别处理以上两种情况:
* - 若线程池中已无可用工作线程了,则需要新建一个工作线程
* - 若线程池已被关闭,则需要回滚入队列(若有必要)
*
* 3.若任务入队失败(比如队列已满),则需要新建一个工作线程;
* - 若新建线程失败,说明线程池已停止或者已饱和,必须执行拒绝策略
*/
public void execute(Runnable command) {

//新任务不允许为空,空则抛出NPE
if (command == null)
throw new NullPointerException();

// ctl 为线程池状态控制器,用于保证线程池状态和工作线程数
// 低29位为工作线程数量,高3位为线程池状态
int c = ctl.get();

/**
* case1:当实际工作线程数 < 核心工作线程数时
* 执行方案:会创建一个新的工作线程去执行该任务
* 注意:此时即使有其他空闲的工作线程也还是会新增工作线程,
* 直到达到核心工作线程数为止
*/
if (workerCountOf(c) < corePoolSize) {

/**
* 新增工作线程,true表示要对比的是核心工作线程数
* 一旦新增成功就开始执行当前任务
* 期间也会通过自旋获取队列任务进行执行
*/
if (addWorker(command, true))
return;

/**
* 需要重新获取控制器状态,说明新增线程失败
* 线程失败的原因可能有两种:
* - 1.线程池已被关闭,非RUNNING状态的线程池是不允许接收新任务的
* - 2.并发时,假如都通过了workerCountOf(c) < corePoolSize校验,但其他线程
* 可能会在addWorker前先创建出线程,导致workerCountOf(c) >= corePoolSize,
* 即实际工作线程数 >= 核心工作线程数,此时需要进入case2
*/
c = ctl.get();
}

/**
* case2:当实际工作线程数>=核心线程数时,新提交任务需要入队
* 执行方案:一旦入队成功,仍需要处理线程池状态突变和工作线程死亡的情况
*/
if (isRunning(c) && workQueue.offer(command)) {

//双重校验
int recheck = ctl.get();

/**
* recheck的目的是为了防止线程池状态的突变 - 即被关闭
* 一旦线程池非RUNNING状态时,除了从队列中移除该任务(回滚)外
* 还需要执行任务拒绝策略处理新提交的任务
*/
if (!isRunning(recheck) && remove(command))
//执行任务拒绝策略
reject(command);

/**
* 若线程池还是RUNNING状态 或
* 队列移除失败(可能正好被一个工作线程拿到处理了)
* 此时需要确保至少有一个工作线程还可以干活
* 补充一句:之所有无须与核心工作线程数或最大线程数相比,而只是比较0的原因是
* 只要保证有一个工作线程可以干活就行,它会自动去获取任务
*/
else if (workerCountOf(recheck) == 0)
/**
* 若工作线程都已死亡,需要新增一个工作线程去干活
* 死亡原因可能是线程超时或者异常等等复杂情况
*
* 第一个参数为null指的是传入一个空任务,
* 目的是创建一个新工作线程去处理队列中的剩余任务
* 第二个参数为false目的是提示可以扩容到最大工作线程数
*/
addWorker(null, false);
}

/**
* case3:一旦线程池被关闭 或者 新任务入队失败(队列已满)
* 执行方案:会尝试创建一个新的工作线程,并允许扩容到最大工作线程数
* 注意:一旦创建失败,比如超过最大工作线程数,需要执行任务拒绝策略
*/
else if (!addWorker(command, false))
//执行任务拒绝策略
reject(command);
}

上面的代码虽然非常少,但是逻辑还是比较多的,创建线程是根据 addWorker方法来实现的,其主要逻辑为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
/**
* 新增工作线程需要遵守线程池控制状态规定和边界限制
*
* @param core core为true时允许扩容到核心工作线程数,否则为最大工作线程数
* @return 新增成功返回true,失败返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {

//重试标签
retry:

/***
* 外部自旋 -> 目的是确认是否能够新增工作线程
* 允许新增线程的条件有两个:
* 1.满足线程池状态条件 -> 条件一
* 2.实际工作线程满足数量边界条件 -> 条件二
* 不满足条件时会直接返回false,表示新增工作线程失败
*/
for (;;) {

//读取原子控制量 - 包含workerCount(实际工作线程数)和runState(线程池状态)
int c = ctl.get();

//读取线程池状态
int rs = runStateOf(c);

/**
* 条件一.判断是否满足线程池状态条件
* 1.只有两种情况允许新增线程:
* 1.1 线程池状态==RUNNING
* 1.2 线程池状态==SHUTDOWN且firstTask为null同时队列非空
*
* 2.线程池状态>=SHUTDOWN时不允许接收新任务,具体如下:
* 2.1 线程池状态>SHUTDOWN,即为STOP、TIDYING、TERMINATED
* 2.2 线程池状态==SHUTDOWN,但firstTask非空
* 2.3 线程池状态==SHUTDOWN且firstTask为空,但队列为空
* 补充:针对1.2、2.2、2.3的情况具体请参加后面的"小问答"环节
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

/***
* 内部自旋 -> 条件二.判断实际工作线程数是否满足数量边界条件
* -数量边界条件满足会对尝试workerCount实现CAS自增,否则新增失败
* -当CAS失败时会再次重新判断是否满足新增条件:
* 1.若此期间线程池状态突变(被关闭),重新判断线程池状态条件和数量边界条件
* 2.若此期间线程池状态一致,则只需重新判断数量边界条件
*/
for (;;) {

//读取实际工作线程数
int wc = workerCountOf(c);

/**
* 新增工作线程会因两种实际工作线程数超标情况而失败:
* 1.实际工作线程数 >= 最大容量
* 2.实际工作线程数 > 工作线程比较边界数(当前最大扩容数)
* -若core = true,比较边界数 = 核心工作线程数
* -若core = false,比较边界数 = 最大工作线程数
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;

/**
* 实际工作线程计数CAS自增:
* 1.一旦成功直接退出整个retry循环,表明新增条件都满足
* 2.因并发竞争导致CAS更新失败的原因有三种:
* 2.1 线程池刚好已新增一个工作线程
* -> 计数增加,只需重新判断数量边界条件
* 2.2 刚好其他工作线程运行期发生错误或因超时被回收
* -> 计数减少,只需重新判断数量边界条件
* 2.3 刚好线程池被关闭
* -> 计数减少,工作线程被回收,
* 需重新判断线程池状态条件和数量边界条件
*/
if (compareAndIncrementWorkerCount(c))
break retry;

//重新读取原子控制量 -> 原因是在此期间可能线程池被关闭了
c = ctl.get();

/**
* 快速检测是否发生线程池状态突变
* 1.若状态突变,重新判断线程池状态条件和数量边界条件
* 2.若状态一致,则只需重新判断数量边界条件
*/
if (runStateOf(c) != rs)
continue retry;
}
}

/**
* 这里是addWorker方法的一个分割线
* 前面的代码的作用是决定了线程池接受还是拒绝新增工作线程
* 后面的代码的作用是真正开始新增工作线程并封装成Worker接着执行后续操作
* PS:虽然笔者觉得这个方法其实可以拆分成两个方法的(在break retry的位置)
*/

//记录新增的工作线程是否开始工作
boolean workerStarted = false;

//记录新增的worker是否成功添加到workers集合中
boolean workerAdded = false;

Worker w = null;
try {

//将新提交的任务和当前线程封装成一个Worker
w = new Worker(firstTask);

//获取新创建的实际工作线程
final Thread t = w.thread;

/**
* 检测是否有可执行任务的线程,即是否成功创建了新的工作线程
* 1.若存在,则选择执行任务
* 2.若不存在,则需要执行addWorkerFailed()方法
*/
if (t != null) {

/**
* 新增工作线程需要加全局锁
* 目的是为了确保安全更新workers集合和largestPoolSize
*/
final ReentrantLock mainLock = this.mainLock;

mainLock.lock();
try {

/**
* 获得全局锁后,需再次检测当前线程池状态
* 原因在于预防两种非法情况:
* 1.线程工厂创建线程失败
* 2.在锁被获取之前,线程池就被关闭了
*/
int rs = runStateOf(ctl.get());

/**
* 只有两种情况是允许添加work进入works集合的
* 也只有进入workers集合后才是真正的工作线程,并开始执行任务
* 1.线程池状态为RUNNING(即rs<SHUTDOWN)
* 2.线程池状态为SHUTDOWN且传入一个空任务
* (理由参见:小问答之快速检测线程池状态?)
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {

/**
* 若线程处于活动状态时,说明线程已启动,需要立即抛出"线程状态非法异常"
* 原因是线程是在后面才被start的,已被start的不允许再被添加到workers集合中
* 换句话说该方法新增线程时,而线程是新的,本身应该是初始状态(new)
* 可能出现的场景:自定义线程工厂newThread有可能会提前启动线程
*/
if (t.isAlive())
throw new IllegalThreadStateException();

//由于加锁,所以可以放心的加入集合
workers.add(w);
int s = workers.size();

//更新最大工作线程数,由于持有锁,所以无需CAS
if (s > largestPoolSize)
largestPoolSize = s;

//确认新建的worker已被添加到workers集合中
workerAdded = true;
}
} finally {

//千万不要忘记主动解锁
mainLock.unlock();
}

/**
* 一旦新建工作线程被加入工作线程集合中,就意味着其可以开始干活了
* 有心的您肯定发现在线程start之前已经释放锁了
* 原因在于一旦workerAdded为true时,说明锁的目的已经达到
* 根据最小化锁作用域的原则,线程执行任务无须加锁,这是种优化
* 也希望您在使用锁时尽量保证锁的作用域最小化
*/
if (workerAdded) {

/**
* 启动线程,开始干活啦
* 若您看过笔者的"并发番@Thread一文通"肯定知道start()后,
* 一旦线程初始化完成便会立即调用run()方法
*/
t.start();

//确认该工作线程开始干活了
workerStarted = true;
}
}
} finally {

//若新建工作线程失败或新建工作线程后没有成功执行,需要做新增失败处理
if (!workerStarted)
addWorkerFailed(w);
}

//返回结果表明新建的工作线程是否已启动执行
return workerStarted;
}

小问:快速检测线程状态时,情况1.2、2.1、2.3的意义是什么?

小答:在阐明这个问题之前,我们先明确两个知识点:

  1. 新增Worker的目的是处理任务,任务来源分初始任务和队列任务(即剩余的待处理任务)

  2. 线程池在非RUNNING状态下是不允许接收新任务的,换句话说您都要下班了,难道还想接新需求?

针对2.1 - > 线程池状态==SHUTDOWN,但firstTask!= null,不允许新增Worker
当线程池状态为SHUTDOWN时,由于不允许接收新任务,因此一旦firstTask!= null需要直接拒绝

针对2.2 - > 线程池状态==SHUTDOWN,且firstTask == null, 但队列为空, 不允许新增Worker
当firstTask为null时,说明调用addWorker()目的不是为了处理新增任务
那么其目的应该是为了处理剩余任务,即队列中的任务,而一旦队列为空,那也没必要新增Worker了

针对1.2 - > 若线程池状态==SHUTDOWN,必须满足firstTask为null且队列非空,才允许新增Worker
当线程池状态为SHUTDOWN时(调用shutdown()),此时不允许接收新任务,因此firstTask必须为null
但需要处理剩余任务,因此队列必须非空,否则新增的工作线程就无任务可做,那就没意义了
结论:传入一个空任务的目的是为了新增工作线程去处理任务队列中的剩余任务

3. Worker类详解

worker包装了任务的调度,用于封装工作线程和任务并管理工作线程的中断状态等功能

由于工作线程和worker实例是一对一的关系,因为可以简单的理解工作线程等价于worker,尤其是谈及数量时,比如创建工作线程实际上就是创建一个worker

线程在线程池执行任务的工作流程:

  1. 工作线程开始执行前,需先对worker加锁,任务完成解锁

  2. 任务执行前后分别执行beforeExecute()和afterExecute()方法

  3. 执行中遇到异常会向外抛出,线程是否死亡取决于您对于异常的处理

  4. 每个任务执行完后,当前工作线程任务完成数自增,同时会循环调用getTask()从任务队列中反复获取任务并执行,无任务可执行时线程会阻塞在该方法上

  5. 当工作线程因各种理由退出时,会执行processWorkerExit()回收线程(核心是将该worker从workers集合中移除,注意之前worker已经退出任务循环,因此已经不再做工了,从集合移除后就方便gc了)

问:worker中断如何控制的

  1. 当工作线程真正开始执行之前,不允许被中断

  2. 当工作线程正在执行任务时,不允许被中断

  3. 当工作线程正等待从任务队列中获取任务getTask()时才能被中断

  4. 调用interruptIdleWorkers()中断空闲线程时必须先获得worker锁

问:为什么Worker不被设计成可重入锁?

由于在动态控制方法中可能会中断线程,比如调用interruptIdleWorkers(),由此该方法在执行interrupt()之前会调用worker.tryLock(),若此时允许重入,就会导致线程被意外中断,这跟当工作线程正在执行任务时,不允许被中断准则是相违背的

IV. 问题解答

1. 如何创建线程池

直接根据构造方法创建

1
2
3
4
5
java.util.concurrent.ThreadPoolExecutor#ThreadPoolExecutor(int, int, long,
java.util.concurrent.TimeUnit,
java.util.concurrent.BlockingQueue<java.lang.Runnable>,
java.util.concurrent.ThreadFactory,
java.util.concurrent.RejectedExecutionHandler)

利用 Executors 创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
java.util.concurrent.Executors#newFixedThreadPool(int)

java.util.concurrent.Executors#newWorkStealingPool(int)

java.util.concurrent.Executors#newSingleThreadExecutor()

java.util.concurrent.Executors#newCachedThreadPool()

java.util.concurrent.Executors#newSingleThreadScheduledExecutor()

java.util.concurrent.Executors#newScheduledThreadPool(int)

java.util.concurrent.Executors#unconfigurableExecutorService

2. 线程池的适用场景

优点

减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
可以根据系统的承受能力,调整线程池中工作线线程的数量

使用线程池场景

我们将线程进行拆分,创建线程耗时T1, 线程执行耗时T2, 销毁线程耗时T3

如果你的场景中,提交线程执行的任务非常频繁,且具体的执行耗时较短,即 T1 + T3 > T2, 这种场景下使用线程池可以带来明显的性能提升

一般来说,如果不是你的任务只偶尔的运行几次,那么绝大部分场景都适合用线程池来处理

3. 如何使用线程池

创建线程池,提交任务

  • execute 适用于提交没有返回结果的任务
  • submit 适用于提交有返回结果的任务, 返回一个Futrure的包装类

4. 线程池实现原理 & 任务提交后的流程

在实现原理中会穿插上任务提交后的流程,所以就放在一起了

首先从提交一个任务开始:

  • 首先判断工作线程数是否小于核心工作线程数,是则直接创建工作线程执行
  • 否,则将任务丢入任务队列中
  • 若任务队列已满,且工作线程数 < 最大工作线程数,则直接创建工作线程执行任务
  • 若队列满,且工作线程数达到最大值,则采用拒绝任务策略

其中上面的任务进队or创建线程执行,都需要关注线程池的状态,每个状态对应的原则

状态 说明 限制
RUNNING 运行状态 线程池会接收新提交任务和执行队列任务
SHUTDOWN 关闭状态 线程池不再接收新任务,但还会继续执行队列任务
STOP 停止状态 不再接收新任务,不会再执行队列任务,并会中断正在执行中的任务
TIDYING 整理状态 所有任务都被终止,工作线程数为0,期间会调用钩子方法terminated()
TERMINATED 终止状态 线程池terminated()方法已经调用完成

接着上面,工作线程执行完毕之后,会尝试从任务队列中获取任务来执行,如果队列为空,则阻塞;此时工作线程空闲

根据工作线程的回收机制

  • 允许回收核心工作线程时,将所有空闲时间大于keepAliveTime的线程回收掉
  • 不允许回收核心工作线程,回收空闲时间大于keepAliveTime的线程,知道工作线程数量为核心工作线程数

5. 异常状况处理

submit()异常处理

  1. 异常会保存在Future对象的ExecutionException中,可以在调用get()使用try-catch方式捕获,有N个任务有异常就会抛出来N个异常,但不会终止当前工作线程
  2. 单独设置UncaughtExceptionHandler没卵用,但结合(3)使用就有效
  3. 允许在submit()方法内部用try-catch捕获该异常,同样不会终止当前线程
  4. 若想在内部处理异常,还可以重写afterExecute()方法,

execute()异常处理

  1. 默认会在execute()方法内部直接抛出异常,注意这不会中断线程池运行,但会终止当前工作线程,并重新创建新的工作线程执行该任务
  2. 允许在execute()方法内部用try-catch捕获该异常,好处是不会终止当前线程并重新创建一个新的线程了
  3. 重写afterExecute()方法
  4. 还可以设置UncaughtExceptionHandler

一个实例如下:

1
2
3
4
5
6
7
8
9
10
11
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3, TimeUnit.SECONDS, new LinkedBlockingQueue(), 
//我们自定义一个线程工厂和重写线程的setUncaughtExceptionHandler方法
new ThreadFactory() {
final AtomicInteger threadNumber = new AtomicInteger(1);
public Thread newThread(Runnable r) {
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), r, "thread-"
+ (threadNumber.getAndIncrement()));
thread.setUncaughtExceptionHandler((t,e) -> System.out.println(e));
return thread;
}
});

6. 线程池关闭

关闭线程池主要有两种方式,两者的区别是:

  • shutdown() : 队列剩余任务全部执行完毕再终止
  • shutdownNow() : 放弃执行队列剩余任务,但会将它们返回

两者的共性在于:

  • 正在执行中的任务会继续执行,不会被终止或放弃
  • 新提交的任务会被直接拒绝

V. 其他

参考

个人博客: Z+|blog

基于hexo + github pages搭建的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

声明

尽信书则不如,已上内容,纯属一家之言,因本人能力一般,见识有限,如发现bug或者有更好的建议,随时欢迎批评指正,我的微博地址: 小灰灰Blog

扫描关注

QrCode

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×