Java线程池ThreadPoolExecutor

    xiaoxiao2021-03-25  203

    转载请保留出处: http://blog.csdn.net/xiaxl/article/details/60868573

    下面介绍ThreadPoolExecutor的实现原理。我想用我能说出来的最简单的话,介绍出ThreadPoolExecutor是怎么实现的,对源代码不进行详细说明。

    一、参数含义:

    corePoolSize 线程池中所保存的线程数,包括空闲线程。maximumPoolSize 线程池中允许的最大线程数(采用LinkedBlockingQueue时没有作用)。keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间,线程池维护线程所允许的空闲时间。unit keepAliveTime 参数的时间单位,线程池维护线程所允许的空闲时间的单位:秒workQueue 执行前用于保持任务的队列(缓冲队列)。此队列仅保持由execute 方法提交的 Runnable 任务。RejectedExecutionHandler 线程池对拒绝任务的处理策略(重试添加当前的任务,自动重复调用execute()方法)

    二、实现策略:

    当一个任务通过execute(Runnable)方法欲添加到线程池时:

    1、如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。2、如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。3、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。4、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler所指定的策略来处理此任务。5、当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

    处理任务的优先级为: 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler策略处理被拒绝的任务。

    三、原理:

    当线程池中线程的数量小于 corePoolSize 的数量时,每加入一个任务,创建一个线程; 当线程池中线程的数量等于 corePoolSize 的数量时,每增加一个任务,将任务加入到任务队列中; 而线程中是一个wile循环,执行完成一个任务后,会在任务队列中再次取出一个任务进行执行,直到所有任务被执行结束。

    下边是代码跟踪:

    源码中有一个 Worker 类,该类是对任务runable的封装,作用就是可以在执行一个任务结束后,从任务队列中再次取出一个任务进行执行。 也可以这样理解,所有的线程执行的都是 Worker 这个runable任务,而Worker 的任务内容为,执行完成当前用户提交的任务后,到 workQueue 中取得下一个任务继续执行。

    ThreadPoolExecutor 中 execute的源码如下:
    public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

    跟踪 addWorker(command, true)

    private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

    从以上代码,我们知道 Worker 实际为一个线程的封装,真正的任务执行都是在Worker 中执行的。 跟踪Worker的 run 方法

    public void run() { runWorker(this); }

    跟踪 runWorker 方法

    final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

    Wile循环中,不断从任务队列中取出任务,并执行。如果取不到任务时,该线程执行结束。

    四、使用举例

    public class Test { private ThreadPoolExecutor threadpool; /** * corePoolSize 池中所保存的线程数,包括空闲线程。 * maximumPoolSize 池中允许的最大线程数(采用LinkedBlockingQueue时没有作用)。 * keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 * unit 参数的时间单位。 * workQueue 执行前用于保持任务的队列(缓冲队列)。此队列仅保持由execute 方法提交的 Runnable 任务。 * RejectedExecutionHandler -线程池对拒绝任务的处理策略(重试添加当前的任务,自动重复调用execute()方法) */ public Test() { /** * 当一个任务通过execute(Runnable)方法欲添加到线程池时: * 1.如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。 * 2.如果此时线程池中的数量等于corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。 * 3.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。 * 4.如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler所指定的策略来处理此任务。 * 5.当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 * * 处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 */ threadpool = new ThreadPoolExecutor(2, 10, 20, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.DiscardOldestPolicy()); } // add task into thread pool public void submit(final int flag) { threadpool.execute(new Runnable() { public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(flag + " Hello"); } }); } /** * close thread pool */ public void shutdown() { threadpool.shutdown(); } /** * 主线程 * @param args */ public static void main(String[] args) { Test t = new Test(); for (int i = 0; i < 20; i++) { System.out.println("time:" + i); t.submit(i); } System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>"); } }

    ========== THE END ==========

    bjxiaxueliang 认证博客专家 Java OpenGL Android 一名普通Android程序员,专注Android、OpenGL ES、移动音视频等终端技术研发。希望通过该博客分享自己的Android Coding心路。
    转载请注明原文地址: https://ju.6miu.com/read-504.html

    最新回复(0)