这篇文章大部分都是直接摘抄自《实战Java高并发程序设计》,基本上就是一篇笔记,用于以后忘了的时候可以回顾。
框架提供的ExecutorService
Executors框架提供了各种类型的线程池,主要有以下工厂方法:
1 | public static ExecutorService newFixedThreadPool(int nThreads) |
newFixedThreadPool()方法. 该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终不变,当有一个新任务时,线程池中若有空闲线程,则立即执行,若没有,则新任务会被暂时存在一个队列中,得有空闲线程时,便处理在任务队列中的任务
newSingleThreadExecutor()方法,改方法返回一个只有一个线程的线程池,若多余一个任务被提交到该线程池,任务会被保存在一个队伍队列,带线程空闲,按先入先出的顺序执行队列中的任务,
newCachedThreadPool()方法,该方法返回一个可根据实际情况调整线程数量的线程池.线程池数量是不确定的,但若有空闲线程可以复用,则会优先使用可以复用的线程,若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务,所有线程在当前任务执行完毕后,将返回线程池进行复用,
newSingleThreadScheduledExecutor()方法: 改方法返回一个ScheduledExecutorService对象,线程池大小为1 这个接口在ExecutorService接口之上拓展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务.
newScheduledThreadPool()方法:改方法也返回一个ScheduledExecutorService对象 但改线程池可以指定线程数量
前面三个工厂方法创建的ExecutorService只需要使用ExecutorService.execute()方法或者submit()方法将需要执行的任务传入即可,这里就不细讲了。关于这两个方法的差异我会在后面细说,这里也不展开讨论了。
后面两个工厂方法会创建ScheduledExecutorService。它有会多出下面三个schedule方法用于延迟执行任务:
1 | public ScheduledFuture<?> schedule(Runnable command, |
schedule()方法会在给定时间,对方法进行一次调度。scheduleAtFixedRate()方法和scheduleWithFixedDelay()会对任务进行周期性调度。但两者有一点小小的差别:
对于FixedRate方式来说,任务调度的频率是一样的。它是以上一个任务开始执行时间为起点,之后的period时间,调度下一次任务。而FixDelay则是在上一个任务结束后,再经过delay时间进行任务调度。
ThreadPoolExecutor
对于Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()、Executors.newCachedThreadPool()这几个方法虽然创建的线程池的功能特点完全不一样,但是他们其实都是使用了ThreadPoolExecutor实现:
1 | public static ExecutorService newFixedThreadPool(int nThreads) { |
ThreadPoolExecutor的最重要的构造函数如下:
1 | public ThreadPoolExecutor(int corePoolSize, |
- corePoolSize: 指定了线程池中的线程数量
- maximumPoolSize: 指定了线程池中的最大线程数量
- keepAliveTime: 当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过corePoolSize的空闲线程,在多长的时间内,会被销毁。
- unit: keepAliveTime的时间单位
- workQueue: 被提交但未被执行的任务
- threadFactory: 线程工厂,用于创建线程,一般用默认即可
- handler: 拒绝策略。但任务太多来不及处理,如何拒绝任务
以上参数中,大部分都很简单,只有workQueue和handler需要说一下。
内置的BlockingQueue有下面几种:
SynchronousQueue: 一个没有容量的队列。使用SynchronousQueue,提交的任务不会真正的被保存,而总是将新任务提交给线程执行。如果没有空闲线程,就创建新线程,如果线程数量已经到达最大值,则执行拒绝策略
ArrayBlockingQueue: 有界任务队列,若有新的任务需要执行,如果实际线程数少于corePoolSize则创建新的线程,如果大于corePoolSize,就会放入ArrayBlockingQueue中,如果ArrayBlockingQueue已满,在总线程数不大于maximumPoolSize的情况下会创建新线程,否则就执行拒绝策略
LinkedBlockingQueue: 无界任务队列,若有新的任务需要执行,如果实际线程数少于corePoolSize则创建新的线程,如果大于corePoolSize,就会放入LinkedBlockingQueue中等待
PriorityBlockingQueue: 它是一个特殊的无界队列,可以设定任务的优先级
而内置的拒绝策略又有下面几种:
- AbortPolicy策略: 该策略会直接抛出异常,阻止系统正常工作
- CallerRunsPolicy策略: 只要线程池没有关闭,该策略直接在调用者线程中运行被拒绝的任务。(使用这个策略可能导致在主线程执行耗时操作)
- DiscardOldestPolicy策略: 该策略丢弃一个最老的任务,并尝试重新提交任务
- DiscardPolicy策略: 该策略默默丢弃拒绝的任务,不做任何处理。
线程池任务调度的逻辑如下图所示:
execute和submit的区别
ExecutorService.execute()和ExecutorService.submit()都可以提交任务去异步执行,但是它们之间有什么区别呢?
1 | void execute(Runnable command); |
- 返回值
ExecutorService.execute()没有返回值,只能简单的提交Runnable给线程池去运行
ExecutorService.submit(),有返回值,可以获得一个Future
- 异常
ExecutorService.execute()的异常机制和普通线程的异常机制一样,必须用try、catch来捕获异常。如果没有捕获一些运行时异常,也会打印出堆栈信息:
1 | Executors.newCachedThreadPool().execute( |
1 | Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero |
ExecutorService.submit()的异常会被吃掉,下面的代码的异常会被默默吃掉,没有堆栈信息的打印:
1 | Executors.newCachedThreadPool().submit( |
但是我们可以调用Future.get()方法,这样当抛出异常的时候系统也会打印堆栈:
1 | Future future = Executors.newCachedThreadPool().submit( |
需要注意的是Future.get()是阻塞的,需要需要等待线程执行完毕才会返回,所以我们可以用这个方法获得Callable.call()的返回值:
1 | Future<Integer> future = Executors.newCachedThreadPool().submit( |