Contents

Cassandra 简介

1. 概述

本教程是使用 Java 的Apache Cassandra 数据库的介绍性指南。

您将找到解释的关键概念,以及一个涵盖从 Java 连接和开始使用此 NoSQL 数据库的基本步骤的工作示例。

2. Cassandra

Cassandra 是一个可扩展的 NoSQL 数据库,可提供无单点故障的持续可用性,并能够以卓越的性能处理大量数据。 该数据库使用环形设计而不是使用主从架构。在环设计中,没有主节点——所有参与节点都是相同的,并且作为对等点相互通信。

这使得 Cassandra 成为一个水平可扩展的系统,允许在不需要重新配置的情况下增量添加节点。

2.1. 关键概念

让我们从对 Cassandra 的一些关键概念的简短调查开始:

  • **Cluster **——以环形架构排列的节点或数据中心的集合。必须为每个集群分配一个名称,该名称随后将由参与节点使用
  • **Keyspace **——如果你来自关系数据库,那么模式就是 Cassandra 中的相应键空间。键空间是 Cassandra 中数据的最外层容器。每个键空间设置的主要属性是Replication FactorReplica Placement StrategyColumn Families
  • **Column Family **——Cassandra 中的列族就像关系数据库中的表。每个 Column Family 包含由*Map<RowKey, SortedMap<ColumnKey, ColumnValue»*表示的行集合。密钥提供了一起访问相关数据的能力
  • **Column **——Cassandra 中的列是一种数据结构,其中包含列名、值和时间戳。与数据结构良好的关系数据库相比,每行中的列和列数可能会有所不同

3. 使用 Java 客户端

3.1. Maven 依赖

我们需要在pom.xml中定义以下 Cassandra 依赖项,最新版本可以在这里 找到:

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.1.0</version>
</dependency>

为了使用嵌入式数据库服务器测试代码,我们还应该添加cassandra-unit依赖项,最新版本可以在这里 找到:

<dependency>
    <groupId>org.cassandraunit</groupId>
    <artifactId>cassandra-unit</artifactId>
    <version>3.0.0.1</version>
</dependency>

3.2. 连接到 Cassandra

为了从 Java 连接到 Cassandra,我们需要构建一个Cluster对象。

需要提供节点的地址作为联系点。如果我们不提供端口号,将使用默认端口 (9042)。

这些设置允许驱动程序发现集群的当前拓扑。

public class CassandraConnector {
    private Cluster cluster;
    private Session session;
    public void connect(String node, Integer port) {
        Builder b = Cluster.builder().addContactPoint(node);
        if (port != null) {
            b.withPort(port);
        }
        cluster = b.build();
        session = cluster.connect();
    }
    public Session getSession() {
        return this.session;
    }
    public void close() {
        session.close();
        cluster.close();
    }
}

3.3. 创建键空间

让我们创建我们的“library”键空间:

public void createKeyspace(
  String keyspaceName, String replicationStrategy, int replicationFactor) {
  StringBuilder sb = 
    new StringBuilder("CREATE KEYSPACE IF NOT EXISTS ")
      .append(keyspaceName).append(" WITH replication = {")
      .append("'class':'").append(replicationStrategy)
      .append("','replication_factor':").append(replicationFactor)
      .append("};");
        
    String query = sb.toString();
    session.execute(query);
}

除了keyspaceName之外,我们还需要定义两个参数,replicationFactorreplicationStrategy。这些参数分别决定了副本的数量以及副本将如何分布在环上。

借助复制,Cassandra 通过将数据副本存储在多个节点中来确保可靠性和容错性。

此时我们可以测试我们的键空间是否已成功创建:

private KeyspaceRepository schemaRepository;
private Session session;
@Before
public void connect() {
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
    schemaRepository = new KeyspaceRepository(session);
}
@Test
public void whenCreatingAKeyspace_thenCreated() {
    String keyspaceName = "library";
    schemaRepository.createKeyspace(keyspaceName, "SimpleStrategy", 1);
    ResultSet result = 
      session.execute("SELECT * FROM system_schema.keyspaces;");
    List<String> matchedKeyspaces = result.all()
      .stream()
      .filter(r -> r.getString(0).equals(keyspaceName.toLowerCase()))
      .map(r -> r.getString(0))
      .collect(Collectors.toList());
    assertEquals(matchedKeyspaces.size(), 1);
    assertTrue(matchedKeyspaces.get(0).equals(keyspaceName.toLowerCase()));
}

