Contents

Akka流简介

1.概述

在本文中,我们将研究建立在 Akka actor 框架之上的akka-streams 库,它遵循反应式流宣言Akka Streams API 允许我们从独立的步骤轻松组合数据转换流。

此外,所有处理都是以反应式、非阻塞和异步的方式完成的。

2. Maven依赖

首先,我们需要将*akka-stream * 和akka-stream-testkit 库添加到我们的pom.xml 中:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.11</artifactId>
    <version>2.5.2</version>
</dependency>
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-testkit_2.11</artifactId>
    <version>2.5.2</version>
</dependency>

3. Akka Streams API

要使用 Akka Streams,我们需要了解核心 API 概念:

  • Source —— akka-stream库中处理的入口点——我们可以从多个源创建这个类的实例;例如,如果我们想从单个StringSourcesingle()Iterable元素创建Source
  • Flow – 主要处理构建块– 每个Flow实例都有一个输入和一个输出值
  • Materializer——如果我们希望我们的Flow有一些副作用,比如记录或保存结果**,我们可以使用它;最常见的是,我们将NotUsed别名作为Materializer传递,以表示我们的Flow不应该有任何副作用
  • Sink 操作——当我们构建一个Flow 时,它不会执行,直到我们在其上注册一个Sink 操作**——它是一个终端操作,会触发整个Flow

4. 在 Akka Streams 中创建*Flow *

让我们从构建一个简单的示例开始,我们将在其中展示如何**创建和组合多个Flow **——以处理整数流并从流中计算整数对的平均移动窗口。

我们将解析一个以分号分隔的整数String作为输入,以创建示例的akka-stream 源。

4.1.使用*Flow *解析输入

首先,让我们创建一个DataImporter类,它将采用 ActorSystem 的一个实例,我们稍后将使用它来创建我们的Flow

public class DataImporter {
    private ActorSystem actorSystem;
    // standard constructors, getters...
}

接下来,让我们创建一个parseLine方法,该方法将从我们的分隔输入String生成一个Integer列表。请记住,我们在这里使用 Java Stream API 仅用于解析:

private List<Integer> parseLine(String line) {
    String[] fields = line.split(";");
    return Arrays.stream(fields)
      .map(Integer::parseInt)
      .collect(Collectors.toList());
}

我们的初始Flow会将parseLine应用于我们的输入,以创建一个输入类型为String且输出类型为IntegerFlow

private Flow<String, Integer, NotUsed> parseContent() {
    return Flow.of(String.class)
      .mapConcat(this::parseLine);
}

当我们调用parseLine()方法时,编译器知道该 lambda 函数的参数将是一个String——与我们的Flow的输入类型相同。

请注意,我们使用的是mapConcat()方法——等效于 Java 8 flatMap()方法——因为我们希望将parseLine()返回的Integer 列表转化为Integer 流,以便我们处理中的后续步骤不需要处理List

4.2. 使用Flow执行计算

至此,我们有了解析整数的Flow。现在,我们需要实现将所有输入元素组合成对并计算这些对的平均值的逻辑。 现在,我们将*创建一个Integer 流并使用*grouped()方法对它们进行分组。

接下来,我们要计算平均值。

由于我们对处理这些平均值的顺序不感兴趣,因此我们可以使用*mapAsyncUnordered()*方法使用多个线程并行计算平均值,并将线程数作为参数传递给该方法。

将作为 lambda 传递给Flow的操作需要返回CompletableFuture,因为该操作将在单独的线程中异步计算:

private Flow<Integer, Double, NotUsed> computeAverage() {
    return Flow.of(Integer.class)
      .grouped(2)
      .mapAsyncUnordered(8, integers ->
        CompletableFuture.supplyAsync(() -> integers.stream()
          .mapToDouble(v -> v)
          .average()
          .orElse(-1.0)));
}

我们正在计算八个并行线程的平均值。请注意,我们使用 Java 8 Stream API 来计算平均值。

