池化技术核心思想:减少每次获取资源的消耗,提高资源的利用率。
线程池:管理一系列线程的资源池。
线程池提供了限制和管理线程资源的方式,维护了基本的统计信息。
线程池的好处:
- 减少资源消耗:重复利用已创建的线程,降低创建新线程和销毁线程的损耗
- 提高响应速度:任务到达时可以不需要等待线程创建就能立即执行
- 便于线程的管理:统一的线程池调度、分配、调优和监控
线程池一般用于执行多个不相关联的耗时任务。使用线程池可以使得多个独立的任务同时执行。
Executor框架
JDK5之后,引入了Executor
框架,通过Executor
启动线程比使用Thread#start()
方法更好,除了更易管理、效率高之外,还有助于避免this逃逸问题。
this逃逸问题:在构造函数返回之前其他线程就持有了该对象的引用,调用尚未构造完全的对象方法可能引发不可知的错误。
任务 Runable / Callable
执行任务需要实现java.lang.Runnable
或者java.util.concurrent.Callable
接口。
执行 Executor
任务执行机制的核心接口java.util.concurrent.Executor
、继承自Executor
接口的java.util.concurrent.ExecutorService
接口。
java.util.concurrent.ThreadPoolExecutor
类和java.util.concurrent.ScheduledThreadPoolExecutor
类,这两个关键类都实现了Executor
接口。
异步计算结果 Future
java.util.concurrent.Future
接口及其实现类java.util.concurrent.FutureTask
都可以代表异步计算的结果。
将Runnable
或者Callable
接口的实现类交给ThreadPoolExecutor
或者ScheduledThreadPoolExecutor
类执行,调用submit()
方法就会返回一个FutureTask
对象。
「 Executor
框架使用示意图 」
主线程create
创建Runnable / Callable
任务对象
=>
将任务对象交给ExecutorService
执行【void execute(Runnable command);
或者 Future<?> submit(Runnable task);
或者
<T> Future<T> submit(Callable<T> task);
】
=> submit()
方法返回FutureTask
对象
=>
主线程调用FutureTask#get()
方法来等待任务执行结束获取返回结果,也可以调用FutureTask#cancel()
方法来取消任务的执行
ThreadPoolExecutor(重要)
ThreadPoolExecutor是Executor实现类中最核心的类
构造方法参数
经典面试题:线程池有什么参数,平时都是如何去设置线程池参数的?
核心的构造器:
public ThreadPoolExecutor(int corePoolSize, |
【参数详解】
corePoolSize
:核心线程数,任务队列未到达队列容量前,最大可以同时运行的线程数量。这些线程好比是公司的正式员工,在正常情况下,他们都是正常待命,等待处理任务的maximumPoolSize
:最大线程数,任务队列中存放的任务数达到队列容量时,当前可以同时运行的线程数量变为最大线程数。在极限情况下系统或者线程池能有多少个线程在工作。就算任务再多,你也只能雇佣这么多人,因为你需要考虑成本和资源问题keepAliveTime
:线程池中的线程数量大于corePoolSize
的时候,如果此时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁。这个参数决定当任务少的时候,临时雇佣的线程会等待多久才会被剔除。这个参数的设定是为了释放无用的线程资源,可以理解为多久之后会“解雇”没有任务做的临时工unit
:keepAliveTime
参数的时间单位workQueue
:新任务来的时候判断当前运行的线程数量是否达到corePoolSize
,如果达到,新任务就会被存放在队列中threadFactory
:线程工厂。它负责控制线程的生成,就像一个管理员,负责招聘、管理员工,比如设定员工的姓名、工资或者其他属性handler
:拒绝策略。当任务队列满的时候,我们应该如何处理新来的任务?抛出异常还是重新尝试入队还是其他策略?比如,我们可以设定任务的优先级,会员的任务优先级更高。如果产品中有会员业务,或者是有一些重要的业务需要保证不被打扰,可以考虑定义两个线程池或者两个任务队列,一个用于处理VIP/重要的任务,一个用于处理普通任务,保证他们不互相干扰,也就是资源隔离策略
ThreadPoolExecutor线程工厂
在使用ThreadPoolExecutor的时候,如何给线程池命名呢?
在初始化线程池时候显式的设置线程池名称前缀,有利于后期定位问题。默认情况下,线程池创建的线程名字类似于pool-1-thread-n
,没有实际的业务含义,不利于排查定位问题。
那么给线程池内部的线程命令就需要用到ThreadFactory
线程工厂,创建线程工厂有以下几种方式:
使用
guava
库的ThradFactoryBuilder
// 抽奖相关业务
String threadNamePrefix = "draw";
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(true).build();
// 作为参数传入ThreadPoolExecutor
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, wordQueue, threadFactory);
// 使用线程池 ...自主实现
ThreadFactory
接口public final class DrawThreadFactory implements ThreadFactory {
private final AtomicInteger threadNum = new AtomicInteger(1);
private final ThreadFactory delegate;
private final String name;
public DrawThreadFactory(ThreadFactory delegate, String name) {
this.delegate = delegate;
this.name = name;
}
public Thread newThread(Runnable r) {
Thread thread = delegate.newThread(r);
thread.setName(name + "-" + threadNum.incrementAndGet());
return thread;
}
}
设定线程池大小的正确姿势
首先需要明确一个概念:并不是线程池越大越好,越大的线程池意味着越多的上下文切换,越多的上下文切换意味着更大的切换成本,这都是在确定线程池大小的时候需要综合考虑的因素。
上下文切换
在多线程编程中,一般线程的数量大于CPU的核心数,而一个CPU核心在一个时刻只能执行一个线程的任务,这意味着在当前任务执行完CPU时间片需要切换到其他线程任务钱需要保存当前的执行状态,以便下次切换回当前线程时能够恢复现场继续执行,
=> 线程任务从保存到再加载的过程就是一次上下文切换。
上下文切换对于操作系统来说需要消耗大量的CPU时间,开销是昂贵的。
Linux系统相比于其他操作系统其中的一个优点就是Linux系统上下文切换和模式切换的时间消耗非常少。
设定线程池的大小
需要知道的是,线程池大小设置过大或者过小都会存在问题:
- 线程池大小设置过小:如果同一时刻有大量的任务需要处理,可能导致大量的任务在工作队列中排队等待执行,甚至出现工作队列积累满了无法处理的情况,亦或者是大量任务堆积在工作队列中出现OOM => CPU没有得到充分的利用
- 线程池大小设置过大:大量的线程在同一时刻争夺CPU资源,这样会导致大量的上下文切换,增加了操作系统的开销,增加了线程的执行时间,影响整体的执行效率
那么该如何设置线程池的大小呢?有一个经验公式:
- CPU幂集型任务 =>
N + 1
:这类型的任务主要是依赖CPU资源,可以将线程数设置为N(CPU核心数) + 1
。比CPU核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其他原因导致的任务暂停而带来的影响。一旦任务暂停,CPU就会处于空闲状态,这种情况下多出来的一个线程就可以充分利用CPU的空闲时间 - I/O密集型任务 =>
2N
:系统处理这类型的任务,会使用大部分时间来处理I/O
交互,而线程在处理I/O
的时间段内是不会占用CPU的,此时CPU就可以交给其他线程使用。因为在I/O
密集型任务的应用中,可以多配置一些线程,经验值是2N
那么如何判断当前应用是哪种类型的任务呢?
- CPU密集型:即利用CPU计算能力的任务,比如需要在内存中对大量数据进行排序
- I/O密集型:涉及网络读取、文件读取等任务,特点就是CPU计算时间耗时比等待I/O操作完成时间少的多
线程数更加严谨的计算方法:最佳线程数 = N * (1 + WT / ST)
- N:CPU核心数
- ST:线程运行时间
- WT:线程等待时间,
WT = 线程运行总时间 - ST
线程等待时间所占比例越高,就需要更多的线程;相反,线程计算时间比例越高,就需要更少的线程。
可以通过JDK自带的工具VisualVM来查看系统的WT/ST
比例:
- CPU密集型任务的
WT/ST
接近或者等于0,带入公式,可以算出最佳线程数为N,和上述说的N+1
差不多 - I/O密集型任务下几乎全是线程等待时间,因此
WT/ST
的结果比较大,那么上述的设置2N
的线程数大概率上是避免创建过多的线程
公式仅仅是为了参考,实际业务中需要根据实际运行情况来动态调整线程池参数。
动态调整线程池参数
核心思路是针对线程池的核心参数实现自定义参数配置,分别是:
corePoolSize
maximumPoolSize
workQueue
TheadPoolExecutor
中提供了下列设置参数的方法:
需要注意的是setCorePoolSize()
方法
public void setCorePoolSize(int corePoolSize) { |
线程池首先判断当前工作线程是否大于corePoolSize
,如果大于的话就会调用interruptIdleWords()
回收工作线程。
ThreadPoolExecutor
中没有提供动态制定工作队列长度的方法,美团技术团队给出的方案是自定义队列ResizableCapacityLinkedBlockIngQueue
,主要就是去除了LinkedBlockingQueue
中的capacity
字段的final
关键字,把它变成可变的。
ThreadPoolExecutor拒绝策略
如果当前同时运行的线程数量达到了最大线程数量并且任务队列也已经被放满了任务时,ThreadPoolExecutor
定义了一些拒绝策略:
ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接调用execute()
方法的线程中运行run()
被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低新任务提交速度,影响程序的整体性能。(如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,可以选择这个策略)ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
异常,拒绝新任务的处理ThreadPoolExecutor.DiscardPolicy
:不处理新任务,直接丢弃ThreadPoolExecutor.DiscardOldestPolicy
:丢弃最早未处理的任务请求(最先加入任务队列的任务)
线程池的创建方式
方式一:通过ThradPoolExecutor
的构造函数来创建(推荐)
方式二:通过工具类java.util.concurrent.Executors
来创建
通过Executors
工具类可以创建出多种类型的ThreadPoolExecutor
:
FixedThreadPool
:该方法返回一个固定线程数量的线程池。当有一个新任务提交时,若此时线程池中有空闲线程,则立即执行;若没有,新的任务会被暂存一个任务队列中,待有空闲线程时,就会处理任务队列中的任务SingleThreadExecutor
:该方法返回一个只有一个线程的线程池。任务处理方式同上CachedThreadPool
:该方法返回一个可以根据实际情况调整线程数量的线程池,线程池的线程数量不确定(上限是Integer.MAX_VALUE
)。若有空闲线程可以使用,优先复用空闲的线程;若所有的线程均在工作,此时又有新的任务提交,则会创建新的线程处理任务。线程执行完当前任务后,返回线程池等待复用,最长等待时间为1min,超过一分钟还没被复用就会被回收销毁ScheduledThreadPool
:该方法返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池
📢注意:《阿里巴巴Java开发手册》中提出:线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors创建的线程池对象的弊端:
FixedThreadPool
和SingleThreadPool
:允许的请求队列长度为Integer.MAX_VALUE
,可能会堆积大量的请求,从而导致 OOMCacheedThreadPool
:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOMScheduledThreadPool
和SingleThreadScheduledExecutor
: 使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM
阻塞队列
新任务到达线程池前,会先判断当前运行的线程数量是否到达核心线程数corePoolSize
,如果达到,新任务会被存放在阻塞队列中。不同的线程池需要选用的不同的阻塞队列:
1)无界队列
队列大小无限制,常用的为无界的LinkedBlockingQueue
,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。
2)有界队列
常用的有两类,
- 一类是遵循FIFO原则的队列如
ArrayBlockingQueue
与有界的LinkedBlockingQueue
- 一类是优先级队列如
PriorityBlockingQueue
。PriorityBlockingQueue中的优先级由任务的Comparator决定
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
3)同步移交队列
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue
作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
线程池工作原理
示例代码
CustomTask.java
package com.juzi.juc.thread; |
ThreadPoolExecutorDemo.java
package com.juzi.juc.thread; |
上述代码中我们指定了:
corePoolSize
: 核心线程数为 2maximumPoolSize
:最大线程数 4keepAliveTime
: 等待时间为 60Lunit
: 等待时间的单位为 TimeUnit.SECONDSworkQueue
:任务队列为ArrayBlockingQueue
,并且容量为 2handler
:饱和策略为CallerRunsPolicy
运行结果:
pool-1-thread-3 start. Time: 16:49:22.896 |
原理分析
为了搞明白线程池的原理,首先需要分析execute
方法。
在上述代码中,我们使用了executor.execute(task)
来提交任务到线程池中去。
private static int workerCountOf(int c) { return c & CAPACITY; } |
示意图:
核心方法addWork()
分析
// 全局锁 |
优秀博文:https://www.throwx.cn/2020/08/23/java-concurrency-thread-pool-executor/
图解
1)刚开始,没有任何的线程和任务
2)当有新任务进来,发现当前员工数量还未达到设定的正式员工数(corePoolSize
=2),则会直接招聘一名新员工来处理这个任务
3)又有新任务进来,发现当前员工数量还未达到设定的正式员工数(corePoolSize
=2),则会再招聘一名新员工来处理这个任务
4)又来了新任务,但是正式员工已经达到上限(当前线程数 =
corePoolSize
=
2),此时新任务被放到了等待队列中(最大长度workQueue.size()
=
2),而不是立即招聘新员工
5)又来了一个新任务,但是我们的任务队列已满(当前线程数 >
corePoolSize
= 2,队列中的任务数 =
队列长度workQueue.size()
=
2),此时将增设临时线程(最大线程数 maximumPoolSize
=
4)来处理任务,而不是丢弃任务
6)当达到7个任务时,由于我们的任务队列已满(当前线程数 >
corePoolSize
= 2,队列中的任务数 =
队列长度workQueue.size()
= 2),临时线程(当前线程数 =
maximumPoolSize
=
4)也满了,此时会采用拒绝策略RejectedExecutionHandler
来处理多余的新任务
7)如果当前线程数超过corePoolSize
,并且此时这些额外的线程没有新任务可以执行,那么在keepAliveTime
时间之后,这些额外的线程将会被释放
常见的对比
Runnable VS Callable
Runnable
在 Java 1.0 引入,Callable
在 Java
1.5 中引入,主要为了来处理Runnable
不支持的用例。
Runnable
接口不会返回结果或抛出检查异常,但是
Callable
接口可以。
=> 如果任务不需要返回结果或抛出异常推荐使用
Runnable
接口
工具类 Executors
可以实现将 Runnable
对象转换成 Callable
对象:
public static Callable<Object> callable(Runnable task) { |
execute()
VS
submit()
execute()
方法用于提交不需要返回值的任务,无法判断任务是否被线程池执行成功与否submit()
方法用于提交需要返回值的任务。线程池会返回一个Future
类型的对象,通过这个Future
对象可以判断任务是否执行成功,并且可以通过Future
的get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成;使用get(long timeout,TimeUnit unit)
方法,如果在timeout
时间内任务还没有执行完,就会抛出java.util.concurrent.TimeoutException
shutdown()
VS
shutdownNow()
shutdown()
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕 => 优雅停机shutdownNow()
:关闭线程池,线程池的状态变为STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的任务列表List => 暴力停机
isTerminated()
VS
isShutdown()
isShutdown
当调用shutdown()
方法后返回为 trueisTerminated
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true
参考文章
- https://javaguide.cn/java/concurrent/java-thread-pool-summary.html
- https://blog.csdn.net/TZ845195485/article/details/109370106
- https://zhuanlan.zhihu.com/p/32867181
- https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html