Contents

Apache Druid与事件驱动数据

1.简介

在本教程中,我们将了解如何使用事件数据和Apache Druid 。我们将介绍事件数据和 Druid 架构的基础知识。作为其中的一部分,我们将创建一个简单的数据管道,利用 Druid 的各种功能,涵盖各种数据摄取模式和查询准备数据的不同方式。

2. 基本概念

在深入了解 Apache Druid 的操作细节之前,我们先来了解一些基本概念。我们感兴趣的领域是大规模事件数据的实时分析。

因此,必须了解我们所说的事件数据的含义以及大规模实时分析它们需要什么。

2.1.什么是事件数据?

事件数据是指关于在特定时间点发生的变化的一条信息。事件数据在当今的应用程序中几乎无处不在。从经典的应用程序日志到由事物生成的现代传感器数据,它几乎无处不在。这些通常以大规模生成的机器可读信息为特征。

它们为预测、自动化、通信和集成等多种功能提供支持,仅举几例。此外,它们在事件驱动架构中也很重要。

2.2. 什么是Apache Druid?

Apache Druid 是一个实时分析数据库,旨在对面向事件的数据进行快速分析。Druid 于 2011 年启动,2012 年在 GPL 许可下开源,并于 2015 年迁移到 Apache 许可。它由 Apache 基金会管理,社区贡献了多个组织。它提供实时摄取、快速查询性能和高可用性。

Druid 这个名字指的是它的架构可以转变来解决不同类型的数据问题。它通常用于商业智能应用程序以分析大量实时和历史数据。

3. Druid架构

Druid 是一个用 Java 编写的面向列的分布式数据源。它能够摄取大量事件数据并在这些数据之上提供低延迟查询。此外,它提供了任意切片和切块数据的可能性。

了解 Druid 架构如何支持这些特性非常有趣。在本节中,我们将介绍 Druid 架构的一些重要部分。

3.1. 数据存储设计

了解 Druid 如何构建和存储其数据非常重要,这允许分区和分布。Druid在处理过程中默认对数据进行分区,并将它们存储到块和段中:

/uploads/apache_druid_event_driven_data/1.jpg

Druid**将数据存储在我们所知的“数据源”**中,这在逻辑上类似于关系数据库中的表。一个 Druid 集群可以并行处理多个数据源,从各种来源摄取。

每个数据源都是分区的——默认情况下基于时间,如果配置了其他属性,则进一步基于其他属性。数据的时间范围称为“块” ——例如,如果数据按小时分区,则为一小时的数据。

每个块进一步划分为一个或多个“段”,这些“段”是由多行数据组成的单个文件。一个数据源可能有从几个段到数百万个段的任何地方。

3.2. Druid进程

Druid 具有多进程和分布式架构。因此,每个过程都可以独立扩展,从而使我们能够创建灵活的集群。让我们了解作为 Druid 一部分的重要过程:

/uploads/apache_druid_event_driven_data/3.jpg

  • Coordinator:该进程主要负责段管理和分发,并与历史进程通信,根据配置加载或删除段
  • Overlord:这是负责接受任务、协调任务分发、围绕任务创建锁以及向调用者返回状态的主要进程
  • Broker:这是所有查询都发送到分布式集群中执行的进程;它从 Zookeeper 收集元数据并将查询路由到具有正确段的进程
  • Router:这是一个可选进程,可用于将查询路由到不同的代理进程,从而为更重要数据的查询提供查询隔离
  • Historical:这些是存储可查询数据的进程;他们与 Zookeeper 保持持续的联系,并监视他们必须加载和服务的分段信息
  • MiddleManager:这些是执行提交任务的工作进程;他们将任务转发给在不同 JVM 中运行的 Peons,从而提供资源和日志隔离

3.3. 外部依赖

除了核心进程之外,Druid 还依赖于几个外部依赖项才能使其集群按预期运行

让我们看看Druid集群是如何与核心进程和外部依赖一起形成的:

/uploads/apache_druid_event_driven_data/5.jpg

