LOGI

Aggregate Operations

[tip type="info" title="备注"]
为了更好理解本章概念,请复习 Lambda ExpressionsMethod References
[/tip]

你为何使用容器?不是简单的为了存储对象吧。大多数情况下,我们使用容器检索存储在其中的元素。

回想 Lambda Expressions 描述的场景。假定你正在编写社交网络应用。你想创建一个可以让管理员执行任何操作的方法,例如,发送消息给满足特定条件的成员。

和之前一样,假定成员使用如下 Person 类表示:

public class Person {
    public enum Sex {
        MALE, FEMALE
    }

    String name;
    LocalDate birthday;
    Sex gender;
    String emailAddress;
    
    // ...

    public int getAge() {
        // ...
    }

    public String getName() {
        // ...
    }
}

如下代码使用 for-each 循环打印了 roster 容器中所有成员的姓名:

for (Person p: roster) {
    System.out.println(p.getName());
}

如下代码使用聚集操作 forEach 完成相同动作:

roster
    .stream()
    .forEach(e -> System.out.println(e.getName));

尽管本例中,聚集操作版本比普通循环长,但之后你会发现,处理复杂任务的大量数据,聚集更简洁。

本篇内容简介:

涉及的完整代码见 BulkDataOperationsExamples

Pipelines and Streams

pipeline 是一个聚集操作序列。下例使用由聚集操作 filterforEach 组成的 pipeline 打印 roster 容器中男性成员的姓名:

roster
    .stream()
    .filter(e -> e.getGender() == Person.Sex.MALE)
    .forEach(e -> System.out.println(e.getName()));

管道包含以下组件:

下例使用聚集操作管道 filter - mapToInt - average 计算 roster 中所有男性成员的平均年龄:

double average = roster
        .stream()
        .filter(p -> p.getGender() == Person.Sex.MALE)
        .mapToInt(Person::getAge)
        .average()
        .getAsDouble();

mapToInt 操作返回类型为 IntStream (只包含整型值)的流。操作将其参数指定的函数应用在特定流的每个元素上。本例中,函数是 Person::getAge,它是一个返回成员年龄的方法引用。(你也可使用 Lambda 表达式 e -> e.getAge()。)最终,本例的 mapToInt 操作返回了包含集合 roster 中所有男性年龄的流。

average 操作计算了 IntStream 流中元素的平均值。它返回一个 OptionalDouble 类型对象。如果流中没有元素,它返回一个空的 OptionalDouble 实例,这将导致 getAsDouble 抛出 NoSuchElementException。JDK 中包含许多 average 一样的终止操作,它们返回一个结合了流所有内容的值。这些操作叫 reduction operations。阅读 Reduction 了解更多信息。

Differences Between Aggregate Operations and Iterators

聚集操作(像 forEach)看上去像迭代器,但它们有几点本质不同:

Reduction

上节描述了以下操作管道,它计算 roster 容器所有男性的平均年龄:

double average = roster
        .stream()
        .filter(p -> p.getGender() == Person.Sex.MALE)
        .mapToInt(Person::getAge)
        .average()
        .getAsDouble();

JDK 包含许多终止操作(如 averagesumminmaxcount),它们返回一个结合了流所有内容的值。这些操作叫 reduction operations。JDK 也包含返回容器而非单值的规约操作。这些归约操作执行特定任务,如计算平均值或将元素分类。但 JDK 还给你提供了通用规约 reducecollect,我们将在下面详细介绍。

本节内容简介:

你可以在 ReductionExamples 查看完整代码。

The Stream.reduce Method

Steam.reduce 定义了一个通用规约操作。考虑如下计算 roster 容器中男性成员年龄总和的管道,它使用 Stream.sum 规约:

int totalAge = roster
        .stream()
        .mapToInt(Person::getAge)
        .sum();

下面使用 Stream.reduce 计算相同值,试比较异同:

Integer totalAgeReduce = roster
        .stream()
        .map(Person::getAge)
        .reduce(0, Integer::sum);

本例中的 reduce 操作接收两个参数:

(a, b) -> a + b