4.3. 将多个Flow组合成一个Flow

Flow API 是一个流畅的抽象,它允许我们组合多个Flow实例来实现我们的最终处理目标。我们可以有细粒度的流程,例如,一个正在解析JSON,另一个正在做一些转换,另一个正在收集一些统计信息。

这样的粒度将帮助我们创建更多可测试的代码,因为我们可以独立地测试每个处理步骤。

我们在上面创建了两个可以相互独立工作的流程。现在,我们想将它们组合在一起。

首先,我们要解析我们的输入String,接下来,我们要计算元素流的平均值。

我们可以使用*via()*方法来组合我们的流程:

Flow<String, Double, NotUsed> calculateAverage() {
    return Flow.of(String.class)
      .via(parseContent())
      .via(computeAverage());
}

我们创建了一个输入类型为StringFlow和两个其他流。parseContent()流接受一个String输入并返回一个Integer作为输出。computeAverage() 流采用该Integer并计算返回Double作为输出类型的平均值。

5. 向流中添加Sink

正如我们所提到的,到目前为止,整个Flow还没有执行,因为它是惰性的。要开始执行Flow,我们需要定义一个Sink。例如,Sink操作可以将数据保存到数据库中,或将结果发送到某些外部 Web 服务。

假设我们有一个带有以下save()方法的AverageRepository类,它将结果写入我们的数据库:

CompletionStage<Double> save(Double average) {
    return CompletableFuture.supplyAsync(() -> {
        // write to database
        return average;
    });
}

现在,我们要创建一个使用此方法保存Flow处理结果的Sink操作。要创建我们的Sink,我们首先需要创建一个Flow,它将我们的处理结果作为输入类型。接下来,我们要将所有结果保存到数据库中。

同样,我们不关心元素的顺序,因此我们可以使用*mapAsyncUnordered()方法**并行执行save()操作。*

要从Flow中创建Sink,我们需要使用Sink.ignore()作为第一个参数和Keep.right()作为第二个参数调用toMat() ,因为我们想要返回处理的状态:

private Sink<Double, CompletionStage<Done>> storeAverages() {
    return Flow.of(Double.class)
      .mapAsyncUnordered(4, averageRepository::save)
      .toMat(Sink.ignore(), Keep.right());
}

6. 定义Flow来源

我们需要做的最后一件事是**从输入String创建一个Source。我们可以使用*via()方法将calculateAverage()*流应用到这个源。

然后,要将Sink添加到处理中,我们需要调用runWith()方法并传递我们刚刚创建的storeAverages() Sink

CompletionStage<Done> calculateAverageForContent(String content) {
    return Source.single(content)
      .via(calculateAverage())
      .runWith(storeAverages(), ActorMaterializer.create(actorSystem))
      .whenComplete((d, e) -> {
          if (d != null) {
              System.out.println("Import finished ");
          } else {
              e.printStackTrace();
          }
      });
}

请注意,当处理完成时,我们正在添加*whenComplete()*回调,我们可以在其中根据处理的结果执行一些操作。

7. 测试Akka

我们可以使用akka-stream-testkit 测试我们的处理。

测试处理的实际逻辑的最佳方法是测试所有Flow逻辑并使用TestSink触发计算并对结果进行验证。

在我们的测试中,我们正在创建我们想要测试的Flow ,接下来,我们将从测试输入内容创建一个Source

@Test
public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults() {
    // given
    Flow<String, Double, NotUsed> tested = new DataImporter(actorSystem).calculateAverage();
    String input = "1;9;11;0";
    // when
    Source<Double, NotUsed> flow = Source.single(input).via(tested);
    // then
    flow
      .runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
      .request(4)
      .expectNextUnordered(5d, 5.5);
}

我们正在检查我们是否期望四个输入参数,并且两个平均值的结果可以以任何顺序到达,因为我们的处理是以异步和并行方式完成的。