Contents

在Play框架中使用Akka实现WebSocket

1.概述

当我们希望我们的 Web 客户端与我们的服务器保持对话时,WebSockets 可能是一个有用的解决方案。WebSockets 保持持久的全双工连接。这使我们能够在服务器和客户端之间发送双向消息。

在本教程中,我们将学习如何在Play Framework 中将 WebSockets 与Akka 一起使用。

2. 设置

让我们设置一个简单的聊天应用程序。用户将向服务器发送消息,服务器将响应来自JSONPlaceholder 的消息。

2.1. 设置 Play 框架应用程序

我们将使用 Play 框架构建这个应用程序。

让我们按照Introduction to Play in Java  中的说明设置和运行一个简单的 Play Framework 应用程序。

2.2. 添加必要的 JavaScript 文件

此外,我们还需要使用 JavaScript 来编写客户端脚本。这将使我们能够接收从服务器推送的新消息。我们将为此使用jQuery 库。

让我们将 jQuery 添加到app/views/index.scala.html文件的底部:

<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>

2.3. 设置 Akka

最后,我们将使用 Akka 来处理服务器端的 WebSocket 连接。

让我们导航到build.sbt文件并添加依赖项。

我们需要添加 *as-actor *和 *as-testkit *依赖项:

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion

我们需要这些能够使用和测试 Akka 框架代码。

接下来,我们将使用 Akka 流。所以让我们添加*akka-stream *依赖项:

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion

最后,我们需要从 Akka actor 调用一个 rest 端点。为此,我们需要 *akka-http *依赖项。当我们这样做时,端点将返回我们必须反序列化的 JSON 数据,因此我们还需要添加akka-http-jackson 依赖项:

libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion

现在我们都准备好了。让我们看看如何让 WebSockets 工作!

3. 使用 Akka Actor 处理 WebSocket

**Play 的 WebSocket 处理机制是围绕 Akka 流构建的。**WebSocket 被建模为流。因此,传入的 WebSocket 消息被馈送到流中,流产生的消息被发送到客户端。

要使用 Actor 处理 WebSocket,我们需要将ActorRef转换为流的 Play 实用程序ActorFlow。这主要需要一些Java代码,稍加配置。

3.1. WebSocket 控制器方法

首先,我们需要一个Materializer实例。Materializer 是流执行引擎的工厂。

我们需要将ActorSystemMaterializer注入控制器app/controllers/HomeController.java

private ActorSystem actorSystem;
private Materializer materializer;
@Inject
public HomeController(
  ActorSystem actorSystem, Materializer materializer) {
    this.actorSystem = actorSystem;
    this.materializer = materializer;
}

现在让我们添加一个套接字控制器方法:

public WebSocket socket() {
    return WebSocket.Json
      .acceptOrResult(this::createActorFlow);
}

这里我们调用函数acceptOrResult,它接受请求头并返回一个future。返回的未来是处理 WebSocket 消息的流。

相反,我们可以拒绝请求并返回拒绝结果。

现在,让我们创建流程:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      F.Either.Right(createFlowForActor()));
}

Play Framework 中的 F类定义了一组函数式编程风格的助手。在这种情况下,我们使用F.Either.Right来接受连接并返回流。

假设我们想在客户端未通过身份验证时拒绝连接。

为此,我们可以检查会话中是否设置了用户名。如果不是,我们拒绝与 HTTP 403 Forbidden 的连接:

private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>> 
  createActorFlow2(Http.RequestHeader request) {
    return CompletableFuture.completedFuture(
      request.session()
      .getOptional("username")
      .map(username -> 
        F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
          createFlowForActor()))
      .orElseGet(() -> F.Either.Left(forbidden())));
}

我们使用F.Either.Left 来拒绝连接,就像我们使用F.Either.Right提供流一样。

最后,我们将流程链接到将处理消息的参与者:

private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
    return ActorFlow.actorRef(out -> Messenger.props(out), 
      actorSystem, materializer);
}

ActorFlow.actorRef创建了一个由 Messenger actor处理的流

3.2. route文件

现在,让我们在conf/routes中添加控制器方法的route定义:

GET  /                    controllers.HomeController.index(request: Request)
GET  /chat                controllers.HomeController.socket
GET  /chat/with/streams   controllers.HomeController.akkaStreamsSocket
GET  /assets/*file        controllers.Assets.versioned(path="/public", file: Asset)

这些路由定义将传入的 HTTP 请求映射到控制器操作方法,如 Play 应用程序中的路由 中所述。

3.3. Actor 实现

Actor 类最重要的部分是 createReceive方法,它决定了 Actor 可以处理哪些消息:

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(JsonNode.class, this::onSendMessage)
      .matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
      .build();
}

Actor 会将与JsonNode类 匹配的所有消息转发到onSendMessage处理程序方法:

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    //..
    processMessage(requestDTO);
}

然后处理程序将使用processMessage方法响应每条消息:

private void processMessage(RequestDTO requestDTO) {
    CompletionStage<HttpResponse> responseFuture = getRandomMessage();
    responseFuture.thenCompose(this::consumeHttpResponse)
      .thenAccept(messageDTO ->
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}

3.4. 使用 Akka HTTP 使用 Rest API

我们将向JSONPlaceholder Posts 的虚拟消息生成器发送 HTTP 请求。当响应到达时,我们通过写出将响应发送给客户端。

让我们有一个使用随机帖子 ID 调用端点的方法:

private CompletionStage<HttpResponse> getRandomMessage() {
    int postId = ThreadLocalRandom.current().nextInt(0, 100);
    return Http.get(getContext().getSystem())
      .singleRequest(HttpRequest.create(
        "https://jsonplaceholder.typicode.com/posts/" + postId));
}

我们还在处理通过调用服务获得的HttpResponse以获得 JSON 响应:

private CompletionStage<MessageDTO> consumeHttpResponse(
  HttpResponse httpResponse) {
    Materializer materializer = 
      Materializer.matFromSystem(getContext().getSystem());
    return Jackson.unmarshaller(MessageDTO.class)
      .unmarshal(httpResponse.entity(), materializer)
      .thenApply(messageDTO -> {
          log.info("Received message: {}", messageDTO);
          discardEntity(httpResponse, materializer);
          return messageDTO;
      });
}

MessageConverter类是用于在 JsonNode和 DTO之间进行转换的 实用程序:

public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
    ObjectMapper mapper = new ObjectMapper();
    return mapper.convertValue(jsonNode, MessageDTO.class);
}

接下来,我们需要丢弃实体 。如果实体对我们没有任何用途,则discardEntityBytes便捷方法的目的是轻松丢弃实体。

让我们看看如何丢弃字节:

private void discardEntity(
  HttpResponse httpResponse, Materializer materializer) {
    HttpMessage.DiscardedEntity discarded = 
      httpResponse.discardEntityBytes(materializer);
    discarded.completionStage()
      .whenComplete((done, ex) -> 
        log.info("Entity discarded completely!"));
}

现在已经完成了 WebSocket 的处理,让我们看看如何使用 HTML5 WebSockets 设置客户端。

4. 设置 WebSocket 客户端

对于我们的客户,让我们构建一个简单的基于 Web 的聊天应用程序。

4.1. 控制器动作

我们需要定义一个呈现索引页面的控制器动作。我们将把它放在控制器类app.controllers.HomeController中:

public Result index(Http.Request request) {
    String url = routes.HomeController.socket()
      .webSocketURL(request);
    return ok(views.html.index.render(url));
}

4.2. 模板页面

现在,让我们转到app/views/ndex.scala.html页面并为接收到的消息添加一个容器和一个用于捕获新消息的表单:

<div id="messageContent"></div>F
<form>
    <textarea id="messageInput"></textarea>
    <button id="sendButton">Send</button>
</form>

我们还需要通过在app/views/index.scala.html页面顶部声明此参数来传递 WebSocket 控制器操作的 URL:

@(url: String)

4.3. JavaScript 中的 WebSocket 事件处理程序

现在,我们可以添加 JavaScript 来处理 WebSocket 事件。为简单起见,我们将在app/views/index.scala.html页面的底部添加 JavaScript 函数。

让我们声明事件处理程序:

var webSocket;
var messageInput;
function init() {
    initWebSocket();
}
function initWebSocket() {
    webSocket = new WebSocket("@url");
    webSocket.onopen = onOpen;
    webSocket.onclose = onClose;
    webSocket.onmessage = onMessage;
    webSocket.onerror = onError;
}

让我们自己添加处理程序:

function onOpen(evt) {
    writeToScreen("CONNECTED");
}
function onClose(evt) {
    writeToScreen("DISCONNECTED");
}
function onError(evt) {
    writeToScreen("ERROR: " + JSON.stringify(evt));
}
function onMessage(evt) {
    var receivedData = JSON.parse(evt.data);
    appendMessageToView("Server", receivedData.body);
}

然后,为了呈现输出,我们将使用函数appendMessageToViewwriteToScreen

function appendMessageToView(title, message) {
    $("#messageContent").append("<p>" + title + ": " + message + "</p>");
}
function writeToScreen(message) {
    console.log("New message: ", message);
}

4.4. 运行和测试应用程序

我们已准备好测试应用程序,让我们运行它:

cd websockets
sbt run

随着应用程序的运行,我们可以通过访问*http://localhost:9000 *与服务器聊天:

/uploads/akka_play_websockets/1.png

每次我们输入消息并点击send时,服务器都会立即响应来自 JSON 占位符服务的一些lorem ipsum

5. 使用 Akka Streams 直接处理 WebSockets

如果我们正在处理来自源的事件流并将其发送到客户端,那么我们可以围绕 Akka 流进行建模。

让我们看看如何在服务器每两秒发送一次消息的示例中使用 Akka 流。

我们将从HomeController中的 WebSocket 操作开始:

public WebSocket akkaStreamsSocket() {
    return WebSocket.Json.accept(request -> {
        Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
        MessageDTO messageDTO = 
          new MessageDTO("1", "1", "Title", "Test Body");
        Source<JsonNode, ?> out = Source.tick(
          Duration.ofSeconds(2),
          Duration.ofSeconds(2),
          MessageConverter.messageToJsonNode(messageDTO)
        );
        return Flow.fromSinkAndSource(in, out);
    });
}

Source#tick方法采用三个参数。第一个是处理第一个滴答之前的初始延迟,第二个是连续滴答之间的间隔。我们在上面的代码片段中将这两个值都设置为两秒。第三个参数是应在每次报价时返回的对象。

要查看此操作,我们需要修改index操作中的 URL 并使其指向akkaStreamsSocket端点:

String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);

现在刷新页面,我们将每两秒看到一个新条目:

/uploads/akka_play_websockets/3.png

6. 终止 Actor

在某些时候,我们需要通过用户请求或超时来关闭聊天。

6.1. 处理 Actor 终止

我们如何检测 WebSocket 何时关闭?

当处理 WebSocket 的 actor 终止时,Play 将自动关闭 WebSocket。所以我们可以通过实现Actor#postStop方法来处理这种情况:

@Override
public void postStop() throws Exception {
    log.info("Messenger actor stopped at {}",
      OffsetDateTime.now()
      .format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}

6.2. 手动终止 Actor

此外,如果我们必须停止演员,我们可以向演员发送PoisonPill。在我们的示例应用程序中,我们应该能够处理“停止”请求。

让我们看看如何在 onSendMessage方法中做到这一点:

private void onSendMessage(JsonNode jsonNode) {
    RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
    String message = requestDTO.getMessage().toLowerCase();
    if("stop".equals(message)) {
        MessageDTO messageDTO = 
          createMessageDTO("1", "1", "Stop", "Stopping actor");
        out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
        self().tell(PoisonPill.getInstance(), getSelf());
    } else {
        log.info("Actor received. {}", requestDTO);
        processMessage(requestDTO);
    }
}

当我们收到一条消息时,我们会检查它是否是一个停止请求。如果是,我们发送 PoisonPill。否则,我们处理请求。

7. 配置选项

我们可以根据如何处理 WebSocket 来配置几个选项。让我们看几个。

7.1. WebSocket 帧长度

WebSocket 通信涉及数据帧的交换。

WebSocket 帧长度是可配置的。我们可以根据应用要求调整帧长度。

**配置较短的帧长度可能有助于减少使用长数据帧的拒绝服务攻击。**我们可以通过在 application.conf 中指定最大长度来更改应用程序的帧长度:

play.server.websocket.frame.maxLength = 64k

我们还可以通过将最大长度指定为命令行参数来设置此配置选项:

sbt -Dwebsocket.frame.maxLength=64k run

7.2. 连接空闲超时

默认情况下,我们用来处理 WebSocket 的actor在一分钟后终止。**这是因为运行我们的应用程序的 Play 服务器的默认空闲超时时间为 60 秒。**这意味着所有在 60 秒内未收到请求的连接都会自动关闭。

我们可以通过配置选项来改变它。让我们转到我们的application.conf并将服务器更改为没有空闲超时:

play.server.http.idleTimeout = "infinite"

或者我们可以将选项作为命令行参数传入:

sbt -Dhttp.idleTimeout=infinite run

我们也可以通过 在 build.sbt中指定devSettings来配置它。

build.sbt中指定的配置选项仅在开发中使用,它们将在生产中被忽略:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"

如果我们重新运行应用程序,actor 不会终止。

我们可以将值更改为秒:

PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"

我们可以在 Play Framework 文档 中找到有关可用配置选项的更多信息。