reduce 操作总是返回新值。处理流的每个元素后,累加器函数也会返回新值。如果你想将流的元素规约为更复杂的对象,例如容器,这可能降低应用性能。如果你的 rudece 操作涉及容器元素相加,那么每处理一个元素,累加器都会创建一个包含该元素的新容器,这很低效,更好的做法是更新已有容器。你可以使用 Steam.collect 方法达成上述目的,我们马上介绍。

The Stream.collect Method

不像 reduce,每处理一个元素都创建新值,collect 方法修改(modifies, or mutates)已有值。

考虑如何计算流元素的平均值。你需要两个数据:元素个数以及所有元素的和。然而,像 reduce 和所有其它规约方法,collect 也只返回一个值。你可以创建一个新类型,它的成员变量记录数据个数和所有数据的和,比如下面的 Averager 类:

public class Averager implements IntConsumer {
    private int total;
    private int count;

    public double average() {
        return count > 0 ? ((double) total) / count : 0;
    }

    @Override
    public void accept(int i) {
        total += i;
        count++;
    }

    public void combine(Averager other) {
        total += other.total;
        count += other.count;
    }
}

如下管道使用 Averager 类和 collect 方法计算所有男性成员的平均年龄:

Averager averageCollect = roster
        .stream()
        .filter(p -> p.getGender() == Person.Sex.MALE)
        .map(Person::getAge)
        .collect(Averager::new, Averager::accept, Averager::combine);

System.out.println("Average age of male members: " + averageCollect.average());

本例中的 collect 操作接收 3 个参数:

注意:

尽管 JDK 提供了计算流元素平均值的 average 操作,你可以使用 collect 和自定义类计算部分流元素的平均值。

collect 操作非常适合容器规约。下例将男性成员的姓名放入容器中:

List<String> namesOfMaleMemberCollect = roster
        .stream()
        .filter(p -> p.getGender() == Person.Sex.MALE)
        .map(Person::getName)
        .collect(Collectors.toList());

这一版本的 collect 接收一个 Collector 类型参数。该类封装了第一版 collect 的三个函数参数(supplier, saccumulator, and functions)。

Collectors 包含许多规约操作,比如将元素逐个加入容器,以及计算满足多条件元素的和。这些规约操作返回 Collector 类型实例,所以你可以把它们当作 collect 的参数。

本例使用 Collectors.toList 操作将流元素逐个放进 List 实例。和大多数 Collectors 操作相同,toList 操作的返回值是 Collector,而非 collection

下例将 roster 容器成员以性别分组:

Map<Person.Sex, List<Person>> byGender = roster
        .stream()
        .collect(Collectors.groupingBy(Person::getGender));

groupingBy 操作返回一个 map,它的 key 由 Lambda 表达式参数决定(叫做 classification function)。本例中,返回值 map 包含两个 key,Person.Sex.MALEPerson.Sex.FEMALE。相应的 value 是 List 实例,它包含分类函数处理过的与 key 对应的流元素。例如,Person.Sex.MALE 对应的 value 是包含所有男性成员的 List 实例。

下例对 roster 容器以性别分组检索成员的姓名:

Map<Person.Sex, List<String>> namesByGender = roster
        .stream()
        .collect(Collectors.groupingBy(Person::getGender,
                Collectors.mapping(Person::getName, Collectors.toList())));

本例中的 groupingBy 接收两个参数,一个分类函数和一个 Collector 实例。Collector 参数又叫 downstream collector,Java 运行时将其应用于另一个 Collector 的结果。所以,groupingBy 允许你把 collect 方法应用到 groupingBy 操作员创建的 List 上。本例使用 mapping collector,它将映射方法 Person::getName 运用到每个流元素上。因此,结果流只包含成员名称。像本例这样包含一到多个 downstream collector 的管道,称 multilevel reduction

下例检索每种性别的总年龄:

Map<Person.Sex, Integer> totalAgeByGender = roster
        .stream()
        .collect(Collectors.groupingBy(Person::getGender,
                Collectors.reducing(0, Person::getAge, Integer::sum)));

reducing 操作接收三个参数:

下例检索每种性别的平均年龄:

Map<Person.Sex, Double> averageAgeByGender = roster
        .stream()
        .collect(Collectors.groupingBy(Person::getGender,
                Collectors.averagingInt(Person::getAge)));

Parallelism