Druid 使用深度存储来存储已摄取到系统中的任何数据。这些不用于响应查询,而是用作数据备份和在进程之间传输数据。这些可以是从本地文件系统到分布式对象存储(如 S3 和 HDFS)的任何内容。

元数据存储用于保存共享系统元数据,如段使用信息和任务信息。但是,它从未用于存储实际数据。它是一个关系数据库,如 Apache Derby、PostgreSQL 或 MySQL。

Druid 使用 Apache Zookeeper 来管理当前的集群状态。它促进了 Druid 集群中的许多操作,例如协调者/霸主领导者选举、段发布协议和段加载/删除协议。

4. Druid设置

Druid 旨在部署为可扩展的容错集群。但是,建立生产级 Druid 集群并非易事。正如我们之前看到的,有许多流程和外部依赖项需要设置和配置。由于可以以灵活的方式创建集群,因此我们必须注意我们的要求,以适当地设置各个流程。

此外,Druid仅在类 Unix 环境中受支持,而不在 Windows 上受支持。此外,运行 Druid 进程需要 Java 8 或更高版本。有几种单服务器配置可用于在单台机器上设置 Druid 以运行教程和示例。但是,为了运行生产工作负载,建议设置一个具有多台机器的成熟 Druid 集群。

出于本教程的目的,我们将借助Docker Hub 上 发布的官方 Docker 镜像在单台机器上设置 Druid。这使我们也可以在 Windows 上运行 Druid,正如我们之前讨论的那样,它不受其他支持。有一个Docker compose 文件 可用,它为每个 Druid 进程及其外部依赖项创建一个容器。

我们必须将配置值作为环境变量提供给 Druid。实现这一点的最简单方法是在与 Docker compose 文件相同的目录中提供一个名为“environment”的文件

一旦我们有了 Docker compose 和环境文件,启动 Druid 就像在同一目录中运行命令一样简单:

docker-compose up

这将调出单机 Druid 设置所需的所有容器。我们必须小心为 Docker 机器提供足够的内存,因为 Druid 会消耗大量资源。

5. 摄取数据

使用 Druid 构建数据管道的第一步是将数据加载到 Druid 中。这个过程在 Druid 架构中称为数据摄取或索引。我们必须找到合适的数据集才能继续本教程。

现在,正如我们迄今为止所收集的那样,我们必须收集事件数据并具有一些时间性质,以充分利用 Druid 基础设施。

Druid 的官方指南使用简单而优雅的数据 ,其中包含特定日期的 Wikipedia 页面编辑。我们将继续在此处的教程中使用它。

5.1. 数据模型

让我们首先检查我们拥有的数据的结构。我们创建的大多数数据管道对数据异常非常敏感,因此有必要尽可能地清理数据。

尽管执行数据分析有复杂的方法和工具,但我们将从目视检查开始。快速分析表明,输入数据具有以 JSON 格式捕获的事件,其中单个事件包含典型属性

{
  "time": "2015-09-12T02:10:26.679Z",
  "channel": "#pt.wikipedia",
  "cityName": null,
  "comment": "Houveram problemas na última edição e tive de refazê-las, junto com as atualizações da página.",
  "countryIsoCode": "BR",
  "countryName": "Brazil",
  "isAnonymous": true,
  "isMinor": false,
  "isNew": false,
  "isRobot": false,
  "isUnpatrolled": true,
  "metroCode": null,
  "namespace": "Main",
  "page": "Catarina Muniz",
  "regionIsoCode": null,
  "regionName": null,
  "user": "181.213.37.148",
  "delta": 197,
  "added": 197,
  "deleted": 0
}

虽然定义此事件的属性有很多,但在使用 Druid 时,有一些是我们特别感兴趣的:

  • 时间戳
  • 维度
  • 指标

Druid 需要一个特定的属性来标识为时间戳列。在大多数情况下,Druid 的数据解析器能够自动检测出最佳候选者。但是我们总是有选择的余地,特别是如果我们的数据中没有合适的属性。

维度是 Druid 按原样存储的属性。我们可以将它们用于任何目的,例如分组、过滤或应用聚合器。我们可以在摄取规范中选择维度,我们将在本教程中进一步讨论。

