java8数据流再识

in Java with 0 comment

数据流操作要么是衔接操作,要么是终止操作。当一个函数不修改数据流的底层数据源,它就是无干扰的。当一个函数的操作的执行是确定性的,它就是无状态的。

数据流可以从多种数据源创建,尤其是集合。可以有不同类型的数据流。

/**
     * 从多种数据源创建数据流
     * 
     * @author wenqy
     * @date 2020年1月17日 上午11:00:15
     */
    private void diffStreamType() {
        System.out.println(“—–>diffStreamType—–>”);
        Arrays.asList(“a1”, “a2”, “a3”)
            .stream()
            .findFirst()
            .ifPresent(System.out::println);  // a1
        Stream.of(“a1”, “a2”, “a3”)
            .findFirst()
            .ifPresent(System.out::println);  // a1
        IntStream.range(1, 4)   // 基本数据类型 // 1 2 3
            .forEach(System.out::println);
    }

基本数据流和对象数据流间也可以转换

/**
     * 基本数据流操作
     * 
     * @author wenqy
     * @date 2020年1月17日 上午11:01:26
     */
    private void baseStream() {
        System.out.println(“—–>baseStream—–>”);
        Arrays.stream(new int[] {1, 2, 3})
            .map(n -> 2 * n + 1)
            .average() // 终止操作,求平均值
            .ifPresent(System.out::println);  // 5.0
        Stream.of(“a1”, “a2”, “a3”)
            .map(s -> s.substring(1))
            .mapToInt(Integer::parseInt) // 对象数据流转换为基本数据流
            .max()
            .ifPresent(System.out::println);  // 3
        IntStream.range(1, 4)
            .mapToObj(i -> “a” + i) // 基本数据流转换为对象数据流
            .forEach(System.out::println); // a1 a2 a3
    }

数据流处理时,衔接操作的一个重要特性就是延迟性,在调用链上是垂直移动的,减少每个元素上所执行的实际操作数量。

/**
     * 处理顺序
     * 
     * @author wenqy
     * @date 2020年1月17日 上午11:08:33
     */
    private void streamHandleSort() {
        System.out.println(“—–>streamHandleSort—–>”);
        Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
            .filter(s -> {
                System.out.println(“filter: “ + s);
                return true;
            })
            .forEach(s -> System.out.println(“forEach: “ + s));
        // filter:  d2 forEach: d2 … 每个元素在调用链上垂直移动
        Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
            .map(s -> {
                System.out.println(“map: “ + s);
                return s.toUpperCase();
            })
            .anyMatch(s -> {
                System.out.println(“anyMatch: “ + s);
                return s.startsWith(“A”);
            });
        // map:d2 anyMatch:D2 map:a2 anyMatch:A2 anyMatch返回true时终止
        Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
            .map(s -> {
                System.out.println(“map: “ + s);
                return s.toUpperCase();
            })
            .filter(s -> {
                System.out.println(“filter: “ + s);
                return s.startsWith(“A”);
            })
            .forEach(s -> System.out.println(“forEach: “ + s));
        // map和filter会对底层集合的每个字符串调用五次,而forEach只会调用一次
        Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
            .filter(s -> {
                System.out.println(“filter: “ + s);
                return s.startsWith(“a”);
            })
            .map(s -> {
                System.out.println(“map: “ + s);
                return s.toUpperCase();
            })
            .forEach(s -> System.out.println(“forEach: “ + s));
        // filter移动到调用链的顶端  map只会调用一次 执行更快
        Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
            .sorted((s1, s2) -> {
                System.out.printf(“sort: %s; %s\n”, s1, s2);
                return s1.compareTo(s2);
            })
            .filter(s -> {
                System.out.println(“filter: “ + s);
                return s.startsWith(“a”);
            })
            .map(s -> {
                System.out.println(“map: “ + s);
                return s.toUpperCase();
            })
            .forEach(s -> System.out.println(“forEach: “ + s));
        // 排序是一类特殊的衔接操作。它是有状态的操作,因为你需要在处理中保存状态来对集合中的元素排序
        Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
            .filter(s -> {
                System.out.println(“filter: “ + s);
                return s.startsWith(“a”);
            })
            .sorted((s1, s2) -> {
                System.out.printf(“sort: %s; %s\n”, s1, s2);
                return s1.compareTo(s2);
            })
            .map(s -> {
                System.out.println(“map: “ + s);
                return s.toUpperCase();
            })
            .forEach(s -> System.out.println(“forEach: “ + s));
        // 重排调用链来优化性能,这个例子中sorted永远不会调用,极大提升性能
    }

java8的数据流不能被复用。一旦你调用了任何终止操作,数据流就关闭了,要克服这个限制,我们需要为每个我们想要执行的终止操作创建新的数据流调用链。例如,我们创建一个数据流供应器,来构建新的数据流,并且设置好所有衔接操作。

