Contents

Apache Beam简介

1.概述

在本教程中,我们将介绍 Apache Beam 并探索其基本概念。

我们将首先展示使用 Apache Beam 的用例和好处,然后我们将介绍基本概念和术语。之后,我们将通过一个简单的示例来说明 Apache Beam 的所有重要方面。

2. 什么是 Apache Beam?

**Apache Beam (Batch + strEAM) 是用于批处理和流式数据处理作业的统一编程模型。**它提供了一个软件开发工具包来定义和构建数据处理管道以及执行它们的运行器。

**Apache Beam 旨在提供可移植的编程层。**事实上,Beam Pipeline Runners 将数据处理管道转换为与用户选择的后端兼容的 API。目前,支持这些分布式处理后端:

  • Apache Apex
  • Apache Flink
  • Apache Gearpump (incubating)
  • Apache Samza
  • Apache Spark
  • Google Cloud Dataflow
  • Hazelcast Jet

3. 为什么选择 Apache Beam?

**Apache Beam 融合了批处理和流式数据处理,而其他人通常通过单独的 API 来实现。**因此,很容易将流式流程更改为批处理流程,反之亦然,例如,随着需求的变化。

**Apache Beam 提高了可移植性和灵活性。**我们专注于我们的逻辑而不是潜在的细节。此外,我们可以随时更改数据处理后端。

有适用于 Apache Beam 的 Java、Python、Go 和 Scala SDK。事实上,团队中的每个人都可以使用他们选择的语言来使用它。

4. 基本概念

使用 Apache Beam,我们可以构建工作流图(管道)并执行它们。编程模型中的关键概念是:

  • PCollection – 表示可以是固定批次或数据流的数据集
  • PTransform – 一种数据处理操作,采用一个或多个PCollection并输出零个或多个PCollection
  • Pipeline – 表示PCollection 和PTransform的有向无环图,因此封装了整个数据处理作业
  • PipelineRunner –在指定的分布式处理后端执行Pipeline

简单来说,一个PipelineRunner执行一个Pipeline,一个PipelinePCollection 和 PTransform组成。

5. 字数示例

现在我们已经了解了 Apache Beam 的基本概念,让我们设计和测试一个字数统计任务。

5.1.构建梁管道

设计工作流图是每个 Apache Beam 作业的第一步。让我们定义一个字数统计任务的步骤:

  1. 从来源阅读文本。
  2. 将文本拆分为单词列表。
  3. 小写所有单词。
  4. 修剪标点符号。
  5. 过滤停用词。
  6. 计算每个唯一的单词。

为此,我们需要使用PCollectionPTransform抽象将上述步骤转换为单个*Pipeline *。

5.2. 依赖项

在我们实现工作流图之前,我们应该将Apache Beam 的核心依赖 添加到我们的项目中:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>${beam.version}</version>
</dependency>

Beam Pipeline Runners 依靠分布式处理后端来执行任务。让我们添加DirectRunner 作为运行时依赖项:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>${beam.version}</version>
    <scope>runtime</scope>
</dependency>

与其他 Pipeline Runners 不同,DirectRunner不需要任何额外的设置,这使其成为初学者的不错选择。

5.3. 执行

Apache Beam 利用 Map-Reduce 编程范式(与Java Streams 相同)。事实上,在我们继续之前,有一个reduce()filter()count()map()flatMap() 的基本概念是个好主意。

创建*Pipeline *是我们要做的第一件事:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

现在我们应用我们的六步字数统计任务:

PCollection<KV<String, Long>> wordCount = p
    .apply("(1) Read all lines", 
      TextIO.read().from(inputFilePath))
    .apply("(2) Flatmap to a list of words", 
      FlatMapElements.into(TypeDescriptors.strings())
      .via(line -> Arrays.asList(line.split("\\s"))))
    .apply("(3) Lowercase all", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> word.toLowerCase()))
    .apply("(4) Trim punctuations", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> trim(word)))
    .apply("(5) Filter stopwords", 
      Filter.by(word -> !isStopWord(word)))
    .apply("(6) Count words", 
      Count.perElement());

apply()的第一个(可选)参数是一个字符串,它只是为了提高代码的可读性。以下是上述代码中每个*apply()*的作用:

  1. 首先,我们使用TextIO 逐行读取输入文本文件。
  2. 用空格分割每一行,我们将它平面映射到一个单词列表。
  3. 字数不区分大小写,因此我们将所有单词小写。
  4. 早些时候,我们用空格分割行,最后得到像“word!”这样的词。和“单词?”,所以我们删除了标点符号。
  5. “is”和“by”等停用词几乎在每个英文文本中都很常见,因此我们将它们删除。
  6. 最后,我们使用内置函数*Count.perElement()*计算唯一词。

如前所述,管道在分布式后端进行处理。不可能在内存中迭代PCollection,因为它分布在多个后端。相反,我们将结果写入外部数据库或文件。

首先,我们将PCollection转换为String。然后,我们使用TextIO来编写输出:

wordCount.apply(MapElements.into(TypeDescriptors.strings())
    .via(count -> count.getKey() + " --> " + count.getValue()))
    .apply(TextIO.write().to(outputFilePath));

现在我们的Pipeline定义已经完成,我们可以运行和测试它。

5.4. 运行和测试

到目前为止,我们已经为字数统计任务定义了一个Pipeline。此时,让我们运行Pipeline

p.run().waitUntilFinish();

在这行代码中,Apache Beam 会将我们的任务发送到多个DirectRunner实例。因此,最后将生成几个输出文件。它们将包含以下内容:

...
apache --> 3
beam --> 5
rocks --> 2
...

在 Apache Beam 中定义和运行分布式作业就像这样简单而富有表现力。为了比较,Apache SparkApache FlinkHazelcast Jet 上也提供了字数统计实现。

6. 我们从这里走向何方?

我们成功地计算了输入文件中的每个单词,但我们还没有最常见单词的报告。当然,对PCollection进行排序是我们下一步要解决的好问题。

稍后,我们可以了解更多关于 Windowing、Triggers、Metrics 和更复杂的 Transforms 的知识。Apache Beam 文档 提供了深入的信息和参考资料。