与维度不同,**指标是默认以聚合形式存储的属性。**我们可以为 Druid 选择一个聚合函数,以便在摄取期间应用到这些属性。与启用汇总一起,这些可以导致紧凑的数据表示。

5.2. 摄取方法

现在,我们将讨论在 Druid 中执行数据摄取的各种方法。通常,事件驱动的数据本质上是流式传输的,这意味着它们会随着时间的推移以不同的速度生成,就像维基百科的编辑一样。

但是,我们可能会在一段时间内对数据进行批处理,其中数据本质上更加静态,就像去年发生的所有 Wikipedia 编辑一样。

我们可能还需要解决不同的数据用例,而 Druid 对其中的大多数都提供了出色的支持。让我们回顾一下在数据管道中使用 Druid 的两种最常见的方式:

  • 流式摄取
  • 批量摄取

在 Druid 中摄取数据的最常见方式是通过 Apache Streaming 服务,Druid 可以直接从 Kafka 读取数据。Druid 也支持 Kinesis 等其他平台。我们必须在 Overload 进程上启动主管,该进程创建和管理 Kafka 索引任务。我们可以通过 Overload 进程的 HTTP POST 命令提交作为 JSON 文件的主管规范来启动主管。

或者,我们可以批量摄取数据——例如,从本地或远程文件。它为基于 Hadoop 的批量摄取提供了一种选择,用于从 Hadoop 文件系统中以 Hadoop 文件格式摄取数据。更常见的是,我们可以选择顺序或并行的原生批量摄取。这是一种更方便、更简单的方法,因为它没有任何外部依赖项。

5.3. 定义任务规范

在本教程中,我们将为我们拥有的输入数据设置一个本机批量摄取任务。我们可以选择从 Druid 控制台配置任务,这为我们提供了直观的图形界面。或者,我们可以将任务规范定义为 JSON 文件,并使用脚本或命令行将其提交给霸主进程。

让我们首先定义一个简单的任务规范,用于在名为wikipedia-index.json的文件中摄取我们的数据:

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "timestampSpec": {
        "column": "time",
        "format": "iso"
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/tutorial/",
        "filter" : "wikiticker-2015-09-12-sampled.json.gz"
      },
      "inputFormat" : {
        "type": "json"
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000,
      "maxRowsInMemory" : 25000
    }
  }
}

让我们根据我们在前面的小节中介绍的基础知识来理解这个任务规范:

  • 我们选择了index_parallel任务,它为我们提供了并行的本地批量摄取
  • 我们将在此任务中使用的数据源名称为“ wikipedia”
  • 我们数据的时间戳来自属性“时间”
  • 我们正在添加许多数据属性作为维度
  • 在当前任务中,我们没有为我们的数据使用任何指标
  • 此任务应禁用默认启用的汇总
  • 该任务的输入源是一个名为wikiticker-2015-09-12-sampled.json.gz的本地文件
  • 我们没有使用任何辅助分区,我们可以在tuneConfig中定义

此任务规范假设我们已下载数据文件 wikiticker-2015-09-12-sampled.json.gz并将其保存在运行 Druid 的本地计算机上。当我们将 Druid 作为 Docker 容器运行时,这可能会更棘手。幸运的是,Druid在quickstart/tutorial位置默认提供了这个示例数据

5.4. 提交任务规范

最后,我们可以使用curl之类的工具通过命令行将此任务规范提交给霸主进程:

curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia-index.json http://localhost:8081/druid/indexer/v1/task

通常,如果提交成功,**上述命令会返回任务的 ID 。**我们可以通过 Druid 控制台或执行查询来验证我们的摄取任务的状态,我们将在下一节中介绍。

5.5. 高级摄取概念

Druid 最适合我们需要处理大量数据的情况——当然不是我们在本教程中看到的那种数据!现在,要大规模启用功能,Druid 架构必须提供合适的工具和技巧。

虽然我们不会在本教程中使用它们,但让我们快速讨论汇总和分区。

