Akka 与 Spring 框架集成
1. 简介
在本文中,我们将专注于将 Akka 与 Spring 框架集成——以允许将基于 Spring 的服务注入 Akka actor。
在阅读本文之前,建议先了解 Akka 的基础知识。
使用 Akka Streams 库在 Java 中进行数据流转换的快速实用指南。阅读更多
2. As中的依赖注入
Akka 是一个基于 Actor 并发模型的强大应用框架。该框架是用 Scala 编写的,这当然也使其在基于 Java 的应用程序中完全可用。**因此,我们经常希望将 Akka 与现有的基于 Spring 的应用程序集成,**或者简单地使用 Spring 将 bean 连接到 actor。
Spring/Akka 集成的问题在于 Spring 中 bean 的管理和 Akka 中 Actor 的管理之间的差异:Actor 具有不同于典型 Spring bean 生命周期的特定生命周期。
此外,actor 被拆分为一个 actor 本身(这是一个内部实现细节,不能由 Spring 管理)和一个 actor 引用,它可以被客户端代码访问,以及在不同的 Akka 运行时之间可序列化和可移植。
幸运的是,Akka 提供了一种机制,即Akka 扩展 ,这使得使用外部依赖注入框架变得相当容易。
3. Maven依赖
为了在我们的 Spring 项目中演示 Akka 的使用,我们需要一个最基本的 Spring 依赖项——spring-context库和akka-actor库。库版本可以提取到pom的properties部分:
<properties>
<spring.version>4.3.1.RELEASE</spring.version>
<akka.version>2.4.8</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
确保检查 Maven Central 以获取最新版本的spring-context 和akka-actor 依赖项。
请注意,akka-actor依赖项的名称中有一个*_2.11*后缀,这表示此版本的 Akka 框架是针对 Scala 版本 2.11 构建的。相应版本的 Scala 库将传递地包含在您的构建中。
4. 将 Spring Beans 注入 Akka Actors
让我们创建一个简单的 Spring/Akka 应用程序,该应用程序由一个参与者组成,该参与者可以通过向这个人发出问候来回答这个人的名字。问候的逻辑将被提取到一个单独的服务中。我们希望将此服务自动连接到一个参与者实例。Spring 集成将帮助我们完成这项任务。
4.1. 定义Actor和服务
为了演示将服务注入到 actor 中,我们将创建一个简单的类GreetingActor,定义为无类型的 actor(扩展 Akka 的UntypedActor基类)。每个 Akka Actor 的主要方法是onReceive方法,它接收消息并根据某些指定的逻辑对其进行处理。
在我们的例子中,GreetingActor实现检查消息是否是预定义的Greet类型,然后从Greet实例中获取人名,然后使用GreetingService接收此人的问候语,并使用接收到的问候语字符串回答发件人。如果消息属于其他未知类型,则将其传递给参与者预定义的unhandled方法。
我们来看一下:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class GreetingActor extends UntypedActor {
private GreetingService greetingService;
// constructor
@Override
public void onReceive(Object message) throws Throwable {
if (message instanceof Greet) {
String name = ((Greet) message).getName();
getSender().tell(greetingService.greet(name), getSelf());
} else {
unhandled(message);
}
}
public static class Greet {
private String name;
// standard constructors/getters
}
}
注意Greet消息类型被定义为这个actor内部的一个静态内部类,这被认为是一个很好的实践。接受的消息类型应尽可能靠近参与者定义,以避免混淆该参与者可以处理哪些消息类型。
还要注意 Spring 注释@Component和@Scope——**它们将类定义为具有*prototype *范围的 Spring 管理的 bean。
范围非常重要,因为每个 bean 检索请求都应该产生一个新创建的实例,因为这种行为与 Akka 的 actor 生命周期相匹配。如果您在其他范围内实现此 bean,则在 Akka 中重新启动 actor 的典型情况很可能无法正常运行。
最后,请注意我们不必显式地*@Autowire GreetingService实例——这是可能的,因为 Spring 4.3 的新特性称为隐式构造函数注入*。
GreeterService的实现非常简单,请注意我们通过向其添加*@Component注释将其定义为 Spring 管理的 bean(具有默认的singleton *范围):
@Component
public class GreetingService {
public String greet(String name) {
return "Hello, " + name;
}
}
4.2. 通过 Akka 扩展添加 Spring 支持
将 Spring 与 Akka 集成的最简单方法是通过 Akka 扩展。
**扩展是每个参与者系统创建的单例实例。**它由一个实现标记接口Extension的扩展类本身和一个通常继承AbstractExtensionId的扩展 id 类组成。
由于这两个类是紧密耦合的,因此实现嵌套在 ExtensionId 类中的Extension类是有意义的:
public class SpringExtension
extends AbstractExtensionId<SpringExtension.SpringExt> {
public static final SpringExtension SPRING_EXTENSION_PROVIDER
= new SpringExtension();
@Override
public SpringExt createExtension(ExtendedActorSystem system) {
return new SpringExt();
}
public static class SpringExt implements Extension {
private volatile ApplicationContext applicationContext;
public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public Props props(String actorBeanName) {
return Props.create(
SpringActorProducer.class, applicationContext, actorBeanName);
}
}
}
首先——SpringExtension从AbstractExtensionId类中实现了一个createExtension方法——它负责创建扩展实例,即SpringExt对象。
SpringExtension类也有一个静态字段SPRING_EXTENSION_PROVIDER ,它包含对其唯一实例的引用。添加一个私有构造函数来明确声明SpringExtention应该是一个单例类通常是有意义的,但为了清楚起见,我们将省略它。
其次,静态内部类SpringExt就是扩展本身。由于Extension只是一个标记接口,我们可以按照我们认为合适的方式定义这个类的内容。
在我们的例子中,我们需要initialize方法来保存一个 Spring ApplicationContext实例——这个方法在每个扩展初始化时只会被调用一次。
我们还需要props方法来创建Props对象。Props实例是一个Actor的蓝图,在我们的例子中,Props.create方法接收一个SpringActorProducer类和该类的构造函数参数。这些是这个类的构造函数将被调用的参数。
每次我们需要 Spring 管理的 Actor 引用时,都会执行props方法。
第三个也是最后一个难题是SpringActorProducer类。它实现了 Akka 的IndirectActorProducer接口,该接口允许通过实现producer和actorClass方法来覆盖 actor 的实例化过程。
你可能已经猜到了,**它不会直接实例化,而是总是从 Spring 的ApplicationContext中检索一个actor实例。**由于我们已将 actor设为prototype作用域的 bean,因此每次调用 producer方法都将返回该 actor 的一个新实例:
public class SpringActorProducer implements IndirectActorProducer {
private ApplicationContext applicationContext;
private String beanActorName;
public SpringActorProducer(ApplicationContext applicationContext,
String beanActorName) {
this.applicationContext = applicationContext;
this.beanActorName = beanActorName;
}
@Override
public Actor produce() {
return (Actor) applicationContext.getBean(beanActorName);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext
.getType(beanActorName);
}
}
4.3. 把它们放在一起
剩下要做的就是创建一个 Spring 配置类(标有*@Configuration注释),它将告诉 Spring 扫描当前包以及所有嵌套包(这是由@ComponentScan*注释确保的)并创建一个 Spring 容器.
我们只需要添加一个额外的 bean—— ActorSystem实例——并在这个ActorSystem上初始化 Spring 扩展:
@Configuration
@ComponentScan
public class AppConfiguration {
@Autowired
private ApplicationContext applicationContext;
@Bean
public ActorSystem actorSystem() {
ActorSystem system = ActorSystem.create("akka-spring-demo");
SPRING_EXTENSION_PROVIDER.get(system)
.initialize(applicationContext);
return system;
}
}
4.4. 检索Spring-Wired Actor
为了测试一切正常,我们可以将ActorSystem实例注入我们的代码(一些 Spring 管理的应用程序代码或基于 Spring 的测试),使用我们的扩展为一个Actor创建一个Props对象,检索一个Actor的引用通过Props对象并尝试向某人打招呼:
ActorRef greeter = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system)
.props("greetingActor"), "greeter");
FiniteDuration duration = FiniteDuration.create(1, TimeUnit.SECONDS);
Timeout timeout = Timeout.durationToTimeout(duration);
Future<Object> result = ask(greeter, new Greet("John"), timeout);
Assert.assertEquals("Hello, John", Await.result(result, duration));
这里我们使用典型的akka.pattern.Patterns.ask模式,它返回一个 Scala 的Future实例。一旦计算完成,Future将使用我们在GreetingActor.onMessasge方法中返回的值解析。
我们可以通过将 Scala 的Await.result方法应用于Future来等待结果,或者,更优选地,使用异步模式构建整个应用程序。