并行计算涉及划分子问题,同时解决子问题(子问题并行地运行于独立线程中),最后组合子问题的解。Java SE 提供的 fork/join framework 使你能够在应用中更容易地实现并行计算。然而使用它,你必须指定子问题如何划分(partitioned)。有了聚集操作,Java 运行时为你完成解决方案的划分与合并。

在使用容器的应用中实现并行的一大难点是,容器不是线程安全的,这意味着无法在不引入 thread interferencememory consistency errors 的前提下多线程操纵容器。容器框架提供了 synchronization wrappers,它为任意容器添加了自动同步使其线程安全,但同步会导致 thread contention。你总是想避免线程竞争,因为它妨碍线程并行执行。聚集操作和并行流使你能在非线程安全容器上实现并发,只要你不在操作时修改容器。

注意并行不会自动比串行操作快,除非你有足够数据和多核心处理器。尽管聚集操作能让你更容易地实现并行,你也必须确定你的程序是否适合。

本节包容如下内容:

你可以在 ParallelismExamples 查看完整代码。

Executing Streams in Parallel

你既可以串行也可以并行执行流。当并行执行时,Java 运行时会将流划分为多个子流。聚集操作并行迭代和处理子流,最后组合它们的结果。

除非指定,你创建的都是串行流。要使用并行流,调用 Collection.parallelStream。作为替代,调用 BaseStream.parallel。例如,以下声明并行计算男性成员的平均年龄:

OptionalDouble average = roster
        .parallelStream()
        .filter(p -> p.getGender() == Person.Sex.MALE)
        .mapToInt(Person::getAge)
        .average();

Concurrent Reduction

重新考虑下例(它在 Reduction 小节介绍过),它将成员以性别分组。该例调用 collect 操作,将 roster 规约为 Map

Map<Person.Sex, List<Person>> byGender = roster
    .stream()
    .collect(Collectors.groupingBy(Person::getGender));

下例是等价的并行版本:

ConcurrentMap<Person.Sex, List<Person>> byGenderParallel = roster
        .parallelStream()
        .collect(Collectors.groupingByConcurrent(Person::getGender));

它叫做 concurrent reduction。对于包含 collect 的特定管道,当以下所有条件成立时,Java 运行时执行并发规约:

[tip type="info" title="备注"]
本例返回了 ConcurrentMap 而非 Map,并且调用了 groupingByConcurrent 而非 groupingby。(阅读 Concurrent Collections 章节了解有关 ConcurrentMap 的更多信息。)不像 groupingByConcurrent 操作,groupingBy 在并行流中效率很低。(因为它根据 key 合并两个 map,这是个计算密集型任务。)类似地,Collectors.toConcurrentMap 在并行流中的表现好于 Collectors.toMap
[/tip]

Ordering

管道处理流元素的顺序取决于流是串行还是并行执行,流的源,以及中间操作。例如,考虑如下示例,它使用 forEach 操作多次打印 ArrayList 实例的元素:

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8};
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
System.out.println("listOfIntegers");
listOfIntegers.forEach(e -> System.out.print(e + " "));
System.out.println();

System.out.println("listOfIntegers sorted in reverse order:");
Comparator<Integer> normal = Integer::compare;
Comparator<Integer> reversed = normal.reversed();
listOfIntegers.sort(reversed);
listOfIntegers.forEach(e -> System.out.print(e + " "));
System.out.println();

System.out.println("Parallel stream:");
listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " "));
System.out.println();

System.out.println("Another parallel stream:");
listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " "));
System.out.println();

System.out.println("With forEachOrdered:");
listOfIntegers.parallelStream().forEachOrdered(e -> System.out.print(e + " "));
System.out.println();

示例包含 5 个管道,它的输入类似下面这样:

listOfIntegers
1 2 3 4 5 6 7 8 
listOfIntegers sorted in reverse order:
8 7 6 5 4 3 2 1 
Parallel stream:
3 4 6 7 1 8 2 5 
Another parallel stream:
3 8 7 5 6 2 4 1 
With forEachOrdered:
8 7 6 5 4 3 2 1 

其流程如下:

Side Effects