事件数据很快就会增长到海量,这会影响我们可以实现的查询性能。在许多情况下,我们可能会随着时间的推移汇总数据。这就是我们在 Druid 中所熟知的 roll-up。启用汇总后,Druid 会在摄取期间努力汇总具有相同维度和时间戳的行。虽然它可以节省空间,但roll-up确实会导致查询精度的损失,因此我们必须合理使用它。

面对不断增长的数据量,实现更好性能的另一种潜在方法是分配数据,从而分配工作负载。默认情况下,Druid根据时间戳将数据划分为包含一个或多个段的时间块。此外,我们可以决定使用自然维度进行二次分区以提高数据局部性。此外,Druid 首先按时间戳对每个段内的数据进行排序,然后按我们配置的其他维度进行排序。

6. 查询数据

一旦我们成功地执行了数据摄取,它应该可以供我们查询了。在 Druid 中有多种查询数据的方法。在 Druid 中执行查询的最简单方法是通过 Druid 控制台。但是,我们也可以通过发送 HTTP 命令或使用命令行工具来执行查询。

在 Druid 中构造查询的两种突出方式是原生查询和类似 SQL 的查询。我们将以这两种方式构建一些基本查询,并使用curl通过 HTTP 发送它们。让我们看看如何对我们之前在 Druid 中摄取的数据创建一些简单的查询。

6.1. 本机查询

Druid 中的原生查询使用 JSON 对象,我们可以将其发送到代理或路由器进行处理。我们可以通过 HTTP POST 命令发送查询,以及其他方式来执行相同的操作。

让我们创建一个名为simple_query_native.json的 JSON 文件:

{
  "queryType" : "topN",
  "dataSource" : "wikipedia",
  "intervals" : ["2015-09-12/2015-09-13"],
  "granularity" : "all",
  "dimension" : "page",
  "metric" : "count",
  "threshold" : 10,
  "aggregations" : [
    {
      "type" : "count",
      "name" : "count"
    }
  ]
}

这是一个简单的查询,用于获取 2019 年 9 月 12 日至 13 日期间页面编辑次数最多的前十个页面。 让我们使用curl通过 HTTP 发布:

curl -X 'POST' -H 'Content-Type:application/json' -d @simple_query_native.json http://localhost:8888/druid/v2?pretty

此响应包含 JSON 格式的前十页的详细信息:

[ {
  "timestamp" : "2015-09-12T00:46:58.771Z",
  "result" : [ {
    "count" : 33,
    "page" : "Wikipedia:Vandalismusmeldung"
  }, {
    "count" : 28,
    "page" : "User:Cyde/List of candidates for speedy deletion/Subpage"
  }, {
    "count" : 27,
    "page" : "Jeremy Corbyn"
  }, {
    "count" : 21,
    "page" : "Wikipedia:Administrators' noticeboard/Incidents"
  }, {
    "count" : 20,
    "page" : "Flavia Pennetta"
  }, {
    "count" : 18,
    "page" : "Total Drama Presents: The Ridonculous Race"
  }, {
    "count" : 18,
    "page" : "User talk:Dudeperson176123"
  }, {
    "count" : 18,
    "page" : "Wikipédia:Le Bistro/12 septembre 2015"
  }, {
    "count" : 17,
    "page" : "Wikipedia:In the news/Candidates"
  }, {
    "count" : 17,
    "page" : "Wikipedia:Requests for page protection"
  } ]
} ]

6.2. Druid SQL

Druid 有一个内置的 SQL 层,它为我们提供了以熟悉的类似 SQL 的结构构建查询的自由。它利用 Apache Calcite 来解析和规划查询。但是,Druid SQL 将 SQL 查询转换为查询代理上的本机查询,然后再将它们发送到数据进程。

让我们看看如何创建与以前相同的查询,但使用 Druid SQL。和以前一样,我们将创建一个名为simple_query_sql.json的 JSON 文件:

{
  "query":"SELECT page, COUNT(*) AS counts FROM wikipedia WHERE \"__time\" /
    BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' /
    GROUP BY page ORDER BY Edits DESC LIMIT 10"
}

