Contents

Apache Geode简介

1.概述

Apache Geode 是一个分布式内存数据网格,支持缓存和数据计算。

在本教程中,我们将介绍 Geode 的关键概念并使用其 Java 客户端运行一些代码示例。

2. 设置

首先,我们需要下载并安装 Apache Geode 并设置gfsh环境。为此,我们可以按照Geode 官方指南 中的说明进行操作。

其次,本教程将创建一些文件系统工件。因此,我们可以通过创建一个临时目录并从那里启动东西来隔离它们。

2.1. 安装和配置

从我们的临时目录中,我们需要启动一个Locator实例:

gfsh> start locator --name=locator --bind-address=localhost

**locator 负责 Geode Cluster的不同成员之间的协调,**我们可以通过 JMX 进一步管理它。

接下来,让我们启动一个Server实例来托管一个或多个数据Region

gfsh> start server --name=server1 --server-port=0

我们将*–server-port选项设置为 0,以便 Geode 选择任何可用端口。虽然如果我们忽略它,服务器将使用默认端口 40404。**服务器是集群的可配置成员,作为长期进程运行并负责管理数据region***。

最后,我们需要一个 Region

gfsh> create region --name=blogdemo --type=REPLICATE

region最终是我们存储数据的地方。

2.2. 确认

在我们继续之前,让我们确保一切正常。

首先,让我们检查一下我们是否有我们的 Server和我们的 Locator

gfsh> list members
##  Name   | Id
server1 | 192.168.0.105(server1:6119)<v1>:1024
locator | 127.0.0.1(locator:5996:locator)<ec><v0>:1024 [Coordinator]

接下来,我们有我们的 Region

gfsh> describe region --name=blogdemo
..........................................................
Name            : blogdemo
Data Policy     : replicate
Hosting Members : server1
Non-Default Attributes Shared By Hosting Members  
##  Type  |    Name     | Value
Region | data-policy | REPLICATE
       | size        | 0
       | scope       | distributed-ack

此外,我们的临时目录下的文件系统上应该有一些目录,称为“locator”和“server1”。

有了这个输出,我们知道我们已经准备好继续前进了。

3. Maven依赖

现在我们有了一个正在运行的 Geode,让我们开始查看客户端代码。

要在我们的 Java 代码中使用 Geode,我们需要将 Apache Geode Java 客户端 库添加到我们的 pom中:

<dependency>
     <groupId>org.apache.geode</groupId>
     <artifactId>geode-core</artifactId>
     <version>1.6.0</version>
</dependency>

让我们从在几个区域中简单地存储和检索一些数据开始。

4. 简单的存储和检索

让我们演示如何存储单个值、批量值以及自定义对象。

要开始在我们的“blogdemo”区域中存储数据,让我们使用定位器连接到它:

@Before
public void connect() {
    this.cache = new ClientCacheFactory()
      .addPoolLocator("localhost", 10334)
        .create();
    this.region = cache.<String, String> 
      createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
        .create("blogdemo");
}

4.1.保存单个值

现在,我们可以简单地在我们的区域中存储和检索数据:

@Test
public void whenSendMessageToRegion_thenMessageSavedSuccessfully() {
    this.region.put("A", "Hello");
    this.region.put("B", "Blogdemo");
    assertEquals("Hello", region.get("A"));
    assertEquals("Blogdemo", region.get("B"));
}

4.2. 一次保存多个值

我们还可以一次保存多个值,例如在尝试减少网络延迟时:

@Test
public void whenPutMultipleValuesAtOnce_thenValuesSavedSuccessfully() {
    Supplier<Stream<String>> keys = () -> Stream.of("A", "B", "C", "D", "E");
    Map<String, String> values = keys.get()
        .collect(Collectors.toMap(Function.identity(), String::toLowerCase));
    this.region.putAll(values);
    keys.get()
        .forEach(k -> assertEquals(k.toLowerCase(), this.region.get(k)));
}

4.3. 保存自定义对象

字符串很有用,但我们迟早需要存储自定义对象。

