Cassandra 简介
1. 概述
本教程是使用 Java 的Apache Cassandra 数据库的介绍性指南。
您将找到解释的关键概念,以及一个涵盖从 Java 连接和开始使用此 NoSQL 数据库的基本步骤的工作示例。
2. Cassandra
Cassandra 是一个可扩展的 NoSQL 数据库,可提供无单点故障的持续可用性,并能够以卓越的性能处理大量数据。 该数据库使用环形设计而不是使用主从架构。在环设计中,没有主节点——所有参与节点都是相同的,并且作为对等点相互通信。
这使得 Cassandra 成为一个水平可扩展的系统,允许在不需要重新配置的情况下增量添加节点。
2.1. 关键概念
让我们从对 Cassandra 的一些关键概念的简短调查开始:
- **Cluster **——以环形架构排列的节点或数据中心的集合。必须为每个集群分配一个名称,该名称随后将由参与节点使用
- **Keyspace **——如果你来自关系数据库,那么模式就是 Cassandra 中的相应键空间。键空间是 Cassandra 中数据的最外层容器。每个键空间设置的主要属性是Replication Factor、Replica Placement Strategy和Column Families
- **Column Family **——Cassandra 中的列族就像关系数据库中的表。每个 Column Family 包含由*Map<RowKey, SortedMap<ColumnKey, ColumnValue»*表示的行集合。密钥提供了一起访问相关数据的能力
- **Column **——Cassandra 中的列是一种数据结构,其中包含列名、值和时间戳。与数据结构良好的关系数据库相比,每行中的列和列数可能会有所不同
3. 使用 Java 客户端
3.1. Maven 依赖
我们需要在pom.xml中定义以下 Cassandra 依赖项,最新版本可以在这里 找到:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.0</version>
</dependency>
为了使用嵌入式数据库服务器测试代码,我们还应该添加cassandra-unit依赖项,最新版本可以在这里 找到:
<dependency>
<groupId>org.cassandraunit</groupId>
<artifactId>cassandra-unit</artifactId>
<version>3.0.0.1</version>
</dependency>
3.2. 连接到 Cassandra
为了从 Java 连接到 Cassandra,我们需要构建一个Cluster对象。
需要提供节点的地址作为联系点。如果我们不提供端口号,将使用默认端口 (9042)。
这些设置允许驱动程序发现集群的当前拓扑。
public class CassandraConnector {
private Cluster cluster;
private Session session;
public void connect(String node, Integer port) {
Builder b = Cluster.builder().addContactPoint(node);
if (port != null) {
b.withPort(port);
}
cluster = b.build();
session = cluster.connect();
}
public Session getSession() {
return this.session;
}
public void close() {
session.close();
cluster.close();
}
}
3.3. 创建键空间
让我们创建我们的“library”键空间:
public void createKeyspace(
String keyspaceName, String replicationStrategy, int replicationFactor) {
StringBuilder sb =
new StringBuilder("CREATE KEYSPACE IF NOT EXISTS ")
.append(keyspaceName).append(" WITH replication = {")
.append("'class':'").append(replicationStrategy)
.append("','replication_factor':").append(replicationFactor)
.append("};");
String query = sb.toString();
session.execute(query);
}
除了keyspaceName之外,我们还需要定义两个参数,replicationFactor和replicationStrategy。这些参数分别决定了副本的数量以及副本将如何分布在环上。
借助复制,Cassandra 通过将数据副本存储在多个节点中来确保可靠性和容错性。
此时我们可以测试我们的键空间是否已成功创建:
private KeyspaceRepository schemaRepository;
private Session session;
@Before
public void connect() {
CassandraConnector client = new CassandraConnector();
client.connect("127.0.0.1", 9142);
this.session = client.getSession();
schemaRepository = new KeyspaceRepository(session);
}
@Test
public void whenCreatingAKeyspace_thenCreated() {
String keyspaceName = "library";
schemaRepository.createKeyspace(keyspaceName, "SimpleStrategy", 1);
ResultSet result =
session.execute("SELECT * FROM system_schema.keyspaces;");
List<String> matchedKeyspaces = result.all()
.stream()
.filter(r -> r.getString(0).equals(keyspaceName.toLowerCase()))
.map(r -> r.getString(0))
.collect(Collectors.toList());
assertEquals(matchedKeyspaces.size(), 1);
assertTrue(matchedKeyspaces.get(0).equals(keyspaceName.toLowerCase()));
}
3.4. 创建列族
现在,我们可以将第一个列族“books”添加到现有的键空间中:
private static final String TABLE_NAME = "books";
private Session session;
public void createTable() {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append(TABLE_NAME).append("(")
.append("id uuid PRIMARY KEY, ")
.append("title text,")
.append("subject text);");
String query = sb.toString();
session.execute(query);
}
下面提供了测试列族是否已创建的代码:
private BookRepository bookRepository;
private Session session;
@Before
public void connect() {
CassandraConnector client = new CassandraConnector();
client.connect("127.0.0.1", 9142);
this.session = client.getSession();
bookRepository = new BookRepository(session);
}
@Test
public void whenCreatingATable_thenCreatedCorrectly() {
bookRepository.createTable();
ResultSet result = session.execute(
"SELECT * FROM " + KEYSPACE_NAME + ".books;");
List<String> columnNames =
result.getColumnDefinitions().asList().stream()
.map(cl -> cl.getName())
.collect(Collectors.toList());
assertEquals(columnNames.size(), 3);
assertTrue(columnNames.contains("id"));
assertTrue(columnNames.contains("title"));
assertTrue(columnNames.contains("subject"));
}
3.5. 更改列族
一本书也有一个出版商,但是在创建的表中找不到这样的列。我们可以使用以下代码来更改表并添加新列:
public void alterTablebooks(String columnName, String columnType) {
StringBuilder sb = new StringBuilder("ALTER TABLE ")
.append(TABLE_NAME).append(" ADD ")
.append(columnName).append(" ")
.append(columnType).append(";");
String query = sb.toString();
session.execute(query);
}
让我们确保已添加新的列publisher:
@Test
public void whenAlteringTable_thenAddedColumnExists() {
bookRepository.createTable();
bookRepository.alterTablebooks("publisher", "text");
ResultSet result = session.execute(
"SELECT * FROM " + KEYSPACE_NAME + "." + "books" + ";");
boolean columnExists = result.getColumnDefinitions().asList().stream()
.anyMatch(cl -> cl.getName().equals("publisher"));
assertTrue(columnExists);
}
3.6. 在列族中插入数据
现在已经创建了books表,我们准备开始向表中添加数据:
public void insertbookByTitle(Book book) {
StringBuilder sb = new StringBuilder("INSERT INTO ")
.append(TABLE_NAME_BY_TITLE).append("(id, title) ")
.append("VALUES (").append(book.getId())
.append(", '").append(book.getTitle()).append("');");
String query = sb.toString();
session.execute(query);
}
‘books’ 表中添加了一个新行,因此我们可以测试该行是否存在:
@Test
public void whenAddingANewBook_thenBookExists() {
bookRepository.createTableBooksByTitle();
String title = "Effective Java";
Book book = new Book(UUIDs.timeBased(), title, "Programming");
bookRepository.insertbookByTitle(book);
Book savedBook = bookRepository.selectByTitle(title);
assertEquals(book.getTitle(), savedBook.getTitle());
}
在上面的测试代码中,我们使用了不同的方法来创建一个名为booksByTitle 的表:
public void createTableBooksByTitle() {
StringBuilder sb = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append("booksByTitle").append("(")
.append("id uuid, ")
.append("title text,")
.append("PRIMARY KEY (title, id));");
String query = sb.toString();
session.execute(query);
}
在 Cassandra 中,最佳实践之一是使用一个表查询模式。这意味着,对于不同的查询,需要不同的表。
在我们的示例中,我们选择按书名选择一本书。为了满足selectByTitle查询,我们使用列title和id创建了一个带有复合PRIMARY KEY的表。列title是分区键,而id列是集群键。
这样,您的数据模型中的许多表都包含重复数据。这不是这个数据库的缺点。相反,这种做法优化了读取的性能。
让我们看看当前保存在我们表中的数据:
public List<Book> selectAll() {
StringBuilder sb =
new StringBuilder("SELECT * FROM ").append(TABLE_NAME);
String query = sb.toString();
ResultSet rs = session.execute(query);
List<Book> books = new ArrayList<Book>();
rs.forEach(r -> {
books.add(new Book(
r.getUUID("id"),
r.getString("title"),
r.getString("subject")));
});
return books;
}
查询返回预期结果的测试:
@Test
public void whenSelectingAll_thenReturnAllRecords() {
bookRepository.createTable();
Book book = new Book(
UUIDs.timeBased(), "Effective Java", "Programming");
bookRepository.insertbook(book);
book = new Book(
UUIDs.timeBased(), "Clean Code", "Programming");
bookRepository.insertbook(book);
List<Book> books = bookRepository.selectAll();
assertEquals(2, books.size());
assertTrue(books.stream().anyMatch(b -> b.getTitle()
.equals("Effective Java")));
assertTrue(books.stream().anyMatch(b -> b.getTitle()
.equals("Clean Code")));
}
到目前为止一切都很好,但必须意识到一件事。我们开始使用表books ,但与此同时,为了满足按title 列的select 查询,我们必须创建另一个名为booksByTitle 的表。
这两个表是相同的,包含重复的列,但我们只在booksByTitle表中插入了数据。因此,当前两个表中的数据不一致。
我们可以使用*batch *查询来解决这个问题,它包含两个插入语句,每个表一个。*batch *查询将多个 DML 语句作为单个操作执行。
提供了此类查询的示例:
public void insertBookBatch(Book book) {
StringBuilder sb = new StringBuilder("BEGIN BATCH ")
.append("INSERT INTO ").append(TABLE_NAME)
.append("(id, title, subject) ")
.append("VALUES (").append(book.getId()).append(", '")
.append(book.getTitle()).append("', '")
.append(book.getSubject()).append("');")
.append("INSERT INTO ")
.append(TABLE_NAME_BY_TITLE).append("(id, title) ")
.append("VALUES (").append(book.getId()).append(", '")
.append(book.getTitle()).append("');")
.append("APPLY BATCH;");
String query = sb.toString();
session.execute(query);
}
我们再次测试批量查询结果,如下所示:
@Test
public void whenAddingANewBookBatch_ThenBookAddedInAllTables() {
bookRepository.createTable();
bookRepository.createTableBooksByTitle();
String title = "Effective Java";
Book book = new Book(UUIDs.timeBased(), title, "Programming");
bookRepository.insertBookBatch(book);
List<Book> books = bookRepository.selectAll();
assertEquals(1, books.size());
assertTrue(
books.stream().anyMatch(
b -> b.getTitle().equals("Effective Java")));
List<Book> booksByTitle = bookRepository.selectAllBookByTitle();
assertEquals(1, booksByTitle.size());
assertTrue(
booksByTitle.stream().anyMatch(
b -> b.getTitle().equals("Effective Java")));
}
注意:从 3.0 版开始,提供了一个名为“Materialized Views”的新功能,我们可以使用它来代替*batch *查询。此处 提供了一个有据可查的“物化视图”示例。
3.7. 删除列族
下面的代码展示了如何删除一个表:
public void deleteTable() {
StringBuilder sb =
new StringBuilder("DROP TABLE IF EXISTS ").append(TABLE_NAME);
String query = sb.toString();
session.execute(query);
}
选择键空间中不存在的表会导致InvalidQueryException: unconfigured table books:
@Test(expected = InvalidQueryException.class)
public void whenDeletingATable_thenUnconfiguredTable() {
bookRepository.createTable();
bookRepository.deleteTable("books");
session.execute("SELECT * FROM " + KEYSPACE_NAME + ".books;");
}
3.8. 删除键空间
最后,让我们删除键空间:
public void deleteKeyspace(String keyspaceName) {
StringBuilder sb =
new StringBuilder("DROP KEYSPACE ").append(keyspaceName);
String query = sb.toString();
session.execute(query);
}
并测试密钥空间是否已被删除:
@Test
public void whenDeletingAKeyspace_thenDoesNotExist() {
String keyspaceName = "library";
schemaRepository.deleteKeyspace(keyspaceName);
ResultSet result =
session.execute("SELECT * FROM system_schema.keyspaces;");
boolean isKeyspaceCreated = result.all().stream()
.anyMatch(r -> r.getString(0).equals(keyspaceName.toLowerCase()));
assertFalse(isKeyspaceCreated);
}