线程池java.util.concurrent.ThreadPoolExecutor总结

    xiaoxiao2021-03-26  15

    线程池还具有提高系统性能的优点,因为创建线程和清除线程的开销比较大。 

    有两种不同类型的线程池:一是固定线程数量的线程池;二是可变数量的线程池。 

     

    对于固定数量的线程池,可以使用Executors的静态方法 newFixedThreadPool 来创建 ExecutorService;或者利用 newSingleThreadPool来创建。 

          而 ExecutorService 实现了 Executor 接口,这个接口中有一个方法:Execute(Runnable command),也就是执行线程。 

          对于固定数量的线程池而言,如果需要执行的线程数量多于构造的数量,那么只能并发构造时的数量,剩下的线程就进入线程池的等待队列。 

          如果不需要使用该线程池了,则使用 ExecutorService 中的 shutDown 方法,此时,该线程池就不会接受执行新的线程任务了。

     

    对于可变数量的线程池,可用Executors的静态方法 newCachedThreadPool 来创建 ExecutorService,该线程池的大小是不定的,当执行任务时,会先选取缓存中的空闲线程来执行,如果没有空闲线程,则创建一个新的线程,而如果空闲线程的空闲状态超过60秒,则线程池删除该线程。

     

    还有一种线程池:延迟线程池 

    该线程池的创建有两个方法: Executors.newScheduledThreadPool(int corePoolSize);

                                            Executors.newSingleScheduledExecutor(); 

    创建之后,会获得一个 ScheduledExecutorService。 

    该对象的一个重要的方法就是: schedule(Runnable command, long delay, TimeUnit unit)

     该方法返回了一个 ScheduledFuture。

     

     

     

    JDK1.5中加入了许多对并发特性的支持,例如:线程池。

    一、简介 线程池类为 Java.util.concurrent.ThreadPoolExecutor,常用构造方法为:

    Java代码   ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)  

     

    corePoolSize:        线程池维护线程的最少数量 (core : 核心) maximumPoolSize:线程池维护线程的最大数量  keepAliveTime:     线程池维护线程所允许的空闲时间 unit:           线程池维护线程所允许的空闲时间的单位 workQueue: 线程池所使用的缓冲队列 handler:      线程池对拒绝任务的处理策略 一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。  当一个任务通过execute(Runnable)方法欲添加到线程池时: 如果线程池中运行的线程 小于corePoolSize ,即使线程池中的线程都处于空闲状态,也要 创建新的线程 来处理被添加的任务。

    如果线程池中运行的线程大于等于corePoolSize,但是缓冲队列 workQueue未满 ,那么任务被放入缓冲队列 。

    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满(即无法将请求加入队列 ),并且线程池中的数量小于maximumPoolSize,建新的线程 来处理被添加的任务。

    如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize ,那么通过 handler 所指定的策略来处理此任务。 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止 。这样,线程池可以动态的调整池中的线程数。

    也就是:处理任务的优先级为: corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。 

    当然,如果用的是无界的缓冲队列,那么当线程等于corePoolSIze,小于maximumPoolSize,任务就会不停的添加到队列中,也不会创建新线程,也不会丢弃。

    Java代码   unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:   NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。      workQueue常用的是:          java.util.concurrent.ArrayBlockingQueue      handler有四个选择:   ThreadPoolExecutor.AbortPolicy()  //默认的处理方式       //抛出java.util.concurrent.RejectedExecutionException异常   ThreadPoolExecutor.CallerRunsPolicy()        //重试添加当前的任务,他会自动重复调用execute()方法   ThreadPoolExecutor.DiscardOldestPolicy()        //抛弃旧的任务   ThreadPoolExecutor.DiscardPolicy()  //推荐的处理方式     // 抛弃当前的任务  

    一个例子: 

    Java代码   package test;   import java.util.concurrent.ArrayBlockingQueue;   import java.util.concurrent.ThreadPoolExecutor;   import java.util.concurrent.TimeUnit;      public class TestThreadPool {          private static int produceTaskSleepTime = 2;       private static int produceTaskMaxNumber = 10;          /**       * 线程池执行的任务       */       public static class ThreadPoolTask implements Runnable{           // 保存任务所需要的数据           private Object threadPoolTaskData;              ThreadPoolTask(Object tasks) {               this.threadPoolTaskData = tasks;           }              public void run() {               // 处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句               System.out.println("start .." + threadPoolTaskData);               try {                   //便于观察,等待一段时间                   Thread.sleep(2000);               } catch (Exception e) {                   e.printStackTrace();               }               threadPoolTaskData = null;           }              public Object getTask() {               return this.threadPoolTaskData;           }       }                 public static void main(String[] args) {           // 构造一个线程池           ThreadPoolExecutor threadPool = new ThreadPoolExecutor(243,                   TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),                   new ThreadPoolExecutor.DiscardOldestPolicy());              for (int i = 1; i <= produceTaskMaxNumber; i++) {               try {                   // 产生一个任务,并将其加入到线程池                   String task = "task@ " + i;                   System.out.println("put " + task);                                      threadPool.execute(new ThreadPoolTask(task));                      // 便于观察,等待一段时间                   Thread.sleep(produceTaskSleepTime);               } catch (Exception e) {                   e.printStackTrace();               }           }       }          }    

    说明: 1、在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。

    2、一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。

    3、在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。

    4、这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。

    如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。

    5、如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。

    因为队员工作是需要成本的,如果工作很闲,闲到 3 秒都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。

    本例来源:http://blog.csdn.net/senton/article/details/3528720

     

    Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。 线程池的类体系结构:  ExecutorService:         真正的线程池接口。 ScheduledExecutorService     能和Timer/TimerTask类似,解决那些需要任务重复执行的问题。 ThreadPoolExecutor         ExecutorService的默认实现。 ScheduledThreadPoolExecutor     继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。

     

    Executors 工厂方法: 

    Java代码   newSingleThreadExecutor:       //单个后台线程  (注意其缓冲队列是无界的,想想为什么)   创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。      newFixedThreadPool:       //固定大小线程池    (其缓冲队列是无界的)   创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。      newCachedThreadPool:        //无界线程池,可以进行自动线程回收   创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。      newScheduledThreadPool:       //创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。  

    ThreadPoolExecutor是Executors类的底层实现:

    ExecutorService newFixedThreadPool(int nThreads) 固定大小线程池

    Java代码   public static ExecutorService newFixedThreadPool(int nThreads) {           return new ThreadPoolExecutor(nThreads, nThreads,                                         0L, TimeUnit.MILLISECONDS,                                         new LinkedBlockingQueue<Runnable>());       }  

     corePoolSize和maximumPoolSize的大小是一样的(实际上,如果使用无界queue的话maximumPoolSize参数是没有意义的),BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的。

    一个例子:

    Java代码   public class ThreadPool {          private static void doSomething(int id) {           System.out.println("start do " + id + " task …");           try {               Thread.sleep(1000 * 2);           } catch (InterruptedException e) {               e.printStackTrace();           }           System.out.println("start do " + id + " finished.");       }              public static void main(String[] args) {           ExecutorService executorService = Executors.newFixedThreadPool(2);               //创建了一个线程池,里面有2个线程              //通过submit()方法,提交一个Runnable的实例,这个实例将交由线程池中空闲的线程执行。           executorService.submit(new Runnable() {               public void run() {                   doSomething(1);               }           });              executorService.execute(new Runnable() {               public void run() {                   doSomething(2);               }           });              executorService.shutdown();                     System.out.println(">>main thread end.");        }   }  

     

    ExecutorService newSingleThreadExecutor() 单线程

    Java代码   public static ExecutorService newSingleThreadExecutor() {           return new FinalizableDelegatedExecutorService               (new ThreadPoolExecutor(11,                                       0L, TimeUnit.MILLISECONDS,                                       new LinkedBlockingQueue<Runnable>()));       }  

     最大和最小都为1

     

    ExecutorService newCachedThreadPool() 无界线程池

    Java代码   public static ExecutorService newCachedThreadPool() {           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                         60L, TimeUnit.SECONDS,                                         new SynchronousQueue<Runnable>());       }  

    这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用 SynchronousQueue 。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中, 每个插入操作必须等待另一个

    线程的对应移除操作。 比如,我先添加一个元素,接下来如果继续想尝试添加则会阻塞,直到另一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)

     

    以下为重要分析:

    到此如果有很多疑问,那是必然了(除非你也很了解了)

     

    先从BlockingQueue <Runnable > workQueue这个入参开始说起。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。以下为引用:(我会稍微修改一下,并用红色突出显示)

      

    所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互: 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize ,则任务根本不会存放,添加到queue中 ,而是直接 抄家伙(thread)开始运行 )如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列 ,而不添加新的线程 如果无法将请求加入队列,则创建新的线程 ,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。 先不着急举例子,因为首先需要知道队列的三种类型 。 排队有三种通用策略: 直接提交。 工作队列的默认选项是 SynchronousQueue ,它将任务直接提交给线程而不保持它们 。在此,如果不存在可用于立即运行任务的线程 ,则试图把任务加入队列将失败,因此会构造一个新的线程 。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务 。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。无界队列。 使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue )将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize 。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。有界队列。 当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue )有助于防止资源耗尽 ,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  

     

    到这里,该了解的理论已经够多了,可以调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。

     

    例子一:使用直接提交策略,也即SynchronousQueue。

     

    首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性 ,在某次添加元素后必须等待其他线程取走后才能继续添加 。在这里不是核心线程便是新创建的线程,但是我们试想一样下,下面的场景。

     

    我们使用一下参数构造ThreadPoolExecutor:

    Java代码   new ThreadPoolExecutor(                 2330, TimeUnit.SECONDS,                  new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),                  new RecorderThreadFactory("CookieRecorderPool"),                  new ThreadPoolExecutor.CallerRunsPolicy());       Java代码   new ThreadPoolExecutor(               2330, TimeUnit.SECONDS,                new <span style="white-space: normal;">SynchronousQueue</span>                  <Runnable>(),                new RecorderThreadFactory("CookieRecorderPool"),                new ThreadPoolExecutor.CallerRunsPolicy());  

     当核心线程已经有2个正在运行.

     

    此时继续来了一个任务(A),根据前面介绍的“如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列 ,而不添加新的线程 。”,所以A被添加到queue中。又来了一个任务(B),且核心2个线程还没有忙完,OK,接下来首先尝试1中描述,但是由于使用的SynchronousQueue,所以一定无法加入进去。此时便满足了上面提到的“如果无法将请求加入队列,则创建新的线程 ,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。”,所以必然会新建一个线程来运行这个任务。暂时还可以,但是如果这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中无法插入,而线程数达到了maximumPoolSize,所以只好执行异常策略了。 所以在使用SynchronousQueue通常要求maximumPoolSize是无界的,这样就可以避免上述情况发生(如果希望限制就直接使用有界队列)。对于使用SynchronousQueue的作用jdk中写的很清楚:此策略可以避免在处理可能具有内部依赖性的请求集时出现锁 。 什么意思?如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中 例子二:使用无界队列策略,即LinkedBlockingQueue 这个就拿newFixedThreadPool 来说,根据前文提到的规则:  写道 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。  那么当任务继续增加,会发生什么呢?  写道   如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。

     OK,此时任务变加入队列之中了,那什么时候才会添加新线程呢?

     

     写道 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

    这里就很有意思了,可能会出现无法加入队列吗?不像SynchronousQueue那样有其自身的特点,对于无界队列来说,总是可以加入的(资源耗尽,当然另当别论)。换句说,永远也不会触发产生新的线程! corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。所以要防止任务疯长,比如任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,而且还不断增加,如果任务内存大一些,不一会儿就爆了,呵呵。

     

    可以仔细想想哈。

     

    例子三:有界队列,使用ArrayBlockingQueue。

     

    这个是最为复杂的使用,所以JDK不推荐使用也有些道理。与上面的相比,最大的特点便是可以防止资源耗尽的情况发生。

     

    举例来说,请看如下构造方法:

     

    Java代码   new ThreadPoolExecutor(                 2430, TimeUnit.SECONDS,                  new ArrayBlockingQueue<Runnable>(2),                  new RecorderThreadFactory("CookieRecorderPool"),                  new ThreadPoolExecutor.CallerRunsPolicy());      Java代码   new ThreadPoolExecutor(               2430, TimeUnit.SECONDS,                new ArrayBlockingQueue<Runnable>(2),                new RecorderThreadFactory("CookieRecorderPool"),                new ThreadPoolExecutor.CallerRunsPolicy());  

    假设,所有的任务都永远无法执行完。

     

    对于首先来的A,B来说直接运行,接下来,如果来了C,D,他们会被放到queu中,如果接下来再来E,F,则增加线程运行E,F。但是如果再来任务,队列无法再接受了,线程数也到达最大的限制了,所以就会使用拒绝策略来处理。

     

    总结:

    ThreadPoolExecutor的使用还是很有技巧的。使用无界queue可能会耗尽系统资源。使用有界queue可能不能很好的满足性能,需要调节线程数和queue大小线程数自然也有开销,所以需要根据不同应用进行调节。 通常来说对于静态任务可以归为: 数量大,但是执行时间很短数量小,但是执行时间较长数量又大执行时间又长除了以上特点外,任务间还有些内在关系 看完这篇问文章后,希望能够可以选择合适的类型了 http://rdc.gleasy.com/java-executorservice-重要bug两例.html

    [java] view plain copy print ? static class Task1 implements Runnable {          int id;          public Task1(int id) {              this.id = id;          }          @Override          public void run() {              try {                  System.out.println(Thread.currentThread() +" task " + id + " begin");                  Thread.sleep(5000);              } catch (InterruptedException e) {                                }              if (id < 5) {                  throw new RuntimeException(Thread.currentThread() + " task " +id+ " exception");              }          }      }            static ExecutorService pool = Executors.newFixedThreadPool(5);      public void test() throws Exception {                                    }                  public static void main(String[] args) throws Exception      {          for (int i = 0; i < 10; ++i) {              pool.execute(new Task1(i));          }                    ThreadPoolExecutor threads =(ThreadPoolExecutor) pool;                    for (int i= 0; i < 40; i++) {              System.out.println("pool size = " + threads.getPoolSize() + " Largest = " + threads.getLargestPoolSize() + " Max = " + threads.getMaximumPoolSize());              Thread.sleep(1000);          }          pool.shutdown();  //        ToyClient client = new ToyClient();  //        client.test();                          }   static class Task1 implements Runnable { int id; public Task1(int id) { this.id = id; } @Override public void run() { try { System.out.println(Thread.currentThread() +" task " + id + " begin"); Thread.sleep(5000); } catch (InterruptedException e) { } if (id < 5) { throw new RuntimeException(Thread.currentThread() + " task " +id+ " exception"); } } } static ExecutorService pool = Executors.newFixedThreadPool(5); public void test() throws Exception { } public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { pool.execute(new Task1(i)); } ThreadPoolExecutor threads =(ThreadPoolExecutor) pool; for (int i= 0; i < 40; i++) { System.out.println("pool size = " + threads.getPoolSize() + " Largest = " + threads.getLargestPoolSize() + " Max = " + threads.getMaximumPoolSize()); Thread.sleep(1000); } pool.shutdown(); // ToyClient client = new ToyClient(); // client.test(); }起始线程池里有5个线程,前五个线程都抛出异常,这五个线程因为异常而死掉,线程池会再创建五个新线程

    所以建议用submit提交任务,如果异常在线程中发生,当前线程不会处理这个异常,当前线程也不会死掉

    如果任务实现的是callable接口,会在future.get();结果返回时抛出异常

    转自 http://blog.csdn.net/sunmenggmail/article/details/35230475
    转载请注明原文地址: https://ju.6miu.com/read-499995.html

    最新回复(0)