请注意,为了便于阅读,查询已分成多行,但它应该出现在单行上。同样,和以前一样,我们将通过 HTTP 发布此查询,但发送到不同的端点:

curl -X 'POST' -H 'Content-Type:application/json' -d @simple_query_sql.json http://localhost:8888/druid/v2/sql

输出应该与我们之前使用本机查询实现的非常相似。

6.3. 查询类型

在前面的部分中,我们看到了一种查询类型,我们根据间隔获取度量“计数”的前十个结果。这只是 Druid 支持的一种查询类型,称为TopN查询。当然,我们可以通过使用过滤器和聚合使这个简单的TopN查询更有趣。但这不在本教程的范围内。但是,我们可能会对 Druid 中的其他几个查询感兴趣。

一些流行的包括 Timeseries 和 GroupBy。

Timeseries查询返回一个 JSON 对象数组,其中每个对象表示时间序列查询中描述的一个值——例如,过去一个月的一个维度的每日平均值。

GroupBy查询返回一个 JSON 对象数组,其中每个对象代表一个分组,如 group-by 查询中所述。例如,我们可以查询一个维度在过去一个月中按另一个维度分组的日平均值。

还有其他几种查询类型,包括ScanSearchTimeBoundarySegmentMetadataDatasourceMetadata

6.4. 高级查询概念

Druid 提供了一些复杂的方法来创建复杂的查询,以创建有趣的数据应用程序。这些包括各种对数据进行切片和切块的方法,同时仍然能够提供令人难以置信的查询性能。

虽然对它们的详细讨论超出了本教程的范围,但让我们讨论一些重要的,例如连接和查找、多租户和查询缓存

Druid 支持两种加入数据的方式。第一个是连接运算符,第二个是查询时查找。但是,为了获得更好的查询性能,建议避免查询时连接。

多租户是指在同一个 Druid 基础设施上支持多个租户的特性,同时仍然为它们提供逻辑隔离。在 Druid 中,可以通过每个租户的单独数据源或租户的数据分区来实现这一点。

最后,查询缓存是数据密集型应用程序性能的关键。Druid 支持分段和查询结果级别的查询结果缓存。此外,缓存数据可以驻留在内存中或外部持久存储中。

7. 语言绑定

尽管 Druid 对在 JSON 中创建摄取规范和定义查询具有出色的支持,但有时在 JSON 中定义这些查询可能很乏味,尤其是在查询变得复杂时。不幸的是,Druid 没有提供任何特定语言的客户端库来帮助我们在这方面。但是社区已经开发了相当多的语言绑定。一个这样的客户端库也可用于 Java。

我们将快速了解如何使用 Java 中的这个客户端库构建我们之前使用的TopN查询。

让我们首先在 Maven 中定义所需的依赖项

<dependency>
    <groupId>in.zapr.druid</groupId>
    <artifactId>druidry</artifactId>
    <version>2.14</version>
</dependency>

在此之后,我们应该能够使用客户端库并创建我们的TopN查询:

DateTime startTime = new DateTime(2015, 9, 12, 0, 0, 0, DateTimeZone.UTC);
DateTime endTime = new DateTime(2015, 9, 13, 0, 0, 0, DateTimeZone.UTC);
Interval interval = new Interval(startTime, endTime);
Granularity granularity = new SimpleGranularity(PredefinedGranularity.ALL);
DruidDimension dimension = new SimpleDimension("page");
TopNMetric metric = new SimpleMetric("count");
DruidTopNQuery query = DruidTopNQuery.builder()
  .dataSource("wikipedia")
  .dimension(dimension)
  .threshold(10)
  .topNMetric(metric)
  .granularity(granularity)
  .filter(filter)
  .aggregators(Arrays.asList(new LongSumAggregator("count", "count")))
  .intervals(Collections.singletonList(interval)).build();

在此之后,我们可以简单地生成所需的 JSON 结构,我们可以在 HTTP POST 调用中使用它:

ObjectMapper mapper = new ObjectMapper();
String requiredJson = mapper.writeValueAsString(query);