假设我们有一个要使用以下键类型存储的客户记录:

public class CustomerKey implements Serializable {
    private long id;
    private String country;
    
    // getters and setters
    // equals and hashcode
}

以及以下值类型:

public class Customer implements Serializable {
    private CustomerKey key;
    private String firstName;
    private String lastName;
    private Integer age;
    
    // getters and setters 
}

有几个额外的步骤可以存储这些:

首先,**他们应该实现 Serializable。**虽然这不是一个严格的要求,但通过使它们可序列化 ,Geode 可以更健壮地存储它们。

其次,它们需要位于我们应用程序的类路径以及 Geode Server的类路径中。

为了让它们进入服务器的类路径,让我们将它们打包,比如使用 mvn clean package

然后我们可以在新的启动服务器命令中引用生成的 jar  :

gfsh> stop server --name=server1
gfsh> start server --name=server1 --classpath=../lib/apache-geode-1.0-SNAPSHOT.jar --server-port=0

同样,我们必须从临时目录运行这些命令。

最后,让我们使用与创建“blogdemo”区域相同的命令在server上创建一个名为*“blogdemo-customers”*的新区域:

gfsh> create region --name=blogdemo-customers --type=REPLICATE

在代码中,我们将像以前一样访问定位器,指定自定义类型:

@Before
public void connect() {
    // ... connect through the locator
    this.customerRegion = this.cache.<CustomerKey, Customer> 
      createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
        .create("blogdemo-customers");
}

然后,我们可以像以前一样存储我们的客户:

@Test
public void whenPutCustomKey_thenValuesSavedSuccessfully() {
    CustomerKey key = new CustomerKey(123);
    Customer customer = new Customer(key, "William", "Russell", 35);
    this.customerRegion.put(key, customer);
    Customer storedCustomer = this.customerRegion.get(key);
    assertEquals("William", storedCustomer.getFirstName());
    assertEquals("Russell", storedCustomer.getLastName());
}

5. 区域类型

对于大多数环境,我们将拥有多个副本或多个分区,具体取决于我们的读写吞吐量要求。

到目前为止,我们已经使用了内存中的复制区域。让我们仔细看看。

5.1. 复制区域

顾名思义,**复制区域在多个server上维护其数据的副本。**让我们测试一下。

从工作目录中的gfsh控制台,让我们再向集群添加一个名为server2server

gfsh> start server --name=server2 --classpath=../lib/apache-geode-1.0-SNAPSHOT.jar --server-port=0

请记住,当我们制作“blogdemo”时,我们使用了*–type=REPLICATE*。因此,Geode 会自动将我们的数据复制到新服务器。

让我们通过停止server1 来验证这一点:

gfsh> stop server --name=server1

然后,让我们对“blogdemo”区域执行快速查询。

如果数据复制成功,我们将返回结果:

gfsh> query --query='select e.key from /blogdemo.entries e'
Result : true
Limit  : 100
Rows   : 5
## Result
C
B
A 
E
D

所以,看起来复制成功了!

向我们的区域添加副本可提高数据可用性。而且,因为不止一台服务器可以响应查询,我们也将获得更高的读取吞吐量。

但是,**如果他们俩都崩溃了怎么办?由于这些是内存区域,因此数据将丢失。**为此,我们可以改为使用 –type=REPLICATE_PERSISTENT,它还在复制时将数据存储在磁盘上。

5.2. 分区区域

对于更大的数据集,我们可以通过配置 Geode 将一个区域拆分为单独的分区或存储桶来更好地扩展系统。

让我们创建一个名为“blogdemo-partitioned”的分区区域:

gfsh> create region --name=blogdemo-partitioned --type=PARTITION

添加一些数据:

gfsh> put --region=blogdemo-partitioned --key="1" --value="one"
gfsh> put --region=blogdemo-partitioned --key="2" --value="two"
gfsh> put --region=blogdemo-partitioned --key="3" --value="three"

并快速验证:

gfsh> query --query='select e.key, e.value from /blogdemo-partitioned.entries e'
Result : true
Limit  : 100
Rows   : 3
key | value
--- | -----
2   | two
1   | one
3   | three

