Contents

使用Cassandra,Astra,Rest&GraphQL构建仪表板 - 录制状态更新

1. 简介

在我们之前的文章 中,我们研究了使用DataStax Astra 构建一个仪表板来查看复仇者联盟的当前状态,这是一个由Apache Cassandra 提供支持的 DBaaS,它使用Stargate 提供额外的 API 来使用它。

/uploads/cassandra_astra_rest_dashboard_updates/1.png

**在本文中,我们将扩展它以存储离散事件而不是汇总的摘要。这将允许我们在 UI 中查看这些事件。**我们将允许用户点击一张卡片并获得一个表格,其中列出了导致我们达到这一点的事件。与摘要不同,这些事件将分别代表一个复仇者和一个离散的时间点。每次收到新事件时,它将与所有其他事件一起附加到表中。

为此,我们使用 Cassandra,因为它提供了一种非常有效的方式来存储时间序列数据 ,我们在其中写入的频率远高于读取的频率。这里的目标是一个可以频繁更新的系统——例如,每 30 秒一次——然后可以让用户轻松查看已记录的最新事件。

2. 构建数据库模式

与我们在上一篇文章中使用的 Document API 不同,它将使用RESTGraphQL API 构建。这些工作在 Cassandra 表之上,这些 API 可以完全相互配合以及 CQL API。

为了使用这些,我们需要已经为我们存储数据的表定义了一个模式。我们使用的表旨在使用特定的模式——按照事件发生时间的顺序查找给定 Avenger 的事件。

此架构将如下所示:

CREATE TABLE events (
    avenger text,
    timestamp timestamp,
    latitude decimal,
    longitude decimal,
    status decimal,
    PRIMARY KEY (avenger, timestamp)
) WITH CLUSTERING ORDER BY (timestamp DESC);

使用与此类似的数据:

avenger timestamp latitude longitude status
 鹘  2021-05-16 09:00:30.000000+0000  40.715255  -73.975353  0.999954
 鹰眼  2021-05-16 09:00:30.000000+0000  40.714602  -73.975238  0.99986
 鹰眼  2021-05-16 09:01:00.000000+0000  40.713572  -73.975289  0.999804

这将我们的表定义为具有多行分区 ,分区键为“avenger”,集群键为“timestamp”。Cassandra 使用分区键来确定数据存储在哪个节点上。聚类键用于确定数据在分区内的存储顺序。

通过指出“avenger”是我们的分区键,它将确保同一个 Avenger 的所有数据都保存在一起。通过指示“时间戳”是我们的集群键,它将以最有效的顺序存储该分区中的数据,以便我们检索。鉴于我们对该数据的核心查询是为单个 Avenger 选择每个事件——我们的分区键——按事件的时间戳排序——我们的集群键——Cassandra 可以让我们非常有效地访问它。

此外,应用程序的设计使用方式意味着我们在近乎连续的基础上写入事件数据。例如,我们可能每 30 秒从每个 Avenger 那里获得一个新事件。以这种方式构建我们的表可以非常高效地将新事件插入到正确分区中的正确位置。

3. 使用 Astra、REST 和 GraphQL API 构建客户端层

我们将使用 REST 和 GraphQL API 与 Astra 进行交互,用于不同的目的。REST API 将用于将新事件插入表中。GraphQL API 将用于再次检索它们。

为了最好地做到这一点,我们需要一个可以执行与 Astra 交互的客户端层。这些与我们在上一篇文章中为其他两个 API 构建的DocumentClient类等效。

3.1. REST 客户端

首先,我们的 REST 客户端。我们将使用它来插入新的、完整的记录,因此只需要一个方法来插入数据:

@Repository
public class RestClient {
  @Value("https://${ASTRA_DB_ID}-${ASTRA_DB_REGION}.apps.astra.datastax.com/api/rest/v2/keyspaces/${ASTRA_DB_KEYSPACE}")
  private String baseUrl;
  @Value("${ASTRA_DB_APPLICATION_TOKEN}")
  private String token;
  private RestTemplate restTemplate;
  public RestClient() {
    this.restTemplate = new RestTemplate();
    this.restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
  }
  public <T> void createRecord(String table, T record) {
    var uri = UriComponentsBuilder.fromHttpUrl(baseUrl)
      .pathSegment(table)
      .build()
      .toUri();
    var request = RequestEntity.post(uri)
      .header("X-Cassandra-Token", token)
      .body(record);
    restTemplate.exchange(request, Map.class);
  }
}

