使用线程池的好处
池化技术相比大家已经屡见不鲜了,线程池、数据库连接池、Http 连接池等等都是对这个思想的应用。池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。
线程池提供了一种限制和管理资源(包括执行一个任务)。 每个线程池还维护一些基本统计信息,例如已完成任务的数量。
这里借用《Java 并发编程的艺术》提到的来说一下使用线程池的好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
Executor框架
简介
任务(Runnable
/Callable
)
执行任务需要实现的 Runnable
接口 或 Callable
接口。Runnable
接口或 *Callable
接口 实现类都可以被 ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
* 执行。
任务的执行(Executor
)
如下图所示,包括任务执行机制的核心接口 Executor
,以及继承自 Executor
接口的 ExecutorService
接口。ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
这两个关键类实现了 ExecutorService 接口。
这里提了很多底层的类关系,但是,实际上我们需要更多关注的是 ThreadPoolExecutor
这个类,这个类在我们实际使用线程池的过程中,使用频率还是非常高的。
注意: 通过查看
ScheduledThreadPoolExecutor
源代码我们发现ScheduledThreadPoolExecutor
实际上是继承了ThreadPoolExecutor
并实现了 ScheduledExecutorService ,而ScheduledExecutorService
又实现了ExecutorService
,正如我们下面给出的类关系图显示的一样。
ThreadPoolExecutor
类描述:
1 | //AbstractExecutorService实现了ExecutorService接口 |
ScheduledThreadPoolExecutor
类描述:
1 | //ScheduledExecutorService继承ExecutorService接口 |
异步计算的结果(Future
)
Future
接口以及 Future
接口的实现类 FutureTask
类都可以代表异步计算的结果。
当我们把 Runnable
接口 或 Callable
接口 的实现类提交给 ThreadPoolExecutor
或 ScheduledThreadPoolExecutor
执行。(调用 submit()
方法时会返回一个 FutureTask
对象)
Executor框架的使用示意图
ThreadPoolExecutor类介绍
线程池实现类 ThreadPoolExecutor
是 Executor
框架最核心的类。
ThreadPoolExecutor类分析
ThreadPoolExecutor
类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么),这里就不贴代码讲了,比较简单。
1 | /** |
ThreadPoolExecutor
3 个最重要的参数:
corePoolSize
: 核心线程数线程数定义了最小可以同时运行的线程数量。maximumPoolSize
: 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。workQueue
: 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor
其他常见参数:
keepAliveTime
:当线程池中的线程数量大于corePoolSize
的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
才会被回收销毁;unit
:keepAliveTime
参数的时间单位。threadFactory
:executor 创建新线程的时候会用到。handler
:饱和策略。关于饱和策略下面单独介绍一下。
ThreadPoolExecutor
饱和策略定义:
如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor
定义一些策略:
ThreadPoolExecutor.AbortPolicy
:抛出RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
:调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
: 不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求。
举个例子:
Spring 通过
ThreadPoolTaskExecutor
或者我们直接通过ThreadPoolExecutor
的构造函数创建线程池的时候,当我们不指定RejectedExecutionHandler
饱和策略的话来配置线程池的时候默认使用的是ThreadPoolExecutor.AbortPolicy
。在默认情况下,ThreadPoolExecutor
将抛出RejectedExecutionException
来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用ThreadPoolExecutor.CallerRunsPolicy
。当最大池被填满时,此策略为我们提供可伸缩队列。(这个直接查看ThreadPoolExecutor
的构造函数源码就可以看出,比较简单的原因,这里就不贴代码了。)
推荐使用ThreadPoolExecutor构造函数创建线程池
在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。
为什么呢?
使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
另外《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险
Executors 返回线程池对象的弊端如下:
FixedThreadPool
和SingleThreadExecutor
: 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。- CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。
方式一:通过ThreadPoolExecutor
构造函数实现(推荐)
方式二:通过 Executor 框架的工具类 Executors 来实现 我们可以创建三种类型的 ThreadPoolExecutor:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
ThreadPoolExecutor 使用示例
示例代码:Runnable
+ TheadPoolExecutor
首先创建一个 Runnable
接口的实现类(当然也可以是 Callable
接口,我们上面也说了两者的区别。)
1 | MyRunnable.java |
编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor
构造函数自定义参数的方式来创建线程池。
1 | ThreadPoolExecutorDemo.java |
可以看到我们上面的代码指定了:
corePoolSize
: 核心线程数为 5。maximumPoolSize
:最大线程数 10keepAliveTime
: 等待时间为 1L。unit
: 等待时间的单位为 TimeUnit.SECONDS。workQueue
:任务队列为ArrayBlockingQueue
,并且容量为 100;handler
:饱和策略为CallerRunsPolicy
。
Output:
1 | pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020 |
线程池原理分析
承接 4.1 节,我们通过代码输出结果可以看出:线程首先会先执行 5 个任务,然后这些任务有任务被执行完的话,就会去拿新的任务执行。
现在,我们就分析上面的输出内容来简单分析一下线程池原理。
为了搞懂线程池的原理,我们需要首先分析一下 execute
方法。在 4.1 节中的 Demo 中我们使用 executor.execute(worker)
来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:
1 | // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) |
addWorker
这个方法主要用来创建新的工作线程,如果返回true说明创建和启动工作线程成功,否则的话返回的就是false。
1 | // 全局锁,并发操作必备 |
我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的5个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。
几个常见的对比
Runnable
vs Callable
Runnable
自 Java 1.0 以来一直存在,但Callable
仅在 Java 1.5 中引入,目的就是为了来处理Runnable
不支持的用例。Runnable
接口不会返回结果或抛出检查异常,但是Callable
接口可以。所以,如果任务不需要返回结果或抛出异常推荐使用 *Runnable
接口*,这样代码看起来会更加简洁。
工具类 Executors
可以实现 Runnable
对象和 Callable
对象之间的相互转换。(Executors.callable(Runnable task
)或 Executors.callable(Runnable task,Object resule)
)。
1 | Runnable.java |
execute()
vs submit()
execute()
方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;submit()
方法用于提交需要返回值的任务。线程池会返回一个Future
类型的对象,通过这个Future
对象可以判断任务是否执行成功,并且可以通过Future
的get()
方法来获取返回值,get()
方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)
方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
我们以AbstractExecutorService
接口中的一个 submit
方法为例子来看看源代码:
1 | public Future<?> submit(Runnable task) { |
上面方法调用的 newTaskFor
方法返回了一个 FutureTask
对象。
1 | protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { |
我们再来看看execute()
方法:
1 | public void execute(Runnable command) { |
shutdown()
VSshutdownNow()
shutdown()
:关闭线程池,线程池的状态变为SHUTDOWN
。线程池不再接受新任务了,但是队列里的任务得执行完毕。shutdownNow()
:关闭线程池,线程的状态变为STOP
。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。
isTerminated()
VS isShutdown()
isShutDown
当调用shutdown()
方法后返回为 true。isTerminated
当调用shutdown()
方法后,并且所有提交的任务完成后返回为 true
Callable
+ThreadPoolExecutor
示例代码
1 | MyCallable.java |
Output:
1 | Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1 |
常见线程池详解
FixedThreadPool
介绍
FixedThreadPool
被称为可重用固定线程数的线程池。通过 Executors 类中的相关源代码来看一下相关实现:
1 | /** |
另外还有一个 FixedThreadPool
的实现方法,和上面的类似,所以这里不多做阐述:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
从上面源代码可以看出新创建的 FixedThreadPool
的 corePoolSize
和 maximumPoolSize
都被设置为 nThreads,这个 nThreads 参数是我们使用的时候自己传递的。
执行任务过程介绍
FixedThreadPool
的 execute()
方法运行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明:
- 如果当前运行的线程数小于 corePoolSize, 如果再来新任务的话,就创建新的线程来执行任务;
- 当前运行的线程数等于 corePoolSize 后, 如果再来新任务的话,会将任务加入
LinkedBlockingQueue
; - 线程池中的线程执行完 手头的任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;
为什么不推荐使用FixedThreadPool
FixedThreadPool
使用无界队列 LinkedBlockingQueue
(队列的容量为 Intger.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响 :
- 当线程池中的线程数达到
corePoolSize
后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize; - 由于使用无界队列时
maximumPoolSize
将是一个无效参数,因为不可能存在任务队列满的情况。所以,通过创建FixedThreadPool
的源码可以看出创建的FixedThreadPool
的corePoolSize
和maximumPoolSize
被设置为同一个值。 - 由于 1 和 2,使用无界队列时
keepAliveTime
将是一个无效参数; - 运行中的
FixedThreadPool
(未执行shutdown()
或shutdownNow()
)不会拒绝任务,在任务比较多的时候会导致 OOM(内存溢出)。
SingleThreadExecutor详解
介绍
SingleThreadExecutor
是只有一个线程的线程池。下面看看SingleThreadExecutor 的实现:
1 | /** |
从上面源代码可以看出新创建的 SingleThreadExecutor
的 corePoolSize
和 maximumPoolSize
都被设置为 1.其他参数和 FixedThreadPool
相同。
执行任务过程介绍
上图说明;
- 如果当前运行的线程数少于 corePoolSize,则创建一个新的线程执行任务;
- 当前线程池中有一个运行的线程后,将任务加入
LinkedBlockingQueue
- 线程执行完当前的任务后,会在循环中反复从
LinkedBlockingQueue
中获取任务来执行;
为什么不推荐使用SingleThredExecutor
SingleThreadExecutor
使用无界队列 LinkedBlockingQueue
作为线程池的工作队列(队列的容量为 Intger.MAX_VALUE)。SingleThreadExecutor
使用无界队列作为线程池的工作队列会对线程池带来的影响与 FixedThreadPool
相同。说简单点就是可能会导致 OOM,
CachedThreadPool详解
介绍
CachedThreadPool
是一个会根据需要创建新线程的线程池。下面通过源码来看看 CachedThreadPool
的实现:
1 | /** |
CachedThreadPool
的corePoolSize
被设置为空(0),maximumPoolSize
被设置为 Integer.MAX.VALUE,即它是无界的,这也就意味着如果主线程提交任务的速度高于 maximumPool
中线程处理任务的速度时,CachedThreadPool
会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。
执行任务过程介绍
CachedThreadPool 的 execute()方法的执行示意图(该图片来源:《Java 并发编程的艺术》):
上图说明:
- 首先执行
SynchronousQueue.offer(Runnable task)
提交任务到任务队列。如果当前maximumPool
中有闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
,那么主线程执行 offer 操作与空闲线程执行的poll
操作配对成功,主线程把任务交给空闲线程执行,execute()
方法执行完成,否则执行下面的步骤 2; - 当初始
maximumPool
为空,或者maximumPool
中没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)
。这种情况下,步骤 1 将失败,此时CachedThreadPool
会创建新线程执行任务,execute 方法执行完成;
ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor
主要用来在给定的延迟后运行任务,或者定期执行任务。 这个在实际项目中基本不会被用到,因为有其他方案选择比如quartz
。大家只需要简单了解一下它的思想。关于如何在 Spring Boot 中 实现定时任务,可以查看这篇文章《5 分钟搞懂如何在 Spring Boot 中 Schedule Tasks》。
简介
ScheduledThreadPoolExecutor
使用的任务队列 DelayQueue
封装了一个 PriorityQueue
,PriorityQueue
会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask
的 time
变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask
的 squenceNumber
变量小的先执行)。
ScheduledThreadPoolExecutor
和 Timer
的比较:
Timer
对系统时钟的变化敏感,ScheduledThreadPoolExecutor
不是;Timer
只有一个执行线程,因此长时间运行的任务可以延迟其他任务。ScheduledThreadPoolExecutor
可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;- 在
TimerTask
中抛出的运行时异常会杀死一个线程,从而导致Timer
死机:-( …即计划任务将不再运行。ScheduledThreadExecutor
不仅捕获运行时异常,还允许您在需要时处理它们(通过重写afterExecute
方法ThreadPoolExecutor
)。抛出异常的任务将被取消,但其他任务将继续运行。
综上,在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。
备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等。
运行机制
ScheduledThreadPoolExecutor
的执行主要分为两大部分:
- 当调用
ScheduledThreadPoolExecutor
的scheduleAtFixedRate()
方法或者scheduleWirhFixedDelay()
方法时,会向ScheduledThreadPoolExecutor
的DelayQueue
添加一个实现了RunnableScheduledFuture
接口的ScheduledFutureTask
。 - 线程池中的线程从
DelayQueue
中获取ScheduledFutureTask
,然后执行任务。
ScheduledThreadPoolExecutor
为了实现周期性的执行任务,对 ThreadPoolExecutor
做了如下修改:
- 使用
DelayQueue
作为任务队列; - 获取任务的方不同
- 执行周期任务后,增加了额外的处理
ScheduledThreadPoolExecutor 执行周期任务的步骤
- 线程 1 从
DelayQueue
中获取已到期的ScheduledFutureTask(DelayQueue.take())
。到期任务是指ScheduledFutureTask
的 time 大于等于当前系统的时间; - 线程 1 执行这个
ScheduledFutureTask
; - 线程 1 修改
ScheduledFutureTask
的 time 变量为下次将要被执行的时间; - 线程 1 把这个修改 time 之后的
ScheduledFutureTask
放回DelayQueue
中(DelayQueue.add()
)。
线程池大小确定
线程池数量的确定一直是困扰着程序员的一个难题,大部分程序员在设定线程池大小的时候就是随心而定。
很多人甚至可能都会觉得把线程池配置过大一点比较好!我觉得这明显是有问题的。就拿我们生活中非常常见的一例子来说:并不是人多就能把事情做好,增加了沟通交流成本。你本来一件事情只需要 3 个人做,你硬是拉来了 6 个人,会提升做事效率嘛?我想并不会。 线程数量过多的影响也是和我们分配多少人做事情一样,对于多线程这个场景来说主要是增加了上下文切换成本。不清楚什么是上下文切换的话,可以看我下面的介绍。
上下文切换:
多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少。
类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。
如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。
但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
有一个简单并且适用面比较广的公式:
- CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
- I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。