1. 为什么要使用线程池?
这点在注释最开始说的很清楚:
Thread pools address two different problems
- they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead,
- and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.
其中第一点也可以这样简单描述:
Reusing threads that have already been created instead of creating new ones (an expensive process)
Answer from Thread vs ThreadPool on Stackoverflow
2. Worst pool
在一切开始之前, 先思考一个问题: 仅就”重用线程”这个首要目标来说, 该如何实现一个线程池? 比较明确的几点是:
1. 一定创建固定数目的`Thread`, 任务由`Runnable`形式交给`Thread`执行.
2. 根据线程的生命周期, 一个线程执行完成即进入 *Dead* 状态, 由此可知`Thread`不能停, 需要一直维持 *Running* 状态. 也就是说, 需要在其中执行一个”死循环”.
于是最开始想到是这样的写法:
1 | public class PoolThread { |
这样实现需要在每个新任务到来时遍历PoolThread
, 判断其工作状态从而决定是否将任务提交给他. 而且弹性比较差, 当所有线程空闲时, 新任务无法处理.
这里提前剧透, 看了ThreadPoolExecutor
的实现, 是引入了一个生产-消费模型, Runnable
直接进入队列(实际上有的并不是, 后面再提), 而Thread从队列中消费.
有了这些信息, 已经可以写出一个辣鸡队列了, 姑且称之为WorstPool
. 代码如下:
1 | import java.util.concurrent.BlockingQueue; |
调用代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public static void main(String[] args) {
WorstPool pool = new WorstPool(2);
for (int i = 0; i < 6; i++) {
final int id = i;
pool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " DONE WITH " + id);
});
}
}
可以运行下看看结果~
3. ThreadPoolExecutor
ThreadPoolExecutor
和WorstPool
的主要模型是一致的. 了解后者之后, ThreadPoolExecutor
就比较容易切入. 先尝试将WorstPool
的成员对应到ThreadPoolExecutor
. 之后再根据ThreadPoolExecutor
的各种特性逐个了解.
3.1 找相同 - Worker & BlockingQueue
观察ThreadPoolExecutor
的类成员, 可以很容易找到和WorstPool
的对应.
- `BlockingQueue<Runnable> workQueue` - `taskQueue` in `WorstPool`
- 包含一个`Thread`成员的内部类`Worker` - `WorstPool`中没有做封装, 直接是一个`Thread`
3.1.1 Worker
Worker
成员较少, 可以先关注下面几个
- `Thread thread`, 用来执行任务的线程
- `Runnable task`, 在创建`Worker`会同时为其指定一个任务. 后续的任务将从队列中获取.
在ThreadPoolExecutor
中, Worker
不是在线程池初始化时创建的. 而是在提交任务时创建的, 即在客户端调用入口execute(Runnable cmd)
方法中. Worker
的创建以及执行任务流程大致如下:
addWorker
方法中, Worker
实例会被加入到HashSet<Worker> workers
. 用于线程池管理所有的Worker
.
Worker数量
Worker
数量可能不是固定的, 在execute()
中, 判断是否需要新建worker
主要看corePoolSize
和maximumPoolSize
. 注释讲的很清楚:
A ThreadPoolExecutor will automatically adjust the pool size according to the bounds set by corePoolSize and maximumPoolSize.
When a new task is submitted in method execute, and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
AbstractQueuedSynchronizer
Worker
本身继承了AbstractQueuedSynchronizer
, 相关方法有lock()
, tryLock()
,isLocked()
,unlock()
. 等后续单独介绍AbstractQueuedSynchronizer
, 暂时可以简单理解为, 内部维护了一个ReentrantLock
实例. 具体使用后面会提到~
Key Words
Worker workQueue corePoolSize maximumPoolSize AbstractQueuedSynchronizer thread workers
3.2 ctl
ThreadPoolExecutor
中有一个乍看起来有些令人困惑的成员 - AtomicInteger ctl
The main pool control state, ctl, is an atomic integer packing two conceptual fields: workerCount, runState.
实际上一些相关的位操作只是为了将两个变量封装到一个中: 高三位表示runState
. 剩下的表示workCount
WorkerCount
workerCount
可以先简单地根据字面理解为Worker
数量, 实际上却不大精准:
The workerCount is the number of workers that have been permitted to start and not permitted to stop.
// TODO
workerCountOf
, ctlOf
等几个方法都是很简单的二进制操作, 下面详细说明.
Bit操作详细说明
这里必须上源码了:
1 | int CAPACITY = (1 << (Integer.SIZE - 3)) - 1; |
CAPACITY
为1
左移29位. 它的二进制表示为:
0001 1111 1111 1111 1111 1111 1111 1111~CAPACITY
1110 0000 0000 0000 0000 0000 0000 0000
这样看就很好理解了. 两者一个是高位为1, 一个是低位为1, 这样在按位与操作时就会忽略其他为0的Bit, 达到封装两个值的效果. 可以看出workerCount
的长度受限于29个bit, 最大为 (2 ^ 29) - 1. 而runState
只有6种可能取值, 3位也够了.
KeyWords
ctl workerCount runState
3.3 生命周期
上文提到 ctl
包装的另一个值就是runState
. 它的作用是用来表示整个线程池的生命周期状态, 取值有如下几种:
The runState provides the main lifecyle control, taking on values:
- RUNNING: Accept new tasks and process queued tasks - SHUTDOWN: Don't accept new tasks, but process queued tasks - STOP: Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks - TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method - TERMINATED: terminated() has completed
下面几个方法都是对runState
简单的读写操作, 几乎可以根据名称判断用处
runStateLessThan(int c, int s)
runStateAtLeast(int c, int s)
isTerminated()
isTerminating()
isShutdown()
isRunning(int c) // 是否处于RUNNING状态
advanceRunState(int target) // 将runState设置为目标值
各种状态的转换说明如下:
The runState monotonically increases over time, but need not hit each state. The transitions are:
- RUNNING -> SHUTDOWN: On invocation of shutdown(), perhaps implicitly in finalize() - (RUNNING or SHUTDOWN) -> STOP: On invocation of shutdownNow() * SHUTDOWN -> TIDYING: When both queue and pool are empty * STOP -> TIDYING: When pool is empty * TIDYING -> TERMINATED: When the terminated() hook method has completed
可以看出来, 线程池初始化之后, 如果不调用shutdown
, shutdownNow
它是一直处于RUNNING状态的, 所以 生命周期的变化都始于这两个方法; 他们的作用都是试图停止线程池, 但是细节有所不同.
showdown
此操作调用之前提交的任务(即包含队列中的任务)都会被执行完, 但是不再接受新任务. 另外此方法的注释中提到: (shutdownNow
也是如此)
This method does not wait for previously submitted tasks to complete execution. Use awaitTermination awaitTermination
这句的意思是,shutdown
并不会阻塞当前线程, 从而等待所有任务执行完. 如果需要的话, 使用awaitTermination
. 下面的代码可以说明这一点:
1 | ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, Queues.newArrayBlockingQueue(1)); |
这里的”ShutDown OK”马上就打印了, 但是线程池中的任务还没有完成.
awaitTermination
的实现是通过一个Condition termination
成员的await
来实现的, 逻辑比较简单, 其中根据runState
是否达到TERMINATED
状态决定是否继续await
. 通常有await
就会有signal
, 在后面会提到.
shutdownNow
此方法试图终止当前运行的任务, 并将队列中的任务全部移除. 由于试图终止的方式为interrupt
, 所以实际上并不能保证一定成功.
shutdown & shutdownNow
来看一下两个方法的主要内容:
advanceRunState(state)
这里两个都有调用, 只不过目标值不同. 逻辑也仅仅为修改状态而已.interruptIdleWorkers
vsinterruptWorkers
:
两者都是试图对进行中的worker thread进行interrupt
. 不同的是, 前者会先调用tryLock()
. 而在runWorker
的循环中, 每次执行task会先调用worker.lock()
, 结束才后unlock
. 所以说非空闲的任务不会受到影响.onShutdown
留给子类的一个钩子, 学习ScheduledThreadPoolExecutor
再关注drainQueue
将队列中的元素抽取到另一个List中, 并移除此元素.tryTerminate
两者都有调用. 具体说明之前先回顾一下前面的runState
转换:- SHUTDOWN -> TIDYING: When both queue and pool are empty
- STOP -> TIDYING: When pool is empty
这个转换过程即为tryTerminate
做的事情.
查看tryTerminate()
, 其中值得注意的是, 当为SHUTDOWN
状态, 且队列不为空时, tryTerminate
方法是直接return
的. 而在showdown
过程中, 很可能正好处于这种情况, 此时shutdown
对tryTerminate
的调用是无效的. 但是在后续, tryTerminate
方法还会被调用一次, 即前面提到的processWorkerExit
所以一个ThreadPoolExecutor
的生命周期转换以及触发操作如下:
RUNNING (shutdown) -> SHUTDOWN -> TIDYING(tryTerminate) -> TERMINATED
RUNNING (shutdownNow) -> STOP -> TIDYING(tryTerminate) -> TERMINATED
RuntimePermission
3.4 Reject策略
回想workerCount
策略:
- 当
workerCount
小于corePoolSize
, 有新任务会会创建Worker
. - 如果达到了
corePoolSize
, 会将任务放到队列中. - 如果队列放不下了, 会尝试继续创建
Worker
还有一点:
- 如果
workerCount
即将超过maximumPoolSize
, 那么将对对应的task执行Reject策略.
这个策略的抽象即为RejectedExecutionHandler#rejectedExecution(Runnable r, ThreadPoolExecutor executor)
.
四种预定义策略比较简单:
AbortPolicy
(默认). 抛出一个异常(RejectedExecutionException
)CallerRunsPolicy
. 将提交的Task直接还给主线程同步执行.DiscardPolicy
. 非常简单, 直接放弃治疗DiscardOldestPolicy
. 抛弃队列头的任务, 重试执行.
3.5 KeepAliveTime & allowCoreThreadTimeOut
通过两个参数corePoolSize
, maximumPoolSize
来控制Worker数量, 目标为使线程池更具有弹性, 保证一段时间内的任务量骤增也可以承受. 而下面则关于任务量从峰值降下来后, 如何减少线程池Worker数量, 从而减少资源占用.
默认情况下, 当Worker数量超过了corePoolSize之后, 且有Worker空闲了一段时间, 会有部分Worker被回收, 但是数量不会小于corePoolSize.
举个栗子, 假设corePoolSize == 5, maximumPoolSize == 10;
. 之前任务很多, 所以创建了10个Worker, 而此时任务被处理完. 对于超过corePoolSize
数量的线程, 如果空闲时间超过了keepAliveTime
, 则会被回收.
默认情况下, 即便回收也是会保证活跃线程数量 >= corePoolSize
的. 如果想打破这里逻辑, 可以设置alloCoreThreadTimeOut
为true
.
对于超时时间的控制, 在getTask()
中, 且仍然基于阻塞队列的特性:
1 | boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; |
正常情况下, 使用take()
会一直阻塞在这里, 而符合超时判断条件时, 则最多等keepAliveTime
纳秒. 没有取到任务则timeOut
则被置为true
, 下次循环中会return null
, 则对应的worker就结束了.(参考前文流程图) .
可以通过以下代码debuggetTask()
方法1
2
3
4
5
6
7
8ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 1, TimeUnit.SECONDS, Queues.newArrayBlockingQueue(1));
pool.setKeepAliveTime(3, TimeUnit.SECONDS);
for (int i = 0; i < 3; i++) {
pool.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
前三个任务, 两个直接交给了Worker
, 一个从队列中消费, 所以getTask()
的第一次调用可以忽略. 可以直接从第二次调用跟踪.
4. 整体概念回顾
5. Best Practice
5. 1 线程池大小应该设置多少合适
先给一个粗糙的结论
取决于程序为CPU密集/IO密集. 如果接近完全为CPU密集的程序, 线程数应设置为CPU内核数量. IO密集则需要增加线程数.
IO密集/CPU密集无法量化, 所以需要通过测试来决定.
// TODO 如何进行测试