转载请保留出处: http://blog.csdn.net/xiaxl/article/details/60868573
下面介绍ThreadPoolExecutor的实现原理。我想用我能说出来的最简单的话,介绍出ThreadPoolExecutor是怎么实现的,对源代码不进行详细说明。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
1、如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。2、如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。3、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。4、如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过handler所指定的策略来处理此任务。5、当线程池中的线程数量大于corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。处理任务的优先级为: 核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler策略处理被拒绝的任务。
当线程池中线程的数量小于 corePoolSize 的数量时,每加入一个任务,创建一个线程; 当线程池中线程的数量等于 corePoolSize 的数量时,每增加一个任务,将任务加入到任务队列中; 而线程中是一个wile循环,执行完成一个任务后,会在任务队列中再次取出一个任务进行执行,直到所有任务被执行结束。
源码中有一个 Worker 类,该类是对任务runable的封装,作用就是可以在执行一个任务结束后,从任务队列中再次取出一个任务进行执行。 也可以这样理解,所有的线程执行的都是 Worker 这个runable任务,而Worker 的任务内容为,执行完成当前用户提交的任务后,到 workQueue 中取得下一个任务继续执行。
跟踪 addWorker(command, true)
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }从以上代码,我们知道 Worker 实际为一个线程的封装,真正的任务执行都是在Worker 中执行的。 跟踪Worker的 run 方法
public void run() { runWorker(this); }跟踪 runWorker 方法
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }Wile循环中,不断从任务队列中取出任务,并执行。如果取不到任务时,该线程执行结束。