3.4. 创建列族

现在,我们可以将第一个列族“books”添加到现有的键空间中:

private static final String TABLE_NAME = "books";
private Session session;
public void createTable() {
    StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
      .append(TABLE_NAME).append("(")
      .append("id uuid PRIMARY KEY, ")
      .append("title text,")
      .append("subject text);");
    String query = sb.toString();
    session.execute(query);
}

下面提供了测试列族是否已创建的代码:

private BookRepository bookRepository;
private Session session;
@Before
public void connect() {
    CassandraConnector client = new CassandraConnector();
    client.connect("127.0.0.1", 9142);
    this.session = client.getSession();
    bookRepository = new BookRepository(session);
}
@Test
public void whenCreatingATable_thenCreatedCorrectly() {
    bookRepository.createTable();
    ResultSet result = session.execute(
      "SELECT * FROM " + KEYSPACE_NAME + ".books;");
    List<String> columnNames = 
      result.getColumnDefinitions().asList().stream()
      .map(cl -> cl.getName())
      .collect(Collectors.toList());
        
    assertEquals(columnNames.size(), 3);
    assertTrue(columnNames.contains("id"));
    assertTrue(columnNames.contains("title"));
    assertTrue(columnNames.contains("subject"));
}

3.5. 更改列族

一本书也有一个出版商,但是在创建的表中找不到这样的列。我们可以使用以下代码来更改表并添加新列:

public void alterTablebooks(String columnName, String columnType) {
    StringBuilder sb = new StringBuilder("ALTER TABLE ")
      .append(TABLE_NAME).append(" ADD ")
      .append(columnName).append(" ")
      .append(columnType).append(";");
    String query = sb.toString();
    session.execute(query);
}

让我们确保已添加新的列publisher

@Test
public void whenAlteringTable_thenAddedColumnExists() {
    bookRepository.createTable();
    bookRepository.alterTablebooks("publisher", "text");
    ResultSet result = session.execute(
      "SELECT * FROM " + KEYSPACE_NAME + "." + "books" + ";");
    boolean columnExists = result.getColumnDefinitions().asList().stream()
      .anyMatch(cl -> cl.getName().equals("publisher"));
        
    assertTrue(columnExists);
}

3.6. 在列族中插入数据

现在已经创建了books表,我们准备开始向表中添加数据:

public void insertbookByTitle(Book book) {
    StringBuilder sb = new StringBuilder("INSERT INTO ")
      .append(TABLE_NAME_BY_TITLE).append("(id, title) ")
      .append("VALUES (").append(book.getId())
      .append(", '").append(book.getTitle()).append("');");
    String query = sb.toString();
    session.execute(query);
}

‘books’ 表中添加了一个新行,因此我们可以测试该行是否存在:

@Test
public void whenAddingANewBook_thenBookExists() {
    bookRepository.createTableBooksByTitle();
    String title = "Effective Java";
    Book book = new Book(UUIDs.timeBased(), title, "Programming");
    bookRepository.insertbookByTitle(book);
        
    Book savedBook = bookRepository.selectByTitle(title);
    assertEquals(book.getTitle(), savedBook.getTitle());
}

在上面的测试代码中,我们使用了不同的方法来创建一个名为booksByTitle 的表:

public void createTableBooksByTitle() {
    StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
      .append("booksByTitle").append("(")
      .append("id uuid, ")
      .append("title text,")
      .append("PRIMARY KEY (title, id));");
    String query = sb.toString();
    session.execute(query);
}

在 Cassandra 中,最佳实践之一是使用一个表查询模式。这意味着,对于不同的查询,需要不同的表。

在我们的示例中,我们选择按书名选择一本书。为了满足selectByTitle查询,我们使用列titleid创建了一个带有复合PRIMARY KEY的表。列title是分区键,而id列是集群键。

这样,您的数据模型中的许多表都包含重复数据。这不是这个数据库的缺点。相反,这种做法优化了读取的性能。

让我们看看当前保存在我们表中的数据:

public List<Book> selectAll() {
    StringBuilder sb = 
      new StringBuilder("SELECT * FROM ").append(TABLE_NAME);
    String query = sb.toString();
    ResultSet rs = session.execute(query);
    List<Book> books = new ArrayList<Book>();
    rs.forEach(r -> {
        books.add(new Book(
          r.getUUID("id"), 
          r.getString("title"),  
          r.getString("subject")));
    });
    return books;
}

