Contents

使用Apache Kafka进行数据建模

1.概述

在本教程中,我们将使用 Apache Kafka 进入事件驱动架构的数据建模领域。

2. 设置

Kafka 集群由多个注册到 Zookeeper 集群的 Kafka 代理组成。为了简单起见,我们将使用由 Confluent 发布的 **现成 Docker 镜像和Docker Compose **配置。

首先,让我们为 3 节点 Kafka 集群下载docker-compose.yml  :

$ BASE_URL="https://raw.githubusercontent.com/confluentinc/cp-docker-images/5.3.3-post/examples/kafka-cluster"
$ curl -Os "$BASE_URL"/docker-compose.yml

接下来,让我们启动 Zookeeper 和 Kafka 代理节点:

$ docker-compose up -d

最后,我们可以验证所有 Kafka 代理都已启动:

$ docker-compose logs kafka-1 kafka-2 kafka-3 | grep started
kafka-1_1      | [2020-12-27 10:15:03,783] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
kafka-2_1      | [2020-12-27 10:15:04,134] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
kafka-3_1      | [2020-12-27 10:15:03,853] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)

3. 基础

在我们开始为事件驱动系统进行数据建模之前,我们需要了解一些概念,例如事件、事件流、生产者-消费者和主题。

3.1. 事件

Kafka 世界中的事件是域世界中发生的事情的信息日志。它通过将信息记录为键值对消息以及一些其他属性(例如时间戳、元信息和标头)来实现这一点。

假设我们正在模拟一个国际象棋游戏;那么一个事件可能是一个动作:

/uploads/apache_kafka_data_modeling/1.png

我们可以注意到,事件保存了参与者、动作和发生时间的关键信息。在这种情况下,Player1是演员,动作是在12/2020/25 00:08:30将车从单元格a1移动到a5

3.2. 消息流

Apache Kafka 是一种流处理系统,可将事件捕获为消息流。在我们的国际象棋游戏中,我们可以将事件流视为玩家下棋的日志。

在每个事件发生时,板的快照将代表其状态。使用传统的表模式存储对象的最新静态状态通常很常见。

另一方面,事件流可以帮助我们以事件的形式捕捉两个连续状态之间的动态变化。如果我们播放一系列这些不可变的事件,我们可以从一种状态转换到另一种状态。这就是事件流和传统表之间的关系,通常称为流表对偶性

让我们可视化棋盘上只有两个连续事件的事件流:

/uploads/apache_kafka_data_modeling/3.png

4. 话题

在本节中,我们将学习如何对通过 Apache Kafka 路由的消息进行分类。

4.1.分类

在 Apache Kafka 这样的消息传递系统中,任何产生事件的东西通常都称为生产者。而阅读和消费这些消息的人称为消费者。

在现实世界的场景中,每个生产者都可以生成不同类型的事件,所以如果我们期望消费者过滤与他们相关的消息而忽略其余的消息,那将是浪费大量精力。

为了解决这个基本问题,Apache Kafka 使用的主题本质上是属于一起的消息组。因此,消费者在使用事件消息时可以更有效率。

在我们的棋盘示例中,可以使用一个主题将所有移动分组到chess-moves主题下:

$ docker run \
  --net=host --rm confluentinc/cp-kafka:5.0.0 \
  kafka-topics --create --topic chess-moves \
  --if-not-exists \
  --partitions 1 --replication-factor 1 \
  --zookeeper localhost:32181
Created topic "chess-moves".

4.2. 生产者-消费者

现在,让我们看看生产者和消费者如何使用 Kafka 的主题进行消息处理。我们将使用Kafka 发行版附带的kafka-console-producerkafka-console-consumer实用程序来演示这一点。

让我们启动一个名为kafka-producer的容器 ,我们将在其中调用 producer 实用程序:

$ docker run \
--net=host \
--name=kafka-producer \
-it --rm \
confluentinc/cp-kafka:5.0.0 /bin/bash
# kafka-console-producer --broker-list localhost:19092,localhost:29092,localhost:39092 \
--topic chess-moves \
--property parse.key=true --property key.separator=:

同时,我们可以启动一个名为kafka-consumer的容器,我们将在其中调用消费者实用程序:

$ docker run \
--net=host \
--name=kafka-consumer \
-it --rm \
confluentinc/cp-kafka:5.0.0 /bin/bash
# kafka-console-consumer --bootstrap-server localhost:19092,localhost:29092,localhost:39092 \
--topic chess-moves --from-beginning \
--property print.key=true --property print.value=true --property key.separator=:

现在,让我们通过制作人记录一些游戏动作:

>{Player1 : Rook, a1->a5}

当消费者处于活动状态时,它将使用作为Player1的键接收此消息:

{Player1 : Rook, a1->a5}

5. 分区

接下来,让我们看看如何使用分区创建进一步的消息分类并提高整个系统的性能。

5.1. 并发

我们可以将一个主题划分为多个分区,调用多个消费者来消费来自不同分区的消息。通过启用这种并发行为,可以提高系统的整体性能。

默认情况下,在创建主题期间支持–bootstrap-server选项的 Kafka 版本将创建主题的单个分区,**除非在创建主题时明确指定。但是,对于预先存在的主题,我们可以增加分区的数量。让我们将chess-moves主题的分区号设置为3

$ docker run \
--net=host \
--rm confluentinc/cp-kafka:5.0.0 \
bash -c "kafka-topics --alter --zookeeper localhost:32181 --topic chess-moves --partitions 3"
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

5.2. 分区键

在一个主题中,Kafka 使用分区键跨多个分区处理消息。一方面,生产者隐式使用它来将消息路由到其中一个分区。另一方面,每个消费者都可以从特定分区读取消息。

默认情况下,生产者将生成键的哈希值,后跟带有分区数的模数。然后,它会将消息发送到计算出的标识符所标识的分区。

让我们使用kafka-console-producer实用程序创建新的事件消息,但这次我们将记录两个玩家的动作:

# kafka-console-producer --broker-list localhost:19092,localhost:29092,localhost:39092 \
--topic chess-moves \
--property parse.key=true --property key.separator=:
>{Player1: Rook, a1 -> a5}
>{Player2: Bishop, g3 -> h4}
>{Player1: Rook, a5 -> e5}
>{Player2: Bishop, h4 -> g3}

现在,我们可以有两个消费者,一个从分区 1 读取,另一个从分区 2 读取:

# kafka-console-consumer --bootstrap-server localhost:19092,localhost:29092,localhost:39092 \
--topic chess-moves --from-beginning \
--property print.key=true --property print.value=true \
--property key.separator=: \
--partition 1
{Player2: Bishop, g3 -> h4}
{Player2: Bishop, h4 -> g3}

我们可以看到 Player2 的所有移动都被记录到分区 1 中。以同样的方式,我们可以检查 Player1 的移动是否被记录到 partition-0 中。

6. 缩放

我们如何概念化主题和分区对于水平扩展至关重要。一方面,主题更像是数据的预定义分类。另一方面,分区是动态发生的数据的动态分类。

此外,我们可以在一个主题中配置多少个分区是有实际限制的。这是因为每个分区都映射到代理节点的文件系统中的一个目录。当我们增加分区数量时,我们也会增加操作系统上打开文件句柄的数量。

根据经验,Confluent 的专家建议 将每个代理的分区数限制为100 x b x r,其中b是 Kafka 集群中代理的数量,r是复制因子。