除了返回和产生值,如果它还修改计算状态,称方法和表达式具有副作用。已有的例子包括可变规约(使用 collect 的操作,见 Reduction 小节)和 System.out.println Lambda 表达式调用。JDK 可以很好地处理管道中的副作用。特别地,collect 被设计来以并行安全方式执行最常见的含副作用流操作。类似 forEachpeek 就是为了执行副作用;无返回值 Lambda 表达式,如调用 System.out.println,除了产生副作用不做任何事情。即便如此,你也应该谨慎使用 forEachpeek,如果你在并行流中使用它们,Java 运行时可能会在多线程中并发调用你为其指定的 Lambda 表达式参数。此外,永远不要传递有副作用的 Lambda 表达式给类似 filtermap 的操作。下面讨论 InterferenceStateful Lambda Expressions,它们都是副作用的源头,并且可能造成不一致和非预期结果,尤其是在并行流中。但是,我们首先介绍 Laziness,因为它对干扰有直接影响。

Laziness

所有中间操作都是 lazy 的。如果仅当需要时才被执行,称表达式,方法或算法是惰性的。(如果被立即执行或处理,称该算法是 eager 的。)中间操作之所以是惰性的,因为只有当终止操作开始,它们才开始处理流内容。惰性处理使得编译器和运行时能对流进行优化。例如,Aggregate Operations 小节介绍的 filter-mapToInt-average 管道,average 操作可以从 mapToInt 操作中获取开始的几个整数,而后者又是从 filter 中获取的。average 重复处理直到它获得了所有需要的元素,最后再计算平均值。

Interference

流操作中的 Lambda 表达式不应 interfere。在管道处理流的同时,如果源被修改,则竞争发生。例如,以下代码试图连接 List listOfStrings 内的字符串,但它抛出了 ConcurrentModificationException 异常:

try {
    List<String> listOfStrings = new ArrayList<>(Arrays.asList("one", "two"));

    // This will fail as the peek operation will attempt to add the
    // string "three" to the source after the terminal operation has
    // commenced.
    String concatenatedString = listOfStrings
            .stream()
            // Don't do this! Interference occurs here.
            .peek(s -> listOfStrings.add("three"))
            .reduce((a, b) -> a + " " + b)
            .get();
    System.out.println("Concatenated string: " + concatenatedString);
} catch (Exception e) {
    System.out.println("Exception caught: " + e.toString());
}

本例通过 reduce 操作连接 listOfStrings 内的字符串成为 Optional<String>,它是一个终止操作。但管道调用了 peek 中间操作,它试图向 listOfStrings 添加新元素。记住,所有中间操作都是惰性的,这意味着 get 被调用时管道开始执行,get 完成时管道结束。peek 操作的参数试图在管道执行过程中修改它的源,这导致 Java 运行时抛出 ConcurrentModificationException

Stateful Lambda Expressions

避免使用 stateful lambda expressions 作为流操作参数。有状态 Lambda 表达式的结果依赖于某个状态,它在管道执行过程中可能改变。下例使用 peek 中间操作将 List listOfIntegers 元素添加到新列表实例。代码先使用串行流,再使用并行流:

System.out.println("Serial stream:");
List<Integer> serialStorage = new ArrayList<>();
listOfIntegers
        .stream()
        // Don't do this! It uses a stateful lambda expression.
        .peek(serialStorage::add)
        .forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
serialStorage.forEach(e -> System.out.print(e + " "));
System.out.println();

System.out.println("Parallel stream:");
List<Integer> parallelStorage = Collections.synchronizedList(new ArrayList<>());
listOfIntegers
        .parallelStream()
        .peek(parallelStorage::add)
        .forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
parallelStorage.forEach(e -> System.out.print(e + " "));
System.out.println();

parallelStorage::add 是一个有状态 Lambda 表达式(方法引用),每次运行,它的结果都会变化。本例输出如下:

Serial stream:
8 7 6 5 4 3 2 1 
8 7 6 5 4 3 2 1 
Parallel stream:
8 7 6 5 4 3 2 1 
3 4 2 1 5 8 6 7 

forEachOrdered 以流指定顺序处理元素,无论它是串行还是并行。但是,如果流被并行执行,peek 获得的元素由 Java 运行时和编译器决定。因此,它添加元素的顺序每次都会改变。为了得到确定和可预测结果,确保流操作中作为参数的 Lambda 表达式不含状态。

