ThreadPoolExecutor简析

    xiaoxiao2021-03-25  70

    ThreadPoolExecutor是java线程池的实现类,主要作用有这么几点: 1. 解耦任务提交和执行,便于控制任务执行的环境,用于吞吐量、响应能力等系统指标的调优; 2. 控制线程数量,复用线程,避免大量的线程造成资源竞争激烈,内存消耗严重引发的吞吐量下降、服务器宕机等问题; 3. 使系统稳定,当大量任务提交时,超过线程限制的任务会进入队列等待,达到平缓降低性能的目的。 下面我们来看看具体的实现。

    线程池实现思路

    我们知道每个线程都有线程空间也就是线程栈,里面放着栈帧(操作数栈、局部变量表等),随着方法的运行结束,那么线程所占用的资源也将被系统回收。 如何保证线程不被系统回收,而是人为的控制是否被回收呢? 循环是一个非常好的思路,只要线程不跳出循环,就不会被回收。我们通过控制什么时候线程跳出循环,来达到关闭线程的目的。具体怎么控制,实现思路应该很多,例如在循环中wait,等待任务到来然后notify唤醒线程,或者通过判断线程中断位,来跳出循环等等。 看看java中的ThreadPoolExecutor是如何实现的 通过以下我修剪后的代码,可以看到,差不多就是上面说的那些思路,java中用的是阻塞队列的超时和挂起来控制线程的生命周期。

    //这个是运行任务的具体方法 final void runWorker(Worker w) { Runnable task = w.firstTask; while (task != null || (task = getTask()) != null) //当getTask返回null时,循环跳出,线程回收 task.run(); } //获取任务 private Runnable getTask() { boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //判断线程是否可以超时回收 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //通过keepAliveTime来控制线程空闲死亡时间 workQueue.take();//如果线程不允许回收,那么就会一直挂起 if (r != null) return r; }

    下面我们来分析下,如何控制线程数,毕竟线程池的目的之一就是避免线程数的失控。 ThreadPoolExecutor主要有三个参数来控制:corePoolSize、maximumPoolSize、workQueue,那么具体是如何控制的呢?

    当前线程总数小于corePoolSize时,不管有没有线程闲置,都会为新的任务创建新的线程,不会进行线程的复用;当线程数等于corePoolSize,新到来的任务会进入队列中等待,等待的任务会被将来空闲的线程执行;如果线程数等于corePoolSize且队列已经满了,会new新线程,直到线程数量等于maximumPoolSize;当上面条件都验证后,那么在到来的任务就会进入拒绝服务,是直接放弃,还是抛出异常等各种策略可以定制。

    首先我们看看实现具体的代码:

    public void execute(Runnable command) { int c = ctl.get(); //ctl是atomicInteger类型,通过不同的计算方式得到线程池状态和线程数量 if (workerCountOf(c) < corePoolSize) { //线程总数小于corePoolSize if (addWorker(command, true)) //为新的任务创建新的线程 return; } if (isRunning(c) && workQueue.offer(command)) { //不能新建线程,那么就入队列(offer会立刻返回,失败为false) if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //创建超过corePoolSize数量的线程 reject(command); }

    java提供的api使用

    通过Executors提供的静态工厂方法,我们可以简单的使用线程池。

    方法介绍newFixedThreadPool(int nThreads)固定线程的线程池,任务会堆积在队列等待执行newSingleThreadExecutor()一个线程的线程池,任务在队列中等待串行执行newCachedThreadPool()最大线程数为Integer.MAX_VALUE的线程池,没有队列缓冲,不能被现有线程接受,直接创建新线程

    以上线程池直接使用有没有问题?从上面介绍来看,好像问题很大,不管是任务过多的堆积,或是过多的创建线程。

    1. newCachedThreadPool线程池 使用的是SynchronousQueue队列,offer方法直接把任务交个等待的线程,如果没有空闲线程挂起在队列上,那么直接返回false,从这里可以看出,当任务量急剧增大时,这可能导致线程无限增长,而线程创建是需要内存保存线程上下文信息的,可能会直接导致服务器内存不足,造成内存泄漏,同时线程对cup激烈的竞争,会导致服务器性能急剧下降。 2. newFixedThreadPool和newSingleThreadExecutor线程池 线程最大数量固定,通过LinkedBlockingQueue(容量很大的阻塞队列)来缓冲不能处理的任务,能够很好的保证合适数量的线程处于忙碌状态,但是同样会使得任务过多的堆积,导致响应能力下降。 从上分析看来,好像我们并不能直接的使用java提供的线程池啊,如果我们自己去实例化ThreadPoolExecutor会不会更好呢,那么我们就可以更精确的控制线程数量和队列长度,来达到更优的平衡。

    个性化定制

    通过有界队列、线程数量和空闲死亡时间来控制线程池,能够构造具有很好伸缩性的线程池,但是在系统资源使用和响应能力之间进行平衡,进行调优,却是件非常困难的事情。

    使用大容量队列,小数量线程,虽然可以减少cup竞争,上下文切换,人为的控制过少的线程,同样会造成吞吐量的下降;使用小容量队列,大数量线程,会使得cup竞争激烈,上线文频繁切换,也会使吞吐量下降。

    参数的确定非常的困难,需要根据具体的场景,考虑任务的执行时间,响应要求进行合理的调整,这更是需要耐心测试的事情。 由此看来使用api直接提供的线程池也是一个选择,只有当确实出现性能瓶颈时,才需要更复杂的调优,而调优,却是高级技术。

    怎么关闭线程池

    我们知道,只要有非后台线程运行,那么虚拟机就不会关闭。只需要关闭每个线程,就能关闭线程池,不过,关闭线程池是个很有技巧的活。 ThreadPoolExecutor提供了2个方法

    方法介绍shutdown()平缓的关闭线程池shutdownNow()暴力关闭线程池

    何为平缓,何为暴力?分析前前我们列举下线程池状态,以及代表的意思(括号中的数字就可以认为就是状态的常量值):

    状态介绍RUNNING(-1)运行状态:接受新的任务或者运行队列中等待的任务SHUTDOWN(0)关闭状态:不接受新的任务但运行队列中等待的任务STOP(1)暴力关闭状态:不接受新任务且不执行队列中任,设置所有线程中断状态TIDYING(2)当所有的任务都完成,所有的线程都消亡TERMINATED(3)线程池完全关闭状态

    先看看shutdown()的源码:

    public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); //设置线程池状态 interruptIdleWorkers(); //中断所有线程 onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); //尝试终止线程池 }

    从源码可以看到,shutdown方法主要是通过中断线程来关闭线程池的,什么时候线程会响应中断,从前面线程执行的分析可以看到,响应中断的情况主要是获取任务的时候挂起在队列上的操作take或者poll,如果队列中有任务,那么线程并不会响应中断,会继续执行,只有队列中没有任务时,线程挂起才会响应中断,跳出循环,可以去看看runWorker(Worker w)方法,可以看到每一次跳出循环就代表线程死亡,都会尝试关闭线程池,但是正常的关闭需要满足两个条件:队列为空且线程池状态大于SHUTDOWN。 由此可以总结,shutdown方法关闭的流程是:拒绝新任务加入–>执行完成所有剩余的任务–>关闭线程池

    现在来看看 shutdownNow()的源码:

    public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); //线程池状态为stop interruptWorkers(); tasks = drainQueue(); //排干所有队列中等待的任务 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

    对比shutdown的源码,我们发现状态变化为stop外,还多一个步骤,排干所有队列中的任务,让所有线程取不到任务,从而响应中断,快速地关闭线程池。这样的会存在很大的风险,但是风险的存在是看使用场景的,自己能够很好的把控风险,用这个方法也没有什么不可。

    转载请注明原文地址: https://ju.6miu.com/read-33639.html

    最新回复(0)