Executor框架

    xiaoxiao2021-03-25  127

    Executor框架:为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK并发包的核心。其中有一个比较重要的类:Executors,他扮演这线程工厂的角色,我们通过Executors可以创建特定功能的线程池。

    Executors创建线程池方法:

    newFixedThreadPool(count)方法,该方法返回一个固定线程数量的线程池,该方法的线程数始终不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在一个任务队列中等待有空闲的线程去执行。(比较靠谱,队列是无界的,任务太多就放在队列,count个线程慢慢执行)newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务列队中。(一个线程,效率不高)newCachedThreadPool()方法,返回一个可根据实际情况调整线程个数的线程池不限制最大线程数量

    若用空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收

    感觉有点不靠谱:采用synchronousQueue队列,没有容器空间,有多少任务创建多少线程

    newScheduledThreadPool()方法,

    可以有延迟,隔一个时间段执行的线程池,该方法返回一个SchededExecutorService对象,但该线程池可以指定线程的数量。

    采用DelayWorkQueue,无界队列

    Excutors方法-->submit和execute的区别: 1、submit可以传入实现Callable接口的实例对象, 

    2、submit方法有返回值Future

    public static void main(String args[]) throws Exception { /** * 返回一个固定数量的线程池 * 底层实现: * return new ThreadPoolExecutor(nThreads, nThreads, * 0L, TimeUnit.MILLISECONDS, * new LinkedBlockingQueue<Runnable>()); * */ ExecutorService fixedPool = Executors.newFixedThreadPool(5); /** * 简单讲相当于fixedPool传了一个1 */ ExecutorService singlePool = Executors.newSingleThreadExecutor(); /** * 返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量, * 若有空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动回收 * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, * 60L, TimeUnit.SECONDS, * new SynchronousQueue<Runnable>()); */ ExecutorService cachePool = Executors.newCachedThreadPool(); /** * 有延迟功能的线程池,提供很多延迟机制,其他线程池一般只有直接执行execute * super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, * new DelayedWorkQueue()); */ ScheduledExecutorService schedulerPool = Executors.newScheduledThreadPool(1); /**--------------------方法-----------**/ /** * 方法:scheduleWithFixedDelay * Runnable command, long initialDelay, long delay, TimeUnit unit * 要执行的线程 * 延迟时间 * 每隔时间执行 * 时间单位 * * 延迟5秒开始执行线程,然后每隔1s执行 */ ScheduledFuture<?> scheduleTask = schedulerPool.scheduleWithFixedDelay (new Runnable() { @Override public void run() { System.out.println("run"); } }, 5, 1, TimeUnit.SECONDS); } 自定义线程池 若Executors工厂类无法满足我们的需求,可以自己去创建自定义的线程池, 其实Executors工厂类里面的创建线程方法其内部实现均是用了ThreadPoolExecutor这个类

    这个类可以自定义线程。构造方法如下:

    public ThreadPoolExecutor( //核心线程个数,一开始就创建 int corePoolSize, //最大线程个数,比如队列用的是有界队列,那么队列满了,会加线程个数,如果是无界队列,该参数无效 int maximumPoolSize, //空闲线程销毁时间,newCachedThreadPool是60秒,其他方法为0 long keepAliveTime, //时间单位 TimeUnit unit, //关键参数,任务的容器,阻塞队列 BlockingQueue<Runnable> workQueue, //线程工厂,可以不填,ThreadPoolExecutor有4个构造方法,可以直接忽略这个参数 ThreadFactory threadFactory, //拒绝策略,重要,默认用的是AbortPolicy--直接抛出异常组织系统正常工作 //当指定的任务容器是有界队列,而任务比较多,超出容器的大小+maximumPoolSize //就会出现异常,一般我们自定义实现RejectedExecutionHandler接口,用来处理 //这个多出的任务,一般处理方式是采用的是记录到log日志文件中,记录任务的重要信息, //以后系统有空闲了再读取log,做相应处理 RejectedExecutionHandler handler) {...}

    这个构造方法对于队列是什么类型的比较关键:

    ☞ 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。☞无界的任务队列时:LinkedBlockingQueue。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。(miximumPoolSize的指定就无用)若后续仍有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待。缺陷:若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。JDK拒绝策略:AbortPolicy:直接抛出异常组织系统正常工作(默认采用)CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务。DiscardPolicy:丢弃无法处理的任务,不给予任何处理。

    如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口。

    public static void main(String[] args) throws Exception{ /** * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程, * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。 */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //corePoolSize 2, //maximumPoolSize 60, //60 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), //指定一种队列 (有界队列) //new LinkedBlockingQueue<Runnable>() new MyRejected() //自定义方案,只有采用有界队列才会出现 //new DiscardOldestPolicy() --jdk提供方法,删除最久加进去的 ); MyTask mt1 = new MyTask(1, "任务1"); //implements Runnable,模拟延迟,执行完要5s MyTask mt2 = new MyTask(2, "任务2"); MyTask mt3 = new MyTask(3, "任务3"); MyTask mt4 = new MyTask(4, "任务4"); MyTask mt5 = new MyTask(5, "任务5"); MyTask mt6 = new MyTask(6, "任务6"); MyTask mt7 = new MyTask(7, "任务7"); pool.execute(mt1); //给corePoolSize创建的线程处理 pool.execute(mt2); //加入任务容器 pool.execute(mt3); //加入任务容器 pool.execute(mt4); //加入任务容器 pool.execute(mt5); //大于队列容器,新创建线程 //TimeUnit.SECONDS.sleep(6); //如果延迟一下,等之前的执行完,就能加进去 pool.execute(mt6); //大于队列容器,新创建线程,大于maximumPoolSize,把异常交给MyRejected pool.execute(mt7); //大于队列容器,新创建线程,大于maximumPoolSize,把异常交给MyRejected pool.shutdown(); //全部执行完则关闭 }

    package com.bjsxt.height.concurrent018; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class MyRejected implements RejectedExecutionHandler{ public MyRejected(){} @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义处理-->当前被拒绝任务为:" + r.toString()); } }

    转载请注明原文地址: https://ju.6miu.com/read-7894.html

    最新回复(0)