一、什么是 Stream
在 Java 8 中增加了一个新的抽象接口 Stream API,它支持声明式的处理数据。使用 Stream 操作集合似于使用 SQL 语句数据库查找数据类似,提供直观的方法进行操作。 同时 Stream API 让开发者能够快速写出干净、简洁的代码,提高开发者的开发效率。
Stream 将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道传输过程中对流进行处理, 比如筛选、排序、聚合等操作。在经过一系列中间操作后形成最终的管道,得到处理的结果。
二、Stream 操作分类
Stream 操作分为 中间操作(Intermediate operations)、终端操作(Terminal operations),信息如下:
| Stream 操作分类 | |||
|---|---|---|---|
| 中间操作 (Intermediate operations) |
无状态 | unordered()、filter()、map()、mapToInt()、mapToLong() mapToDouble()、flatMap()、flatMapToInt()、flatMapToLong() flatMapToDouble()、peek() |
|
| 有状态 | distinct()、sorted()、limit()、skip() | ||
| 终端操作 (Terminal operations) |
非短路操作 | forEach()、forEachOrdered()、toArray()、reduce()、collect max()、min()、count() |
|
| 短路操作 | anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() | ||
- 中间操作: 中间操作其实就是进行逻辑处理。这个操作可以有一个或者多个连续操作,将一个流转换成另一个流,这些操作不会消耗流,其目的是建立一个流水线,直到终端操作发生后,才会做数据的最终执行。
- 无状态: 指数据处理时,不受之前”中间操作”的影响;
- 有状态: 指数据处理时,受之前”中间操作”的影响,有状态的方法往往需要更大的性能开销;
- 终端操作: 一个 Stream 对象只能有一个终端操作(Terminal operations),这个操作一旦发生,就会真实处理数据,生成对应的处理结果。
- 非短路操作: 指必须处理所有元素才能得到最终结果;
- 短路操作: 指遇到某些符合条件的元素就可以得到最终结果;
三、Stream 特性
- 不存储数据: Stream 不对数据进行存储,而是按照特定的规则对数据进行处理;
- 不改变数据源: Stream 通常不会改变原有数据源,操作时一般是对原有的数据源创建副本,然后对副本进行处理,生成新的 Stream。
- 具有惰性化: Stream 很多操作是有向后延迟的,需要一直等到它弄清楚了最后需要多少数据才会开始。而中间操作(Intermediate operations)永远是惰性化的。
- 可以是无限的: 集合有固定大小,Stream 则不一定。limit(n) 和 findFirst() 这类的短路操作,可以对无限的 Stream 快速完成运算处理。
- 支持并行能力: Stream 是支持并行能力的,可以轻松执行并行化对数据进行处理。
- 可被消耗的: Stream 的生命周期中,Stream 的元素只被访问一次。与迭代器一样,必须生成新的 Stream 后, 才能重新访问与开始源中相同的元素。
四、数据转换为 Stream
一般我们需要获取 Stream 对象后才能对其进行操作,下面列出了一些数据转换为 Stream 的常用方法:
1 | public class StreamExample { |
五、Stream 转换得到指定类型数据
既然可以把集合或者数组转换成流,那么也就可以把流转换回去,使用 collect() 方法就能实现,不过一般还需要配合 Collectors 工具类一起使用,Collectors 类中内置了一系列收集器实现,如下:
- toList(): 将元素收集到一个新的 List 集合中;
- toSet(): 将元素收集到一个新的 Set 集合中;
- toCollection(): 将元素收集到一个新的 ArrayList 集合中;
- joining(): 将元素收集到一个可以用分隔符指定的字符串中;
下面再介绍下 Stream 转换指定类型的常用示例:
(1)、Stream 转换为数组
1 | public class StreamExample { |
(2)、Stream 转换位字符串
1 | public class StreamExample { |
(3)、Stream 转换为 List 列表
1 | public class StreamExample { |
(4)、Stream 转换为 Set 集合
1 | public class StreamExample { |
(5)、Stream 转换为 Map 集合
1 | public class StreamExample { |
六、Stream 对于基本类型的封装
因为 Java 的范型不支持基本类型,所以我们无法用 Stream<int> 这样的类型,如果写了类似代码在编译器中会提示编译错误。为了存储 int,只能使用 Stream<Integer>,但这样会产生频繁的装箱、拆箱操作。为了提高效率,对基本类型数据 int、long、double 进行了封装,分别为 IntSteam、LongStream、DoubleSteam,使用方法和 Stream 类似,示例代码如下:
1 | public class StreamExample { |
七、Stream 的串行与并行
1、Stream 的并行介绍
在 Stream 中,最明显的特点就是存在并行操作,不过如果使用默认方式执行中间与终端操作,那么整个执行过程其实是个串行操作。如果想让 Stream 并行处理数据,那么需要 Stream 中调用 parallel() 或者集合中调用 parallelStream() 方法来开启并行执行。
1 | List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); |
或者:
1 | List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); |
其中 Stream 底层使用的是 ForkJoinTask 实现 Stream 的并行处理,充分利用 CPU 的多核能力,Stream 的 API 将底层复杂实现完全屏蔽了,开发者仅需调用一个方法即可实现并行计算。
2、Stream 并行示例
1 | public class StreamExample { |
执行结果:
1 | 并行执行耗时:8317 |
可以看到使用并行执行所花费的时间远低于串行所花费的时间,不过在使用并行执行时一定要先考虑好使用情况,考虑执行数据是否需要顺序执行,是否涉及线程安全,是否涉及使用网络等,并不是全部情况都能使用并行执行完成处理逻辑的,所以在使用之前一定要慎重,使用不好很可能会带来性能的不升反降。
一般情况下,如机器学习和数据处理等比较适合使用并行处理数据任务,其它方便需要使用者自己衡量进行测试,是否该使用并行执行任务。
八、Stream 中间操作(有状态)常用 API
1、distinct
保证输出的流中包含唯一的元素,通常用于数据去重。
- 接口定义:
-
Stream<T> distinct();
-
- 方法描述:
- 在
distinct接口定义中不接收任何参数,该方法的作用是根据hashCode()和equals()方法来获取不同的元素,因此我们的元素必须实现这两个方法。如果distinct()正在处理有序流,那么 对于重复元素将保留处理元素时的顺序。而在处理无序流的情况下,则不一定保证元素的顺序。在有序流的并行执行情况下,保持distinct()的顺序性是需要高昂的缓冲开销。如果我们在处理元素时,不需要保证元素的顺序性,那么我们可以使用unordered()方法实现无序流。
- 在
使用示例:
1 | public class StreamExample { |
2、sorted
对数据进行排序,不过对于有序流,排序是稳定的,而对于非有序流,不保证排序稳定。
- 接口定义:
-
Stream<T> sorted(); -
Stream<T> sorted(Comparator<? super T> comparator);
-
- 方法描述:
- 使用
Stream<T> sorted();方法时,它会将Stream中的元素按照自然排序方式对元素进行排序。等到将全部元素处理完成后,将元素组成新的Stream返回。 - 使用
Stream<T> sorted(Comparator<? super T> comparator);方法时,它接收的是一个Comparator类型参数,在Lambda表达式中Comparator<T>一般是用于比较两个参数,设置一个算法逻辑,执行完后返回一个整数,可以是负数、零、正整数,它的的不同的值表示两个值比较的不同,一般排序会按这个比较结果进行排序。等到将全部元素处理完成后,将元素组成新的Stream返回。
- 使用
使用示例:
1 | public class StreamExample { |
3、skip
根据指定数值,从指定位置跳过流中某些元素。
- 接口定义:
-
Stream<T> skip(long n);
-
- 方法描述:
- 在
skip接口定义中是接收long类型参数,指定要跳过前n个元素,将第n个后的元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
4、limit
根据指定数值,限制只能访问流中最大访问的个数。这是一个有状态的、短路方法。
- 接口定义:
-
Stream<T> limit(long maxSize);
-
- 方法描述:
- 在
limit接口定义中是接收long类型参数,指定限制maxSize个元素,即将maxSize和它之前的元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
九、Stream 中间操作(无状态)常用 API
1、map
对流中的元素进行处理,然后返回,返回值的类型可以和原来的元素的类型不同。
- 接口定义:
-
Stream<R> map(Function<? super T, ? extends R> mapper);
-
- 方法描述:
- 在
map接口定义中是接收Function类型参数,了解Lambda表达式就可以知道Function<T,R>是接收一个T返回处理后的值R。所以,这里map方法就是对流中的元素进行处理,然后返回一个新的元素。等到将全部元素处理完成后将元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
2、peek
对流中的每个元素进行操作处理,返回的流和原来的流保存一样的元素。
- 接口定义:
-
Stream<T> peek(Consumer<? super T> action);
-
- 方法描述:
- 在
peek接口定义中是接收Consumer类型参数,了解Lambda表达式就可以知道Consumer<T>是接收一个T返回处理后不返回值。所以,这里peek方法就是对流中的元素进行处理而不返回值。等到将全部元素处理完成后将元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
3、filter
主要用于数据过滤,过滤不符合 predicate 条件的元素,保留过滤后的元素。
- 接口定义:
-
Stream<T> filter(Predicate<? super T> predicate);
-
- 方法描述:
- 在
filter接口定义中是接收Predicate类型参数,了解Lambda表达式就可以知道Predicate<T>是接收一个T进行验证,返回布尔值。所以,这里filter方法就是设置验证条件,对流中的元素进行验证,返回符合条件的全部元素组成新的流。
- 在
使用示例:
1 | public class StreamExample { |
4、mapToInt
主要用于对流中元素进行一对一转换为 int 整数,然后可以进行一些求和、平均值、最大最小值等处理。
- 接口定义:
-
IntStream mapToInt(ToIntFunction<? super T> mapper);
-
- 方法描述:
- 在
mapToInt接口定义中可知,它接收ToIntFunctio类型参数,在Lambda中ToIntFunction<T>函数的作用为接受一个输入参数,返回一个int类型结果,根据这点很容易了解到mapToInt方法就是将Stream中原有的元素类型转换为int类型到新的Stream中。
- 在
使用示例:
1 | public class StreamExample { |
5、mapToDouble
主要用于对流中元素进行一对一转换为 double 双精度浮点型,然后可以进行一些求和、平均值、最大最小值等处理。
- 接口定义:
-
IntStream mapToInt(ToDoubleFunction<? super T> mapper);
-
- 方法描述:
- 在
mapToDouble接口定义中可知,它接收ToDoubleFunction类型参数,在Lambda中ToDoubleFunction<T>函数的作用为接受一个输入参数,返回一个int类型结果,根据这点很容易了解到ToDoubleFunction方法就是将Stream中原有的元素类型转换为double类型到新的Stream中。
- 在
使用示例:
1 | public class StreamExample { |
6、flatMap
把几个小的list中的所有元素合并到一个大的list。
- 接口定义:
-
<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)
-
- 方法描述:
-
Stream.flatMap()返回流,该流将包含通过映射函数替换源流的每个元素而获得的元素,并展平结果。映射函数将生成流,并且在应用映射后关闭每个映射流。在对象流上应用统计函数是有用的,并且可以在一行中进行编码。
-
使用示例:
1 | public static void main(String args[]) { |
十、Stream 终端操作(短路操作)常用 API
1、anyMatch
判断数据列表中是否存在任意一个元素符合设置的 predicate 条件,如果是就返回 true,否则返回 false。
- 接口定义:
-
boolean anyMatch(Predicate<? super T> predicate);
-
- 方法描述:
- 在
anyMatch接口定义中是接收Predicate类型参数,在Lambda表达式中Predicate<T>是接收一个T类型参数,然后经过逻辑验证返回布尔值结果。这里anyMatch表示,判断的条件里,任意一个元素符合条件,就返回true值。
- 在
使用示例:
1 | iimport java.util.stream.Stream; |
2、allMatch
只有数据列表中全部元素都符合设置的 predicate 条件时,才返回 true,否则 flase,流为空时总是返回 true。
- 接口定义:
-
boolean allMatch(Predicate<? super T> predicate);
-
- 方法描述:
- 在
allMatch接口定义中是接收Predicate类型参数,在 Lambda 表达式中Predicate<T>是接收一个 T 类型参数,然后经过逻辑验证返回布尔值结果。这里allMatch表示,判断的条件里,全部元素符合条件,就返回true值。
- 在
使用示例:
1 | import java.util.stream.Stream; |
3、noneMatch
只有数据列表中全部元素都不符合设置的 predicate 条件时,才返回 true,否则 flase,流为空时总是返回 true。
- 接口定义:
-
boolean noneMatch(Predicate<? super T> predicate);
-
- 方法描述:
- 在
noneMatch接口定义中是接收Predicate类型参数,在Lambda表达式中Predicate<T>是接收一个T类型参数,然后经过逻辑验证返回布尔值结果。这里noneMatch表示与allMatch相反,判断条件里的元素,所有的元素都不符合,返回true值。
- 在
使用示例:
1 | import java.util.stream.Stream; |
4、findFirst
返回第一个元素,如果流为空,返回空的 Optional 对象。
- 接口定义:
-
Optional<T> findFirst();
-
- 方法描述:
- 在
findFirst方法的作用是返回集合中的第一个对象。
- 在
使用示例:
1 | import java.util.Optional; |
5、findAny
返回任意一个元素,如果流为空,返回空的 Optional 对象。
- 接口定义:
-
Optional<T> findAny();
-
- 方法描述:
- 在
findAny方法的作用是返回集合中的任何一个对象。
- 在
使用示例:
1 | import java.util.Optional; |
十一、Stream 终端操作(非短路操作)常用 API
1、max
返回流中所有元素的最大值。
- 接口定义:
-
Optional<T> max(Comparator<? super T> comparator);
-
- 方法描述:
- 在
max接口定义中是接收 Comparator 类型参数,Lambda常用函数的Consumer<T>一般是用于比较两个参数,设置一个算法逻辑,执行完后返回一个整数,可以是负数、零、正整数,根据返回的值结果进行排序筛选出最大值。
- 在
使用示例:
1 | import java.util.Optional; |
2、min
返回流中所有元素的最小值。
- 接口定义:
-
Optional<T> min(Comparator<? super T> comparator);
-
- 方法描述:
- 在
max接口定义中是接收Comparator类型参数,Lambda常用函数的Consumer<T>一般是用于比较两个参数,设置一个算法逻辑,执行完后返回一个整数,可以是负数、零、正整数,根据返回的值结果进行排序筛选出最小值。
- 在
使用示例:
1 | import java.util.Optional; |
3、count
统计流中所有元素的数目。
- 接口定义:
-
long count();
-
- 方法描述:
- 该方法用于统计
Stream中元素个数,返回long类型结果。
- 该方法用于统计
使用示例:
1 | import java.util.Optional; |
4、reduce
主要用于对数据进行合并处理。
- 接口定义:
-
T reduce(T identity, BinaryOperator<T> accumulator); -
Optional<T> reduce(BinaryOperator<T> accumulator); -
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
-
- 方法描述:
- 当使用
Optional<T> reduce(BinaryOperator<T> accumulator);方法时操作Stream中的数据时,通过累加器accumulator迭代计算,最终得到一个T类型的 Optional 对象。 - 当使用
T reduce(T identity, BinaryOperator<T> accumulator);方法时,给定一个初始值identity,通过累加器accumulator迭代计算,得到一个同Stream中数据同类型的结果。 - 当使用
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);方法时,给定一个初始值identity,通过累加器accumulator迭代计算,得到一个identity类型的结果,第三个参数用于使用并行流时,进行合并结果。
- 当使用
使用示例:
1 | public class StreamExample { |
5、forEach
主要用于对数据遍历整个流中元素,执行指定逻辑,在并发执行时不保证顺序。
- 接口定义:
-
void forEach(Consumer<? super T> action);
-
- 方法描述:
- 对
Stream中的每个元素都执一段逻辑代码,例如,循环打印输出元素的值,但是需要注意的是,在并发执行时无法保证执行顺序。
- 对
使用示例:
1 | public class StreamExample { |
6、forEachOrdered
主要用于对数据遍历整个流中元素,执行指定逻辑,在并发执行时保证顺序。
- 接口定义:
-
void forEachOrdered(Consumer<? super T> action);
-
- 方法描述:
- 作用和
forEach作用类似,但是其在并发执行时以保证执行顺序。
- 作用和
使用示例:
1 | public class StreamExample { |
7、toArray
将 Stream 流中的数据存储到数组中。
- 接口定义:
-
Object[] toArray(); -
<A> A[] toArray(IntFunction<A[]> generator);
-
- 方法描述:
- 将
Stream中的元素,存储到Object数组。
- 将
使用示例:
1 | public class StreamExample { |
8、collect
常用的聚合方法,能将数据进行聚合操作。不仅如此,它还能与 Collector 配合使用,对聚合后的数据进行处理。
- 接口定义:
-
<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); -
<R, A> R collect(Collector<? super T, A, R> collector);
-
- 方法描述: 。
- 当使用
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);方法时,它第一个参数supplier为结果存放容器,第二个参数accumulator为结果如何添加到容器的操作,第三个参数combiner则为多个容器的聚合策略。 - 当使用
<R, A> R collect(Collector<? super T, A, R> collector);方法时,这种是接收Collector类型参数,它使collect操作更加强大,对于绝大部分操作可以分解为以下主要步骤:提供初始容器->加入元素到容器->并发下多容器聚合->对聚合后结果进行操作。同时Collector接口又提供了of静态方法帮助你最大化的定制自己的操作,官方也提供了Collectors这个类封装了大部分的常用收集操作。
- 当使用
使用示例:
1 | public class StreamExample { |
十二、Java 9 改进的 Stream API
Java 9 改进的 Stream API 添加了一些便利的方法,使流处理更容易,并使用收集器编写复杂的查询。
Java 9 为 Stream 新增了几个方法:dropWhile、takeWhile、ofNullable,为 iterate 方法新增了一个重载方法。
1、takeWhile 方法
语法default Stream<T>takeWhile(Predicate<? super T>predicate)
takeWhile() 方法使用一个断言作为参数,返回给定 Stream 的子集直到断言语句第一次返回 false。如果第一个值不满足断言条件,将返回一个空的 Stream。
takeWhile() 方法在有序的 Stream 中,takeWhile 返回从开头开始的尽量多的元素;在无序的 Stream 中,takeWhile 返回从开头开始的符合 Predicate 要求的元素的子集。
实例
1 | import java.util.stream.Stream; |
以上实例 takeWhile 方法在碰到空字符串时停止循环输出,执行输出结果为:
1 | abc |
2、dropWhile 方法
语法default Stream<T>dropWhile(Predicate<? super T>predicate)
dropWhile 方法和 takeWhile 作用相反的,使用一个断言作为参数,直到断言语句第一次返回 false 才返回给定 Stream 的子集。
实例
1 | import java.util.stream.Stream; |
以上实例 dropWhile 方法在碰到空字符串时开始循环输出,执行输出结果为:
1 | ef |
3.iterate 方法
语法static <T> Stream<T>iterate(T seed, Predicate<? super T>hasNext, UnaryOperator<T> next)
方法允许使用初始种子值创建顺序(可能是无限)流,并迭代应用指定的下一个方法。 当指定的 hasNext 的 predicate 返回 false 时,迭代停止。
实例
1 | java.util.stream.IntStream; |
执行输出结果为:
1 | 3 |
4、ofNullable 方法
语法static <T> Stream<T>ofNullable(T t)
ofNullable 方法可以预防 NullPointerExceptions 异常, 可以通过检查流来避免 null 值。
如果指定元素为非 null,则获取一个元素并生成单个元素流,元素为 null 则返回一个空流。
实例
1 | import java.util.stream.Stream; |
执行输出结果为:
1 | 1 |