/**
     * 复用数据流
     * 
     * @author wenqy
     * @date 2020年1月17日 上午11:40:02
     */
    private void streamReuse() {
        System.out.println(“—–>streamReuse—–>”);
        Stream<String> stream =
                Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
                    .filter(s -> s.startsWith(“a”));
        stream.anyMatch(s -> true);    // ok
//      stream.noneMatch(s -> true);   // exception  java.lang.IllegalStateException: stream has already been operated upon or closed
        Supplier<Stream<String>> streamSupplier =
                () -> Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
                        .filter(s -> s.startsWith(“a”));
        // 每次对get()的调用都构造了一个新的数据流,我们将其保存来调用终止操作
        streamSupplier.get().anyMatch(s -> true);   // ok
        streamSupplier.get().noneMatch(s -> true);  // ok
    }

collect

collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。collect接受收集器(Collector),它由四个不同的操作组成:供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)。

/**
     * collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。
     * collect接受收集器(Collector),它由四个不同的操作组成:
     *  供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)
     * 
     * @author wenqy
     * @date 2020年1月17日 上午11:45:18
     */
    private void streamCollect() {
        System.out.println(“—–>streamCollect—–>”);
        List<Person> persons = getPersionList();
        List<Person> filtered =
            persons
                .stream()
                .filter(p -> p.getFirstName().startsWith(“P”))
                .collect(Collectors.toList()); // 构造list
        System.out.println(filtered);    // [Person [firstName=Peter, lastName=null, age=23], Person [firstName=Pamela, lastName=null, age=23]]
        Map<Integer, List<Person>> personsByAge = persons
            .stream()
            .collect(Collectors.groupingBy(p -> p.getAge())); // 构造map key: age
        personsByAge
            .forEach((age, p) -> System.out.format(“age %s: %s\n”, age, p));
        IntSummaryStatistics ageSummary =
            persons
                .stream()
                .collect(Collectors.summarizingInt(p -> p.getAge()));
        System.out.println(ageSummary); // 统计:简单计算最小年龄、最大年龄、算术平均年龄、总和和数量
        String phrase = persons
            .stream()
            .filter(p -> p.getAge() >= 18)
            .map(p -> p.getFirstName()) // 键必须是唯一的,否则会抛出IllegalStateException异常
            .collect(Collectors.joining(” and “, “In China “, ” are of legal age.”));
        System.out.println(phrase); // 所有人连接为一个字符串
        Collector<Person, StringJoiner, String> personNameCollector =
            Collector.of(
                () -> new StringJoiner(” | “),          // supplier
                (j, p) -> j.add(p.getFirstName().toUpperCase()),  // accumulator
                (j1, j2) -> j1.merge(j2),               // combiner
                StringJoiner::toString);                // finisher
        String names = persons
            .stream()
            .collect(personNameCollector);
        System.out.println(names);  // MAX | PETER | PAMELA | DAVID
        // 构建自己特殊收集器。将流中的所有人转换为一个字符串,包含所有大写的名称,并以|分割。
    }

flatMap

我们已经了解了如何通过使用map操作,将流中的对象转换为另一种类型。map有时十分受限,因为每个对象只能映射为一个其它对象。但如何我希望将一个对象转换为多个或零个其他对象呢?flatMap这时就会派上用场。

flatMap将流中的每个元素,转换为其它对象的流。所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。这些流的内容之后会放进flatMap所返回的流中。

/**
 * flatMap将流中的每个元素,转换为其它对象的流。
 * 所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。
 * 这些流的内容之后会放进flatMap所返回的流中
 * 
 * @author wenqy
 * @date 2020年1月17日 下午1:38:09
 */
private void streamFlatMap() {
    System.out.println(“—–>streamFlatMap—–>”);
    List<Foo> foos = new ArrayList<>();
    // create foos
    IntStream
        .range(1, 4)
        .forEach(i -> foos.add(new Foo(“Foo” + i)));
    // create bars
    foos.forEach(f ->
        IntStream
            .range(1, 4)
            .forEach(i -> f.bars.add(new Bar(“Bar” + i + ” <- “ + f.name))));
    foos.stream()
        .flatMap(f -> f.bars.stream())
        .forEach(b -> System.out.println(b.name)); // 将含有三个foo对象中的流转换为含有九个bar对象的流
    IntStream.range(1, 4)
        .mapToObj(i -> new Foo(“Foo” + i))
        .peek(f -> IntStream.range(1, 4)
            .mapToObj(i -> new Bar(“Bar” + i + ” <- “ + f.name))
            .forEach(f.bars::add)) // 简化为流式操作的单一流水线
        .flatMap(f -> f.bars.stream())
        .forEach(b -> System.out.println(b.name));
    Optional.of(new Outer())
        .flatMap(o -> Optional.ofNullable(o.nested))
        .flatMap(n -> Optional.ofNullable(n.inner))
        .flatMap(i -> Optional.ofNullable(i.foo))
        .ifPresent(System.out::println);
    // 如果存在的话,每个flatMap的调用都会返回预期对象的Optional包装,
    // 否则为null的Optional包装,避免潜在NullPointerException
}

reduce

