Contents

Apache Curator 简介

1.简介

Apache Curator 是 Apache Zookeeper 的 Java 客户端,Apache Zookeeper 是分布式应用程序的流行协调服务。

在本教程中,我们将介绍 Curator 提供的一些最相关的功能:

  • Connection Management——管理连接和重试策略
  • Async ——通过添加异步功能和使用 Java 8 lambda 来增强现有客户端
  • Configuration Management——对系统进行集中配置
  • Strongly-Typed Models——使用类型模型
  • Recipes ——实现领导者选举、分布式锁或计数器

2.先决条件

首先,建议快速了解一下Apache Zookeeper 及其功能。

对于本教程,我们假设已经有一个独立的 Zookeeper 实例在127.0.0.1:2181上运行;如果您刚刚开始,这里 有关于如何安装和运行它的说明。

首先,我们需要将curator-x-async 依赖项添加到我们的pom.xml中:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-async</artifactId>
    <version>4.0.1</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

最新版本的 Apache Curator 4.XX 与 Zookeeper 3.5.X 存在硬依赖关系,目前仍处于测试阶段。

因此,在本文中,我们将使用当前最新的稳定版 Zookeeper 3.4.11

所以我们需要排除 Zookeeper 依赖,并将Zookeeper 版本的依赖 添加到我们的pom.xml中:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.11</version>
</dependency>

有关兼容性的更多信息,请参阅此链接

3.连接管理

Apache Curator 的基本用例是连接到正在运行的 Apache Zookeeper 实例

该工具提供了一个工厂来使用重试策略建立与 Zookeeper 的连接:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
  maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);
client.start();
 
assertThat(client.checkExists().forPath("/")).isNotNull();

在这个快速示例中,我们将重试 3 次,并在重试之间等待 100 毫秒,以防出现连接问题。

使用CuratorFramework客户端连接到 Zookeeper后,我们现在可以浏览路径、获取/设置数据并与服务器进行交互。

4. 异步

**Curator Async 模块包装了上述CuratorFramework客户端,**以使用CompletionStage Java 8 API 提供非阻塞功能

让我们看看前面的示例如何使用 Async 包装器:

int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy 
  = new RetryNTimes(maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
  .newClient("127.0.0.1:2181", retryPolicy);
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
AtomicBoolean exists = new AtomicBoolean(false);
async.checkExists()
  .forPath("/")
  .thenAcceptAsync(s -> exists.set(s != null));
await().until(() -> assertThat(exists.get()).isTrue());

现在,*checkExists()操作在异步模式下工作,不会阻塞主线程。我们还可以使用thenAcceptAsync()*方法一个接一个地链接操作,该方法使用CompletionStage API

5. 配置管理

在分布式环境中,最常见的挑战之一是管理许多应用程序之间的共享配置。我们可以使用 Zookeeper 作为数据存储来保存我们的配置。

让我们看一个使用 Apache Curator 获取和设置数据的示例:

CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
client.create().forPath(key);
async.setData()
  .forPath(key, expected.getBytes());
AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
  .forPath(key)
  .thenAccept(data -> isEquals.set(new String(data).equals(expected)));
await().until(() -> assertThat(isEquals.get()).isTrue());

在这个例子中,我们创建节点路径,在 Zookeeper 中设置数据,然后我们检查它的值是否相同来恢复它。key 字段可以是像/config/dev/my_key这样的节点路径。

5.1. 观察者

Zookeeper 中另一个有趣的特性是能够监视键或节点。它允许我们监听配置的变化并更新我们的应用程序,而无需重新部署

让我们看看上面的例子在使用 watchers 时的样子:

CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
async.create().forPath(key);
List<String> changes = new ArrayList<>();
async.watched()
  .getData()
  .forPath(key)
  .event()
  .thenAccept(watchedEvent -> {
    try {
        changes.add(new String(client.getData()
          .forPath(watchedEvent.getPath())));
    } catch (Exception e) {
        // fail ...
    }});
// Set data value for our key
async.setData()
  .forPath(key, expected.getBytes());
await()
  .until(() -> assertThat(changes.size()).isEqualTo(1));

我们配置观察者,设置数据,然后确认被观察事件被触发。我们可以一次观察一个节点或一组节点。

6. 强类型模型

Zookeeper 主要处理字节数组,所以我们需要对我们的数据进行序列化和反序列化。这使我们能够灵活地处理任何可序列化的实例,但它可能难以维护。

为了在这里提供帮助,Curator 添加了类型化模型 的概念,它委托序列化/反序列化并允许我们直接使用我们的类型。让我们看看它是如何工作的。

首先,我们需要一个序列化器框架。Curator 建议使用 Jackson 实现,所以让我们将Jackson 依赖 项添加到我们的pom.xml中:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.13.0</version>
</dependency>

现在,让我们尝试持久化我们的自定义类HostConfig

public class HostConfig {
    private String hostname;
    private int port;
    // getters and setters
}

我们需要提供从HostConfig类到路径的模型规范映射,并使用 Apache Curator 提供的建模框架包装器:

ModelSpec<HostConfig> mySpec = ModelSpec.builder(
  ZPath.parseWithIds("/config/dev"), 
  JacksonModelSerializer.build(HostConfig.class))
  .build();
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async 
  = AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient 
  = ModeledFramework.wrap(async, mySpec);
modeledClient.set(new HostConfig("host-name", 8080));
modeledClient.read()
  .whenComplete((value, e) -> {
     if (e != null) {
          fail("Cannot read host config", e);
     } else {
          assertThat(value).isNotNull();
          assertThat(value.getHostname()).isEqualTo("host-name");
          assertThat(value.getPort()).isEqualTo(8080);
     }
   });

读取路径*/config/dev时的whenComplete()方法将返回 Zookeeper 中的HostConfig*实例。

7. Recipes

Zookeeper 提供此指南 来实现高级解决方案或配方,例如领导者选举、分布式锁或共享计数器。

Apache Curator 为大多数这些配方提供了一个实现。要查看完整列表,请访问文档

所有这些食谱都在一个单独的模块中可用:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

让我们直接进入并通过一些简单的例子开始理解这些。

7.1. 领导节点选举

在分布式环境中,我们可能需要一个主节点或领导节点来协调一项复杂的工作。 这是Curator中领导人选举配方的用法:

CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client, 
  "/mutex/select/leader/for/job/A", 
  new LeaderSelectorListener() {
      @Override
      public void stateChanged(
        CuratorFramework client, 
        ConnectionState newState) {
      }
      @Override
      public void takeLeadership(
        CuratorFramework client) throws Exception {
      }
  });
// join the members group
leaderSelector.start();
// wait until the job A is done among all members
leaderSelector.close();

当我们启动领导者选择器时,我们的节点会加入路径*/mutex/select/leader/for/job/A中的成员组。一旦我们的节点成为领导者,将调用takeLeadership*方法,我们作为领导者可以恢复工作。

7.2. 共享锁

共享锁配方 是关于拥有一个完全分布式的锁:

CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
  client, "/mutex/process/A");
sharedLock.acquire();
// do process A
sharedLock.release();

当我们获取锁时,Zookeeper 确保没有其他应用程序同时获取相同的锁。

7.3. 计数器

Counters 配方 在所有客户端之间协调一个共享的Integer

CuratorFramework client = newClient();
client.start();
SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();
counter.setCount(counter.getCount() + 1);
assertThat(counter.getCount()).isEqualTo(1);

在此示例中,Zookeeper 将Integer值存储在路径*/counters/A中,如果尚未创建路径,则将该值初始化为0* 。