一、什么是 Stream
在 Java 8
中增加了一个新的抽象接口 Stream API
,它支持声明式的处理数据。使用 Stream
操作集合似于使用 SQL 语句数据库查找数据类似,提供直观的方法进行操作。 同时 Stream API 让开发者能够快速写出干净、简洁的代码,提高开发者的开发效率。
Stream 将要处理的元素集合看作一种流, 流在管道中传输, 并且可以在管道传输过程中对流进行处理, 比如筛选、排序、聚合等操作。在经过一系列中间操作后形成最终的管道,得到处理的结果。
二、Stream 操作分类
Stream 操作分为 中间操作
(Intermediate operations)、终端操作
(Terminal operations),信息如下:
Stream 操作分类 | |||
---|---|---|---|
中间操作 (Intermediate operations) |
无状态 | unordered()、filter()、map()、mapToInt()、mapToLong() mapToDouble()、flatMap()、flatMapToInt()、flatMapToLong() flatMapToDouble()、peek() |
|
有状态 | distinct()、sorted()、limit()、skip() | ||
终端操作 (Terminal operations) |
非短路操作 | forEach()、forEachOrdered()、toArray()、reduce()、collect max()、min()、count() |
|
短路操作 | anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() |
- 中间操作: 中间操作其实就是进行逻辑处理。这个操作可以有一个或者多个连续操作,将一个流转换成另一个流,这些操作不会消耗流,其目的是建立一个流水线,直到终端操作发生后,才会做数据的最终执行。
- 无状态: 指数据处理时,不受之前”中间操作”的影响;
- 有状态: 指数据处理时,受之前”中间操作”的影响,有状态的方法往往需要更大的性能开销;
- 终端操作: 一个 Stream 对象只能有一个终端操作(Terminal operations),这个操作一旦发生,就会真实处理数据,生成对应的处理结果。
- 非短路操作: 指必须处理所有元素才能得到最终结果;
- 短路操作: 指遇到某些符合条件的元素就可以得到最终结果;
三、Stream 特性
- 不存储数据: Stream 不对数据进行存储,而是按照特定的规则对数据进行处理;
- 不改变数据源: Stream 通常不会改变原有数据源,操作时一般是对原有的数据源创建副本,然后对副本进行处理,生成新的 Stream。
- 具有惰性化: Stream 很多操作是有向后延迟的,需要一直等到它弄清楚了最后需要多少数据才会开始。而中间操作(Intermediate operations)永远是惰性化的。
- 可以是无限的: 集合有固定大小,Stream 则不一定。limit(n) 和 findFirst() 这类的短路操作,可以对无限的 Stream 快速完成运算处理。
- 支持并行能力: Stream 是支持并行能力的,可以轻松执行并行化对数据进行处理。
- 可被消耗的: Stream 的生命周期中,Stream 的元素只被访问一次。与迭代器一样,必须生成新的 Stream 后, 才能重新访问与开始源中相同的元素。
四、数据转换为 Stream
一般我们需要获取 Stream
对象后才能对其进行操作,下面列出了一些数据转换为 Stream
的常用方法:
1 | public class StreamExample { |
五、Stream 转换得到指定类型数据
既然可以把集合或者数组转换成流,那么也就可以把流转换回去,使用 collect()
方法就能实现,不过一般还需要配合 Collectors
工具类一起使用,Collectors
类中内置了一系列收集器实现,如下:
- toList(): 将元素收集到一个新的 List 集合中;
- toSet(): 将元素收集到一个新的 Set 集合中;
- toCollection(): 将元素收集到一个新的 ArrayList 集合中;
- joining(): 将元素收集到一个可以用分隔符指定的字符串中;
下面再介绍下 Stream 转换指定类型的常用示例:
(1)、Stream 转换为数组
1 | public class StreamExample { |
(2)、Stream 转换位字符串
1 | public class StreamExample { |
(3)、Stream 转换为 List 列表
1 | public class StreamExample { |
(4)、Stream 转换为 Set 集合
1 | public class StreamExample { |
(5)、Stream 转换为 Map 集合
1 | public class StreamExample { |
六、Stream 对于基本类型的封装
因为 Java
的范型不支持基本类型,所以我们无法用 Stream<int>
这样的类型,如果写了类似代码在编译器中会提示编译错误。为了存储 int
,只能使用 Stream<Integer>
,但这样会产生频繁的装箱、拆箱操作。为了提高效率,对基本类型数据 int
、long
、double
进行了封装,分别为 IntSteam
、LongStream
、DoubleSteam
,使用方法和 Stream
类似,示例代码如下:
1 | public class StreamExample { |
七、Stream 的串行与并行
1、Stream 的并行介绍
在 Stream
中,最明显的特点就是存在并行操作,不过如果使用默认方式执行中间与终端操作,那么整个执行过程其实是个串行操作。如果想让 Stream
并行处理数据,那么需要 Stream
中调用 parallel()
或者集合中调用 parallelStream()
方法来开启并行执行。
1 | List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); |
或者:
1 | List<String> list = Arrays.asList("One", "Two", "Three", "Four", "Five"); |
其中 Stream
底层使用的是 ForkJoinTask
实现 Stream
的并行处理,充分利用 CPU
的多核能力,Stream
的 API
将底层复杂实现完全屏蔽了,开发者仅需调用一个方法即可实现并行计算。
2、Stream 并行示例
1 | public class StreamExample { |
执行结果:
1 | 并行执行耗时:8317 |
可以看到使用并行执行所花费的时间远低于串行所花费的时间,不过在使用并行执行时一定要先考虑好使用情况,考虑执行数据是否需要顺序执行,是否涉及线程安全,是否涉及使用网络等,并不是全部情况都能使用并行执行完成处理逻辑的,所以在使用之前一定要慎重,使用不好很可能会带来性能的不升反降。
一般情况下,如机器学习和数据处理等比较适合使用并行处理数据任务,其它方便需要使用者自己衡量进行测试,是否该使用并行执行任务。
八、Stream 中间操作(有状态)常用 API
1、distinct
保证输出的流中包含唯一的元素,通常用于数据去重。
- 接口定义:
-
Stream<T> distinct();
-
- 方法描述:
- 在
distinct
接口定义中不接收任何参数,该方法的作用是根据hashCode()
和equals()
方法来获取不同的元素,因此我们的元素必须实现这两个方法。如果distinct()
正在处理有序流,那么 对于重复元素将保留处理元素时的顺序。而在处理无序流的情况下,则不一定保证元素的顺序。在有序流的并行执行情况下,保持distinct()
的顺序性是需要高昂的缓冲开销。如果我们在处理元素时,不需要保证元素的顺序性,那么我们可以使用unordered()
方法实现无序流。
- 在
使用示例:
1 | public class StreamExample { |
2、sorted
对数据进行排序,不过对于有序流,排序是稳定的,而对于非有序流,不保证排序稳定。
- 接口定义:
-
Stream<T> sorted();
-
Stream<T> sorted(Comparator<? super T> comparator);
-
- 方法描述:
- 使用
Stream<T> sorted();
方法时,它会将Stream
中的元素按照自然排序
方式对元素进行排序。等到将全部元素处理完成后,将元素组成新的Stream
返回。 - 使用
Stream<T> sorted(Comparator<? super T> comparator);
方法时,它接收的是一个Comparator
类型参数,在Lambda
表达式中Comparator<T>
一般是用于比较两个参数,设置一个算法逻辑,执行完后返回一个整数,可以是负数、零、正整数,它的的不同的值表示两个值比较的不同,一般排序会按这个比较结果进行排序。等到将全部元素处理完成后,将元素组成新的Stream
返回。
- 使用
使用示例:
1 | public class StreamExample { |
3、skip
根据指定数值,从指定位置跳过流中某些元素。
- 接口定义:
-
Stream<T> skip(long n);
-
- 方法描述:
- 在
skip
接口定义中是接收long
类型参数,指定要跳过前n
个元素,将第n
个后的元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
4、limit
根据指定数值,限制只能访问流中最大访问的个数。这是一个有状态的、短路方法。
- 接口定义:
-
Stream<T> limit(long maxSize);
-
- 方法描述:
- 在
limit
接口定义中是接收long
类型参数,指定限制maxSize
个元素,即将maxSize
和它之前的元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
九、Stream 中间操作(无状态)常用 API
1、map
对流中的元素进行处理,然后返回,返回值的类型可以和原来的元素的类型不同。
- 接口定义:
-
Stream<R> map(Function<? super T, ? extends R> mapper);
-
- 方法描述:
- 在
map
接口定义中是接收Function
类型参数,了解Lambda
表达式就可以知道Function<T,R>
是接收一个T
返回处理后的值R
。所以,这里map
方法就是对流中的元素进行处理,然后返回一个新的元素。等到将全部元素处理完成后将元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
2、peek
对流中的每个元素进行操作处理,返回的流和原来的流保存一样的元素。
- 接口定义:
-
Stream<T> peek(Consumer<? super T> action);
-
- 方法描述:
- 在
peek
接口定义中是接收Consumer
类型参数,了解Lambda
表达式就可以知道Consumer<T>
是接收一个T
返回处理后不返回值。所以,这里peek
方法就是对流中的元素进行处理而不返回值。等到将全部元素处理完成后将元素组成新的流返回。
- 在
使用示例:
1 | public class StreamExample { |
3、filter
主要用于数据过滤,过滤不符合 predicate 条件的元素,保留过滤后的元素。
- 接口定义:
-
Stream<T> filter(Predicate<? super T> predicate);
-
- 方法描述:
- 在
filter
接口定义中是接收Predicate
类型参数,了解Lambda
表达式就可以知道Predicate<T>
是接收一个T
进行验证,返回布尔值。所以,这里filter
方法就是设置验证条件,对流中的元素进行验证,返回符合条件的全部元素组成新的流。
- 在
使用示例:
1 | public class StreamExample { |
4、mapToInt
主要用于对流中元素进行一对一转换为 int 整数,然后可以进行一些求和、平均值、最大最小值等处理。
- 接口定义:
-
IntStream mapToInt(ToIntFunction<? super T> mapper);
-
- 方法描述:
- 在
mapToInt
接口定义中可知,它接收ToIntFunctio
类型参数,在Lambda
中ToIntFunction<T>
函数的作用为接受一个输入参数,返回一个int
类型结果,根据这点很容易了解到mapToInt
方法就是将Stream
中原有的元素类型转换为int
类型到新的Stream
中。
- 在
使用示例:
1 | public class StreamExample { |
5、mapToDouble
主要用于对流中元素进行一对一转换为 double 双精度浮点型,然后可以进行一些求和、平均值、最大最小值等处理。
- 接口定义:
-
IntStream mapToInt(ToDoubleFunction<? super T> mapper);
-
- 方法描述:
- 在
mapToDouble
接口定义中可知,它接收ToDoubleFunction
类型参数,在Lambda
中ToDoubleFunction<T>
函数的作用为接受一个输入参数,返回一个int
类型结果,根据这点很容易了解到ToDoubleFunction
方法就是将Stream
中原有的元素类型转换为double
类型到新的Stream
中。
- 在
使用示例:
1 | public class StreamExample { |
6、flatMap
把几个小的list中的所有元素合并到一个大的list。
- 接口定义:
-
<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)
-
- 方法描述:
-
Stream.flatMap()
返回流,该流将包含通过映射函数替换源流的每个元素而获得的元素,并展平结果。映射函数将生成流,并且在应用映射后关闭每个映射流。在对象流上应用统计函数是有用的,并且可以在一行中进行编码。
-
使用示例:
1 | public static void main(String args[]) { |
十、Stream 终端操作(短路操作)常用 API
1、anyMatch
判断数据列表中是否存在任意一个元素符合设置的 predicate 条件,如果是就返回 true,否则返回 false。
- 接口定义:
-
boolean anyMatch(Predicate<? super T> predicate);
-
- 方法描述:
- 在
anyMatch
接口定义中是接收Predicate
类型参数,在Lambda
表达式中Predicate<T>
是接收一个T
类型参数,然后经过逻辑验证返回布尔值结果。这里anyMatch
表示,判断的条件里,任意一个元素符合条件,就返回true
值。
- 在
使用示例:
1 | iimport java.util.stream.Stream; |
2、allMatch
只有数据列表中全部元素都符合设置的 predicate 条件时,才返回 true,否则 flase,流为空时总是返回 true。
- 接口定义:
-
boolean allMatch(Predicate<? super T> predicate);
-
- 方法描述:
- 在
allMatch
接口定义中是接收Predicate
类型参数,在 Lambda 表达式中Predicate<T>
是接收一个 T 类型参数,然后经过逻辑验证返回布尔值结果。这里allMatch
表示,判断的条件里,全部元素符合条件,就返回true
值。
- 在
使用示例:
1 | import java.util.stream.Stream; |
3、noneMatch
只有数据列表中全部元素都不符合设置的 predicate 条件时,才返回 true,否则 flase,流为空时总是返回 true。
- 接口定义:
-
boolean noneMatch(Predicate<? super T> predicate);
-
- 方法描述:
- 在
noneMatch
接口定义中是接收Predicate
类型参数,在Lambda
表达式中Predicate<T>
是接收一个T
类型参数,然后经过逻辑验证返回布尔值结果。这里noneMatch
表示与allMatch
相反,判断条件里的元素,所有的元素都不符合,返回true
值。
- 在
使用示例:
1 | import java.util.stream.Stream; |
4、findFirst
返回第一个元素,如果流为空,返回空的 Optional 对象。
- 接口定义:
-
Optional<T> findFirst();
-
- 方法描述:
- 在
findFirst
方法的作用是返回集合中的第一个对象。
- 在
使用示例:
1 | import java.util.Optional; |
5、findAny
返回任意一个元素,如果流为空,返回空的 Optional 对象。
- 接口定义:
-
Optional<T> findAny();
-
- 方法描述:
- 在
findAny
方法的作用是返回集合中的任何一个对象。
- 在
使用示例:
1 | import java.util.Optional; |
十一、Stream 终端操作(非短路操作)常用 API
1、max
返回流中所有元素的最大值。
- 接口定义:
-
Optional<T> max(Comparator<? super T> comparator);
-
- 方法描述:
- 在
max
接口定义中是接收 Comparator 类型参数,Lambda
常用函数的Consumer<T>
一般是用于比较两个参数,设置一个算法逻辑,执行完后返回一个整数,可以是负数、零、正整数,根据返回的值结果进行排序筛选出最大值。
- 在
使用示例:
1 | import java.util.Optional; |
2、min
返回流中所有元素的最小值。
- 接口定义:
-
Optional<T> min(Comparator<? super T> comparator);
-
- 方法描述:
- 在
max
接口定义中是接收Comparator
类型参数,Lambda
常用函数的Consumer<T>
一般是用于比较两个参数,设置一个算法逻辑,执行完后返回一个整数,可以是负数、零、正整数,根据返回的值结果进行排序筛选出最小值。
- 在
使用示例:
1 | import java.util.Optional; |
3、count
统计流中所有元素的数目。
- 接口定义:
-
long count();
-
- 方法描述:
- 该方法用于统计
Stream
中元素个数,返回long
类型结果。
- 该方法用于统计
使用示例:
1 | import java.util.Optional; |
4、reduce
主要用于对数据进行合并处理。
- 接口定义:
-
T reduce(T identity, BinaryOperator<T> accumulator);
-
Optional<T> reduce(BinaryOperator<T> accumulator);
-
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
-
- 方法描述:
- 当使用
Optional<T> reduce(BinaryOperator<T> accumulator);
方法时操作Stream
中的数据时,通过累加器accumulator
迭代计算,最终得到一个T
类型的 Optional 对象。 - 当使用
T reduce(T identity, BinaryOperator<T> accumulator);
方法时,给定一个初始值identity
,通过累加器accumulator
迭代计算,得到一个同Stream
中数据同类型的结果。 - 当使用
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
方法时,给定一个初始值identity
,通过累加器accumulator
迭代计算,得到一个identity
类型的结果,第三个参数用于使用并行流时,进行合并结果。
- 当使用
使用示例:
1 | public class StreamExample { |
5、forEach
主要用于对数据遍历整个流中元素,执行指定逻辑,在并发执行时不保证顺序。
- 接口定义:
-
void forEach(Consumer<? super T> action);
-
- 方法描述:
- 对
Stream
中的每个元素都执一段逻辑代码,例如,循环打印输出元素的值,但是需要注意的是,在并发执行时无法保证执行顺序。
- 对
使用示例:
1 | public class StreamExample { |
6、forEachOrdered
主要用于对数据遍历整个流中元素,执行指定逻辑,在并发执行时保证顺序。
- 接口定义:
-
void forEachOrdered(Consumer<? super T> action);
-
- 方法描述:
- 作用和
forEach
作用类似,但是其在并发执行时以保证执行顺序。
- 作用和
使用示例:
1 | public class StreamExample { |
7、toArray
将 Stream 流中的数据存储到数组中。
- 接口定义:
-
Object[] toArray();
-
<A> A[] toArray(IntFunction<A[]> generator);
-
- 方法描述:
- 将
Stream
中的元素,存储到Object
数组。
- 将
使用示例:
1 | public class StreamExample { |
8、collect
常用的聚合方法,能将数据进行聚合操作。不仅如此,它还能与 Collector 配合使用,对聚合后的数据进行处理。
- 接口定义:
-
<R> R collect(Supplier<R> supplier,BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
-
<R, A> R collect(Collector<? super T, A, R> collector);
-
- 方法描述: 。
- 当使用
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
方法时,它第一个参数supplier
为结果存放容器,第二个参数accumulator
为结果如何添加到容器的操作,第三个参数combiner
则为多个容器的聚合策略。 - 当使用
<R, A> R collect(Collector<? super T, A, R> collector);
方法时,这种是接收Collector
类型参数,它使collect
操作更加强大,对于绝大部分操作可以分解为以下主要步骤:提供初始容器
->加入元素到容器
->并发下多容器聚合
->对聚合后结果进行操作
。同时Collector
接口又提供了of
静态方法帮助你最大化的定制自己的操作,官方也提供了Collectors
这个类封装了大部分的常用收集操作。
- 当使用
使用示例:
1 | public class StreamExample { |
十二、Java 9 改进的 Stream API
Java 9 改进的 Stream API 添加了一些便利的方法,使流处理更容易,并使用收集器编写复杂的查询。
Java 9 为 Stream 新增了几个方法:dropWhile、takeWhile、ofNullable,为 iterate 方法新增了一个重载方法。
1、takeWhile 方法
语法default Stream<T>takeWhile(Predicate<? super T>predicate)
takeWhile() 方法使用一个断言作为参数,返回给定 Stream 的子集直到断言语句第一次返回 false。如果第一个值不满足断言条件,将返回一个空的 Stream。
takeWhile() 方法在有序的 Stream 中,takeWhile 返回从开头开始的尽量多的元素;在无序的 Stream 中,takeWhile 返回从开头开始的符合 Predicate 要求的元素的子集。
实例
1 | import java.util.stream.Stream; |
以上实例 takeWhile 方法在碰到空字符串时停止循环输出,执行输出结果为:
1 | abc |
2、dropWhile 方法
语法default Stream<T>dropWhile(Predicate<? super T>predicate)
dropWhile 方法和 takeWhile 作用相反的,使用一个断言作为参数,直到断言语句第一次返回 false 才返回给定 Stream 的子集。
实例
1 | import java.util.stream.Stream; |
以上实例 dropWhile 方法在碰到空字符串时开始循环输出,执行输出结果为:
1 | ef |
3.iterate 方法
语法static <T> Stream<T>iterate(T seed, Predicate<? super T>hasNext, UnaryOperator<T> next)
方法允许使用初始种子值创建顺序(可能是无限)流,并迭代应用指定的下一个方法。 当指定的 hasNext 的 predicate 返回 false 时,迭代停止。
实例
1 | java.util.stream.IntStream; |
执行输出结果为:
1 | 3 |
4、ofNullable 方法
语法static <T> Stream<T>ofNullable(T t)
ofNullable 方法可以预防 NullPointerExceptions 异常, 可以通过检查流来避免 null 值。
如果指定元素为非 null,则获取一个元素并生成单个元素流,元素为 null 则返回一个空流。
实例
1 | import java.util.stream.Stream; |
执行输出结果为:
1 | 1 |