3.2. GraphQL 客户端

然后,我们的 GraphQL 客户端。这次我们进行了一个完整的 GraphQL 查询并返回它获取的数据

@Repository
public class GraphqlClient {
  @Value("https://${ASTRA_DB_ID}-${ASTRA_DB_REGION}.apps.astra.datastax.com/api/graphql/${ASTRA_DB_KEYSPACE}")
  private String baseUrl;
  @Value("${ASTRA_DB_APPLICATION_TOKEN}")
  private String token;
  private RestTemplate restTemplate;
  public GraphqlClient() {
    this.restTemplate = new RestTemplate();
    this.restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
  }
  public <T> T query(String query, Class<T> cls) {
    var request = RequestEntity.post(baseUrl)
      .header("X-Cassandra-Token", token)
      .body(Map.of("query", query));
    var response = restTemplate.exchange(request, cls);
  
    return response.getBody();
  }
}

和以前一样,我们的baseUrltoken字段是从我们定义如何与 Astra 对话的属性中配置的。这些客户端类每个都知道如何构建与数据库交互所需的完整 URL。我们可以使用它们发出正确的 HTTP 请求以执行所需的操作。

这就是与 Astra 交互所需的全部内容,因为这些 API 通过简单地通过 HTTP 交换 JSON 文档来工作。

4. 记录个别事件

为了显示事件,我们需要能够记录它们。这将建立在我们之前更新*statuses 表的功能之上,并将另外插入新记录到events *表中。

4.1. 插入事件

我们需要的第一件事是表示该表中的数据。这将表示为 Java 记录:

public record Event(String avenger, 
  String timestamp,
  Double latitude,
  Double longitude,
  Double status) {}

**这与我们之前定义的模式直接相关。**当我们实际进行 API 调用时,Jackson 会将其转换为 REST API 的正确 JSON。

接下来,我们需要我们的服务层来实际记录这些。这将从外部获取适当的细节,用时间戳增加它们并调用我们的 REST 客户端来创建新记录:

@Service
public class EventsService {
  @Autowired
  private RestClient restClient;
  public void createEvent(String avenger, Double latitude, Double longitude, Double status) {
    var event = new Event(avenger, Instant.now().toString(), latitude, longitude, status);
    restClient.createRecord("events", event);
  }
}

4.2. 更新 API

最后,我们需要一个控制器来接收事件。这是对我们在上一篇文章中编写的UpdateController的扩展,以连接新的EventsService,然后从我们的update方法中调用它。

@RestController
public class UpdateController {
  ......
  @Autowired
  private EventsService eventsService;
  @PostMapping("/update/{avenger}")
  public void update(@PathVariable String avenger, @RequestBody UpdateBody body) throws Exception {
    eventsService.createEvent(avenger, body.lat(), body.lng(), body.status());
    statusesService.updateStatus(avenger, lookupLocation(body.lat(), body.lng()), getStatus(body.status()));
  }
  ......
}

此时,调用我们的 API 来记录 Avenger 的状态将更新状态文档并将新记录插入事件表中。这将允许我们记录发生的每个更新事件。

**这意味着每次我们收到更新 Avenger 状态的调用时,我们都会在此表中添加一条新记录。**实际上,我们需要通过修剪或添加额外的分区来支持存储的数据规模,但这超出了本文的范围。

5. 通过 GraphQL API 向用户提供事件

一旦我们的表中有事件,下一步就是将它们提供给用户。我们将使用 GraphQL API 实现这一点,一次为给定的 Avenger 检索一页事件,始终按顺序排列,以便最近的事件排在第一位

使用 GraphQL,我们还可以只检索我们真正感兴趣的字段子集,而不是全部。如果我们要获取大量记录,那么这可以帮助减小有效负载大小,从而提高性能。

5.1. 检索事件

**我们需要的第一件事是我们正在检索的数据的表示。**这是存储在表中的实际数据的子集。因此,我们需要一个不同的类来表示它:

