常见的操作符
List<String> threehighCalorcDishNames = menu .stream() .filter(dish -> dish.getCalories() > 30) .map(Dish::getName).limit(3) .collect(Collectors.toList());steam 是将集合转化为流 filter 接受lamdba 从流中排出某些元素 map 将元素转化为其他形式 Dish::getName 实际为 dish-dish.getName(),返回一个String limit 截断流使其元素不超过制定元素 collect 是终端操作,将流中的元素累计到一个汇总结果 这里的toList 就是将流转化为列表的方案,前面的fiter,map等都属于中间操作 可以连接起来作为一个流水线
流和迭代器类似只能遍历一次。遍历完成这个流就已经被消费了
List<String> strings = Arrays.asList("Java8", "In", "Action"); Stream<String> stream = strings.stream(); stream.forEach(System.out::println); stream.forEach(System.out::println); 这个时候回抛出异常 java.lang.IllegalStateException: stream has already been operated upon or closed归纳的优势和并行问题 使用reduce的好处在于这里的迭代被内部抽象掉,可以让内部实现并行的操作。如果并行使用共享变量并不容易并行化,在这里stream 换成 parallelStream int sum = numbers.parallelStream().reduce(0, (a, b) -> a + b); 其他代码几乎不用换
刚才的 reduce 有个装箱的成本在,这里 我们使用java8的原始类型特化流接口 IntStream Doublestream和Long’Stream 分别对应 流中的int double long 避免了装箱的操作
int sum = menu.stream().mapToInt(Dish::getCalories).sum(); 如果要转化为对象流 使用boxed IntStream intStream = menu.stream().mapToInt(Dish::getCalories); Stream<Integer> boxed = intStream.boxed(); 同时支持max min,average等操作Collectors 类是用来创建和使用收集器的 先看一个收集器的例子 将货币分组
Map<Currency, List<Transaction>> transactionsByCurrencies = new HashMap<>(); for (Transaction transaction : transactions) { Currency currency = transaction.getCurrency(); List<Transaction> transactionsForCurrency = transactionsByCurrencies.get(currency); if (transactionsForCurrency == null) { transactionsForCurrency = new ArrayList<>(); transactionsByCurrencies.put(currency, transactionsForCurrency); } transactionsForCurrency.add(transaction); } System.out.println(transactionsByCurrencies); //通过Collctiors来收集前面的元素 Map<Currency, List<Transaction>> transactionsByCurrencies = transactions.stream() .collect(groupingBy(Transaction::getCurrency)); System.out.println(transactionsByCurrencies);上述例子 同样的操作 代码量和阅读量 差距略大 提醒了收集器的优势,在函数式编程中**,我们希望的是做什么 而不是怎么做。** Collectors 会对元素应用一个函数,并将其结果累积到一个数据结构中,从而产生输出,它提供三大功能
将流元素归约和汇总为一个值元素分组元素分区对于汇总到一个值 入求和提供了summingInt,求平均值提供了averagingInt 还提供了一个收集器叫做 IntSummaryStatistics 可以获取 count max,min sum,average
Integer collect = menu.stream().collect(summingInt(Dish::getCalories)); Double collect = menu.stream().collect(averagingInt(Dish::getCalories)); IntSummaryStatistics collect = menu.stream().collect(summarizingInt(Dish::getCalories)); double average = collect.getAverage(); long count = collect.getCount(); ..... joining 收集器会把每个流中对象的toString方法拼接成字符串 String collect = menu.stream().map(Dish::getName).collect(joining()); System.out.print(collect); String collec1t = menu.stream().map(Dish::getName).collect(joining(", ")); System.out.println(collec1t); 后者可以提供用逗号把String 分割开Collectors.groupBy工厂方法返回一个map 更加方法中返回的为key
Map<Dish.Type, List<Dish>> map=menu.stream().collect(groupingBy(Dish::getType)) Map<CaloricLevel, List<Dish>> map=menu.stream().collect( groupingBy(dish -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET; else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; else return CaloricLevel.FAT; } ));多级分组
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> map=menu.stream().collect( groupingBy(Dish::getType, groupingBy((Dish dish) -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET; else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; else return CaloricLevel.FAT; } ) ) );分区 分区是特殊的分组,分区函数返回的是Boolean的值
Map<Boolean,List<Djsh>> partitionMenu=menu.stream().collect(partitioningBy(Dish::isVegetarian)); 通过partitionMenu.get(true)来获取数据将顺序流转化为并行流
//计算方法运行的时间 public static <T, R> long measurePerf(Function<T, R> f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; if (duration < fastest) fastest = duration; } return fastest; }同样的计算求和
//传统的for 循环 时间大概 3毫秒 public static long iterativeSum(long n) { long result = 0; for (long i = 0; i <= n; i++) { result += i; } return result; } //时间为128毫秒,顺序计算 public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get(); } //并行计算 98毫秒 原因在于 装箱问题和iterate 无法进行分流的操作 public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get(); } //正确的操作 耗时大概2毫秒 public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong(); }ArrayList 分解性极佳 LinkkedList分解性差 IntStream.range 极佳 Stream.iterate 差 HashSet 好 TreeSet 好
分支合并的框架目的在于将递归方式可以并行的任务拆分到更小的任务,然后将每个任务的结果合并起来 使用RecursiveTask 要把任务提交到池中
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(); public class ForkJoinSumCalculator extends RecursiveTask<Long> { //不能再将任务分解为子任务的数组大小 public static final long THRESHOLD = 10_000; //要求和的数组 private final long[] numbers; //起始位置 private final int start; //结束位置 private final int end; //创建主任务 public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } //私有的构造函数用来创建子任务 private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { //计算求和大大小 int length = end - start; //如果计算的大小小于阈值顺序计算 if (length <= THRESHOLD) { return computeSequentially(); } //在任务分成左右两个 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join();//如果第一个人子任务为操作就等待 //合并结果 return leftResult + rightResult; } //如果子任务不可以在分时计算结果 private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers); return FORK_JOIN_POOL.invoke(task); } } //一个任务调用join方法会阻塞调用方,知道该任务得到结果,因此有必要在两个任务的计算都开始之后再调用它,否则得到的版本比原始的顺序算法更加复杂,因为每个子任务都必须等待另一个子任务完成才能开启 //对于子任务调用fork方法,可以把他排到ForkJoinPool。Spliteerator 是用来遍历数据源中的元素,但是他可以并行执行
public interface Spliterator<T>{ boolean tryAdvance(Consumer<? super T> action); Spliterator<T> trySplit(); long estimateSize(); int characteristics(); } //tryAdvance 的方法类似于普通的Iterator 按照顺序一个一个使用Spliterator元素,= //trySplit 可以将一些元素划出去分出第二个Spliterator,让他们并行解决 //eatimateSize 估计剩下多少元素要遍历 //characteristics 返回的是一个int 代表Spliterator本身特性的编码 ORDERED 元素是有序的DISTINCT 对于任意一对遍历过的元素x和y x.equal(y)返回falseSORTED 遍历的元素按照一个预定义的顺序排序SIZED 该Spliterator由一个已知大小的源建立,NONNULL 保证遍历的数据不为空IMMUTABLE 源数据不会被修改CONCURRENT 源数据可以被其他线程同步修改而无需同步SUBSIZED 该Spliterator和它所拆分的Spliterator都是SIZED分解字符串为例
public class WordCounterSpliterator implements Spliterator<Character> { private final String string; private int currentChar = 0; public WordCounterSpliterator(String string) { this.string = string; } @Override public boolean tryAdvance(Consumer<? super Character> action) { // 处理当前字符 action.accept(string.charAt(currentChar++)); //如果还有字符返回true return currentChar < string.length(); } @Override public Spliterator<Character> trySplit() { int currentSize = string.length() - currentChar; if (currentSize < 10) { return null; } //拆分工作 for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { if (Character.isWhitespace(string.charAt(splitPos))) { Spliterator<Character> spliterator = new WordCounterSpliterator(string.substring(currentChar, splitPos)); currentChar = splitPos; return spliterator; } } return null; } @Override public long estimateSize() { return string.length() - currentChar; } @Override public int characteristics() { return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; } } Spliterator<Character> spliterator = new WordCounterSpliterator(s); Stream<Character> stream = StreamSupport.stream(spliterator, true); int num= stream.reduce(new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine). getCounter();