归约操作将所有流中的元素组合为单一结果。Java8支持三种不同类型的reduce方法。

/**
     * 归约操作将所有流中的元素组合为单一结果
     * 
     * @author wenqy
     * @date 2020年1月17日 下午2:01:23
     */
    private void streamReduce() {
        System.out.println(“—–>streamReduce—–>”);
        List<Person> persons = getPersionList();
        persons
            .stream()
            .reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2)
            .ifPresent(System.out::println);    // 计算年龄最大的人 Pamela
        Person result =
            persons
                .stream()
                .reduce(new Person(“”, 0), (p1, p2) -> {
                    p1.setAge(p1.getAge() + p2.getAge());
                    p1.setFirstName(p1.getFirstName() + p2.getFirstName());
                    return p1;  // 构造带有聚合后名称和年龄的新Person对象
                });
        // name=MaxPeterPamelaDavid; age=76
        System.out.format(“name=%s; age=%s.\n”, result.getFirstName(), result.getAge());
        Integer ageSum = persons
            .stream()
            .reduce(0, (sum, p) -> sum += p.getAge(), (sum1, sum2) -> sum1 + sum2);
        System.out.println(ageSum);  // 计算所有人的年龄总和 76
        Integer ageSum2 = persons
            .stream()
            .reduce(0,
                (sum, p) -> {
                    System.out.format(“accumulator: sum=%s; person=%s\n”, sum, p);
                    return sum += p.getAge();
                },
                (sum1, sum2) -> {
                    System.out.format(“combiner: sum1=%s; sum2=%s\n”, sum1, sum2);
                    return sum1 + sum2;
                });
        System.out.println(ageSum2); // 输出调试信息,combiner并没有输出
        Integer ageSum3 = persons
            .parallelStream()
            .reduce(0,
                (sum, p) -> {
                    System.out.format(“accumulator: sum=%s; person=%s [%s]\n”, sum, p, Thread.currentThread().getName());
                    return sum += p.getAge();
                },
                (sum1, sum2) -> {
                    System.out.format(“combiner: sum1=%s; sum2=%s [%s]\n”, sum1, sum2, Thread.currentThread().getName());
                    return sum1 + sum2;
                });
        System.out.println(ageSum3); // 并行方式
    }

parallelStream

流可以并行执行,在大量输入元素上可以提升运行时的性能。并行流使用公共的ForkJoinPool,由ForkJoinPool.commonPool()方法提供。底层线程池的大小最大为五个线程 -- 取决于CPU的物理核数。

组合器函数只在并行流中调用,而不在串行流中调用

/**
     * 并行流
     * 
     * @author wenqy
     * @date 2020年1月17日 下午2:31:57
     */
    private void streamParallel() {
        System.out.println(“—–>streamParallel—–>”);
        ForkJoinPool commonPool = ForkJoinPool.commonPool();
        System.out.println(commonPool.getParallelism());    // 底层线程池的大小 — 取决于CPU的物理核数 本机 默认 7 
        // 可用JVM参数增减 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5
        Arrays.asList(“a1”, “a2”, “b1”, “c2”, “c1”)
            .parallelStream()
            .filter(s -> {
                System.out.format(“filter: %s [%s]\n”,
                    s, Thread.currentThread().getName());
                return true;
            })
            .map(s -> {
                System.out.format(“map: %s [%s]\n”,
                    s, Thread.currentThread().getName());
                return s.toUpperCase();
            })
            .forEach(s -> System.out.format(“forEach: %s [%s]\n”,
                s, Thread.currentThread().getName()));
        // 并行流使用了所有公共的ForkJoinPool中的可用线程来执行流式操作
        Arrays.asList(“a1”, “a2”, “b1”, “c2”, “c1”)
            .parallelStream()
            .filter(s -> {
                System.out.format(“filter: %s [%s]\n”,
                    s, Thread.currentThread().getName());
                return true;
            })
            .map(s -> {
                System.out.format(“map: %s [%s]\n”,
                    s, Thread.currentThread().getName());
                return s.toUpperCase();
            })
            .sorted((s1, s2) -> {
                System.out.format(“sort: %s <> %s [%s]\n”,
                    s1, s2, Thread.currentThread().getName());
                return s1.compareTo(s2);
            })
            .forEach(s -> System.out.format(“forEach: %s [%s]\n”,
                s, Thread.currentThread().getName()));
        // sort看起来只在主线程上串行执行。实际上,并行流上的sort在背后使用了Java8中新的方法Arrays.parallelSort()。
        // 如javadoc所说,这个方法会参照数据长度来决定以串行或并行来执行,如果指定数据的长度小于最小粒度,它使用相应的Arrays.sort方法来排序
        // 所有并行流操作都共享相同的JVM相关的公共ForkJoinPool。所以你可能需要避免实现又慢又卡的流式操作,因为它可能会拖慢你应用中严重依赖并行流的其它部分。
    }

参考

https://github.com/winterbe/java8-tutorial java8教程

https://wizardforcel.gitbooks.io/modern-java/content/ 中文译站