一文详解Stream流
简介
Stream支持元素流功能性操作的类,例如集合上的map-reduce
转换。 例如:
1 | int sum = widgets.stream().filter(b -> b.getColor() == RED).mapToInt(b -> b.getWeight()).sum(); |
这里我们使用widgets Collection<Widget>
作为流的源,然后在流上执行filter-map-reduce以获得红色小部件的权重之和。(求和是一个的例子reduction操作)。
这个包中引入的关键抽象是流。流与集合有以下几种不同:
- 没有存储。 流不是存储元素的数据结构; 相反,它通过计算操作的流水线传送诸如数据结构,数组,生成器功能或I/O通道的源的元件。
- 功能性质。 流上的操作产生结果,但不会修改其来源。 例如,过滤从Stream获得的Stream会生成旧的Stream不需要的元素,而不是从源集合中删除元素。
- 懒惰执行。 许多流操作(如过滤,映射或重复删除)可以懒惰地实现,从而暴露优化的机会。 例如,”找到具有三个连续元音的第一个String”,不需要检查所有的输入字符串。 流操作分为中间(Stream生产)操作和终端(价值或副作用生成)操作。 中间操作总是懒惰执行的。
- 可能无限。 虽然集合的大小有限,但流不需要。 诸如limit(n)或findFirst()之类的方法可以允许无限流上的计算在有限的时间内完成。
- 消耗性质。流的元素只能在流的一生中访问一次。 像Iterator一样 ,必须生成一个新流来重新访问源的相同元素。
流可以通过多种方式获得。 一些例子包括:
- 通过Collection的stream()或parallelStream()方法;
- 对于数组类可以通过Arrays.stream(Object[])获取流;
- 通过高级流类的静态方法,如Stream.of(Object[])、IntStream.range(int, int)、Stream.iterate(Object, UnaryOperator); ;
- 文件的行可以从BufferedReader.lines()获取;
- 文件路径的流可以从Files中的方法获得,如newDirectoryStream()、newInputStream()、newOutputStream();
- 随机数流可以从Random.ints()获得;
- 许多其它的数据流的方法的轴承在JDK中,包括BitSet.stream()、Pattern.splitAsStream(java.lang.CharSequence)、JarFile.stream();
流操作分为中间操作和终端操作,并且组合以形成pipelines。pipelines由源(例如Collection,数组,发生器功能或I/O通道)组成;其次是零个或多个中间操作,如Stream.filter或Stream.map;以及诸如Stream.forEach或Stream.reduce的终端操作。
中间操作返回一个新的流。 他们总是懒惰执行诸如filter()操作实际上不执行任何过滤,而是创建一个新的流,当被遍历时,它包含与给定谓词匹配的初始流的元素。在管道的终端操作被执行之前,管道源的遍历不会开始。
诸如Stream.forEach或IntStream.sum终端操作可以遍历流以产生结果或副作用。在执行终端操作之后,流管道被认为被消耗,并且不能再使用;如果再次遍历相同的数据源,则必须返回到数据源以获取新的流。 在几乎所有情况下,终端操作都是迫切的,在返回之前完成对数据源的遍历和处理。只有终端操作iterator()和spliterator()不是;在现有操作不足以满足任务的情况下,这些提供为”逃生舱“,以允许任意客户端控制的管道遍历。
处理流懒惰实现有着显著的效率提升; 在诸如上述的filter-map-reduce的流水线中,过滤、映射和求和可以融合到数据的单次传递中,具有最小的中间状态。懒惰也允许避免在没有必要时检查所有数据;对于诸如“找到长度超过1000个字符的第一个字符串”等操作,只需检查足够的字符串即可找到具有所需特性的字符串,而无需检查源中可用的所有字符串。(当输入流是无限大而不仅仅是大的时候,这种行为变得更加重要)
中间操作进一步分为无状态操作和有状态操作。无状态操作(例如filter和map)在处理新元素时不保留先前看到的元素的状态——每个元素可以独立于其他元素的操作进行处理。 诸如distinct和sorted有状态操作可以在处理新元件时结合先前看到的元素的状态。
在产生结果之前,有状态操作可能需要处理整个输入。例如,在流已经看到流的所有元素之前,不能产生排序流的任何结果。因此,在并行计算下,包含有状态中间操作的一些流水线可能需要对数据进行多遍,或者可能需要缓冲重要的数据。 包含无状态中间操作的流水线可以在一次通过中处理,无论是顺序还是并行,具有最少的数据缓冲。
此外,一些操作被认为是短路操作。如果在无限输入的情况下,中间运算可能会产生有限的流,这种情况就叫做短路。如果当无限输入呈现时,终端操作可能会在有限时间内终止。在流水线中进行短路操作是处理无限流在有限时间内正常终止的必要但不足够的条件。
特性
并行性
具有显式for循环的处理元素方式是串行的。流通过将计算重构为聚合操作的管道来促进并行执行,而不是作为每个单独元素的必要操作。所有流操作可以串行或并行执行。JDK中的流实现创建串行流,除非显式请求并行。 例如, Collection具有方法Collection.stream()和Collection.parallelStream(),其分别产生顺序和并行流;虽然IntStream.range(int, int)等其他流承载方法产生顺序流,但是可以通过调用它们的BaseStream.parallel()方法来有效地并行化这些流。要并行执行先前的“小部件权重总和”查询,我们可以这样做:
1 | int sumOfWeights = widgets.parallelStream().filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()).sum(); |
此示例的串行和并行版本之间的唯一区别是创建初始流,使用parallelStream()而不是stream()。当终端操作被启动时,根据被调用的流的方向,顺序或并行地执行流管道。流是否以串行或并行方式执行可以使用isParallel()方法确定,并且可以使用BaseStream.sequential()和BaseStream.parallel()操作修改流的方向。当终端操作被启动时,根据其被调用的流的模式,顺序地或并行地执行pipelines。
除了确定为非确定性的操作,如findAny(),流是顺序还是并行执行,不应该改变计算结果。
大多数流操作接受描述用户指定行为的参数,这些行为通常是lambda表达式。为了保持正确的行为,这些行为参数必须是非干扰的,在大多数情况下必须是无状态的。这样的参数总是functional interface的例子 ,如Function,并且通常是lambda表达式或方法引用。
非干扰
Streams能够对各种数据源执行可能并行的聚合操作,包括甚至非线程安全的集合,如ArrayList。只有当我们能够在流管道的执行过程中防止对数据源的干扰时,这才有可能。除了逃生舱操作iterator()和spliterator(),当调用终端操作时开始执行,终端操作完成时结束。对于大多数数据源,防止干扰意味着确保在pipelines的执行期间完全不修改数据源。其中值得注意的例外是其源是并发集合的流,它们专门用于处理并发修改。并发流源是指Spliterator报告CONCURRENT特征的流源。。
因此,源可能不是并发的流管道中的行为参数永远不应该修改流的数据源。如果行为参数修改或导致修改流的数据源,则它会干扰非并发数据源。不干涉的必要性适用于所有管道,而不仅仅是平行管道。除非流源是并发的,否则在流管道的执行过程中修改流的数据源可能会导致异常、错误回答或不一致行为。对于性能良好的流源,可以在终端操作开始之前修改源,并且这些修改将反映在所覆盖的元素中。例如,考虑以下代码:
1 | List<String> l = new ArrayList(Arrays.asList("one", "two")); |
首先创建一个由两个字符串组成的列表:“one”、“two”。 然后从该列表创建流。接下来,通过添加第三个字符串“three”修改列表。最后,流的元素被收集并连接在一起。由于列表在终端collect操作开始之前被修改,所以结果将是一个“one two three”的字符串。从JDK集合返回的所有流和大多数其他JDK类都以这种方式表现良好;对于其他库生成的流,请参阅Low-level stream construction,了解构建良好流的要求。
有状态
如果流操作的行为参数是有状态的,流管道结果可能不确定或不正确。有状态的lambda(或实现适当的功能接口的其他对象)是结果取决于在流管道的执行期间可能改变的任何状态。有状态的lambda的一个例子是map()的map()
:
1 | Set<Integer> seen = Collections.synchronizedSet(new HashSet<>()); |
这里,如果并行执行映射操作,由于线程调度的差异,相同输入的结果可能因运行而异,而使用无状态的lambda表达式,结果总是相同的。
还要注意,尝试从行为参数访问可变状态会带来安全和性能方面的不良选择;如果不同步对该状态的访问,则会有数据竞争,因此代码已损坏,但是如果同步访问该状态,则冒有争议的风险有可能破坏您正在寻求的并行性。最好的方法是避免有状态的行为参数完全流式传输;通常有一种重组流管道以避免状态的方法。
副作用
一般而言,流行为的行为参数的副作用是不鼓励的,因为它们经常会导致无意识地违反无状态要求以及其他线程安全危害。
如果行为参数确实有副作用,除非明确说明,有没有保证,而在visibility的那些副作用给其他线程,也没有任何保证相同的流管道内的“相同”的元素在不同的操作在同一个线程中执行。此外,这些效果的排序可能是令人惊讶的。 即使当管道被限制以产生与流源的遇到顺序一致的结果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray()
必须产生[0, 2, 4, 6, 8]
)时,不保证将映射器功能应用于各个元件的顺序,或者在什么线程中为给定元素执行任何行为参数。
许多可能诱惑使用副作用的计算可以更安全有效地表达而无副作用,例如使用reduction代替可变累加器。但是,对于调试目的使用println()等副作用通常是无害的。少量流操作,如forEach()和peek() ,只能通过副作用进行操作;这些应谨慎使用。
作为如何将不正确地使用副作用的流管道转换为不适用的流管道的示例,以下代码搜索与给定正则表达式匹配的字符串,并将匹配放在列表中。
1 | ArrayList<String> results = new ArrayList<>(); |
此代码不必要地使用副作用。如果并行执行,ArrayList的非线程安全性将导致不正确的结果,并且添加所需的同步将导致争用,从而破坏并行性的好处。此外,这里使用副作用是完全不必要的;forEach()可以简单地替换为更安全、更高效、更适合并行化的collect操作:
1 | List<String> results = stream.filter(s -> pattern.matcher(s).matches()).collect(Collectors.toList()); // No side-effects! |
有序性
流可能有也可能没有定义的遇到顺序 。流是否有遇到顺序取决于源和中间操作。某些流源(如List或数组)本质上是有序的,而其他数据源(如HashSet )不是。一些中间操作(例如sorted())可以在其他无序流上施加遇到命令,而其他中间操作可以使有序流无序,例如BaseStream.unordered()。此外,一些终端操作可能会忽略遇到的顺序,如forEach() 。
如果一个流被操作,大多数操作被限制为在遇到的顺序中对元素进行操作; 如果流的源是List含有[1, 2, 3]
,然后执行的结果map(x -> x*2)
必须是[2, 4, 6]
。然而,如果源没有定义遇到顺序,则值[2, 4, 6]
任何[2, 4, 6]
将是有效结果。
对于顺序流,遇到顺序的存在或不存在不影响性能,仅影响确定性。如果流被排序,在相同的源上重复执行相同的流管线将产生相同的结果; 如果没有顺序,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以实现更有效的执行。某些聚合操作,例如过滤重复(distinct())或组合减少(Collectors.groupingBy())可以更有效地实现,如果元素的排序不相关。类似地,本质上与遇到顺序相关的操作,如limit()可能需要缓冲以确保正确排序,从而破坏并行性的好处。在流具有相遇顺序,但用户并不特别关心该相遇顺序的情况下,使用unordered()方法显式地取消流的排序可以提高一些有状态或终端操作的并行性能。然而,大多数pipelines,如上面的“块的权重之和”示例,即使在排序约束下,仍然可以有效地并行化。
归约操作
归约操作(也称为折叠)采用一系列输入元素,并通过重复应用组合运算将它们组合成一个汇总结果,例如找到一组数字的和或最大值,或将元素累积到列表中。流类具有多种形式的通用归约运算,称为reduce()和collect(),以及多种专门的归约形式,如sum()、max()或count()。
当然,这样的操作可以容易使用简单的顺序循环实现,如:
1 | int sum = 0; |
然而,有一个很好的理由使用归约操作超过如上所述的突变积累。不仅减少“更抽象”——它在整个流上运行,而不是单个元素,但正确构造的归约操作本质上是可并行的,只要用于处理元素的功能是associative和stateless。例如,给定一个我们要找到的数字的数据流,我们可以写:
1 | int sum = numbers.stream().reduce(0, (x,y) -> x+y); |
要么:
1 | int sum = numbers.stream().reduce(0, Integer::sum); |
这些还原操作可以安全地并行运行,几乎没有修改:
1 | int sum = numbers.parallelStream().reduce(0, Integer::sum); |
归约可以很好地并行化,因为实现可以并行地对数据子集进行操作,然后将中间结果组合起来以获得最终的正确答案。(即使该语言有一个“每个并行”的构造,可变累积方法仍然需要开发人员为共享的累积变量sum提供线程安全的更新,然后所需的同步可能会消除并行带来的任何性能增益。)使用reduce()来消除并行化reduce操作的所有负担,并且该库可以提供高效的并行实现,而不需要额外的同步。
前面显示的“widgets”示例显示了如何将reduce与其他操作结合来替代大量操作的循环。如果widgets是Widget对象的集合,它们具有getWeight方法,我们可以找到最重的小部件:
1 | OptionalInt heaviest = widgets.parallelStream().mapToInt(Widget::getWeight).max(); |
在其更一般的形式中,reduce上类型的元素的操作<U>
需要三个参数:
1 | <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner); |
这里,identity既是减少的初始种子值,也是没有输入元素的默认结果。accumulator函数获得部分结果和下一个元素,并产生新的部分结果。combiner组合两个部分结果以产生新的部分结果。(组合器在并行归约中是必需的,其中输入被分割,为每个分区计算的部分累加,然后组合部分结果以产生最终结果。)
更正式地, identity值必须是组合器功能的标识。这意味着,对于所有u,combiner.apply(identity, u)等于u 。此外,combiner功能必须是associative,并必须兼容accumulator功能:对所有u和t,combiner.apply(u, accumulator.apply(identity, t))与accumulator.apply(u, t)的equals()结果必须为true。
三参数形式是双参数形式的泛化,将映射步骤纳入积累步骤。我们可以使用更一般的形式重写简单的权重示例,如下所示:
1 | int sumOfWeights = widgets.stream().reduce(0, (sum, b) -> sum + b.getWeight()), Integer::sum); |
显式的map-reduce
形式更具有可读性,因此通常应该是首选。为可以通过将映射和归约组合成单个函数来优化大量工作的情况提供了通用形式。。
可变归约
可变归约操作将输入元素累加到可变结果容器中,例如Collection或StringBuilder,因为它处理流中的元素。
如果我们想采取一串字符串并将它们连接成一个长的字符串,我们可以通过普通的reduction实现这一点:
1 | String concatenated = strings.reduce("", String::concat) |
我们会得到所需的结果,甚至可以并行工作。但是,我们可能不会对示例感到高兴!这样的实现将执行大量的字符串复制,并且运行时间将是字符数为O(n^2) 。更有效的方法是将结果累积到一个StringBuilder,它是一个用于累积字符串的可变容器。我们可以使用相同的技术并行化可变减少,就像我们用普通的减少一样。
可变归约操作称为collect() ,因为它将所需结果汇总到结果容器(如Collection。一个collect操作需要三个功能:供应商功能构造结果容器的新实例,将输入元素合并到结果容器中的累加器函数,以及将一个结果容器的内容合并到另一个中的组合函数。 这种形式与普通缩减的一般形式非常相似:
1 | <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); |
与reduce()以抽象的方式表达collect的好处是可以直接适应并行化:只要积累和组合函数满足适当的要求,我们可以并行累积部分结果,然后将它们组合起来。例如,要将流中的元素的String表示形式收集到一个ArrayList,我们可以为每个形式写出明显的顺序:
1 | ArrayList<String> strings = new ArrayList<>(); |
或者我们可以使用可并行化的收集表单:
1 | ArrayList<String> strings = stream.collect(() -> new ArrayList<>(), (c, e) -> c.add(e.toString()), (c1, c2) -> c1.addAll(c2)); |
或者将绘图操作拉出累加器函数,我们可以更简洁地表达:
1 | List<String> strings = stream.map(Object::toString).collect(ArrayList::new, ArrayList::add, ArrayList::addAll); |
在这里,我们的供应商只是ArrayList constructor,累加器将一个字符串元素添加到一个ArrayList,组合器只需使用addAll将字符串从一个容器复制到另一个容器中。
collect- supplier、accumulator和combiner的三个方面紧密耦合。 我们可以使用Collector的抽象来捕获所有三个方面。 将字符串收集到List中的上述示例可以使用标准Collector:
1 | List<String> strings = stream.map(Object::toString).collect(Collectors.toList()); |
将可变减少包装到收集器中有另一个优点:可组合性。 Collectors
类包含了一些收集器的预定义工厂,包括将一个收集器转换成另一个收集器的组合器。例如,假设我们有一个收集器来计算员工流量的总和,如下所示:
1 | Collector<Employee, ?, Integer> summingSalaries = Collectors.summingInt(Employee::getSalary); |
(该? 。对于第二类参数仅仅表明我们不关心这个收集器所使用的中间表示)如果我们想创造一个收藏家制表工资由部门的总和,我们可以重用summingSalaries使用groupingBy:
1 | Map<Department, Integer> salariesByDept = employees.stream().collect(Collectors.groupingBy(Employee::getDepartment, summingSalaries)); |
与正常的归约操作一样,collect()操作只能在符合条件的情况下进行并行化。对于任何部分累积的结果,将其与空结果容器组合必须产生等效结果。也就是说,部分累加结果p即任何系列累加器和组合器的调用的结果,p必须等同于combiner.apply(p, supplier.get())。
此外,然而,计算是分裂的,它必须产生等效的结果。对于任何输入元素t1和t2 ,下面的计算中的结果r1和r2必须是等效的:
1 | A a1 = supplier.get(); |
这里,等效通常意味着根据Object.equals(Object) 。 但是在某些情况下,可以放松等价以解决顺序上的差异。
对于一些复杂的减少操作,例如一个collect() ,其产生Map,如:
1 | Map<Buyer, List<Transaction>> salesByBuyer = txns.parallelStream().collect(Collectors.groupingBy(Transaction::getBuyer)); |
实际上并行执行操作可能会适得其反。这是因为组合步骤(将一个Map合并成另一个按键)对于一些Map实现可能是昂贵的。
然而,假设在此减少中使用的结果容器是可同时修改的集合,例如ConcurrentHashMap。在这种情况下,累加器的并行调用实际上可以将它们的结果同时存入相同的共享结果容器中,从而无需组合器来合并不同的结果容器。 这可能会提高并行执行性能。 我们称之为并发减少。
Collector支持并发还原标有Collector.Characteristics.CONCURRENT特性。然而,并发收集也有缺点。如果多个线程将结果并入到共享容器中,则结果存入的顺序是非确定性的。 因此,只有对于正在处理的流的顺序不重要,并发减少才是可能的。在以下情况该Stream.collect(Collector)实施将只执行并发减少:
- 并发流;
- Collector具有Collector.Characteristics.CONCURRENT特征,
- 流是无序的,或者收集器具有Collector.Characteristics.UNORDERED的特征。
可以使用BaseStream.unordered()方法确保流无序 。例如:
1 | Map<Buyer, List<Transaction>> salesByBuyer = txns.parallelStream().unordered().collect(groupingByConcurrent(Transaction::getBuyer)); |
其中Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)等同于groupingBy。
请注意,如果给定键的元素以它们在源中显示的顺序显示很重要,那么我们不能使用并发缩减,因为排序是并发插入的一种伤害。然后,我们将被限制为执行顺序减少或基于并行的并行还原。
低级流构造器
到目前为止,所有的流示例都使用了Collection.stream()或Arrays.stream的方法(Object[])来获取流。这些承载流的方法是如何实现的?
StreamSupport类有许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator。一个分离器是一个迭代器的并行模拟;它描述了一个(可能是无限的)元素集合,支持顺序推进、批量遍历,并将输入的一部分拆分到另一个可以并行处理的拆分器中。在最低级别,所有流都由分离器驱动。
在实现拆分器时有许多实现选择,几乎所有这些都是在实现的简单性和使用该拆分器的流的运行时性能之间进行权衡。创建拆分器最简单但性能最低的方法是使用Spliterators.spliteratorUnknownSize(java.util.iterator,int)从迭代器创建一个。虽然这样的拆分器可以工作,但它可能会提供较差的并行性能,因为我们已经丢失了大小信息(底层数据集有多大),并且受制于简单的拆分算法。
更高质量的拆分器将提供均衡且已知的大小拆分、准确的大小信息以及拆分器的许多其他特性或数据,这些特性或数据可由实现用于优化执行。
用于可变数据源的分裂器还有一个额外的挑战;绑定到数据的时间,因为数据可能在创建分离器和执行流管道之间发生变化。理想情况下,流的分离器将报告IMUTABLE或CONCURRENT的特性;如果没有,那应该是后期绑定。如果源无法直接提供推荐的分离器,则可以使用供应商间接提供分离器,并通过接受stream()版本的供应商构建流。只有在流管道的终端操作开始后,才能从供应商处获得分离器。
这些要求显著减少了流源的突变和流管道的执行之间的潜在干扰范围。基于具有所需特性的分离器的流,或使用基于供应商的工厂形式的流,在终端操作开始前不受数据源修改的影响(前提是流操作的行为参数满足不干扰和无状态的要求标准)。有关详细信息,请参见无干扰。
实现
枚举
Enum | 描述 |
---|---|
Collector.Characteristics | 指示Collector的属性的 Collector ,可用于优化collect实现。 |
枚举常量
- IDENTITY_FINISH:表示整理器功能是身份功能,可以被删除。
- UNORDERED:表示收集操作不承诺保留输入元素的遇到顺序。
- CONCURRENT:表示此收集器是并发的,这意味着结果容器可以支持与多个线程相同的结果容器同时调用的累加器函数。
所有方法
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
static Collector.Characteristics | valueOf(String name) | 以指定的名称返回此类型的枚举常量。 |
static Collector.Characteristics[] | values() | 按照它们声明的顺序返回一个包含此枚举类型常量的数组。 |
接口
接口 | 描述 |
---|---|
BaseStream<T,S extends BaseStream<T,S>> | 流的基本界面,它们是支持顺序和并行聚合操作的元素序列。 |
Collector<T,A,R> | A mutable reduction operation将输入元素累加到可变结果容器中,可选地在所有输入元素被处理之后将累积结果转换成最终表示。 |
DoubleStream | 支持顺序和并行聚合操作的原始双值元素序列。 |
DoubleStream.Builder | DoubleStream可变构建器。 |
IntStream | 支持顺序和并行聚合操作的原始int值元素序列。 |
IntStream.Builder | IntStream可变构建器。 |
LongStream | 支持顺序和并行聚合操作的原始长值元素序列。 |
LongStream.Builder | LongStream可变构建器。 |
Stream |
支持顺序和并行聚合操作的一系列元素。 |
Stream.Builder |
Stream可变构建器。 |
实现类
类 | 描述 |
---|---|
Collectors | Collector的实现 ,实现各种有用的collect操作,例如将元素累积到集合中,根据各种标准汇总元素等。 |
StreamSupport | 用于创建和操纵流的低级实用程序方法。 |
示例
接口
BaseStream
查看Stream的类图:
BaseStream是Stream的父类,提供了一些基础功能,包括:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
void | close() | 关闭此流,导致此流管道的所有关闭处理程序被调用。 |
boolean | isParallel() | 返回此流是否要执行终端操作,将并行执行。 |
Iterator |
iterator() | 返回此流的元素的迭代器。 |
S | onClose(Runnable closeHandler) | 返回带有附加关闭处理程序的等效流。 |
S | parallel() | 返回并行的等效流。 |
S | sequential() | 返回顺序的等效流。 |
Spliterator |
spliterator() | 返回此流的元素的拼接器。 |
S | unordered() | 返回等效的流,即 unordered 。 |
AutoCloseable是BaseStream的父类,它只有一个方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
void | close() | 关闭此资源,放弃任何潜在资源。 |
AutoCloseable对象的close()方法在退出已在资源规范头中声明对象的try-with-resources块时自动调用。这种结构确保迅速释放,避免资源耗尽异常和可能发生的错误。
实际上,基类实现自动关闭是可能的,实际上是可行的,尽管不是所有的子类或实例都将保存可释放的资源。对于必须以完全一般性操作的代码,或者当知道AutoCloseable实例需要资源释放时,建议使用try资源结构。然而,使用设施,例如当Stream同时支持I/O基和非I/O基的形式,try-with-resources块是一般不必要使用非I/O形式。
测试使用:
1 | private static void test1(List<User> userList) { |
控制台输出:
1 | 是否为并行流:false |
查看unordered()源码,这里实现为ReferencePipeline类:
1 |
|
Collector
A mutable reduction operation将输入元素累加到可变结果容器中,可选地在所有输入元素被处理之后将累加结果转换成最终表示。还原操作可以顺序还是并行执行。
A mutable reduction operation的示例包括:将元素累加到一个Collection; 使用连接字符串StringBuilder;计算诸如sum,min,max或average之类的元素的摘要信息;计算“枢纽表”摘要,如“卖方最大价值交易”等。Collectors类提供了许多常见的可变减少的实现。
- 一个Collector由四个函数来指定,这四个函数一起工作以将条目累加到可变结果容器中,并且可选地对结果进行最终转换。他们是:
- 创建新的结果容器(
supplier()
) - 将新的数据元素并入结果容器(
accumulator()
) - 将两个结果容器组合成一个(
combiner()
) - 在容器上执行可选的最终变换(
finisher()
)
- 创建新的结果容器(
- 收集者还具有一系列特征,如
Collector.Characteristics.CONCURRENT
,它提供了可以通过减少实现来提供更好性能的提示。
Collector有以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
BiConsumer<A,T> | accumulator() | 将值折叠成可变结果容器的函数。 |
Set<Collector.Characteristics> | characteristics() | 返回一个Set的Collector.Characteristics表示该Collector的特征 |
BinaryOperator | combiner() | 一个接受两个部分结果并将其合并的函数。 |
Function<A,R> | finisher() | 执行从中间累积类型 A到最终结果类型 R的最终 R 。 |
static <T,A,R> Collector<T,A,R> | of(Supplier supplier, BiConsumer<A,T> accumulator, BinaryOperator combiner, Function<A,R> finisher, Collector.Characteristics… characteristics) | 返回一个新Collector由给定的描述 supplier , accumulator , combiner和 finisher功能。 |
static <T,R> Collector<T,R,R> | of(Supplier |
返回一个新 Collector由给定的描述 supplier , accumulator和 combiner功能。 |
Supplier | supplier() | 一个创建并返回一个新的可变结果容器的函数。 |
Collector接口只有一个实现java.util.stream.Collectors.CollectorImpl,CollectorImpl是Collectors的静态内部类:
1 | static class CollectorImpl<T, A, R> implements Collector<T, A, R> { |
Collectors的大部分方法都是依赖CollectorImpl实现的,例如:
1 | private static void test2(List<User> userList) { |
控制台输出:
1 | zhangsanlisiwangwuzhangsan |
查看Collectors的joining()方法:
1 | public static Collector<CharSequence, ?, String> joining() { |
实际上就是创建了一个CollectorImpl示例,实现了
- supplier
- accumulator
- combiner
- finisher
等方法,并传入了characteristics。
Stream类提供另一个重载的collect方法:
1 | <R> R collect(Supplier<R> supplier, |
我们可以自己实现一个将姓名放在一个集合中的功能:
1 | private static void test2(List<User> userList) { |
控制台输出:
1 | [zhangsan, lisi, wangwu, zhangsan] |
Collector还提供两个of函数,根据不同入参返回新Collector,最终返回的实现也是CollectorImpl。
1 | public static<T, R> Collector<T, R, R> of(Supplier<R> supplier, |
DoubleStream
DoubleStream支持顺序和并行聚合操作的原始双值元素序列,是double原始专长Stream。
DoubleStream有以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
boolean | allMatch(DoublePredicate predicate) | 返回此流的所有元素是否与提供的谓词匹配。 |
boolean | anyMatch(DoublePredicate predicate) | 返回此流的任何元素是否与提供的谓词匹配。 |
OptionalDouble | average() | 返回 OptionalDouble此流的元素的算术平均值的OptionalDouble,如果此流为空,则返回一个空的可选项。 |
Stream |
boxed() | 返回一个 Stream组成的这个流的元素,装箱到 Double 。 |
static DoubleStream.Builder | builder() | 返回一个 DoubleStream的生成器。 |
collect(Supplier |
对此流的元素执行 mutable reduction操作。 | |
static DoubleStream | concat(DoubleStream a, DoubleStream b) | 创建一个懒惰连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。 |
long | count() | 返回此流中的元素数。 |
DoubleStream | distinct() | 返回由该流的不同元素组成的流。 |
static DoubleStream | empty() | 返回一个空的顺序 DoubleStream 。 |
DoubleStream | filter(DoublePredicate predicate) | 返回由与此给定谓词匹配的此流的元素组成的流。 |
OptionalDouble | findAny() | 返回描述流的一些元素的OptionalDouble如果流为空,则返回一个空的OptionalDouble 。 |
OptionalDouble | findFirst() | 返回描述此流的第一个元素的OptionalDouble如果流为空,则返回一个空的OptionalDouble 。 |
DoubleStream | flatMap(DoubleFunction<? extends DoubleStream> mapper) | 返回由通过将提供的映射函数应用于每个元素而产生的映射流的内容来替换该流的每个元素的结果的流。 |
void | forEach(DoubleConsumer action) | 对此流的每个元素执行操作。 |
void | forEachOrdered(DoubleConsumer action) | 对此流的每个元素执行一个操作,保证每个元素按遇到顺序处理,以便具有定义的遇到顺序的流。 |
static DoubleStream | generate(DoubleSupplier s) | 返回无限顺序无序流,其中每个元素由提供的 DoubleSupplier 。 |
static DoubleStream | iterate(double seed, DoubleUnaryOperator f) | 返回有序无限连续 DoubleStream由函数的迭代应用产生 f至初始元素 seed ,产生 Stream包括 seed , f(seed) , f(f(seed)) ,等 |
PrimitiveIterator.OfDouble | iterator() | 返回此流的元素的迭代器。 |
DoubleStream | limit(long maxSize) | 返回由此流的元素组成的流,截断长度不能超过 maxSize。 |
DoubleStream | map(DoubleUnaryOperator mapper) | 返回由给定函数应用于此流的元素的结果组成的流。 |
IntStream | mapToInt(DoubleToIntFunction mapper) | 返回一个 IntStream ,其中包含将给定函数应用于此流的元素的结果。 |
LongStream | mapToLong(DoubleToLongFunction mapper) | 返回一个 LongStream ,其中包含将给定函数应用于此流的元素的结果。 |
Stream | mapToObj(DoubleFunction<? extends U> mapper) | 返回一个对象值 Stream ,其中包含将给定函数应用于此流的元素的结果。 |
OptionalDouble | max() | 返回 OptionalDouble此流的最大元素的OptionalDouble,如果此流为空,则返回空的OptionalDouble。 |
OptionalDouble | min() | 返回 OptionalDouble此流的最小元素的OptionalDouble,如果此流为空,则返回空的OptionalDouble。 |
boolean | noneMatch(DoublePredicate predicate) | 返回此流的元素是否与提供的谓词匹配。 |
static DoubleStream | of(double… values) | 返回其元素是指定值的顺序排序流。 |
static DoubleStream | of(double t) | 返回包含单个元素的顺序 DoubleStream 。 |
DoubleStream | parallel() | 返回平行的等效流。 |
DoubleStream | peek(DoubleConsumer action) | 返回由该流的元素组成的流,另外在从生成的流中消耗元素时对每个元素执行提供的操作。 |
OptionalDouble | reduce(DoubleBinaryOperator op) | 使用 associative累积函数对此流的元素执行 reduction ,并返回描述减小值(如果有的话)的 OptionalDouble 。 |
double | reduce(double identity, DoubleBinaryOperator op) | 使用提供的身份值和 associative累积功能对此流的元素执行 reduction ,并返回减小的值。 |
DoubleStream | sequential() | 返回顺序的等效流。 |
DoubleStream | skip(long n) | 在丢弃流的第一个 n元素之后,返回由该流的 n元素组成的流。 |
DoubleStream | sorted() | 以排序顺序返回由该流的元素组成的流。 |
Spliterator.OfDouble | spliterator() | 返回此流的元素的拼接器。 |
double | sum() | 返回此流中元素的总和。 |
DoubleSummaryStatistics | summaryStatistics() | 返回一个 DoubleSummaryStatistics描述有关此流的元素的各种摘要数据。 |
double[] | toArray() | 返回一个包含此流的元素的数组。 |
我们可以生成一个DoubleStream:
1 | DoubleStream doubleStream = userList.stream().mapToDouble(User::getHeight); |
查看mapToDouble()方法:
1 |
|
实际返回的是一个StatelessOp类型,查看其类图:
StatelessOp继承了DoubleStream,DoubleStream又继承了AbstractPipeline,其实AbstractPipeline有很多常用的实现,比如:
- Stream对应的实现ReferencePipeline
- IntStream对应的实现IntPipeline
- LongStream对应的实现IntPipeline
也可以通过DoubleStream的静态方法of()来创建DoubleStream:
1 | DoubleStream doubleStream = DoubleStream.of(1, 2, 3, 4, 5); |
进入of()方法:
1 | public static DoubleStream of(double... values) { |
进入Arrays.stream()方法:
1 | public static DoubleStream stream(double[] array) { |
进入重载的stream()方法:
1 | public static DoubleStream stream(double[] array, int startInclusive, int endExclusive) { |
最终使用的是StreamSupport的doubleStream()创建DoubleStream,进入doubleStream()方法:
1 | public static DoubleStream doubleStream(Spliterator.OfDouble spliterator, |
新建了一个Head类,而Head又是DoublePipeline子类。
测试方法:
1 | private static void test3(List<User> userList) { |
控制台输出:
1 | 是否所有用户都小于50岁:true |
DoubleStream.Builder
DoubleStream.Builder是DoubleStream是构建器,DoubleStream.Builder具有生命周期,其从构建阶段开始,在该阶段期间可以添加元素,然后转换到内置阶段,之后可能不添加元素。构建阶段从调用build()方法开始,它创建一个有序流,其元素是添加到流构建器的元素,按照它们被添加的顺序。
DoubleStream.Builder提供以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
void | accept(double t) | 向要构建的流添加元素。 |
default DoubleStream.Builder | add(double t) | 向要构建的流添加元素。 |
DoubleStream | build() | 构建流,将此构建器转换为内置状态。 |
测试方法:
1 | private static void test4(List<User> userList) { |
控制台输出:
1 | 遍历通过builder()方法创建的DoubleStream元素: |
DoubleStream.Builderd的accept(double t)方法只在DoubleStreamBuilderImpl中有实现,DoubleStreamBuilderImpl在两个地方有用法:
- DoubleStream的builder()方法:
1 | /** |
- DoubleStream的of()方法:
1 | /** |
IntStream
IntStream支持顺序和并行聚合操作的原始int值元素序列,是int原始专长Stream。
IntStream有以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
boolean | allMatch(IntPredicate predicate) | 返回此流的所有元素是否与提供的谓词匹配。 |
boolean | anyMatch(IntPredicate predicate) | 返回此流的任何元素是否与提供的谓词匹配。 |
DoubleStream | asDoubleStream() | 返回一个DoubleStream由该流中的元素,转换为double 。 |
LongStream | asLongStream() | 返回一个LongStream由该流的元素组成,转换为long 。 |
OptionalDouble | average() | 返回OptionalDouble此流的元素的算术平均值的OptionalDouble,如果此流为空,则返回空的可选项。 |
Stream |
boxed() | 返回一个Stream组成的这个流的元素,每个盒装到一个Integer 。 |
static IntStream.Builder | builder() | 返回一个IntStream的生成器。 |
collect(Supplier |
对此流的元素执行mutable reduction操作。 | |
static IntStream | concat(IntStream a, IntStream b) | 创建一个懒惰连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。 |
long | count() | 返回此流中的元素数。 |
IntStream | distinct() | 返回由该流的不同元素组成的流。 |
static IntStream | empty() | 返回一个空的顺序IntStream 。 |
IntStream | filter(IntPredicate predicate) | 返回由与此给定谓词匹配的此流的元素组成的流。 |
OptionalInt | findAny() | 返回一个描述流的一些元素的OptionalInt如果流为空,则返回一个空的OptionalInt 。 |
OptionalInt | findFirst() | 返回描述此流的第一个元素的OptionalInt如果流为空,则返回一个空的OptionalInt 。 |
IntStream | flatMap(IntFunction<? extends IntStream> mapper) | 返回由通过将提供的映射函数应用于每个元素而产生的映射流的内容来替换该流的每个元素的结果的流。 |
void | forEach(IntConsumer action) | 对此流的每个元素执行操作。 |
void | forEachOrdered(IntConsumer action) | 对此流的每个元素执行一个操作,保证每个元素按遇到顺序处理,以便具有定义的遇到顺序的流。 |
static IntStream | generate(IntSupplier s) | 返回无限顺序无序流,其中每个元素由提供的IntSupplier。 |
static IntStream | iterate(int seed, IntUnaryOperator f) | 返回有序无限连续IntStream由函数的迭代应用产生f至初始元素seed ,产生 Stream包括 seed , f(seed) , f(f(seed)) ,等 |
PrimitiveIterator.OfInt | iterator() | 返回此流的元素的迭代器。 |
IntStream | limit(long maxSize) | 返回由此流的元素组成的流,截断长度不能超过 maxSize 。 |
IntStream | map(IntUnaryOperator mapper) | 返回由给定函数应用于此流的元素的结果组成的流。 |
DoubleStream | mapToDouble(IntToDoubleFunction mapper) | 返回一个 DoubleStream ,其中包含将给定函数应用于此流的元素的结果。 |
LongStream | mapToLong(IntToLongFunction mapper) | 返回一个 LongStream ,其中包含将给定函数应用于此流的元素的结果。 |
Stream | mapToObj(IntFunction<? extends U> mapper) | 返回一个对象值 Stream ,其中包含将给定函数应用于此流的元素的结果。 |
OptionalInt | max() | 返回 OptionalInt此流的最大元素的OptionalInt,如果此流为空,则返回一个空的可选项。 |
OptionalInt | min() | 返回 OptionalInt此流的最小元素的OptionalInt,如果此流为空,则返回一个空的可选项。 |
boolean | noneMatch(IntPredicate predicate) | 返回此流的元素是否与提供的谓词匹配。 |
static IntStream | of(int… values) | 返回其元素是指定值的顺序排序流。 |
static IntStream | of(int t) | 返回一个包含单个元素的顺序IntStream。 |
IntStream | parallel() | 返回平行的等效流。 |
IntStream | peek(IntConsumer action) | 返回由该流的元素组成的流,另外在从生成的流中消耗元素时对每个元素执行提供的操作。 |
static IntStream | range(int startInclusive, int endExclusive) | 返回有序的顺序IntStream从startInclusive(含)至 endExclusive通过增量步骤(独家) 1 。 |
static IntStream | rangeClosed(int startInclusive, int endInclusive) | 从 startInclusive (含)的顺序排列 IntStream到 endInclusive(含),增量步长为 1 。 |
OptionalInt | reduce(IntBinaryOperator op) | 使用associative累积函数对此流的元素执行reduction ,并返回描述减小值的 OptionalInt (如果有)。 |
int | reduce(int identity, IntBinaryOperator op) | 使用提供的身份值和associative累积功能对此流的元素执行reduction ,并返回减小的值。 |
IntStream | sequential() | 返回顺序的等效流。 |
IntStream | skip(long n) | 在丢弃流的第一个 n元素后,返回由该流的 n元素组成的流。 |
IntStream | sorted() | 以排序顺序返回由该流的元素组成的流。 |
Spliterator.OfInt | spliterator() | 返回此流的元素的拼接器。 |
int | sum() | 返回此流中元素的总和。 |
IntSummaryStatistics | summaryStatistics() | 返回一个 IntSummaryStatistics描述有关此流的元素的各种摘要数据。 |
int[] | toArray() | 返回一个包含此流的元素的数组。 |
测试方法:
1 | private static void test5(List<User> userList) { |
控制台输出:
1 | 是否所有用户都小于50岁:true |
IntStream.Builder
IntStream.Builder是IntStream是构建器,IntStream.Builder具有生命周期,其从构建阶段开始,在该阶段期间可以添加元素,然后转换到内置阶段,之后可能不添加元素。构建阶段从调用build()方法开始,它创建一个有序流,其元素是添加到流构建器的元素,按照它们被添加的顺序。
IntStream.Builder提供以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
void | accept(int t) | 向要构建的流添加元素。 |
default IntStream.Builder | add(int t) | 向要构建的流添加元素。 |
IntStream | build() | 构建流,将此构建器转换为内置状态。 |
测试方法:
1 | private static void test6(List<User> userList) { |
控制台输出:
1 | 遍历通过builder()方法创建的IntStream元素: |
IntStream.Builderd的accept(double t)方法只在IntStreamBuilderImpl中有实现,IntStreamBuilderImpl在两个地方有用法:
- IntStream的builder()方法:
1 | /** |
- IntStream的of()方法:
1 | /** |
LongStream
LongStream支持顺序和并行聚合操作的原始long值元素序列,是long原始专长Stream。
LongStream有以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
boolean | allMatch(LongPredicate predicate) | 返回此流的所有元素是否与提供的谓词匹配。 |
boolean | anyMatch(LongPredicate predicate) | 返回此流的任何元素是否与提供的谓词匹配。 |
DoubleStream | asDoubleStream() | 返回一个 DoubleStream由该流中的元素,转换为 double 。 |
OptionalDouble | average() | 返回 OptionalDouble此流的元素的算术平均值的OptionalDouble,如果此流为空,则返回一个空的可选项。 |
Stream |
boxed() | 返回一个 Stream由该流的盒装到所述的元件,每个的 Long 。 |
static LongStream.Builder | builder() | 返回一个 LongStream的生成器。 |
collect(Supplier |
对此流的元素执行mutable reduction操作。 | |
static LongStream | concat(LongStream a, LongStream b) | 创建一个懒惰连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。 |
long | count() | 返回此流中的元素数。 |
LongStream | distinct() | 返回由该流的不同元素组成的流。 |
static LongStream | empty() | 返回一个空的顺序 LongStream 。 |
LongStream | filter(LongPredicate predicate) | 返回由与此给定谓词匹配的此流的元素组成的流。 |
OptionalLong | findAny() | 返回描述流的一些元素的OptionalLong如果流为空,则返回一个空的OptionalLong 。 |
OptionalLong | findFirst() | 返回描述此流的第一个元素的OptionalLong如果流为空,则返回空的OptionalLong 。 |
LongStream | flatMap(LongFunction<? extends LongStream> mapper) | 返回由通过将提供的映射函数应用于每个元素而产生的映射流的内容来替换该流的每个元素的结果的流。 |
void | forEach(LongConsumer action) | 对此流的每个元素执行操作。 |
void | forEachOrdered(LongConsumer action) | 对此流的每个元素执行一个操作,保证每个元素按遇到顺序处理,以便具有定义的遇到顺序的流。 |
static LongStream | generate(LongSupplier s) | 返回无限顺序无序流,其中每个元素由提供的 LongSupplier 。 |
static LongStream | iterate(long seed, LongUnaryOperator f) | 返回有序无限连续 LongStream由函数的迭代应用产生 f至初始元素 seed ,产生 Stream包括 seed , f(seed) , f(f(seed)) ,等 |
PrimitiveIterator.OfLong | iterator() | 返回此流的元素的迭代器。 |
LongStream | limit(long maxSize) | 返回由此流的元素组成的流,截断长度不能超过 maxSize 。 |
LongStream | map(LongUnaryOperator mapper) | 返回由给定函数应用于此流的元素的结果组成的流。 |
DoubleStream | mapToDouble(LongToDoubleFunction mapper) | 返回一个 DoubleStream ,其中包含将给定函数应用于此流的元素的结果。 |
IntStream | mapToInt(LongToIntFunction mapper) | 返回一个 IntStream ,其中包含将给定函数应用于此流的元素的结果。 |
Stream | mapToObj(LongFunction<? extends U> mapper) | 返回一个对象值 Stream ,其中包含将给定函数应用于此流的元素的结果。 |
OptionalLong | max() | 返回 OptionalLong此流的最大元素的OptionalLong,如果此流为空,则返回一个空的可选项。 |
OptionalLong | min() | 返回 OptionalLong此流的最小元素的OptionalLong,如果此流为空,则返回一个空的可选项。 |
boolean | noneMatch(LongPredicate predicate) | 返回此流的元素是否与提供的谓词匹配。 |
static LongStream | of(long… values) | 返回其元素是指定值的顺序排序流。 |
static LongStream | of(long t) | 返回包含单个元素的顺序 LongStream 。 |
LongStream | parallel() | 返回平行的等效流。 |
LongStream | peek(LongConsumer action) | 返回由该流的元素组成的流,另外在从生成的流中消耗元素时对每个元素执行提供的操作。 |
static LongStream | range(long startInclusive, long endExclusive) 。 | 返回有序的顺序 LongStream从 startInclusive (含)至 endExclusive通过增量步骤(独家) 1 |
static LongStream | rangeClosed(long startInclusive, long endInclusive) | 返回有序顺序 LongStream从 startInclusive (含)至 endInclusive通过的递增步长(含) 1 。 |
OptionalLong | reduce(LongBinaryOperator op) | 使用 associative累积功能对此流的元素执行reduction ,并返回描述减小值的 OptionalLong (如果有)。 |
long | reduce(long identity, LongBinaryOperator op) | 使用提供的身份值和associative累积功能对此流的元素执行reduction ,并返回减小的值。 |
LongStream | sequential() | 返回顺序的等效流。 |
LongStream | skip(long n) | 在丢弃流的第一个 n元素后,返回由该流的 n元素组成的流。 |
LongStream | sorted() | 以排序顺序返回由该流的元素组成的流。 |
Spliterator.OfLong | spliterator() | 返回此流的元素的拼接器。 |
long | sum() | 返回此流中元素的总和。 |
LongSummaryStatistics | summaryStatistics() | 返回一个 LongSummaryStatistics描述有关此流的元素的各种摘要数据。 |
long[] | toArray() | 返回一个包含此流的元素的数组。 |
测试方法:
1 | private static void test7(List<User> userList) { |
控制台输出:
1 | 是否所有用户都小于50岁:true |
LongStream.Builder
LongStream.Builder是LongStream是构建器,LongStream.Builder具有生命周期,其从构建阶段开始,在该阶段期间可以添加元素,然后转换到内置阶段,之后可能不添加元素。构建阶段从调用build()方法开始,它创建一个有序流,其元素是添加到流构建器的元素,按照它们被添加的顺序。
LongStream.Builder提供以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
void | accept(long t) | 向要构建的流添加元素。 |
default LongStream.Builder | add(long t) | 向要构建的流添加元素。 |
LongStream | build() | 构建流,将此构建器转换为内置状态。 |
测试方法:
1 | private static void test8(List<User> userList) { |
控制台输出:
1 | 遍历通过builder()方法创建的LongStream元素: |
LongStream.Builderd的accept(double t)方法只在LongStreamBuilderImpl中有实现,LongStreamBuilderImpl在两个地方有用法:
- LongStream的builder()方法:
1 | /** |
- LongStream的of()方法:
1 | /** |
Stream
支持顺序和并行聚合操作的一系列元素。以下示例说明了使用Stream和IntStream的汇总操作:
1 | int sum = widgets.stream().filter(w -> w.getColor() == RED).mapToInt(w -> w.getWeight()).sum(); |
在这个例子中,widgets是Collection
除了Stream为对象引用的流,还存在IntStream、LongStream和DoubleStream。
为了执行计算,流operations被组合成pipelines。 pipelines由源(其可以是数组、集合、生成函数、I/O通道等)组成,零个或多个中间操作(其将流转换成另一个流,例如filter(Predicate))以及终端操作(产生结果或副作用,如count()或forEach(Consumer))。流具有懒惰性,源数据上的计算仅在终端操作启动时执行,源元素仅在需要时才被使用。
集合和流,具有一些表面上的相似之处,但具有不同的目标。集合主要关注其元素的有效管理和访问。相比之下,流不提供直接访问或操纵其元素的手段,而是关心描述其源和将在该源上进行聚合的计算操作。但是,如果提供的流操作不提供所需的功能,则可以使用BaseStream.iterator()和BaseStream.spliterator()操作来执行受控遍历。
pipelines,如上面的“小部件”示例,可以被视为流源上的查询 。除非源是明确设计用于并发修改(例如ConcurrentHashMap),否则在查询流源时可能会导致不可预测或错误的行为。
大多数流操作接受用户指定行为的参数,例如上面示例中传递给mapToInt的lambda表达式w->w.getWeight() 。为了保持正确的行为,这些行为参数 :
- 必须是non-interfering(他们不修改流源)。
- 在大多数情况下必须是stateless(它们的结果不应该取决于在pipelines的执行期间可能改变的任何状态)。
这些参数始终是functional interface的实例,例如Function,并且通常是lambda表达式或方法引用。除非另有说明,否则这些参数必须为非空值 。
一个流应该只操作一次(调用中间或终端流操作)。例如,这排除了“分叉”流,即同一个源提供两个或多个管道,或同一流的多次遍历。如果流实现检测到流正在被重用,它可能会抛出IllegalStateException。然而,由于一些流操作可能返回它们的接收器而不是新的流对象,因此不可能在所有情况下都检测到重用。
Streams有一个BaseStream.close()方法并实现了AutoCloseable,但几乎所有的流实例在使用后都不需要关闭。通常,只有源为IO通道的流(例如由Files.lines(Path,Charset)返回的流)才需要关闭。大多数流都由集合、数组或生成函数支持,这些函数不需要特殊的资源管理。(如果流确实需要关闭,则可以在try-with-resources语句中将其声明为资源。)
流管道可以按顺序执行,也可以并行执行。此执行模式是流的一个属性。流是通过顺序执行或并行执行的初始选择创建的。(例如,Collection.stream()创建一个顺序流,Collection.sparallelStream()创建并行流。)这种执行模式的选择可以通过BaseStream.sequential()或BaseStream.parallel()方法进行修改,也可以使用BaseStream.isParallel()判断是否为并行流。
Stream有以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
boolean | allMatch(Predicate<? super T> predicate) | 返回此流的所有元素是否与提供的谓词匹配。 |
boolean | anyMatch(Predicate<? super T> predicate) | 返回此流的任何元素是否与提供的谓词匹配。 |
static |
builder() | 返回一个Stream的构建器。 |
<R,A> R | collect(Collector<? super T,A,R> collector) | 使用Collector对此流的元素执行mutable reduction Collector 。 |
collect(Supplier |
对此流的元素执行mutable reduction操作。 | |
static |
concat(Stream<? extends T> a, Stream<? extends T> b) | 创建一个懒惰连接的流,其元素是第一个流的所有元素,后跟第二个流的所有元素。 |
long | count() | 返回此流中的元素数。 |
Stream |
distinct() | 返回由该流的不同元素(根据 Object.equals(Object) )组成的流。 |
static |
empty() | 返回一个空的顺序Stream 。 |
Stream |
filter(Predicate<? super T> predicate) | 返回由与此给定谓词匹配的此流的元素组成的流。 |
Optional |
findAny() | 返回描述流的一些元素的Optional如果流为空,则返回一个空的Optional 。 |
Optional |
findFirst() | 返回描述此流的第一个元素的Optional如果流为空,则返回一个空的Optional 。 |
flatMap(Function<? super T,? extends Stream<? extends R>> mapper) | 返回由通过将提供的映射函数应用于每个元素而产生的映射流的内容来替换该流的每个元素的结果的流。 | |
DoubleStream | flatMapToDouble(Function<? super T,? extends DoubleStream> mapper) | 返回一个DoubleStream ,其中包含将该流的每个元素替换为通过将提供的映射函数应用于每个元素而产生的映射流的内容的结果。 |
IntStream | flatMapToInt(Function<? super T,? extends IntStream> mapper) | 返回一个 IntStream ,其中包含将该流的每个元素替换为通过将提供的映射函数应用于每个元素而产生的映射流的内容的结果。 |
LongStream | flatMapToLong(Function<? super T,? extends LongStream> mapper) | 返回一个 LongStream ,其中包含将该流的每个元素替换为通过将提供的映射函数应用于每个元素而产生的映射流的内容的结果。 |
void | forEach(Consumer<? super T> action) | 对此流的每个元素执行操作。 |
void | forEachOrdered(Consumer<? super T> action) | 如果流具有定义的遇到顺序,则以流的遇到顺序对该流的每个元素执行操作。 |
static |
generate(Supplier |
返回无限顺序无序流,其中每个元素由提供的 Supplier 。 |
static |
iterate(T seed, UnaryOperator |
返回有序无限连续 Stream由函数的迭代应用产生 f至初始元素 seed ,产生 Stream包括seed,f(seed) ,f(f(seed)) ,等 |
Stream |
limit(long maxSize) | 返回由此流的元素组成的流,截短长度不能超过maxSize 。 |
map(Function<? super T,? extends R> mapper) | 返回由给定函数应用于此流的元素的结果组成的流。 | |
DoubleStream | mapToDouble(ToDoubleFunction<? super T> mapper) | 返回一个DoubleStream ,其中包含将给定函数应用于此流的元素的结果。 |
IntStream | mapToInt(ToIntFunction<? super T> mapper) | 返回一个IntStream ,其中包含将给定函数应用于此流的元素的结果。 |
LongStream | mapToLong(ToLongFunction<? super T> mapper) | 返回一个LongStream ,其中包含将给定函数应用于此流的元素的结果。 |
Optional |
max(Comparator<? super T> comparator) | 根据提供的 Comparator返回此流的最大元素。 |
Optional |
min(Comparator<? super T> comparator) | 根据提供的 Comparator返回此流的最小元素。 |
boolean | noneMatch(Predicate<? super T> predicate) | 返回此流的元素是否与提供的谓词匹配。 |
static |
of(T… values) | 返回其元素是指定值的顺序排序流。 |
static |
of(T t) | 返回包含单个元素的顺序Stream 。 |
Stream |
peek(Consumer<? super T> action) | 返回由该流的元素组成的流,另外在从生成的流中消耗元素时对每个元素执行提供的操作。 |
Optional |
reduce(BinaryOperator |
使用associative累积函数对此流的元素执行reduction,并返回描述减小值的Optional(如果有)。 |
T | reduce(T identity, BinaryOperator |
使用提供的身份值和associative累积功能对此流的元素执行 reduction,并返回减小的值。 |
U | reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator combiner) | 执行reduction在此流中的元素,使用所提供的身份,积累和组合功能。 |
Stream |
skip(long n) | 在丢弃流的第一个n元素后,返回由该流的n元素组成的流。 |
Stream |
sorted() | 返回由此流的元素组成的流,根据自然顺序排序。 |
Stream |
sorted(Comparator<? super T> comparator) | 返回由该流的元素组成的流,根据提供的Comparator进行排序。 |
Object[] | toArray() | 返回一个包含此流的元素的数组。 |
A[] | toArray(IntFunction<A[]> generator) | 使用提供的generator函数返回一个包含此流的元素的数组,以分配返回的数组,以及分区执行或调整大小可能需要的任何其他数组。 |
测试方法:
1 | private static void test9(List<User> userList) { |
控制台输出:
1 | 是否所有用户年龄都小于35岁且身高大于180cm: |
Stream.Builder
Stream.Builder是Stream是构建器,Stream.Builder具有生命周期,其从构建阶段开始,在该阶段期间可以添加元素,然后转换到内置阶段,之后可能不添加元素。构建阶段从调用build()方法开始,它创建一个有序流,其元素是添加到流构建器的元素,按照它们被添加的顺序。
Stream.Builder提供以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
void | accept(long t) | 向要构建的流添加元素。 |
default LongStream.Builder | add(long t) | 向要构建的流添加元素。 |
LongStream | build() | 构建流,将此构建器转换为内置状态。 |
测试方法:
1 | private static void test10(List<User> userList) { |
控制台输出:
1 | 遍历通过builder()方法创建的Stream元素: |
Stream.Builderd的accept(double t)方法只在StreamBuilderImpl中有实现,StreamBuilderImpl在两个地方有用法:
- Stream的builder()方法:
1 | /** |
- Stream的of()方法:
1 | /** |
类
Collectors
Collectors是一个工具类,其中实现了很多的Collect操作供我们使用。
Collectors提供以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
static |
averagingDouble(ToDoubleFunction<? super T> mapper) | 返回一个 Collector ,它产生应用于输入元素的双值函数的算术平均值。 |
static |
averagingInt(ToIntFunction<? super T> mapper) | 返回一个 Collector ,它产生应用于输入元素的整数值函数的算术平均值。 |
static |
averagingLong(ToLongFunction<? super T> mapper) | 返回一个 Collector ,它产生应用于输入元素的长值函数的算术平均值。 |
static <T,A,R,RR> Collector<T,A,RR> | collectingAndThen(Collector<T,A,R> downstream, Function<R,RR> finisher) | 适应 Collector进行额外的整理转换。 |
static |
counting() | 返回 Collector类型的接受元件 T计数输入元件的数量。 |
static <T,K> Collector<T,?,Map<K,List |
groupingBy(Function<? super T,? extends K> classifier) | 返回 Collector “由基团”上的类型的输入元件操作实现 T ,根据分类功能分组元素,并且在返回的结果 Map 。 |
static <T,K,A,D> Collector<T,?,Map<K,D>> | groupingBy(Function<? super T,? extends K> classifier, Collector<? super T,A,D> downstream) | 返回 Collector “由基团”上的类型的输入元件操作实现级联 T ,根据分类功能分组元素,然后使用下游的指定执行与给定键相关联的值的归约运算 Collector 。 |
static <T,K,D,A,M extends Map<K,D>> Collector<T,?,M> | groupingBy(Function<? super T,? extends K> classifier, Supplier |
返回 Collector “由基团”上的类型的输入元件操作实现级联 T ,根据分类功能分组元素,然后使用下游的指定执行与给定键相关联的值的归约运算 Collector 。 |
static <T,K> Collector<T,?,ConcurrentMap<K,List |
groupingByConcurrent(Function<? super T,? extends K> classifier) | 返回一个并发 Collector “由基团”上的类型的输入元件操作实现 T ,根据分类功能分组元素。 |
static <T,K,A,D> Collector<T,?,ConcurrentMap<K,D>> | groupingByConcurrent(Function<? super T,? extends K> classifier, Collector<? super T,A,D> downstream) | 返回一个并发 Collector “由基团”上的类型的输入元件操作实现级联 T ,根据分类功能分组元素,然后使用下游的指定执行与给定键相关联的值的归约运算 Collector 。 |
static <T,K,A,D,M extends ConcurrentMap<K,D>> Collector<T,?,M> | groupingByConcurrent(Function<? super T,? extends K> classifier, Supplier |
返回一个并发 Collector “由基团”上的类型的输入元件操作实现级联 T ,根据分类功能分组元素,然后使用下游的指定执行与给定键相关联的值的归约运算 Collector 。 |
static Collector<CharSequence,?,String> | joining() | 返回一个 Collector ,按照遇到的顺序将输入元素连接到一个 String中。 |
static Collector<CharSequence,?,String> | joining(CharSequence delimiter) | 返回一个 Collector ,按照遇到的顺序连接由指定的分隔符分隔的输入元素。 |
static Collector<CharSequence,?,String> | joining(CharSequence delimiter, CharSequence prefix, CharSequence suffix) | 返回一个 Collector ,它将按照指定的 Collector分隔的输入元素与指定的前缀和后缀进行连接。 |
static <T,U,A,R> Collector<T,?,R> | mapping(Function<? super T,? extends U> mapper, Collector<? super U,A,R> downstream) | 适应一个 Collector类型的接受元件 U至类型的一个接受元件 T通过积累前应用映射函数到每个输入元素。 |
static |
maxBy(Comparator<? super T> comparator) | 返回一个 Collector ,它根据给出的 Comparator产生最大元素,描述为 Optional |
static |
minBy(Comparator<? super T> comparator) | 返回一个 Collector ,根据给出的 Comparator产生最小元素,描述为 Optional |
static |
partitioningBy(Predicate<? super T> predicate) | 返回一个 Collector ,根据Predicate对输入元素进行 Predicate ,并将它们组织成 Map<Boolean, List |
static <T,D,A> Collector<T,?,Map<Boolean,D>> | partitioningBy(Predicate<? super T> predicate, Collector<? super T,A,D> downstream) | 返回一个 Collector ,它根据Predicate对输入元素进行 Predicate ,根据另一个 Collector减少每个分区的值,并将其组织成 Map<Boolean, D> ,其值是下游缩减的结果。 |
static |
reducing(BinaryOperator |
返回一个 Collector ,它在指定的 Collector下执行其输入元素的 BinaryOperator 。 |
static |
reducing(T identity, BinaryOperator |
返回 Collector执行下一个指定的减少其输入元件的 BinaryOperator使用所提供的身份。 |
static <T,U> Collector<T,?,U> | reducing(U identity, Function<? super T,? extends U> mapper, BinaryOperator op) | 返回一个 Collector ,它在指定的映射函数和 BinaryOperator下执行其输入元素的 BinaryOperator。 |
static |
summarizingDouble(ToDoubleFunction<? super T> mapper) | 返回一个 Collector , double生产映射函数应用于每个输入元素,并返回结果值的汇总统计信息。 |
static |
summarizingInt(ToIntFunction<? super T> mapper) | 返回一个 Collector , int生产映射函数应用于每个输入元素,并返回结果值的汇总统计信息。 |
static |
summarizingLong(ToLongFunction<? super T> mapper) | 返回一个 Collector , long生产映射函数应用于每个输入元素,并返回结果值的汇总统计信息。 |
static |
summingDouble(ToDoubleFunction<? super T> mapper) | 返回一个 Collector ,它产生应用于输入元素的双值函数的和。 |
static |
summingInt(ToIntFunction<? super T> mapper) | 返回一个 Collector ,它产生应用于输入元素的整数值函数的和。 |
static |
summingLong(ToLongFunction<? super T> mapper) | 返回一个 Collector ,它产生应用于输入元素的长值函数的和。 |
static <T,C extends Collection |
toCollection(Supplier |
返回一个 Collector ,按照遇到的顺序将输入元素累加到一个新的 Collection中。 |
static <T,K,U> Collector<T,?,ConcurrentMap<K,U>> | toConcurrentMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper) | 返回一个并发的 Collector ,它将元素累加到 ConcurrentMap ,其键和值是将所提供的映射函数应用于输入元素的结果。 |
static <T,K,U> Collector<T,?,ConcurrentMap<K,U>> | toConcurrentMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper, BinaryOperator mergeFunction) | 返回一个并发的 Collector ,它将元素累加到一个 ConcurrentMap ,其键和值是将提供的映射函数应用于输入元素的结果。 |
static <T,K,U,M extends ConcurrentMap<K,U>> Collector<T,?,M> | toConcurrentMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper, BinaryOperator mergeFunction, Supplier |
返回一个并发的 Collector ,它将元素累加到一个 ConcurrentMap ,其键和值是将所提供的映射函数应用于输入元素的结果。 |
static |
toList() | 返回一个 Collector ,它将输入元素 List到一个新的 List。 |
static <T,K,U> Collector<T,?,Map<K,U>> | toMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper) | 返回一个 Collector ,它将元素累加到一个 Map ,其键和值是将所提供的映射函数应用于输入元素的结果。 |
static <T,K,U> Collector<T,?,Map<K,U>> | toMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper, BinaryOperator mergeFunction) | 返回一个 Collector ,它将元素累加到 Map ,其键和值是将提供的映射函数应用于输入元素的结果。 |
static <T,K,U,M extends Map<K,U>> Collector<T,?,M> | toMap(Function<? super T,? extends K> keyMapper, Function<? super T,? extends U> valueMapper, BinaryOperator mergeFunction, Supplier |
返回一个 Collector ,它将元素累加到一个 Map ,其键和值是将所提供的映射函数应用于输入元素的结果。 |
static |
toSet() | 返回一个 Collector ,将输入元素 Set到一个新的 Set。 |
测试方法:
1 | private static void test11(List<User> userList) { |
控制台输出:
1 | 求用户平均身高: |
StreamSupport
StreamSupport用于创建和操作流的低级实用程序方法。
这个类主要是为库编写人员提供数据结构的流视图,大多数面向最终用户的静态流方法都在各种stream类中。
StreamSupport提供以下方法:
方法类型及返回值 | 方法名称 | 描述 |
---|---|---|
static DoubleStream | doubleStream(Spliterator.OfDouble spliterator, boolean parallel) | 创建一个新的串行或并行 DoubleStream从 Spliterator.OfDouble。 |
static DoubleStream | doubleStream(Supplier<? extends Spliterator.OfDouble> supplier, int characteristics, boolean parallel) | 创建一个新的顺序或并行 DoubleStream从 Supplier的 Spliterator.OfDouble。 |
static IntStream | intStream(Spliterator.OfInt spliterator, boolean parallel) | 创建一个新的串行或并行 IntStream从 Spliterator.OfInt。 |
static IntStream | intStream(Supplier<? extends Spliterator.OfInt> supplier, int characteristics, boolean parallel) | 创建一个新的顺序或并行 IntStream从 Supplier的 Spliterator.OfInt。 |
static LongStream | longStream(Spliterator.OfLong spliterator, boolean parallel) | 创建一个新的串行或并行 LongStream从 Spliterator.OfLong。 |
static LongStream | longStream(Supplier<? extends Spliterator.OfLong> supplier, int characteristics, boolean parallel) | 创建一个新的顺序或并行 LongStream从 Supplier的 Spliterator.OfLong。 |
static |
stream(Spliterator |
创建一个新的串行或并行 Stream从 Spliterator。 |
static |
stream(Supplier<? extends Spliterator |
创建一个新的顺序或并行 Stream从 Supplier的 Spliterator。 |
以构建Steam为例,打开Stream.of()方法:
1 | public static<T> Stream<T> of(T t) { |
调用的是StreamSupport的stream()方法构造Stream实现。
进入StreamSupport的stream()方法:
1 | public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { |
发现最终返回的是一个ReferencePipeline的静态内部类Head。