查询返回预期结果的测试:

@Test
public void whenSelectingAll_thenReturnAllRecords() {
    bookRepository.createTable();
        
    Book book = new Book(
      UUIDs.timeBased(), "Effective Java", "Programming");
    bookRepository.insertbook(book);
      
    book = new Book(
      UUIDs.timeBased(), "Clean Code", "Programming");
    bookRepository.insertbook(book);
        
    List<Book> books = bookRepository.selectAll(); 
        
    assertEquals(2, books.size());
    assertTrue(books.stream().anyMatch(b -> b.getTitle()
      .equals("Effective Java")));
    assertTrue(books.stream().anyMatch(b -> b.getTitle()
      .equals("Clean Code")));
}

到目前为止一切都很好,但必须意识到一件事。我们开始使用表books ,但与此同时,为了满足按title 列的select 查询,我们必须创建另一个名为booksByTitle 的表。

这两个表是相同的,包含重复的列,但我们只在booksByTitle表中插入了数据。因此,当前两个表中的数据不一致。

我们可以使用*batch *查询来解决这个问题,它包含两个插入语句,每个表一个。*batch *查询将多个 DML 语句作为单个操作执行。

提供了此类查询的示例:

public void insertBookBatch(Book book) {
    StringBuilder sb = new StringBuilder("BEGIN BATCH ")
      .append("INSERT INTO ").append(TABLE_NAME)
      .append("(id, title, subject) ")
      .append("VALUES (").append(book.getId()).append(", '")
      .append(book.getTitle()).append("', '")
      .append(book.getSubject()).append("');")
      .append("INSERT INTO ")
      .append(TABLE_NAME_BY_TITLE).append("(id, title) ")
      .append("VALUES (").append(book.getId()).append(", '")
      .append(book.getTitle()).append("');")
      .append("APPLY BATCH;");
    String query = sb.toString();
    session.execute(query);
}

我们再次测试批量查询结果,如下所示:

@Test
public void whenAddingANewBookBatch_ThenBookAddedInAllTables() {
    bookRepository.createTable();
        
    bookRepository.createTableBooksByTitle();
    
    String title = "Effective Java";
    Book book = new Book(UUIDs.timeBased(), title, "Programming");
    bookRepository.insertBookBatch(book);
    
    List<Book> books = bookRepository.selectAll();
    
    assertEquals(1, books.size());
    assertTrue(
      books.stream().anyMatch(
        b -> b.getTitle().equals("Effective Java")));
        
    List<Book> booksByTitle = bookRepository.selectAllBookByTitle();
    
    assertEquals(1, booksByTitle.size());
    assertTrue(
      booksByTitle.stream().anyMatch(
        b -> b.getTitle().equals("Effective Java")));
}

注意:从 3.0 版开始,提供了一个名为“Materialized Views”的新功能,我们可以使用它来代替*batch *查询。此处 提供了一个有据可查的“物化视图”示例。

3.7. 删除列族

下面的代码展示了如何删除一个表:

public void deleteTable() {
    StringBuilder sb = 
      new StringBuilder("DROP TABLE IF EXISTS ").append(TABLE_NAME);
    String query = sb.toString();
    session.execute(query);
}

选择键空间中不存在的表会导致InvalidQueryException: unconfigured table books

@Test(expected = InvalidQueryException.class)
public void whenDeletingATable_thenUnconfiguredTable() {
    bookRepository.createTable();
    bookRepository.deleteTable("books");
       
    session.execute("SELECT * FROM " + KEYSPACE_NAME + ".books;");
}

3.8. 删除键空间

最后,让我们删除键空间:

public void deleteKeyspace(String keyspaceName) {
    StringBuilder sb = 
      new StringBuilder("DROP KEYSPACE ").append(keyspaceName);
    String query = sb.toString();
    session.execute(query);
}

并测试密钥空间是否已被删除:

@Test
public void whenDeletingAKeyspace_thenDoesNotExist() {
    String keyspaceName = "library";
    schemaRepository.deleteKeyspace(keyspaceName);
    ResultSet result = 
      session.execute("SELECT * FROM system_schema.keyspaces;");
    boolean isKeyspaceCreated = result.all().stream()
      .anyMatch(r -> r.getString(0).equals(keyspaceName.toLowerCase()));
        
    assertFalse(isKeyspaceCreated);
}