在Play框架中使用Akka实现WebSocket
1.概述
当我们希望我们的 Web 客户端与我们的服务器保持对话时,WebSockets 可能是一个有用的解决方案。WebSockets 保持持久的全双工连接。这使我们能够在服务器和客户端之间发送双向消息。
在本教程中,我们将学习如何在Play Framework 中将 WebSockets 与Akka 一起使用。
2. 设置
让我们设置一个简单的聊天应用程序。用户将向服务器发送消息,服务器将响应来自JSONPlaceholder 的消息。
2.1. 设置 Play 框架应用程序
我们将使用 Play 框架构建这个应用程序。
让我们按照Introduction to Play in Java 中的说明设置和运行一个简单的 Play Framework 应用程序。
2.2. 添加必要的 JavaScript 文件
此外,我们还需要使用 JavaScript 来编写客户端脚本。这将使我们能够接收从服务器推送的新消息。我们将为此使用jQuery 库。
让我们将 jQuery 添加到app/views/index.scala.html文件的底部:
<script src="https://code.jquery.com/jquery-3.4.1.min.js"></script>
2.3. 设置 Akka
最后,我们将使用 Akka 来处理服务器端的 WebSocket 连接。
让我们导航到build.sbt文件并添加依赖项。
我们需要添加 *as-actor *和 *as-testkit *依赖项:
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % akkaVersion
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % akkaVersion
我们需要这些能够使用和测试 Akka 框架代码。
接下来,我们将使用 Akka 流。所以让我们添加*akka-stream *依赖项:
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % akkaVersion
最后,我们需要从 Akka actor 调用一个 rest 端点。为此,我们需要 *akka-http *依赖项。当我们这样做时,端点将返回我们必须反序列化的 JSON 数据,因此我们还需要添加akka-http-jackson 依赖项:
libraryDependencies += "com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion
libraryDependencies += "com.typesafe.akka" %% "akka-http" % akkaHttpVersion
现在我们都准备好了。让我们看看如何让 WebSockets 工作!
3. 使用 Akka Actor 处理 WebSocket
**Play 的 WebSocket 处理机制是围绕 Akka 流构建的。**WebSocket 被建模为流。因此,传入的 WebSocket 消息被馈送到流中,流产生的消息被发送到客户端。
要使用 Actor 处理 WebSocket,我们需要将ActorRef转换为流的 Play 实用程序ActorFlow。这主要需要一些Java代码,稍加配置。
3.1. WebSocket 控制器方法
首先,我们需要一个Materializer实例。Materializer 是流执行引擎的工厂。
我们需要将ActorSystem和Materializer注入控制器app/controllers/HomeController.java:
private ActorSystem actorSystem;
private Materializer materializer;
@Inject
public HomeController(
ActorSystem actorSystem, Materializer materializer) {
this.actorSystem = actorSystem;
this.materializer = materializer;
}
现在让我们添加一个套接字控制器方法:
public WebSocket socket() {
return WebSocket.Json
.acceptOrResult(this::createActorFlow);
}
这里我们调用函数acceptOrResult,它接受请求头并返回一个future。返回的未来是处理 WebSocket 消息的流。
相反,我们可以拒绝请求并返回拒绝结果。
现在,让我们创建流程:
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>>
createActorFlow(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
F.Either.Right(createFlowForActor()));
}
Play Framework 中的 F类定义了一组函数式编程风格的助手。在这种情况下,我们使用F.Either.Right来接受连接并返回流。
假设我们想在客户端未通过身份验证时拒绝连接。
为此,我们可以检查会话中是否设置了用户名。如果不是,我们拒绝与 HTTP 403 Forbidden 的连接:
private CompletionStage<F.Either<Result, Flow<JsonNode, JsonNode, ?>>>
createActorFlow2(Http.RequestHeader request) {
return CompletableFuture.completedFuture(
request.session()
.getOptional("username")
.map(username ->
F.Either.<Result, Flow<JsonNode, JsonNode, ?>>Right(
createFlowForActor()))
.orElseGet(() -> F.Either.Left(forbidden())));
}
我们使用F.Either.Left 来拒绝连接,就像我们使用F.Either.Right提供流一样。
最后,我们将流程链接到将处理消息的参与者:
private Flow<JsonNode, JsonNode, ?> createFlowForActor() {
return ActorFlow.actorRef(out -> Messenger.props(out),
actorSystem, materializer);
}
ActorFlow.actorRef创建了一个由 Messenger actor处理的流。
3.2. route文件
现在,让我们在conf/routes中添加控制器方法的route定义:
GET / controllers.HomeController.index(request: Request)
GET /chat controllers.HomeController.socket
GET /chat/with/streams controllers.HomeController.akkaStreamsSocket
GET /assets/*file controllers.Assets.versioned(path="/public", file: Asset)
这些路由定义将传入的 HTTP 请求映射到控制器操作方法,如 Play 应用程序中的路由 中所述。
3.3. Actor 实现
Actor 类最重要的部分是 createReceive方法,它决定了 Actor 可以处理哪些消息:
@Override
public Receive createReceive() {
return receiveBuilder()
.match(JsonNode.class, this::onSendMessage)
.matchAny(o -> log.error("Received unknown message: {}", o.getClass()))
.build();
}
Actor 会将与JsonNode类 匹配的所有消息转发到onSendMessage处理程序方法:
private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
//..
processMessage(requestDTO);
}
然后处理程序将使用processMessage方法响应每条消息:
private void processMessage(RequestDTO requestDTO) {
CompletionStage<HttpResponse> responseFuture = getRandomMessage();
responseFuture.thenCompose(this::consumeHttpResponse)
.thenAccept(messageDTO ->
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf()));
}
3.4. 使用 Akka HTTP 使用 Rest API
我们将向JSONPlaceholder Posts 的虚拟消息生成器发送 HTTP 请求。当响应到达时,我们通过写出将响应发送给客户端。
让我们有一个使用随机帖子 ID 调用端点的方法:
private CompletionStage<HttpResponse> getRandomMessage() {
int postId = ThreadLocalRandom.current().nextInt(0, 100);
return Http.get(getContext().getSystem())
.singleRequest(HttpRequest.create(
"https://jsonplaceholder.typicode.com/posts/" + postId));
}
我们还在处理通过调用服务获得的HttpResponse以获得 JSON 响应:
private CompletionStage<MessageDTO> consumeHttpResponse(
HttpResponse httpResponse) {
Materializer materializer =
Materializer.matFromSystem(getContext().getSystem());
return Jackson.unmarshaller(MessageDTO.class)
.unmarshal(httpResponse.entity(), materializer)
.thenApply(messageDTO -> {
log.info("Received message: {}", messageDTO);
discardEntity(httpResponse, materializer);
return messageDTO;
});
}
MessageConverter类是用于在 JsonNode和 DTO之间进行转换的 实用程序:
public static MessageDTO jsonNodeToMessage(JsonNode jsonNode) {
ObjectMapper mapper = new ObjectMapper();
return mapper.convertValue(jsonNode, MessageDTO.class);
}
接下来,我们需要丢弃实体 。如果实体对我们没有任何用途,则discardEntityBytes便捷方法的目的是轻松丢弃实体。
让我们看看如何丢弃字节:
private void discardEntity(
HttpResponse httpResponse, Materializer materializer) {
HttpMessage.DiscardedEntity discarded =
httpResponse.discardEntityBytes(materializer);
discarded.completionStage()
.whenComplete((done, ex) ->
log.info("Entity discarded completely!"));
}
现在已经完成了 WebSocket 的处理,让我们看看如何使用 HTML5 WebSockets 设置客户端。
4. 设置 WebSocket 客户端
对于我们的客户,让我们构建一个简单的基于 Web 的聊天应用程序。
4.1. 控制器动作
我们需要定义一个呈现索引页面的控制器动作。我们将把它放在控制器类app.controllers.HomeController中:
public Result index(Http.Request request) {
String url = routes.HomeController.socket()
.webSocketURL(request);
return ok(views.html.index.render(url));
}
4.2. 模板页面
现在,让我们转到app/views/ndex.scala.html页面并为接收到的消息添加一个容器和一个用于捕获新消息的表单:
<div id="messageContent"></div>F
<form>
<textarea id="messageInput"></textarea>
<button id="sendButton">Send</button>
</form>
我们还需要通过在app/views/index.scala.html页面顶部声明此参数来传递 WebSocket 控制器操作的 URL:
@(url: String)
4.3. JavaScript 中的 WebSocket 事件处理程序
现在,我们可以添加 JavaScript 来处理 WebSocket 事件。为简单起见,我们将在app/views/index.scala.html页面的底部添加 JavaScript 函数。
让我们声明事件处理程序:
var webSocket;
var messageInput;
function init() {
initWebSocket();
}
function initWebSocket() {
webSocket = new WebSocket("@url");
webSocket.onopen = onOpen;
webSocket.onclose = onClose;
webSocket.onmessage = onMessage;
webSocket.onerror = onError;
}
让我们自己添加处理程序:
function onOpen(evt) {
writeToScreen("CONNECTED");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onError(evt) {
writeToScreen("ERROR: " + JSON.stringify(evt));
}
function onMessage(evt) {
var receivedData = JSON.parse(evt.data);
appendMessageToView("Server", receivedData.body);
}
然后,为了呈现输出,我们将使用函数appendMessageToView和writeToScreen:
function appendMessageToView(title, message) {
$("#messageContent").append("<p>" + title + ": " + message + "</p>");
}
function writeToScreen(message) {
console.log("New message: ", message);
}
4.4. 运行和测试应用程序
我们已准备好测试应用程序,让我们运行它:
cd websockets
sbt run
随着应用程序的运行,我们可以通过访问*http://localhost:9000 *与服务器聊天:
每次我们输入消息并点击send时,服务器都会立即响应来自 JSON 占位符服务的一些lorem ipsum 。
5. 使用 Akka Streams 直接处理 WebSockets
如果我们正在处理来自源的事件流并将其发送到客户端,那么我们可以围绕 Akka 流进行建模。
让我们看看如何在服务器每两秒发送一次消息的示例中使用 Akka 流。
我们将从HomeController中的 WebSocket 操作开始:
public WebSocket akkaStreamsSocket() {
return WebSocket.Json.accept(request -> {
Sink<JsonNode, ?> in = Sink.foreach(System.out::println);
MessageDTO messageDTO =
new MessageDTO("1", "1", "Title", "Test Body");
Source<JsonNode, ?> out = Source.tick(
Duration.ofSeconds(2),
Duration.ofSeconds(2),
MessageConverter.messageToJsonNode(messageDTO)
);
return Flow.fromSinkAndSource(in, out);
});
}
Source#tick方法采用三个参数。第一个是处理第一个滴答之前的初始延迟,第二个是连续滴答之间的间隔。我们在上面的代码片段中将这两个值都设置为两秒。第三个参数是应在每次报价时返回的对象。
要查看此操作,我们需要修改index操作中的 URL 并使其指向akkaStreamsSocket端点:
String url = routes.HomeController.akkaStreamsSocket().webSocketURL(request);
现在刷新页面,我们将每两秒看到一个新条目:
6. 终止 Actor
在某些时候,我们需要通过用户请求或超时来关闭聊天。
6.1. 处理 Actor 终止
我们如何检测 WebSocket 何时关闭?
当处理 WebSocket 的 actor 终止时,Play 将自动关闭 WebSocket。所以我们可以通过实现Actor#postStop方法来处理这种情况:
@Override
public void postStop() throws Exception {
log.info("Messenger actor stopped at {}",
OffsetDateTime.now()
.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
}
6.2. 手动终止 Actor
此外,如果我们必须停止演员,我们可以向演员发送PoisonPill。在我们的示例应用程序中,我们应该能够处理“停止”请求。
让我们看看如何在 onSendMessage方法中做到这一点:
private void onSendMessage(JsonNode jsonNode) {
RequestDTO requestDTO = MessageConverter.jsonNodeToRequest(jsonNode);
String message = requestDTO.getMessage().toLowerCase();
if("stop".equals(message)) {
MessageDTO messageDTO =
createMessageDTO("1", "1", "Stop", "Stopping actor");
out.tell(MessageConverter.messageToJsonNode(messageDTO), getSelf());
self().tell(PoisonPill.getInstance(), getSelf());
} else {
log.info("Actor received. {}", requestDTO);
processMessage(requestDTO);
}
}
当我们收到一条消息时,我们会检查它是否是一个停止请求。如果是,我们发送 PoisonPill。否则,我们处理请求。
7. 配置选项
我们可以根据如何处理 WebSocket 来配置几个选项。让我们看几个。
7.1. WebSocket 帧长度
WebSocket 通信涉及数据帧的交换。
WebSocket 帧长度是可配置的。我们可以根据应用要求调整帧长度。
**配置较短的帧长度可能有助于减少使用长数据帧的拒绝服务攻击。**我们可以通过在 application.conf 中指定最大长度来更改应用程序的帧长度:
play.server.websocket.frame.maxLength = 64k
我们还可以通过将最大长度指定为命令行参数来设置此配置选项:
sbt -Dwebsocket.frame.maxLength=64k run
7.2. 连接空闲超时
默认情况下,我们用来处理 WebSocket 的actor在一分钟后终止。**这是因为运行我们的应用程序的 Play 服务器的默认空闲超时时间为 60 秒。**这意味着所有在 60 秒内未收到请求的连接都会自动关闭。
我们可以通过配置选项来改变它。让我们转到我们的application.conf并将服务器更改为没有空闲超时:
play.server.http.idleTimeout = "infinite"
或者我们可以将选项作为命令行参数传入:
sbt -Dhttp.idleTimeout=infinite run
我们也可以通过 在 build.sbt中指定devSettings来配置它。
build.sbt中指定的配置选项仅在开发中使用,它们将在生产中被忽略:
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "infinite"
如果我们重新运行应用程序,actor 不会终止。
我们可以将值更改为秒:
PlayKeys.devSettings += "play.server.http.idleTimeout" -> "120 s"
我们可以在 Play Framework 文档 中找到有关可用配置选项的更多信息。