public record EventSummary(String timestamp,
  Double latitude,
  Double longitude,
  Double status) {}

我们还需要一个类来表示这些列表的 GraphQL 响应。这将包括事件摘要列表和用于将光标移至下一页的页面状态:

public record Events(List<EventSummary> values, String pageState) {}

我们现在可以在事件服务中创建一个新方法来实际执行搜索。

public class EventsService {
  ......
  @Autowired
  private GraphqlClient graphqlClient;
  public Events getEvents(String avenger, String offset) {
    var query = "query {" + 
      "  events(filter:{avenger:{eq:\"%s\"}}, orderBy:[timestamp_DESC], options:{pageSize:5, pageState:%s}) {" +
      "    pageState " +
      "    values {" +
      "     timestamp " +
      "     latitude " +
      "     longitude " +
      "     status" +
      "   }" +
      "  }" +
      "}";
    var fullQuery = String.format(query, avenger, offset == null ? "null" : "\"" + offset + "\"");
    return graphqlClient.query(fullQuery, EventsGraphqlResponse.class).data().events();
  }
  private static record EventsResponse(Events events) {}
  private static record EventsGraphqlResponse(EventsResponse data) {}
}

在这里,我们有几个内部类,它们的存在纯粹是为了表示 GraphQL API 返回的 JSON 结构,一直到我们感兴趣的部分——这些完全是 GraphQL API 的产物。

然后,我们有一个方法可以为我们想要的详细信息构造一个 GraphQL 查询,按 avenger 字段过滤并按 timestamp 字段按降序排序。我们将实际的 Avenger ID 和要使用的页面状态代入其中,然后将其传递给我们的 GraphQL 客户端以获取实际数据。

5.2. 在 UI 中显示事件

现在我们可以从数据库中获取事件,然后我们可以将其连接到我们的 UI。

首先,我们将更新我们在上一篇文章中编写的StatusesController以支持 UI 端点获取事件:

public class StatusesController {
  ......
  @Autowired
  private EventsService eventsService;
  @GetMapping("/avenger/{avenger}")
  public Object getAvengerStatus(@PathVariable String avenger, @RequestParam(required = false) String page) {
    var result = new ModelAndView("dashboard");
    result.addObject("avenger", avenger);
    result.addObject("statuses", statusesService.getStatuses());
    result.addObject("events", eventsService.getEvents(avenger, page));
    return result;
  }
}

然后我们需要更新我们的模板来呈现事件表。我们将在dashboard.html文件中添加一个新表,该表仅在从控制器接收到的模型中存在events对象时才会呈现:

......
    <div th:if="${events}">
      <div class="row">
        <table class="table">
          <thead>
            <tr>
              <th scope="col">Timestamp</th>
              <th scope="col">Latitude</th>
              <th scope="col">Longitude</th>
              <th scope="col">Status</th>
            </tr>
          </thead>
          <tbody>
            <tr th:each="data, iterstat: ${events.values}">
              <th scope="row" th:text="${data.timestamp}">
                </td>
              <td th:text="${data.latitude}"></td>
              <td th:text="${data.longitude}"></td>
              <td th:text="${(data.status * 100) + '%'}"></td>
            </tr>
          </tbody>
        </table>
      </div>
      <div class="row" th:if="${events.pageState}">
        <div class="col position-relative">
          <a th:href="@{/avenger/{id}(id = ${avenger}, page = ${events.pageState})}"
            class="position-absolute top-50 start-50 translate-middle">Next
            Page</a>
        </div>
      </div>
    </div>
  </div>
......

这包括底部的链接以转到下一页,该链接通过我们的事件数据和我们正在查看的复仇者的 ID 传递页面状态。

最后,我们需要更新状态卡以允许我们链接到该条目的事件表。这只是每张卡片标题周围的超链接,在status.html中呈现:

......
  <a th:href="@{/avenger/{id}(id = ${data.avenger})}">
    <h5 class="card-title" th:text="${data.name}"></h5>
  </a>
......

我们现在可以启动应用程序,并从卡片中单击以查看导致此状态的最新事件:

/uploads/cassandra_astra_rest_dashboard_updates/3.png