多线程的软件设计,确实可以最大限度的发挥现代多核处理器的计算能力,提高系统的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响。
为了避免系统频繁的创建和销毁线程,我们可以让创建的线程复用。如果你写过数据库方便的代码,那你肯定知道数据库连接池。或者是发送http请求时候的,http连接池。亦或者是发送mq消息时候的连接池。线程池于他们的作用是一样的,线程池中,总是有几个活跃的线程。当需要线程的时候,从线程池中拿一个空闲的来用,用完了再放进去。
为了能够更好的控制多线程,JDK提供了一套Executor框架,帮助开发人员进行有效的线程控制,其本质就是一个线程池。 Executor框架提供了各种类型的线程池,主要有一下几种:
newFixedThreadPool():该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终保持不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行,若没有,则新的任务被暂存在一个队列中,待有线程空闲时,便处理在任务队列中的任务。newSingleThreadExecutor():该方法返回只有一个线程的线程池,若有多于一个的任务,被提交到线程池时,任务将会被暂存在一个队列中,待线程池空闲时,按先入先出的顺序执行队列中的任务。newCachedThreadPool():该方法返回一个可根据实际情况调整线程数量的线程池。线程池的数量不确定,但若有空闲线程可以复用,则会优先使用可复用线程。若所有线程均在工作,又有新任务提交,则会创建新的线程处理任务。所有线程,在当前任务执行完毕后,将返回线程池进行复用。newSingleThreadScheduledExecutor() :该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上进行了扩展,在给定时间,执行某任务的功能,例如在固定的,某个延时之后执行,或者周期性执行某个任务。newScheduledThreadPool():该方法也返回一个ScheduledExecutorService对象,但该线程池,可以指定线程数量。1. 固定大小的线程池 这里以newFixedThreadPool()为例,简单展示下用法
package com.example.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by mazhenhua on 2017/3/9. */ public class ThreadPoolDemo { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(1000); } catch (InterruptedException e){ e.printStackTrace(); } } } public static void main(String[] args) { MyTask task = new MyTask(); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++){ es.submit(task); } } }输出结果:
从这个结果中,我们看到,一共是5个线程,共执行了10次,当打出前5个线程的时候,停顿了1秒,然后又打印出了5个,并且全部打印完,程序并没有结束,说明线程池还有活跃的线程。
2. 计划任务
另外一个值得注意的方法是newScheduledThreadPool(),它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。
ScheduledExecutorService与其他几个线程池不同,它并不会立即安排执行任务。它其实是起到了定时任务的作用。它会在指定的时间,对任务进行调度。
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);schedule()方法会在指定的时间,对任务进行一次调度。scheduleAtFixedRate()和scheduleWithFixedDelay()方法,会对任务进行周期性的调用,两者的不同点是,FixedRate是从上一个任务开始后的period时间启动下一次任务,而FixedDelay则是从上一个线程执行结束后delay时间后启动下一个线程。一个是从上一个线程的开始时间计时,一个是从上一个线程的结束时间计时。
package com.example.thread; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * Created by mazhenhua on 2017/3/9. */ public class ScheduledExecutorServiceDemo { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(10); ses.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { Thread.sleep(1000); System.out.println(System.currentTimeMillis()/1000); } catch (InterruptedException e){ e.printStackTrace(); } } }, 10, 2, TimeUnit.SECONDS); } }执行上面代码可以发现,先停顿了10秒,然后每2秒打印一次。
上面的代码中,执行时sleep了1秒,所以周期为2秒一次的打印出来了,那么如果sleep5秒会怎么样呢? 这里不再啰嗦,直接给出结论,代码可以自己手动改,结论:会以5秒为周期的打印,任务并不会重叠而是在上次执行完毕后立即执行下一个任务。
如果是,scheduleWithFixedDelay方法就不一样了,因为他本身就是上一个任务执行完毕才开始计时的,所以,如果是scheduleWithFixedDelay方法的话,那就会是7秒一个周期的打印,
调度程序实际上并不能保证任务会无限期的被持续调用。如果任务本身抛出了异常,那么后续的所有执行都会被中断,因此,如果你想让你任务持续稳定的执行,那么一定要处理好异常。
3. 线程池的内部实现
对于核心的几个线程池,无论是newFixedThreadPool(),newSingleThreadExecutor()还是newCachedThreadPool(),虽然看起来创建线程有着不同的特点,但是其内部实现均使用了,ThreadPoolExecutor:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) corePoolSize:指定了线程池中的线程数量。maximumPoolSize:指定了线程池中的最大线程数量。keepAliveTime:当线程池数量超过corePoolSize时,对于的空闲线程的存活时间。即,超过corePoolSize的线程,在多长时间内会被销毁。unit:keepAliveTime的时间单位。workQueue:任务队列,被提交但尚未被执行的队列。threadFactory:线程工厂,用于创建线程,一般用默认的即可。handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。这里详细说明一下workQueue和handler两个参数,
参数workQueue指被提交,但未执行的任务队列,他是一个BlockingQueue接口对象,仅存放Runable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中,可以使用以下几种BlockingQueue。
直接提交队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作,都要等待相应的删除操作,反之,每一个删除操作都要等待相应的插入操作。如果使用SynchronousQueue,提交的任务不会被真正的保存,而总是将新任务提交给线程执行,如果没有空线程,则尝试创建线程,如果线程数量已经达到最大值,则执行拒绝策略。因此使用SynchronousQueue队列,通常要设置maximumPoolSize值,否则很容执行拒绝策略。
有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新线程,若大于corePoolSize,则会将新任务加入等待队列,若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务,若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当任务队列装满时,才能将线程数提升到corePoolSize之上,换句话说,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize
无界的任务队列:无界任务队列可以通过LinkedBlockingQueue类来实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续又有任务继续提交,而有没有可用的线程时,则任务直接进入等待。若任务的创建和处理速度相差很大,无界队列会保持快速增长,知道耗尽系统内存。
优先任务队列:优先任务队列是带有执行优先顺序的队列。他通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。他是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue还是LinkedBlockingQueue无界队列,都是按照先进先出的算法处理任务的。而PriorityBlockingQueue则可以根据自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证。
newFixedThreadPool()方法返回了一个corePoolSize和maximumPoolSize一样大小的,并使用了LinkedBlockingQueue无边界的任务队列线程池。因为对于固定大小的线程池,不存在线程数量大小的变化。
newSingleThreadExecutor()与newFixedThreadPool()类似只是corePoolSize和maximumPoolSize设置为了1
newCachedThreadPool()方法返回一个corePoolSize为0,maximumPoolSize无限大的线程池,这意味着,在没有任务时,线程池内是没有线程的,当有线程提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,SynchronousQueue队列是直接提交的队列,它总会破使线程池增加新的线程执行任务,当执行完毕后,由于corePoolSize为0,因此空线程会在指定的时间(60s)内被回收。
下面是ThreadPoolExecutor的一段执行的核心代码
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }代码中workerCountOf()方法获取了当前线程池的线程总数。当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行。否则在workQueue.offer(command)方法进入等待队列。如果等待队列失败(比如有界队列达到上限,或者使用SynchronousQueue队列)则会直接将任务提交给线程池,如果提交失败,则执行拒绝策略。
4. 拒绝策略
上面多次提到了拒绝策略,那么拒绝策略到底是什么呢,请往下看。
拒绝策略就是当任务数量超过系统实际承载能力时,该如何处理。 JDK内置提供了四种策略:
AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。DiscardPolicy策略:该策略,默默的丢弃无法处理的任务,不予任何处理。如果允许任务丢失,可以采用这个。DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试,再次提交这个任务。以上策略都实现了RejectedExecutionHandler接口,若上述策略无法满足需要,可以自己扩展RejectedExecutionHandler接口,
package com.example.thread; import java.util.concurrent.*; /** * Created by mazhenhua on 2017/3/13. */ public class RejectedThreadPoolDemo { public static class MyTask implements Runnable{ @Override public void run() { System.out.println(System.currentTimeMillis() + ": Thread ID:" + Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler(){ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is disCard"); } }); for (int i = 0; i < Integer.MAX_VALUE; i ++){ es.submit(task); Thread.sleep(10); } } }输出结果:
上述代码中,线程池中有5个常驻线程,并且最大线程数也是5个,但是他却拥有只有10个容量的等待队列。 MyTask的执行时间是100毫秒。所以当任务被源源不断的提交时,拒绝策略就开始生效了。
5. 自定义线程创建:ThreadFactory
看了那么多关于线程池的介绍,有没有考虑一个基本的问题,线程池中的线程是哪里来的?
线程池的作用是为了线程的复用,避免频繁的创建线程,但是最开始的线程是哪里开的呢?答案是:ThreadFactory
ThreadFactory接口只有一个方法创建线程Thread newThread(Runnable r),当需要创建线程时,就调用这个方法。自定义线程,可以帮助我们做不少事。我们可以追踪线程池在何时创建了多少个线程,也可以定义线程的组,优先级,名字等等。也可以任性的将所有线程都设置为守护线程。
package com.example.thread; import java.util.concurrent.*; /** * Created by mazhenhua on 2017/3/13. */ public class RejectedThreadPoolDemo { public static class MyTask implements Runnable{ @Override public void run() { //System.out.println(System.currentTimeMillis() + ": Thread ID:" + // Thread.currentThread().getId()); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } // 拒绝策略 /* public static void main(String[] args) throws InterruptedException { MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(10), Executors.defaultThreadFactory(), new RejectedExecutionHandler(){ @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + "is disCard"); } }); for (int i = 0; i < Integer.MAX_VALUE; i ++){ es.submit(task); Thread.sleep(10); } }*/ public static void main(String[] args) throws InterruptedException{ MyTask task = new MyTask(); ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); System.out.println("create : " + t); return t; } }); for (int i = 0; i < 5; i ++){ es.submit(task); } Thread.sleep(2000); } }执行结果:
从输出结果中可以看出,创建了5个守护线程,并且当主线程main结束后,所有的线程都结束了。
6. 扩展线程池
虽然JDK已经帮我们实现了这个稳定的高性能线程池。但有的时候我们需要对这个线程池做一些扩展,比如,想要监控每个任务的执行开始时间结束时间,或者打印一些关键参数等等。
ThreadPoolExecutor提供了beforeExecute(),afterExecute(),terminated()三个接口对线程池进行控制,有点类似于spring的AOP
package com.example.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Created by mazhenhua on 2017/3/14. */ public class ExtThreadPool { public static class MyTask implements Runnable { public String name; public MyTask(String name) { this.name = name; } @Override public void run() { System.out.println("正在执行:Thread ID:" + Thread.currentThread().getId() + ", Task Name = " + name); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) throws InterruptedException { ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()){ @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("正在准备执行:" + ((MyTask) r).name); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("执行完成:" + ((MyTask) r).name); } @Override protected void terminated() { System.out.println("线程池退出"); } }; for (int i = 0; i < 5; i ++){ MyTask task = new MyTask("TASK-" + i); es.execute(task); Thread.sleep(10); } es.shutdown(); } }输出结果: