Java多线程

    xiaoxiao2022-06-30  66

    这里使用的方法是实现Callable接口。

    首先要定义一个线程池来跑管理你的线程,不过注意这个线程池大小得注意一下,大小是有讲究的,有个公式就是: 最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目 比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)*8=32; 比如说调DUBBO接口执行的多线程则是反过来(0.5/1.5+1)*8 = 10

    是否使用线程池就一定比使用单线程高效呢? 答案是否定的,比如Redis就是单线程的,但它却非常高效,基本操作都能达到十万量级/s。从线程这个角度来看,部分原因在于:多线程带来线程上下文切换开销,单线程就没有这种开销;锁。

    线程池的配置:private ExecutorService service可以通过Executors提供的静态方法获得线程池; 分别是。newCachedThreadPool、newFixedThreadPool、newScheduledThreadPool、newSingleThreadExecutor。但是通过查询源码可以发现其实底层都是通过new ThreadPoolExector来实现的。

    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。 newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }

    那么问题来了:ThreadPoolExecutor构造函数的几个参数是什么意思呢? 先看一下ThreadPoolExecutor的构造函数

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }

    -corePoolSize :线程池中所保存的核心线程数。线程池启动后默认是空的,只有任务来临时才会创建线程以处理请求。prestartAllCoreThreads方法可以在线程池启动后即启动所有核心线程以等待任务。(这个就是我们一般配的线程池的大小)

    -maximumPoolSize:线程池允许创建的最大线程数。当workQueue使用无界队列时(如:LinkedBlockingQueue),则此参数无效。(一般采用无界队列新建线程池,这时候集合会拆成很多个线程一起放入线程池里。)

    -keepAliveTime:当前线程池线程总数大于核心线程数时,终止多余的空闲线程的时间。(线程池里的线程很多的时候这个参数当然 要早点结束啦,设为0)

    -unit:keepAliveTime参数的时间单位。

    -workQueue:工作队列,如果当前线程池达到核心线程数时(corePoolSize),且当前所有线程都处于活动状态,则将新加入的任务放到此队列中。下面仅列几个常用的:

    ArrayBlockingQueue:基于数组结构的有界队列,此队列按FIFO原则对任务进行排序。如果队列满了还有任务进来,则调用拒绝策略。 LinkedBlockingQueue:  基于链表结构的无界队列,此队列按FIFO(先入先出)原则对任务进行排序。因为它是无界的,根本不会满,所以采用此队列后线程池将忽略拒绝策略(handler)参数;同时还将忽略最大线程数(maximumPoolSize)等参数。(这个用的多点) SynchronousQueue:直接将任务提交给线程而不是将它加入到队列,实际上此队列是空的。每个插入的操作必须等到另一个调用移除的操作;如果新任务来了线程池没有任何可用线程处理的话,则调用拒绝策略。其实要是把maximumPoolSize设置成无界(Integer.MAX_VALUE)的,加上SynchronousQueue队列,就等同于Executors.newCachedThreadPool()。 PriorityBlockingQueue: 具有优先级的队列的有界队列,可以自定义优先级;默认是按自然排序,可能很多场合并不合适。

    -handler:拒绝策略,当线程池与workQueue队列都满了的情况下,对新加任务采取的策略。

    AbortPolicy: 拒绝任务,抛出RejectedExecutionException异常。默认值。 CallerRunsPolicy: DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。这样的结果是最后加入的任务反而有可能被执行到,先前加入的都被抛弃了。 DiscardPolicy:加不进的任务都被抛弃了,同时没有异常抛出。

    实际应用代码:

    private ExecutorService poolExecutorService = Executors.newFixedThreadPool(10);

    定义一个监控线程数量的值,用来保证每个线程都跑完了

    CountDownLatch begin = new CountDownLatch(size);

    然后用Future 接收每个线程返回的数据

    Future future = poolExecutorService.submit(new ProcessSingleTask(begin,list.get(i),client)); ServiceResponse<ScoreUpdateResponse> response = (ServiceResponse<ScoreUpdateResponse>)future.get(); scoreUpdateResponseList.add(response.getData());

    最后一定要写上begin.await();这个就是之前用来监控线程是否跑完的代码,直到begin为0的时候才会认为线程池全部结束。

    package com.galaxy.customer.business.impl.score; import com.galaxy.customer.model.ServiceResponse; import com.galaxy.customer.model.score.ScoreUpdateRequest; import com.galaxy.customer.model.score.ScoreUpdateResponse; import com.galaxy.foundation.model.Client; import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * Created by fengyiming on 2016/9/13. */ public class ScoreUpdateThread { private ScoreUpdateManagerImpl scoreUpdateManager; private ExecutorService poolExecutorService = Executors.newFixedThreadPool(10); private Logger logger = Logger.getLogger(ScoreUpdateThread.class); public ServiceResponse<List<ScoreUpdateResponse>> updateForBatch(List<ScoreUpdateRequest> list,Client client) throws ExecutionException, InterruptedException{ List<ScoreUpdateResponse> scoreUpdateResponseList = new ArrayList<ScoreUpdateResponse>(list.size()); try { logger.error("------update data batch start----"); int size = list.size(); CountDownLatch begin = new CountDownLatch(size); for(int i = 0; i < size; i++) { logger.error("------" + i + " start------------------------"); Future future = poolExecutorService.submit(new ProcessSingleTask(begin,list.get(i),client)); ServiceResponse<ScoreUpdateResponse> response = (ServiceResponse<ScoreUpdateResponse>)future.get(); scoreUpdateResponseList.add(response.getData()); logger.error("------" + i + " end-------------------------"); } begin.await(); logger.error("------update data batch end----"); }catch (Exception e){ logger.error("--------thread error--------",e); } return ServiceResponse.succeess(scoreUpdateResponseList); } public void setScoreUpdateManager(ScoreUpdateManagerImpl scoreUpdateManager) { this.scoreUpdateManager = scoreUpdateManager; } /** * 带异步回调功能 */ public class ProcessSingleTask implements Callable<ServiceResponse<ScoreUpdateResponse>> { private CountDownLatch keepCount; private ScoreUpdateRequest request; private Client client; public ProcessSingleTask (CountDownLatch count,ScoreUpdateRequest request,Client client){ this.keepCount = count; this.request = request; this.client = client; } public ServiceResponse<ScoreUpdateResponse> call() throws Exception { try { return scoreUpdateManager.updateUserScoreByManual(request,client); }catch (Exception e){ logger.error("------update data error----",e); }finally { this.keepCount.countDown(); } return null; } } }
    转载请注明原文地址: https://ju.6miu.com/read-1126122.html

    最新回复(0)