然后,为了验证数据是否已分区,让我们再次停止server1并重新查询:

gfsh> stop server --name=server1
gfsh> query --query='select e.key, e.value from /blogdemo-partitioned.entries e'
Result : true
Limit  : 100
Rows   : 1
key | value
--- | -----
2   | two

这次我们只取回了一些数据条目,因为那台服务器只有一个数据分区,所以当 server1掉线时,它的数据也丢失了。

**但是如果我们需要分区和冗余呢?**Geode 还支持许多其他类型 。以下三个很方便:

  • PARTITION_REDUNDANT在集群的不同成员之间分区和复制我们的数据
  • PARTITION_PERSISTENTPARTITION 一样对数据进行分区,但是到磁盘,并且
  • PARTITION_REDUNDANT_PERSISTENT为我们提供了所有三种行为。

6. 对象查询语言

Geode 还支持对象查询语言或 OQL,它比简单的键查找功能更强大。这有点像 SQL。

对于这个例子,让我们使用我们之前构建的“blogdemo-customer”区域。

如果我们再添加几个客户:

Map<CustomerKey, Customer> data = new HashMap<>();
data.put(new CustomerKey(1), new Customer("Gheorge", "Manuc", 36));
data.put(new CustomerKey(2), new Customer("Allan", "McDowell", 43));
this.customerRegion.putAll(data);

然后我们可以使用 QueryService查找名字为“Allan”的客户:

QueryService queryService = this.cache.getQueryService();
String query = 
  "select * from /blogdemo-customers c where c.firstName = 'Allan'";
SelectResults<Customer> results =
  (SelectResults<Customer>) queryService.newQuery(query).execute();
assertEquals(1, results.size());

7. 功能

内存数据网格更强大的概念之一是“将计算应用于数据”的想法。

简而言之,由于 Geode 是纯 Java,我们不仅可以轻松发送数据,还可以轻松地对这些数据执行逻辑。

这可能会让我们想起 PL-SQL 或 Transact-SQL 等 SQL 扩展的想法。

7.1. 定义函数

为了给 Geode 定义一个工作单元,我们实现了 Geode 的 Function接口。

例如,假设我们需要将所有客户的姓名都更改为大写。

我们可以只实现Function,而不是查询数据并让我们的应用程序完成工作:

public class UpperCaseNames implements Function<Boolean> {
    @Override
    public void execute(FunctionContext<Boolean> context) {
        RegionFunctionContext regionContext = (RegionFunctionContext) context;
        Region<CustomerKey, Customer> region = regionContext.getDataSet();
        for ( Map.Entry<CustomerKey, Customer> entry : region.entrySet() ) {
            Customer customer = entry.getValue();
            customer.setFirstName(customer.getFirstName().toUpperCase());
        }
        context.getResultSender().lastResult(true);   
    }
    @Override
    public String getId() {
        return getClass().getName();
    }
}

请注意,  getId必须返回一个唯一值,因此类名通常是一个不错的选择。

FunctionContext包含我们所有的区域数据,因此我们可以对其进行更复杂的查询,或者像我们在这里所做的那样,对其进行变异。

而且Function比这更强大,所以请查看官方手册 ,尤其是getResultSender 方法。

7.2. 部署功能

我们需要让 Geode 知道我们的函数才能运行它。就像我们对自定义数据类型所做的那样,我们将打包 jar。

但这一次,我们可以只使用 deploy命令:

gfsh> deploy --jar=./lib/apache-geode-1.0-SNAPSHOT.jar

7.3. 执行功能

现在,我们可以使用FunctionService 从应用程序中执行Function

@Test
public void whenExecuteUppercaseNames_thenCustomerNamesAreUppercased() {
    Execution execution = FunctionService.onRegion(this.customerRegion);
    execution.execute(UpperCaseNames.class.getName());
    Customer customer = this.customerRegion.get(new CustomerKey(1));
    assertEquals("GHEORGE", customer.getFirstName());
}