java1.7集合源码赏析系列:线程池原理

    xiaoxiao2021-12-14  81

    Executors支持以下各种方法:

    创建并返回设置有常用配置字符串的 ExecutorService 的方法。 创建并返回设置有常用配置字符串的 ScheduledExecutorService 的方法。 创建并返回“包装的”ExecutorService 方法,它通过使特定于实现的方法不可访问来禁用重新配置。 创建并返回 ThreadFactory 的方法,它可将新创建的线程设置为已知的状态。 创建并返回非闭包形式的 Callable 的方法,这样可将其用于需要 Callable 的执行方法中。 上述是Executors类在api中的说明,我们可以很清楚的知道Executors其实是一个工厂类,它提供了各种不同参数的线程池给我们使用。

    相对来说newCachedThreadPool和newFixedThreadPool 用的较多,这是使用方法:传送门 接着让我们带着问题来看源码, 1.为什么要这么用? 2.使用不当会导致什么样的问题? 3.他们的实现有什么差异?

    ThreadPoolExecutor介绍

    ThreadPoolExecutor 是线程池的实现类,它继承了AbstractExecutorService,实现了ExecutorService。fixed和cache线程池都是由它实现,区别在于默认参数不同。下面看一下两者的代码。

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }

    实例化ThreadPoolExecutor类的第三个参数不同的,一个是60,一个是0。这是他们最大的区别。下面就ThreadPoolExecutor类的参数作出说明。

    ThreadPoolExecutor的参数

    //ThreadPoolExecutor的构造方法 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /** corePoolSize - 池中所保存的线程数,包括空闲线程。 maximumPoolSize - 池中允许的最大线程数。 keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 unit - keepAliveTime 参数的时间单位。 workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 */

    核心和最大池大小 当新任务在方法execute(java.lang.Runnable)中提交时,如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

    创建新线程 使用 ThreadFactory 创建新线程。如果没有另外说明,则在同一个 ThreadGroup 中一律使用Executors.defaultThreadFactory()创建线程,并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。

    按需构造 默认情况下,即使核心线程最初只是在新任务到达时才创建和启动的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 对其进行动态重写。如果构造带有非空队列的池,则可能希望预先启动线程。

    保持活动时间 如果池中当前有多于corePoolSize的线程,则这些多出的线程在空闲时间超过keepAliveTime时将会终止(参见getKeepAliveTime(java.util.concurrent.TimeUnit))。这提供了当池处于非活动状态时减少资源消耗的方法。如果池后来变得更为活动,则可以创建新的线程。也可以使用方法setKeepAliveTime(long, java.util.concurrent.TimeUnit) 动态地更改此参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在关闭前有效地从以前的终止状态禁用空闲线程。默认情况下,保持活动策略只在有多于 corePoolSizeThreads 的线程时应用。但是只要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程。

    排队 所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互: 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

    钩子 (hook) 方法 此类提供 protected 可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目。此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。 如果钩子 (hook) 或回调方法抛出异常,则内部辅助线程将依次失败并突然终止。 beforeExecute(Thread t, Runnable r) 在执行给定线程中的给定 Runnable 之前调用的方法. t - 将运行任务 r 的线程。 r - 将执行的任务。

    afterExecute(Runnable r, Throwable t) 基于完成执行给定 Runnable 所调用的方法。 r - 已经完成的 runnable 线程。 t - 导致终止的异常;如果执行正常结束,则为 null。

    terminated() 当 Executor 已经终止时调用的方法。

    以上钩子方法均实现不执行任何操作,在子类中定制

    //任务执行入口,Workder类 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { //没有任务时结束 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任务执行前调用钩子方法 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任务执行后调用钩子方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //内部调用terminated(),其它线程结束入口也会调用terminated方法 processWorkerExit(w, completedAbruptly); } } //从队列中获取任务,如果设置了时间超时时返回null,如果没有设置时间会阻塞 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); . if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

    终止 程序 AND 不再引用的池没有剩余线程会自动 shutdown。如果希望确保回收取消引用的池(即使用户忘记调用 shutdown()),则必须安排未使用的线程最终终止:设置适当保持活动时间,使用0核心线程的下边界和/或设置allowCoreThreadTimeOut(boolean)。

    //任务提交 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } //校验线程池参数,添加任务,启动线程 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
    转载请注明原文地址: https://ju.6miu.com/read-964918.html

    最新回复(0)