submit(ForkJoinTask)
ForkJoinTask.fork(ForkJoinTask are Futures) ForkJoinTask类:任务类的默认实现,实际一般使用的它的两个默认实现: RecurisiveAction :没有返回值的任务类抽象类。 RecurisiveTask :带有返回值的任务类抽象类。 ForkJoinTask的两个主要方法 fork 提交子任务,join 获取子任务执行结果 ※Fork-Join 框架任务的执行由ForkJoinTask类的对象之外,还可以使用一般的Callable和Runnable接口来表示任务。 异常处理 ForkJoinTask在执行的时候可能会抛出异常,但是没办法在主线程里直接捕获异常,所以ForkJoinTask提供了 i sCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的 getException 方法获取异常。使用如下代码: if(task.isCompletedAbnormally()) { System.out.println(task.getException());} getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。 代码示例 示例1: RecursiveTask 有返回值任务类 简要模拟 Java 8 parallelStream并行数据流的sum方法(为了便于理解使用二分法实现分割,JDK中使用的是更加复杂的分割算法); 使用 RecursiveTask 分治有返回值的线程任务:并行计算List的sum值,在将其分割到小于阀值后再由子线程进行计算; //子任务类 public static class SumTask extends RecursiveTask<Long> { private static final int THRESHOLD = 100; //规定分割阀值 private List<Long>list; private long low; private long high; public SumTask(List<Long> list,long low,long high){ this.list = list; this.low = low; this.high = high; } @Override //覆盖compute接口,规定分治策略(采用二分法) protected Long compute() { long sum = 0; if(high - low +1 <= THRESHOLD){ sum = list.stream().skip(low).limit(high-low+1).mapToLong(x->x).sum(); //流长度小于阈值直接计算sum }else{ //流长度大于阈值,对流进行分治 long mid = (low + high) /2; SumTask leftSubTask = new SumTask(list,low,mid); SumTask rightSubTask = new SumTask(list,mid+1,high); //分发运行子线程 leftSubTask.fork(); rightSubTask.fork(); //或者使用 invoke(lefSubTask,rightSubTask);代替以上任务提交的两行; //获取子线程结果 sum = leftSubTask.join() + rightSubTask.join(); //或者以如下获取结果 /*try { sum = leftSubTask.get() + rightSubTask.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }*/ } return sum; } } //客户端线程池部分 public static void main(String[] args){ SumTask sumTask = new SumTask(list,0,list.size()-1); //创建并行运算任务 ForkJoinPool forkJoinPool = new ForkJoinPool(); //创建ForkJoinPool线程池 forkJoinPool.submit(sumTask); //向ForkJoinPool提交任务 forkJoinPool.shutdown(); //开始执行线程池 long result = sumTask.join(); //获取任务结果 System.out.println(result == rightResult); //验证结果是否正确 } 示例2:RecursiveAction 无返回值任务类 使用 RecursiveTask 分治有返回值的线程任务:并行将List中元素自增,在将其分割到小于阀值后再由子线程进行; //子任务类 public static class PrintTask extends RecursiveAction { private static final int THRESHOLD = 20; //规定分割阀值 private int[] array; private int low; private int high; public PrintTask(int[] array, int low,int high){ this.array = array; this.low = low; this.high = high; } @Override protected void compute() { if(high - low + 1 < THRESHOLD){ for(int i=low;i<=high;i++){ array[i] = array[i] + 1; } }else{ int mid = (high + low) / 2; PrintTask leftSubTask = new PrintTask(array,low,mid); PrintTask rightSubTask = new PrintTask(array,mid+1,high); leftSubTask.fork(); rightSubTask.fork(); // 或者使用 invokeAll(leftSubTask,rightSubTask); 代替以上两行的任务提交 } } } //ForkJoinPool线程池提交 public static void main(String[] args){ int[] array = new int[1000]; //创建测试数据 PrintTask printTask = new PrintTask(array,0,array.length-1); ForkJoinPool forkjoinPool = new ForkJoinPool(); forkjoinPool.execute(printTask); forkjoinPool.shutdown(); try { //等待forkjoinPool中的所有线程都结束或参数时间后,再执行本线程, forkjoinPool.awaitTermination(30,TimeUnit.SECONDS); System.out.println(Arrays.stream(array).allMatch(x->x == 1)); //验证结果 } catch (InterruptedException e) { e.printStackTrace(); } } 在 ForkJoinPool 运行 RecursiveTask 型任务时,在提交任务后可以使用以下两种方法阻塞客户端代码,直到 ForkJoinPool 中的任务类执行完毕,可以不用自己手动编写阻塞代码: ForkJoinPool forkjoinPool = new ForkJoinPool();forkjoinPool.execute(new Task()); forkjoinPool.shutdown(); //method 1 : 阻塞客户端线程直到 forkjoinPool中的任务执行完毕,或阻塞时间达到参数值, 会抛出InterruptedException异常;forkjoinPool.awaitTermination(60,TimeUnit.SECONDS); //method 2: 阻塞客户端线程直到 forkjoinPool中的任务执行完毕forkjoinPool.awaitQuiescence()