Apache Avro简介
1. 概述
数据序列化是一种将数据转换为二进制或文本格式的技术。有多种系统可用于此目的。Apache Avro 就是其中一种数据序列化系统。
Avro 是一个独立于语言、基于模式的数据序列化库。它使用模式来执行序列化和反序列化。此外,Avro 使用 JSON 格式来指定数据结构,使其更强大。
在本教程中,我们将探索更多关于 Avro 设置、执行序列化的 Java API 以及 Avro 与其他数据序列化系统的比较。
我们将主要关注作为整个系统基础的模式创建。
2. Avro
Avro 是一个独立于语言的序列化库。为此,Avro 使用了作为核心组件之一的模式。它将模式存储在文件中以供进一步的数据处理。
Avro 最适合大数据处理。它在 Hadoop 和 Kafka 世界中非常流行,因为它的处理速度更快。
Avro 创建一个数据文件,它将数据与模式一起保存在其元数据部分中。最重要的是,它提供了丰富的数据结构,使其比其他类似的解决方案更受欢迎。
要使用 Avro 进行序列化,我们需要按照下面提到的步骤进行操作。
3. 问题陈述
让我们从定义一个名为 AvroHttRequest的类开始 ,我们将在示例中使用它。该类包含原始类型和复杂类型属性:
class AvroHttpRequest {
private long requestTime;
private ClientIdentifier clientIdentifier;
private List<String> employeeNames;
private Active active;
}
在这里,requestTime是一个原始值。ClientIdentifier是另一个代表复杂类型的类。我们还有employeeName,它又是一个复杂的类型。Active是一个枚举,用于描述给定的员工列表是否处于活动状态。
我们的目标是使用 Apache Avro序列化和反序列化 AvroHttRequest类。
4. Avro 数据类型
在继续之前,让我们讨论一下 Avro 支持的数据类型。
Avro 支持两种类型的数据:
-
原始类型:Avro 支持所有原始类型。我们使用原始类型名称来定义给定字段的类型。例如,保存String的值应在 Schema 中声明为 {“type”: “string”}
-
复杂类型:Avro 支持六种复杂类型:记录、枚举、数组、映射、联合和固定
例如,在我们的问题陈述中,ClientIdentifier是一条记录。
在这种情况下,ClientIdentifier的架构应如下所示:
{
"type":"record",
"name":"ClientIdentifier",
"namespace":"com.blogdemo.avro",
"fields":[
{
"name":"hostName",
"type":"string"
},
{
"name":"ipAddress",
"type":"string"
}
]
}
5. 使用 Avro
首先,让我们将需要的 Maven 依赖项添加到pom.xml文件中。
我们应该包括以下依赖项:
- Apache Avro – 核心组件
- 编译器——用于 Avro IDL 和 Avro 特定 Java APIT 的 Apache Avro 编译器
- 工具——包括 Apache Avro 命令行工具和实用程序
- 用于 Maven 项目的 Apache Avro Maven 插件
我们在本教程中使用 1.8.2 版本。 但是,始终建议在Maven Central 上找到最新版本:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-compiler</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
</dependency>
添加 maven 依赖项后,接下来的步骤将是:
- 架构创建
- 在我们的程序中读取模式
- 使用 Avro 序列化我们的数据
- 最后,反序列化数据
6. 模式创建
Avro 使用 JSON 格式描述其 Schema。给定的 Avro Schema 主要有四个属性:
- type —— 描述 Schema 的类型,无论是复杂类型还是原始值
- namespace —— 描述给定 Schema 所属的命名空间
- name —— 架构的名称
- fields —— 说明与给定模式关联的字段。字段可以是原始类型,也可以是复杂类型。
创建模式的一种方法是编写 JSON 表示,正如我们在前几节中看到的那样。 我们还可以使用SchemaBuilder创建模式,这无疑是一种更好、更有效的创建方法。
6.1. SchemaBuilder实用程序
org.apache.avro.SchemaBuilder类对于创建 Schema 很有用。
首先,让我们为ClientIdentifier 创建架构:
Schema clientIdentifier = SchemaBuilder.record("ClientIdentifier")
.namespace("com.blogdemo.avro")
.fields().requiredString("hostName").requiredString("ipAddress")
.endRecord();
现在,让我们用它来创建一个 avroHttpRequest模式:
Schema avroHttpRequest = SchemaBuilder.record("AvroHttpRequest")
.namespace("com.blogdemo.avro")
.fields().requiredLong("requestTime")
.name("clientIdentifier")
.type(clientIdentifier)
.noDefault()
.name("employeeNames")
.type()
.array()
.items()
.stringType()
.arrayDefault(null)
.name("active")
.type()
.enumeration("Active")
.symbols("YES","NO")
.noDefault()
.endRecord();
需要注意的是,我们已将 clientIdentifier 指定为clientIdentifier 字段的type。在这种情况下,用于定义类型的clientIdentifier与我们之前创建的模式相同。
稍后我们可以应用 toString方法来获取Schema的JSON结构。
**架构文件使用 .avsc 扩展名保存。让我们将生成的模式保存到 *“src/main/resources/avroHttpRequest-schema.avsc”*文件中。
7. 阅读架构
读取模式或多或少是关于为给定模式创建 Avro 类。一旦创建了 Avro 类,我们就可以使用它们来序列化和反序列化对象。 有两种方法可以创建 Avro 类:
- 以编程方式生成 Avro 类:可以使用SchemaCompiler生成类。我们可以使用几个 API 来生成 Java 类。我们可以在 GitHub 上找到生成类的代码。
- 使用 Maven 生成类
我们确实有一个 Maven 插件可以很好地完成这项工作。我们需要包含插件并运行mvn clean install。 让我们将插件添加到我们的pom.xml文件中:
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
8. 使用 Avro 进行序列化和反序列化
当我们完成生成模式后,让我们继续探索序列化部分。
Avro 支持两种数据序列化格式:JSON 格式和二进制格式。
首先,我们将关注 JSON 格式,然后我们将讨论二进制格式。
在继续之前,我们应该通过几个关键接口。我们可以使用下面的接口和类进行序列化:
DatumWriter<T>:我们应该使用它在给定的 Schema 上写入数据。我们将在示例中使用 SpecificDatumWriter实现,但是,DatumWriter也有其他实现。其他实现有 GenericDatumWriter、Json.Writer、ProtobufDatumWriter、ReflectDatumWriter、ThriftDatumWriter。
Encoder:编码器用于或定义前面提到的格式。EncoderFactory提供两种类型的编码器,二进制编码器和 JSON 编码器。
DatumReader<D>:用于反序列化的单一接口。同样,它有多个实现,但我们将在示例中使用SpecificDatumReader。其他实现是 - GenericDatumReader、Json.ObjectReader、Json.Reader、ProtobufDatumReader、ReflectDatumReader、ThriftDatumReader。
Decoder:在反序列化数据时使用解码器。Decoderfactory提供两种解码器:二进制解码器和 JSON 解码器。
接下来,让我们看看在 Avro 中序列化和反序列化是如何发生的。
8.1. 序列化
我们将以AvroHttpRequest类为例,并尝试使用 Avro 对其进行序列化。
首先,让我们将其序列化为 JSON 格式:
public byte[] serealizeAvroHttpRequestJSON(
AvroHttpRequest request) {
DatumWriter<AvroHttpRequest> writer = new SpecificDatumWriter<>(
AvroHttpRequest.class);
byte[] data = new byte[0];
ByteArrayOutputStream stream = new ByteArrayOutputStream();
Encoder jsonEncoder = null;
try {
jsonEncoder = EncoderFactory.get().jsonEncoder(
AvroHttpRequest.getClassSchema(), stream);
writer.write(request, jsonEncoder);
jsonEncoder.flush();
data = stream.toByteArray();
} catch (IOException e) {
logger.error("Serialization error:" + e.getMessage());
}
return data;
}
让我们看一下这个方法的一个测试用例:
@Test
public void whenSerialized_UsingJSONEncoder_ObjectGetsSerialized(){
byte[] data = serealizer.serealizeAvroHttpRequestJSON(request);
assertTrue(Objects.nonNull(data));
assertTrue(data.length > 0);
}
在这里,我们使用了 jsonEncoder 方法并将模式传递给它。 如果我们想使用二进制编码器,我们需要将 jsonEncoder() 方法替换为 binaryEncoder():
Encoder jsonEncoder = EncoderFactory.get().binaryEncoder(stream,null);
8.2. 反序列化
为此,我们将使用上述DatumReader和Decoder接口。
正如我们使用EncoderFactory获取 Encoder一样,同样我们将使用DecoderFactory获取Decoder对象。
让我们使用 JSON 格式反序列化数据:
public AvroHttpRequest deSerealizeAvroHttpRequestJSON(byte[] data) {
DatumReader<AvroHttpRequest> reader
= new SpecificDatumReader<>(AvroHttpRequest.class);
Decoder decoder = null;
try {
decoder = DecoderFactory.get().jsonDecoder(
AvroHttpRequest.getClassSchema(), new String(data));
return reader.read(null, decoder);
} catch (IOException e) {
logger.error("Deserialization error:" + e.getMessage());
}
}
让我们看看测试用例:
@Test
public void whenDeserializeUsingJSONDecoder_thenActualAndExpectedObjectsAreEqual(){
byte[] data = serealizer.serealizeAvroHttpRequestJSON(request);
AvroHttpRequest actualRequest = deSerealizer
.deSerealizeAvroHttpRequestJSON(data);
assertEquals(actualRequest,request);
assertTrue(actualRequest.getRequestTime()
.equals(request.getRequestTime()));
}
同样,我们可以使用二进制解码器:
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);