数据流操作要么是衔接操作,要么是终止操作。当一个函数不修改数据流的底层数据源,它就是无干扰的。当一个函数的操作的执行是确定性的,它就是无状态的。
数据流可以从多种数据源创建,尤其是集合。可以有不同类型的数据流。
/**
* 从多种数据源创建数据流
*
* @author wenqy
* @date 2020年1月17日 上午11:00:15
*/
private void diffStreamType() {
System.out.println(“—–>diffStreamType—–>”);
Arrays.asList(“a1”, “a2”, “a3”)
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
Stream.of(“a1”, “a2”, “a3”)
.findFirst()
.ifPresent(System.out::println); // a1
IntStream.range(1, 4) // 基本数据类型 // 1 2 3
.forEach(System.out::println);
}
基本数据流和对象数据流间也可以转换
/**
* 基本数据流操作
*
* @author wenqy
* @date 2020年1月17日 上午11:01:26
*/
private void baseStream() {
System.out.println(“—–>baseStream—–>”);
Arrays.stream(new int[] {1, 2, 3})
.map(n -> 2 * n + 1)
.average() // 终止操作,求平均值
.ifPresent(System.out::println); // 5.0
Stream.of(“a1”, “a2”, “a3”)
.map(s -> s.substring(1))
.mapToInt(Integer::parseInt) // 对象数据流转换为基本数据流
.max()
.ifPresent(System.out::println); // 3
IntStream.range(1, 4)
.mapToObj(i -> “a” + i) // 基本数据流转换为对象数据流
.forEach(System.out::println); // a1 a2 a3
}
数据流处理时,衔接操作的一个重要特性就是延迟性,在调用链上是垂直移动的,减少每个元素上所执行的实际操作数量。
/**
* 处理顺序
*
* @author wenqy
* @date 2020年1月17日 上午11:08:33
*/
private void streamHandleSort() {
System.out.println(“—–>streamHandleSort—–>”);
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.filter(s -> {
System.out.println(“filter: “ + s);
return true;
})
.forEach(s -> System.out.println(“forEach: “ + s));
// filter: d2 forEach: d2 … 每个元素在调用链上垂直移动
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.map(s -> {
System.out.println(“map: “ + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println(“anyMatch: “ + s);
return s.startsWith(“A”);
});
// map:d2 anyMatch:D2 map:a2 anyMatch:A2 anyMatch返回true时终止
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.map(s -> {
System.out.println(“map: “ + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println(“filter: “ + s);
return s.startsWith(“A”);
})
.forEach(s -> System.out.println(“forEach: “ + s));
// map和filter会对底层集合的每个字符串调用五次,而forEach只会调用一次
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.filter(s -> {
System.out.println(“filter: “ + s);
return s.startsWith(“a”);
})
.map(s -> {
System.out.println(“map: “ + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println(“forEach: “ + s));
// filter移动到调用链的顶端 map只会调用一次 执行更快
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.sorted((s1, s2) -> {
System.out.printf(“sort: %s; %s\n”, s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println(“filter: “ + s);
return s.startsWith(“a”);
})
.map(s -> {
System.out.println(“map: “ + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println(“forEach: “ + s));
// 排序是一类特殊的衔接操作。它是有状态的操作,因为你需要在处理中保存状态来对集合中的元素排序
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.filter(s -> {
System.out.println(“filter: “ + s);
return s.startsWith(“a”);
})
.sorted((s1, s2) -> {
System.out.printf(“sort: %s; %s\n”, s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println(“map: “ + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println(“forEach: “ + s));
// 重排调用链来优化性能,这个例子中sorted永远不会调用,极大提升性能
}
java8的数据流不能被复用。一旦你调用了任何终止操作,数据流就关闭了,要克服这个限制,我们需要为每个我们想要执行的终止操作创建新的数据流调用链。例如,我们创建一个数据流供应器,来构建新的数据流,并且设置好所有衔接操作。
/**
* 复用数据流
*
* @author wenqy
* @date 2020年1月17日 上午11:40:02
*/
private void streamReuse() {
System.out.println(“—–>streamReuse—–>”);
Stream<String> stream =
Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.filter(s -> s.startsWith(“a”));
stream.anyMatch(s -> true); // ok
// stream.noneMatch(s -> true); // exception java.lang.IllegalStateException: stream has already been operated upon or closed
Supplier<Stream<String>> streamSupplier =
() -> Stream.of(“d2”, “a2”, “b1”, “b3”, “c”)
.filter(s -> s.startsWith(“a”));
// 每次对get()的调用都构造了一个新的数据流,我们将其保存来调用终止操作
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
}
collect
collect
是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。collect
接受收集器(Collector
),它由四个不同的操作组成:供应器(supplier
)、累加器(accumulator
)、组合器(combiner
)和终止器(finisher
)。
/**
* collect是非常有用的终止操作,将流中的元素存放在不同类型的结果中,例如List、Set或者Map。
* collect接受收集器(Collector),它由四个不同的操作组成:
* 供应器(supplier)、累加器(accumulator)、组合器(combiner)和终止器(finisher)
*
* @author wenqy
* @date 2020年1月17日 上午11:45:18
*/
private void streamCollect() {
System.out.println(“—–>streamCollect—–>”);
List<Person> persons = getPersionList();
List<Person> filtered =
persons
.stream()
.filter(p -> p.getFirstName().startsWith(“P”))
.collect(Collectors.toList()); // 构造list
System.out.println(filtered); // [Person [firstName=Peter, lastName=null, age=23], Person [firstName=Pamela, lastName=null, age=23]]
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.getAge())); // 构造map key: age
personsByAge
.forEach((age, p) -> System.out.format(“age %s: %s\n”, age, p));
IntSummaryStatistics ageSummary =
persons
.stream()
.collect(Collectors.summarizingInt(p -> p.getAge()));
System.out.println(ageSummary); // 统计:简单计算最小年龄、最大年龄、算术平均年龄、总和和数量
String phrase = persons
.stream()
.filter(p -> p.getAge() >= 18)
.map(p -> p.getFirstName()) // 键必须是唯一的,否则会抛出IllegalStateException异常
.collect(Collectors.joining(” and “, “In China “, ” are of legal age.”));
System.out.println(phrase); // 所有人连接为一个字符串
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(” | “), // supplier
(j, p) -> j.add(p.getFirstName().toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names); // MAX | PETER | PAMELA | DAVID
// 构建自己特殊收集器。将流中的所有人转换为一个字符串,包含所有大写的名称,并以|分割。
}
flatMap
我们已经了解了如何通过使用map操作,将流中的对象转换为另一种类型。map有时十分受限,因为每个对象只能映射为一个其它对象。但如何我希望将一个对象转换为多个或零个其他对象呢?flatMap
这时就会派上用场。
flatMap
将流中的每个元素,转换为其它对象的流。所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。这些流的内容之后会放进flatMap
所返回的流中。
/**
* flatMap将流中的每个元素,转换为其它对象的流。
* 所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。
* 这些流的内容之后会放进flatMap所返回的流中
*
* @author wenqy
* @date 2020年1月17日 下午1:38:09
*/
private void streamFlatMap() {
System.out.println(“—–>streamFlatMap—–>”);
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo(“Foo” + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar(“Bar” + i + ” <- “ + f.name))));
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name)); // 将含有三个foo对象中的流转换为含有九个bar对象的流
IntStream.range(1, 4)
.mapToObj(i -> new Foo(“Foo” + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar(“Bar” + i + ” <- “ + f.name))
.forEach(f.bars::add)) // 简化为流式操作的单一流水线
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
// 如果存在的话,每个flatMap的调用都会返回预期对象的Optional包装,
// 否则为null的Optional包装,避免潜在NullPointerException
}
reduce
归约操作将所有流中的元素组合为单一结果。Java8支持三种不同类型的reduce
方法。
/**
* 归约操作将所有流中的元素组合为单一结果
*
* @author wenqy
* @date 2020年1月17日 下午2:01:23
*/
private void streamReduce() {
System.out.println(“—–>streamReduce—–>”);
List<Person> persons = getPersionList();
persons
.stream()
.reduce((p1, p2) -> p1.getAge() > p2.getAge() ? p1 : p2)
.ifPresent(System.out::println); // 计算年龄最大的人 Pamela
Person result =
persons
.stream()
.reduce(new Person(“”, 0), (p1, p2) -> {
p1.setAge(p1.getAge() + p2.getAge());
p1.setFirstName(p1.getFirstName() + p2.getFirstName());
return p1; // 构造带有聚合后名称和年龄的新Person对象
});
// name=MaxPeterPamelaDavid; age=76
System.out.format(“name=%s; age=%s.\n”, result.getFirstName(), result.getAge());
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.getAge(), (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 计算所有人的年龄总和 76
Integer ageSum2 = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format(“accumulator: sum=%s; person=%s\n”, sum, p);
return sum += p.getAge();
},
(sum1, sum2) -> {
System.out.format(“combiner: sum1=%s; sum2=%s\n”, sum1, sum2);
return sum1 + sum2;
});
System.out.println(ageSum2); // 输出调试信息,combiner并没有输出
Integer ageSum3 = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format(“accumulator: sum=%s; person=%s [%s]\n”, sum, p, Thread.currentThread().getName());
return sum += p.getAge();
},
(sum1, sum2) -> {
System.out.format(“combiner: sum1=%s; sum2=%s [%s]\n”, sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
System.out.println(ageSum3); // 并行方式
}
parallelStream
流可以并行执行,在大量输入元素上可以提升运行时的性能。并行流使用公共的ForkJoinPool
,由ForkJoinPool.commonPool()
方法提供。底层线程池的大小最大为五个线程 -- 取决于CPU的物理核数。
组合器函数只在并行流中调用,而不在串行流中调用
/**
* 并行流
*
* @author wenqy
* @date 2020年1月17日 下午2:31:57
*/
private void streamParallel() {
System.out.println(“—–>streamParallel—–>”);
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 底层线程池的大小 — 取决于CPU的物理核数 本机 默认 7
// 可用JVM参数增减 -Djava.util.concurrent.ForkJoinPool.common.parallelism=5
Arrays.asList(“a1”, “a2”, “b1”, “c2”, “c1”)
.parallelStream()
.filter(s -> {
System.out.format(“filter: %s [%s]\n”,
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format(“map: %s [%s]\n”,
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format(“forEach: %s [%s]\n”,
s, Thread.currentThread().getName()));
// 并行流使用了所有公共的ForkJoinPool中的可用线程来执行流式操作
Arrays.asList(“a1”, “a2”, “b1”, “c2”, “c1”)
.parallelStream()
.filter(s -> {
System.out.format(“filter: %s [%s]\n”,
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format(“map: %s [%s]\n”,
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format(“sort: %s <> %s [%s]\n”,
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format(“forEach: %s [%s]\n”,
s, Thread.currentThread().getName()));
// sort看起来只在主线程上串行执行。实际上,并行流上的sort在背后使用了Java8中新的方法Arrays.parallelSort()。
// 如javadoc所说,这个方法会参照数据长度来决定以串行或并行来执行,如果指定数据的长度小于最小粒度,它使用相应的Arrays.sort方法来排序
// 所有并行流操作都共享相同的JVM相关的公共ForkJoinPool。所以你可能需要避免实现又慢又卡的流式操作,因为它可能会拖慢你应用中严重依赖并行流的其它部分。
}
参考
https://github.com/winterbe/java8-tutorial java8教程
https://wizardforcel.gitbooks.io/modern-java/content/ 中文译站
本文由 wenqy 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Nov 7,2020