[tip type="info" title="备注"]
本例调用了 synchronizedList,所以 List parallelStorage 是线程安全的。记住,普通容器不是线程安全的,这意味着多线程不应同时访问某一容器。假定你没有使用 synchronizedList

List<Integer> parallelStorage = new ArrayList<>();

它将产生不定行为,因为多线程访问和修改 parallelStorage 时,没有类似同步的机制调度哪个线程可以访问 List 实例。最终,它的输出类型下面这样:

Parallel stream:
8 7 6 5 4 3 2 1
null 3 5 4 7 8 1 2

[/tip]

Questions and Exercises

Questions

1. 聚集操作序列被称为 _

管道(Pipeline)。

2. 每个管道包含零到多个 _ 操作。

中间(Intermediate)。

3. 管道以 _ 操作结束。

终止(Terminal)。

4. 哪种操作产生新流?

中间(Intermediate)。

5. 列举一种 forEach 聚集操作与增强型 for 或迭代器的不同。

forEach 聚集操作让系统决定如何迭代,使用聚集操作让你专注于做什么,而非怎么做。

6. (是非题)流类似于容器,也是存储元素的数据结构。

错。流不存储数据,它通过管道从源搬运数据。

7. 标出以下代码的中间和终止操作:

double average = roster
        .stream()
        .filter(p -> p.getGender() == Person.Sex.MALE)
        .mapToInt(Person::getAge)
        .average()
        .getAsDouble();

中间操作:filter, mapToInt;终止操作:average。终止操作 average 返回 OptionalDoublegetAsDouble 从它上面获取 double。查看 API Specification 是识别中间还是终止操作的最好办法。

8. p -> p.getGender() == Person.Sex.MALE 是什么的示例?

Lambda 表达式。

9. Person::getAge 是什么示例?

方法引用。

10. 合并流内容并返回一个值的终止操作叫做什么?

规约操作。

11. 说出 Stream.reduceStream.collect 的一点不同。

处理元素时,前者总是创建新值,后者修改(modifies or mutates)已有值。

12. 如果你想处理姓名流,提取男性姓名,将它们放到新 ListStream.reduceStream.collect 是最合适的操作吗?

collect 最合适于将元素收集到 List

例如:

List<String> namesOfMaleMembersCollect = roster
    .stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .map(Person::getName)
    .collect(Collectors.toList())

13. (是非题)聚集操作可以在非线程安全容器上完成并行操作。

对,只要你不在操作时修改(modify or mutate)底层容器。

14. 除非指定,流总是串行的。你怎样指定一个并行流?

调用 parallelStream() 而非 stream() 可以获取并行流。

Exercises

1. 使用 Lambda 表达式将以下增强型 for 声明改写成管道。提示:使用 filter 中间操作和 forEach 终止操作。

for (Person p: roster) {
    if (p.getGender == Person.Sex.MALE) {
        System.out.println(p.getName());
    }
}

Answer:

roster
    .stream()
    .filter(p -> p.getGender == Person.Sex.MALE)
    .forEach(p -> System.out.println(p.getName()));

2. 将以下嵌套 for 循环改写成使用 Lambda 表达式的聚集操作。提示:使用 filter - sorted - collect 管道。

List<Album> favs = new ArrayList<>();
for (Album a: albums) {
    boolean hasFavorite = false;
    for (Trace t: a.tracks) {
        if (t.rating >= 4) {
            hasFavorite = true;
            break;
        }
    }
    if (hasFavorite) {
        favs.add(a);
    }
}
Collections.sort(favs, new Comparator<Album>() {
                            public int compare(Album a, Album b) {
                                return a.name.compareTo(b.name);
                            }
                        });

Answer:

List<Album> sortedFavs = albums
                            .stream()
                            .filter(a -> a.tracks.anyMatch(t -> t.rating >= 4))
                            .sorted(Comparator.comparing(a -> a.name))
                            .collect(Collectors.toList());

我们使用流操作简化三个主要步骤 —— 识别任何包含高于 4 分曲目的专辑(anyMatch),排序,最后将符合条件的专辑放入 ListComparator.comparing() 方法接收一个用于提取 Comparable 排序关键字的函数,返回一个比较该关键字的 Comparator

本文译自 Aggregate Operations,译者 LOGI。

当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »