J.U.C--线程池ThreadPoolExecutor

    xiaoxiao2021-03-25  120

    这一篇博文主要讲解关于Java的线程池相关的内容,主要包括: (1) Executor接口以及其子接口 (2)Executor的生命周期 (3)Executors (4)任务拒绝策略 (5)线程池 ThreadPoolExecutor实现原理

    1. Executor接口以及其子接口

    首先来看一下线程池相关类与接口的体系结构图:

    上图很清晰显示了线程池中的常用类和接口之间关系,下面首先看看最基础的Executor接口:

    public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * 翻译: * 1.任务在未来某个时候被执行,即可能execute之后任务不是立刻执行; * 2.任务的执行是在一个新的线程中执行,这个新的线程可能是线程池中的,也可能是被调用的线程。这取决于对Executor的实现 */ void execute(Runnable command); }

    首先Executor接口的execute方法只是执行一个Runnable的任务。根据线程池的执行策略最后这个任务可能在新的线程中执行,或者线程池中的某个线程,甚至是Caller线程。

    Executor的子接口ExecutorService增加了一些方法用来更好的支持线程池的一些策略,我们在实际使用中也是使用ExecutorService居多,其中增加的最核心的方法如下:

    //当发生shutdown请求或则timeout或则当前线程被中断时,阻塞等待所有的任务结束。 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; //判断当前executor 是否已经shutdown boolean isShutdown(); //在shutdown之后如果所有的任务都已经结束了就返回TRUE, boolean isTerminated(); //调用该函数表示executor将不再接收新的任务,已经被提交的任务将被依然被执行。 void shutdown(); //尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行任务的列表 List<Runnable> shutdownNow(); //提交一个Runnable任务给executor,并返回一个Future。直到任务结束,否则Future的get方法将一直返回null Future<?> submit(Runnable task); //提交一个有返回值的Callable任务给executor,并返回一个Future代表线程执行的结果。直到任务结束,否则Future的get方法将一直返回null。 <T> Future<T> submit(Callable<T> task)

    对于submit提交的两种任务Runnable和Callable,都是向线程池中提交任务,它们的区别在于Runnable在执行完毕后没有结果,Callable执行完毕后有一个结果。这在多个线程中传递状态和结果是非常有用的。另外他们的相同点在于都返回一个Future对象。Future对象可以阻塞线程直到运行完毕(获取结果,如果有的话),也可以取消任务执行,当然也能够检测任务是否被取消或者是否执行完毕。在没有Future之前我们检测一个线程是否执行完毕通常使用Thread.join()或者用一个死循环加状态位来描述线程执行完毕。现在有了更好的方法能够阻塞线程,检测任务执行完毕甚至取消执行中或者未开始执行的任务。

    ExecutorService的子接口就是ScheduledExecutorService,ScheduledExecutorService描述的功能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。这包括延迟时间执行一次、延迟时间周期性执行以及固定延迟时间周期性执行等。当然了继承ExecutorService的ScheduledExecutorService拥有ExecutorService的全部特性。

    ThreadPoolExecutor是ExecutorService的默认实现,ThreadPoolExecutor并不是直接实现的ExecutorService接口,而是继承AbstractExecutorService抽象类。AbstractExecutorService抽象类实现了ExecutorService接口。下图可以比较清楚的显示:

    ScheduledThreadPoolExecutor是继承ThreadPoolExecutor和实现ScheduledExecutorService接口,具备周期性任务调度的实现。

    2.Executor的生命周期

    我们都知道在Java中线程有多种状态:初始化、就绪态、运行态、阻塞态、死亡态等。同样,作为线程管理器的线程池同样有多种状态,JVM会在所有线程(非后台daemon线程)全部终止后才退出,为了节省资源和有效释放资源,关闭一个线程池就显得很重要。有时候无法正确的关闭线程池,将会阻止JVM的结束。

    线程池Executor是异步的执行任务,因此任何时刻不能够直接获取提交的任务的状态。这些任务有可能已经完成,也有可能正在执行或者还在排队等待执行。因此关闭线程池可能出现一下几种情况:

    1)平缓关闭(shutdown):已经启动的任务全部执行完毕,同时不再接受新的任务。 2)立即关闭(terminate):取消所有正在执行和未执行的任务。

    另外关闭线程池后对于任务的状态应该有相应的反馈信息。

    下图显示了线程池的几种状态:

    1)线程池在构造前(new操作)是初始状态,一旦构造完成线程池就进入了执行状态RUNNING。严格意义上讲线程池构造完成后并没有线程被立即启动,只有进行“预启动”或者接收到任务的时候才会启动线程。这个会后面线程池的原理会详细分析。但是线程池是处于运行状态,随时准备接受任务来执行。

    2)线程池运行中可以通过shutdown()和shutdownNow()来改变运行状态。shutdown()是一个平缓的关闭过程,线程池停止接受新的任务,同时等待已经提交的任务执行完毕,包括那些进入队列还没有开始的任务,这时候线程池处于shutdown状态;shutdownNow()是一个立即关闭过程,线程池停止接受新的任务,同时线程池取消所有执行的任务和已经进入队列但是还没有执行的任务,这时候线程池处于STOP状态。

    3)一旦shutdown()或者shutdownNow()执行完毕,线程池就进入TERMINATED状态,此时线程池就结束了。

    4)isTerminating()描述的是SHUTDOWN和STOP两种状态。

    5)isShutdown()描述的是非RUNNING状态,也就是SHUTDOWN/STOP/TERMINATED三种状态。

    线程池的API如下:

    其中shutdownNow()会返回那些已经进入了队列但是还没有执行的任务列表。awaitTermination描述的是等待线程池关闭的时间,如果等待时间线程池还没有关闭将会抛出一个超时异常。

    对于关闭线程池期间发生的任务提交情况就会触发一个拒绝执行的操作。这是Java.util.concurrent.RejectedExecutionHandler描述的任务操作。下一个小结中将描述这些任务被拒绝后的操作。

    总结下这个小节:

    1)线程池有运行、关闭、停止、结束四种状态,结束后就会释放所有资源;

    2)平缓关闭线程池使用shutdown()

    3)立即关闭线程池使用shutdownNow(),同时得到未执行的任务列表

    4)检测线程池是否正处于关闭中,使用isShutdown()

    5)检测线程池是否已经关闭使用isTerminated()

    6)定时或者永久等待线程池关闭结束使用awaitTermination()操作

    以上内容是我上网找到的资料,但是实际上我自己看源码的时候发现线程池除了RUNNING、SHUTDOWN、STOP/TERMINATED四种状态,还有一个TIDYING状态。

    在ThreadPoolExecutor类源码中截取内容如下:

    /* The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * TERMINATED: terminated() has completed * * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * 当线程池和任务阻塞队列均为空 * STOP -> TIDYING * 当线程池为空 * TIDYING -> TERMINATED * When the terminated() hook method has completed * * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). */

    上面的英文比较简单,也比较好理解,这里就不翻译的,根据上面的意思,也就是说,线程池在SHUTDOWM/STOP到TERMINATED状态之间还存在一个TIDYING状态。这几个状态的变化是这样的,这里载贴一下,有兴趣的也可以去源码中看一看。

    /* RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * STOP -> TIDYING * When pool is empty * TIDYING -> TERMINATED * When the terminated() hook method has completed */

    小结:

    本节我们需要了解的就是线程池的生命周期,回顾下,线程池的状态有:

    1、RUNNING

    2、SHUNDOWN

    3、STOP

    4、TIDYING

    5、TERMINATED

    这几个状态的转化关系为:

    1、调用shundown()方法线程池的状态由RUNNING——>SHUTDOWN

    2、调用shutdowNow()方法线程池的状态由RUNNING——>STOP

    3、当任务队列和线程池均为空的时候 线程池的状态由STOP/SHUTDOWN——–>TIDYING

    4、当terminated()方法被调用完成之后,线程池的状态由TIDYING———->TERMINATED状态。

    3. Executors工具类

    关于这部分内容,可以参考以下这篇博客的第四节 http://blog.csdn.net/u010853261/article/details/54345448

    4. 任务拒绝策略

    首先来分析一下为什么会有任务拒绝策略:

    这里先假设一个前提:线程池有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此在任务队列长度有限的情况下就会出现对新任务拒绝处理的问题,需要有一种策略来处理应该加入任务队列却因为队列已满无法加入的情况。另外在线程池关闭的时候也需要对任务加入队列操作进行额外的协调处理。

    RejectedExecutionHandler提供了四种方式来处理任务拒绝策略

    1)直接丢弃(DiscardPolicy)

    2)丢弃队列中最老的任务(DiscardOldestPolicy)。

    3)直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略(AbortPolicy)

    4)将任务分给调用线程来执行(CallerRunsPolicy)

    这四种策略是独立无关的,是对任务拒绝处理的四种表现形式。最简单的方式就是直接丢弃任务。但是却有两种方式,到底是该丢弃哪一个任务,比如可以丢弃当前将要加入队列的任务本身(DiscardPolicy)或者丢弃任务队列中最老任务(DiscardOldestPolicy)。丢弃最旧任务也不是简单的丢弃最旧的任务,而是有一些额外的处理。除了丢弃任务还可以直接抛出一个异常(RejectedExecutionException),这是比较简单的方式。抛出异常的方式(AbortPolicy)尽管实现方式比较简单,但是由于抛出一个RuntimeException,因此会中断调用者的处理过程。除了抛出异常以外还可以不进入线程池执行,在这种方式(CallerRunsPolicy)中任务将有调用者线程去执行。

    5.ThreadPoolExecutor源码分析

    ThreadPoolExecutor是ExecutorService的一种实现,但是是间接实现。ThreadPoolExecutor是继承AbstractExecutorService。而AbstractExecutorService实现了ExecutorService接口。

    下面从几个方面来介绍ThreadPoolExecutor。

    1. 线程池的数据结构与构造器

    看看ThreadPoolExecutor类属性就不难分析出ThreadPoolExecutor的数据结构,下图描述了这种数据结构:

    根据源码和数据结构能够分析出其数据结构功能: 1)线程池需要支持多个线程并发执行,因此有一个线程集合Collection来执行线程任务;

    2)很显然在多个线程之间协调多个任务,那么就需要一个线程安全的任务集合,同时还需要支持阻塞、超时操作,那么BlockingQueue是必不可少的;

    3)既然是线程池,出发点就是提高系统性能同时降低资源消耗,那么线程池的大小就有限制,因此需要有一个核心线程池大小(线程个数)和一个最大线程池大小(线程个数),有一个计数用来描述当前线程池大小;

    4)如果是有限的线程池大小,那么长时间不使用的线程资源就应该销毁掉,这样就需要一个线程空闲时间的计数来描述线程何时被销毁;

    5)前面描述过线程池也是有生命周期的,因此需要有一个状态来描述线程池当前的运行状态;

    6)线程池的任务队列如果有边界,那么就需要有一个任务拒绝策略来处理过多的任务,同时在线程池的销毁阶段也需要有一个任务拒绝策略来处理新加入的任务;

    7)上面中的线程池大小、线程空闲实际数、线程池运行状态等等状态改变都不是线程安全的,因此需要有一个全局的锁(mainLock)来协调这些竞争资源;

    8)除了以上数据结构以外,ThreadPoolExecutor还有一些状态用来描述线程池的运行计数,例如线程池运行的任务数、曾经达到的最大线程数,主要用于调试和性能分析。

    对于ThreadPoolExecutor而言,一个线程就是一个Worker对象,它与一个线程绑定,当Worker执行完毕就是线程执行完毕,这个在后面详细讨论线程池中线程的运行方式。

    下面再看看线程池的构造方法:

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

    首先来分析一下构造器的参数(来自JDK文档)

    corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 threadFactory - 执行程序创建新线程时使用的工厂。 handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

    上面的构造器参数里面最不好理解的就是:corePoolSize 和 maximumPoolSize这两个参数了。所以特别做一个详细的解释: 1)池中线程数小于corePoolSize时,新任务都不排队而是直接添加新线程。 2)池中线程数大于等于corePoolSize时,workQueue未满,首选将新任务加入workQueue而不是添加新线程。 3)池中线程数大于等于corePoolSize时,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务。 4)池中线程数大于等于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize时,新任务被拒绝,使用handler处理被拒绝的任务。 5)如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。

    上面这4条规则非常重要。基本解释清楚了corePoolSize 和 maximumPoolSize这两个参数怎么用。

    2. 线程池中线程的创建方式

    在线程池中线程是通过线程工厂ThreadFactory来创建的,ThreadFactory是一个接口,在JDK中有一个默认的实现DefaultThreadFactory:

    static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1);//线程池标号 private final ThreadGroup group;//线程组 private final AtomicInteger threadNumber = new AtomicInteger(1);//线程标号 private final String namePrefix;// DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false);//所有线程都默认设置为非Daemon线程 //产生的所有线程优先级相同 if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }

    从上面代码中可以得到以下几点信息:

    1、在这个线程工厂中,同一个线程池的所有线程属于同一个线程组,也就是创建线程池的那个线程组, 同时线程池的名称都是“pool–thread-”, 其中poolNum是线程池的数量序号,threadNum是此线程池中的线程数量序号。

    2、另外对于线程池中的所有线程默认都转换为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。

    3、还有一点就是默认将线程池中的所有线程都调为同一个级别,这样在操作系统角度来看所有系统都是公平的,不会导致竞争堆积。

    3. 线程池中线程的生命周期

    一个线程Worker被构造出来以后就开始处于运行状态。下图给出一个运行过程:

    1)当提交一个任务时,如果需要创建一个线程(何时需要创建在下一节中探讨)时,就调用线程工厂创建一个线程,同时将线程绑定到Worker工作队列中。需要说明的是,Worker队列构造的时候带着一个任务Runnable,因此Worker创建时总是绑定着一个待执行任务。换句话说,创建线程的前提是有必要创建线程(任务数已经超出了线程或者强制创建新的线程,至于为何强制创建新的线程后面章节会具体分析),不会无缘无故创建一堆空闲线程等着任务。这是节省资源的一种方式。

    2)一旦线程池启动线程后(调用线程run()方法),那么线程工作队列Worker就从第1个任务开始执行(这时候发现构造Worker时传递一个任务的好处了),一旦第1个任务执行完毕,就从线程池的任务队列中取出下一个任务进行执行。循环如此,直到线程池被关闭或者任务抛出了一个RuntimeException。

    3)由此可见,线程池的基本原理其实也很简单,无非预先启动一些线程,线程进入死循环状态,每次从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程就行。如此反复。

    其实,线程池原理看起来简单,但是复杂的是各种策略,例如何时该启动一个线程,何时该终止、挂起、唤醒一个线程,任务队列的阻塞与超时,线程池的生命周期以及任务拒绝策略等等。下一节将研究这些策略问题。

    4. execute()方法源码介绍

    下面通过分析execute()方法的源码来分析上面提到的各种策略:其实不管是对于execute()方法还是submit()方法来提交任务,最后都是执行execute()方法。

    在execute方法上有一段注释:

    /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. */

    这段注释的意思是: 1)任务在未来的某个时刻可能被执行,意味着也可能不被执行,为什么有可能不执行呢?在下面通过源码说明

    2)任务可能在一个新的线程中执行,也可能在线程池中已存在的线程中执行。

    3)如果任务不能被提交执行,可能是由于线程池已经shutdown或者是任务队列中已经满了。这种情况下任务将由此时的任务拒绝策略决定怎么来处理。

    那么为什么task在未来可能被执行也可能不被执行呢? 如果不被执行,可能是出现了上面所说的第3种的情况(shutdown或者是任务队列满了);在未来被执行即延时执行可能因为需要等待线程来处理。

    在分析execute方法的源码之前,我们首先还得搞清楚一件事情,线程池是怎么获取线程池状态 和 当前线程池中有效的线程数的。在ThreadPoolExecutor类里面的注释已经说明白了咋点:

    /** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * workerCount, indicating the effective number of threads * runState, indicating whether running, shutting down etc * * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. */

    从上面的注释中我们可以知道: (1)通过一个原子整型变量ctl来包装workerCount 和 runState这两个状态。

    (2)workerCount表示有效(存活)的工作线程;runState表示线程池的运行状态。

    (3)ctl的低29位来表示workerCount, 高三位来表示runState状态,所以workerCount最大值是2^29-1,下面给出源码:

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 包装和解包 ctl变量 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ private static boolean runStateLessThan(int c, int s) { return c < s; } private static boolean runStateAtLeast(int c, int s) { return c >= s; } private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * Attempts to CAS-increment the workerCount field of ctl. */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } /** * Attempts to CAS-decrement the workerCount field of ctl. */ private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } /** * Decrements the workerCount field of ctl. This is called only on * abrupt termination of a thread (see processWorkerExit). Other * decrements are performed within getTask. */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }

    分析完线程池对线程容量限制和状态维护,下面再来看execute()方法的源码分析了:

    public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /** * 通过三步来处理: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 翻译:如果池中正在运行线程数小于corePoolSize,那么就尝试为新任务开启一个新的线程, * 并将这个任务作为该线程的第一个运行任务。通过调用addWorker()方法来自动检测runState 和 * workerCount,防止本来不需要添加线程而要求添加线程的假警报 * * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 翻译:如果一个任务成功入队,这时候我们需要通过双检查来确定 是否我们应该添加一个 * 新的线程。 因为存在以下两种情况:在第一次检查时,池中有一个线程挂掉了或则是 * 线程池被shutdown了,这时我们就需要重新检查线程状态,如果线程池已经stop, * 就将任务回滚出队。 如果上诉两种情况都不存在就创建一个新的线程 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 翻译:如果我们不能够将一个任务入队,这时我们就应该添加一个新的线程处理, * 如果添加失败,这时线程池就是出于shutdown或则队列已经满了,调用拒绝策略来 * 处理这个task。 */ //获取ctl的值 int c = ctl.get(); //获取线程池中线程数workerCount,当其小于corePoolSize时, 就直接为新的task创建一个线程执行,如果创建线程成功然后就直接返回。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //检测线程池是否处于工作状态 且 将任务成功添加到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { //重新检查线程池运行状态,如果线程池没有工作,则考虑将任务从队列中移除,如果移除成功则根据任务拒绝策略类进行处理 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); //如果工作线程数为0,则为task创建一个新的线程并运行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } 如果我们添加到队列失败,则尝试开启一个新的线程执行拒绝策略 else if (!addWorker(command, false)) reject(command); } //根据ctl变量判断线程池是否处于Running状态 private static boolean isRunning(int c) { return c < SHUTDOWN; }

    execute方法中的注释已经很明确的说明了execte()方法的流程。这里我再梳理如下:

    1、当我们提交一个任务,检测线程池中的工作线程是否小于核心线程数,如果小于,则进行2,否则进行3。

    2、尝试建一个新的线程来将任务作为第一个任务来执行,如果建立成功,则立即返回。否则进行3。

    3、检测线程池是否处于工作状态,如果正在工作,则进行4,否则进行10。

    4、将任务加入到任务队列中,如果加入成功,则进行5.否则,进行10。

    5、重新检测(双检测)线程池是否处于工作状态,如果不是,则进行6,如果是,则进行7。

    6、将任务从任务队列中移除,如果移除成功,则利用任务拒绝策略来处理该任务。否则进行7.

    7、检查工作线程数是否为零。如果不为零,则直接退出。否则进行8。

    8、添加一个新的线程且该线程的第一个任务为null。无论是否成功均退出。

    10、尝试建立一个新的线程来处理该任务,如果建立失败则利用任务拒绝策略来进行处理。

    上面的源码解释中调用了addWorker()函数实现:添加新的线程来处理新的任务。在分析addWorker()函数函数之前我们首先有必要来分析一下ThreadPoolExecutor类中的工作线程类。

    5. 线程类Worker

    在ThreadPoolExecutor中,Worker类其实就是就是线程池中的一个线程对象。也就是一个Worker对象对应于线程池中的一个线程。我们看看Worker类里面的属性就能得出这个结论。

    /** 当前worker对象绑定的线程,其实thread就是当前worker线程的引用,通过ThreadFactory创建线程,如果线程工厂创建失败时则为null,所以thread变量是final的 */ final Thread thread; /** 当前线程运行的初始任务,其实就是从阻塞队列(如果有)里面取出来的一个Runable,这个Runable将被Worker thread执行 */ Runnable firstTask; /** 当前线程已经运行结束的task任务数 */ volatile long completedTasks;

    Worker类继承了AQS同步器,用于在执行任务前后来获取和释放锁。这可防止中断,在等待一个任务时旨在唤醒工作线程而不是中断正在运行的任务。

    这里实现了一个简单的非重入独占锁而不是使用ReentrantLock。 主要是因为不想工作任务正在执行线程池中的控制方法时能够被再次锁住。 除此之外,为了禁止中断直到线程正在开始执行任务,我们将锁的初始状态设置为一个负数,当启动时将在runWorker中清除。

    下面给出Worker实现源代码:

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); //初始设置为-1,禁止中断直到调用runWorker()方法 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 将线程运行任务的循环委托给外部的runWorker()函数 */ public void run() { runWorker(this); } // 下面是加锁方法,实现的是一种独占锁。 // // The value 0 represents the unlocked state. // The value 1 represents the locked state. //从实现的AQS方法就可知是使用到了独占锁 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }

    当Worker类中的线程启动之后,则会调用Worker类中的run方法,而run方法调用了ThreadPoolExecutor的runWorker() 方法,因此,下面我们就看下runWorker方法:

    final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); //允许终端 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock();//加锁 //如果线程池已经stopping,确保线程是被中断的; //如果线程池没有stopping,确保线程没有被中断, if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock();//释放锁 } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

    这个runWorker()方法会让调用线程不停的循环执行任务。重复的从任务队列中取出任务并执行。

    从上面runWorker()方法我们还可以看出其维护了中断控制;并且在运行task的run()函数的时候前都对worker进行了加锁,在task执行完之后再对worker对象解锁,保证了每个task运行过程中的线程安全。

    6. addWorker()方法

    最后来分析addWorker(Runnable firstTask, boolean core)方法,该方法也就是针对这个firstTask创建一个线程来处理。

    此函数的功能是: (1)根据当前线程池的状态和给的边界条件来检测是否需要一个新的线程添加。

    (2)如果需要,则添加到线程队列中并调整工作线程数并启动线程执行第一个任务。

    (3)如果该方法检测到线程池处于STOP状态或者是察觉到将要停止,则返回false。

    (4)如果线程工厂创建线程失败(可能是由于发生了OOM异常)则也返回false。

    按照上面的描述来看addWorker具体代码的实现就相当好理解了。哈,已有一定的注释,这里就不再过多的介绍了。

    private boolean addWorker(Runnable firstTask, boolean core) { //先是各种检测来判断是否需要建立一个新的Worker对象 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))//大于边界就不创建 return false; if (compareAndIncrementWorkerCount(c)) break retry;//退出循环,即需要建立一个新的Worker对象 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //开始产生一个Worker对象,并将其添加到线程队列中去 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//添加到线程队列中 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//添加成功,则该启动线程 workerStarted = true; } } } finally { if (! workerStarted)//如果没有启动成功,执行addWorkerFailed方法 addWorkerFailed(w);//功能:从线程队列中将其移除。 } return workerStarted; } //从线程队列中移除 w。 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }

    7. 小结

    线程池的工作原理:当我们写程序中使用submit往线程池中提交新任务的时候,线程池首先会判断是否需要生成新的线程来执行这个任务。如果需要,则新建一个,然后启动此线程,接着就是一个一个的任务被启动的线程执行了。

    转载请注明原文地址: https://ju.6miu.com/read-22351.html

    最新回复(0)