Apache Curator 简介
1.简介
Apache Curator 是 Apache Zookeeper 的 Java 客户端,Apache Zookeeper 是分布式应用程序的流行协调服务。
在本教程中,我们将介绍 Curator 提供的一些最相关的功能:
- Connection Management——管理连接和重试策略
- Async ——通过添加异步功能和使用 Java 8 lambda 来增强现有客户端
- Configuration Management——对系统进行集中配置
- Strongly-Typed Models——使用类型模型
- Recipes ——实现领导者选举、分布式锁或计数器
2.先决条件
首先,建议快速了解一下Apache Zookeeper 及其功能。
对于本教程,我们假设已经有一个独立的 Zookeeper 实例在127.0.0.1:2181上运行;如果您刚刚开始,这里 有关于如何安装和运行它的说明。
首先,我们需要将curator-x-async 依赖项添加到我们的pom.xml中:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-async</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
最新版本的 Apache Curator 4.XX 与 Zookeeper 3.5.X 存在硬依赖关系,目前仍处于测试阶段。
因此,在本文中,我们将使用当前最新的稳定版 Zookeeper 3.4.11 。
所以我们需要排除 Zookeeper 依赖,并将Zookeeper 版本的依赖 添加到我们的pom.xml中:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.11</version>
</dependency>
有关兼容性的更多信息,请参阅此链接 。
3.连接管理
Apache Curator 的基本用例是连接到正在运行的 Apache Zookeeper 实例。
该工具提供了一个工厂来使用重试策略建立与 Zookeeper 的连接:
int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy = new RetryNTimes(
maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181", retryPolicy);
client.start();
assertThat(client.checkExists().forPath("/")).isNotNull();
在这个快速示例中,我们将重试 3 次,并在重试之间等待 100 毫秒,以防出现连接问题。
使用CuratorFramework客户端连接到 Zookeeper后,我们现在可以浏览路径、获取/设置数据并与服务器进行交互。
4. 异步
**Curator Async 模块包装了上述CuratorFramework客户端,**以使用CompletionStage Java 8 API 提供非阻塞功能。
让我们看看前面的示例如何使用 Async 包装器:
int sleepMsBetweenRetries = 100;
int maxRetries = 3;
RetryPolicy retryPolicy
= new RetryNTimes(maxRetries, sleepMsBetweenRetries);
CuratorFramework client = CuratorFrameworkFactory
.newClient("127.0.0.1:2181", retryPolicy);
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
AtomicBoolean exists = new AtomicBoolean(false);
async.checkExists()
.forPath("/")
.thenAcceptAsync(s -> exists.set(s != null));
await().until(() -> assertThat(exists.get()).isTrue());
现在,*checkExists()操作在异步模式下工作,不会阻塞主线程。我们还可以使用thenAcceptAsync()*方法一个接一个地链接操作,该方法使用CompletionStage API 。
5. 配置管理
在分布式环境中,最常见的挑战之一是管理许多应用程序之间的共享配置。我们可以使用 Zookeeper 作为数据存储来保存我们的配置。
让我们看一个使用 Apache Curator 获取和设置数据的示例:
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
client.create().forPath(key);
async.setData()
.forPath(key, expected.getBytes());
AtomicBoolean isEquals = new AtomicBoolean();
async.getData()
.forPath(key)
.thenAccept(data -> isEquals.set(new String(data).equals(expected)));
await().until(() -> assertThat(isEquals.get()).isTrue());
在这个例子中,我们创建节点路径,在 Zookeeper 中设置数据,然后我们检查它的值是否相同来恢复它。key 字段可以是像/config/dev/my_key这样的节点路径。
5.1. 观察者
Zookeeper 中另一个有趣的特性是能够监视键或节点。它允许我们监听配置的变化并更新我们的应用程序,而无需重新部署。
让我们看看上面的例子在使用 watchers 时的样子:
CuratorFramework client = newClient()
client.start();
AsyncCuratorFramework async = AsyncCuratorFramework.wrap(client);
String key = getKey();
String expected = "my_value";
async.create().forPath(key);
List<String> changes = new ArrayList<>();
async.watched()
.getData()
.forPath(key)
.event()
.thenAccept(watchedEvent -> {
try {
changes.add(new String(client.getData()
.forPath(watchedEvent.getPath())));
} catch (Exception e) {
// fail ...
}});
// Set data value for our key
async.setData()
.forPath(key, expected.getBytes());
await()
.until(() -> assertThat(changes.size()).isEqualTo(1));
我们配置观察者,设置数据,然后确认被观察事件被触发。我们可以一次观察一个节点或一组节点。
6. 强类型模型
Zookeeper 主要处理字节数组,所以我们需要对我们的数据进行序列化和反序列化。这使我们能够灵活地处理任何可序列化的实例,但它可能难以维护。
为了在这里提供帮助,Curator 添加了类型化模型 的概念,它委托序列化/反序列化并允许我们直接使用我们的类型。让我们看看它是如何工作的。
首先,我们需要一个序列化器框架。Curator 建议使用 Jackson 实现,所以让我们将Jackson 依赖 项添加到我们的pom.xml中:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.0</version>
</dependency>
现在,让我们尝试持久化我们的自定义类HostConfig:
public class HostConfig {
private String hostname;
private int port;
// getters and setters
}
我们需要提供从HostConfig类到路径的模型规范映射,并使用 Apache Curator 提供的建模框架包装器:
ModelSpec<HostConfig> mySpec = ModelSpec.builder(
ZPath.parseWithIds("/config/dev"),
JacksonModelSerializer.build(HostConfig.class))
.build();
CuratorFramework client = newClient();
client.start();
AsyncCuratorFramework async
= AsyncCuratorFramework.wrap(client);
ModeledFramework<HostConfig> modeledClient
= ModeledFramework.wrap(async, mySpec);
modeledClient.set(new HostConfig("host-name", 8080));
modeledClient.read()
.whenComplete((value, e) -> {
if (e != null) {
fail("Cannot read host config", e);
} else {
assertThat(value).isNotNull();
assertThat(value.getHostname()).isEqualTo("host-name");
assertThat(value.getPort()).isEqualTo(8080);
}
});
读取路径*/config/dev时的whenComplete()方法将返回 Zookeeper 中的HostConfig*实例。
7. Recipes
Zookeeper 提供此指南 来实现高级解决方案或配方,例如领导者选举、分布式锁或共享计数器。
Apache Curator 为大多数这些配方提供了一个实现。要查看完整列表,请访问文档 。
所有这些食谱都在一个单独的模块中可用:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
让我们直接进入并通过一些简单的例子开始理解这些。
7.1. 领导节点选举
在分布式环境中,我们可能需要一个主节点或领导节点来协调一项复杂的工作。 这是Curator中领导人选举配方的用法:
CuratorFramework client = newClient();
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client,
"/mutex/select/leader/for/job/A",
new LeaderSelectorListener() {
@Override
public void stateChanged(
CuratorFramework client,
ConnectionState newState) {
}
@Override
public void takeLeadership(
CuratorFramework client) throws Exception {
}
});
// join the members group
leaderSelector.start();
// wait until the job A is done among all members
leaderSelector.close();
当我们启动领导者选择器时,我们的节点会加入路径*/mutex/select/leader/for/job/A中的成员组。一旦我们的节点成为领导者,将调用takeLeadership*方法,我们作为领导者可以恢复工作。
7.2. 共享锁
共享锁配方 是关于拥有一个完全分布式的锁:
CuratorFramework client = newClient();
client.start();
InterProcessSemaphoreMutex sharedLock = new InterProcessSemaphoreMutex(
client, "/mutex/process/A");
sharedLock.acquire();
// do process A
sharedLock.release();
当我们获取锁时,Zookeeper 确保没有其他应用程序同时获取相同的锁。
7.3. 计数器
Counters 配方 在所有客户端之间协调一个共享的Integer :
CuratorFramework client = newClient();
client.start();
SharedCount counter = new SharedCount(client, "/counters/A", 0);
counter.start();
counter.setCount(counter.getCount() + 1);
assertThat(counter.getCount()).isEqualTo(1);
在此示例中,Zookeeper 将Integer值存储在路径*/counters/A中,如果尚未创建路径,则将该值初始化为0* 。