Contents

Akka 与 Spring 框架集成

1. 简介

在本文中,我们将专注于将 Akka 与 Spring 框架集成——以允许将基于 Spring 的服务注入 Akka actor。

在阅读本文之前,建议先了解 Akka 的基础知识。

使用 Akka Streams 库在 Java 中进行数据流转换的快速实用指南。阅读更多

2. As中的依赖注入

Akka 是一个基于 Actor 并发模型的强大应用框架。该框架是用 Scala 编写的,这当然也使其在基于 Java 的应用程序中完全可用。**因此,我们经常希望将 Akka 与现有的基于 Spring 的应用程序集成,**或者简单地使用 Spring 将 bean 连接到 actor。

Spring/Akka 集成的问题在于 Spring 中 bean 的管理和 Akka 中 Actor 的管理之间的差异:Actor 具有不同于典型 Spring bean 生命周期的特定生命周期

此外,actor 被拆分为一个 actor 本身(这是一个内部实现细节,不能由 Spring 管理)和一个 actor 引用,它可以被客户端代码访问,以及在不同的 Akka 运行时之间可序列化和可移植。

幸运的是,Akka 提供了一种机制,即Akka 扩展 ,这使得使用外部依赖注入框架变得相当容易。

3. Maven依赖

为了在我们的 Spring 项目中演示 Akka 的使用,我们需要一个最基本的 Spring 依赖项——spring-context库和akka-actor库。库版本可以提取到pomproperties部分:

<properties>
    <spring.version>4.3.1.RELEASE</spring.version>
    <akka.version>2.4.8</akka.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>${akka.version}</version>
    </dependency>
</dependencies>

确保检查 Maven Central 以获取最新版本的spring-contextakka-actor 依赖项。

请注意,akka-actor依赖项的名称中有一个*_2.11*后缀,这表示此版本的 Akka 框架是针对 Scala 版本 2.11 构建的。相应版本的 Scala 库将传递地包含在您的构建中。

4. 将 Spring Beans 注入 Akka Actors

让我们创建一个简单的 Spring/Akka 应用程序,该应用程序由一个参与者组成,该参与者可以通过向这个人发出问候来回答这个人的名字。问候的逻辑将被提取到一个单独的服务中。我们希望将此服务自动连接到一个参与者实例。Spring 集成将帮助我们完成这项任务。

4.1. 定义Actor和服务

为了演示将服务注入到 actor 中,我们将创建一个简单的类GreetingActor,定义为无类型的 actor(扩展 Akka 的UntypedActor基类)。每个 Akka Actor 的主要方法是onReceive方法,它接收消息并根据某些指定的逻辑对其进行处理。

在我们的例子中,GreetingActor实现检查消息是否是预定义的Greet类型,然后从Greet实例中获取人名,然后使用GreetingService接收此人的问候语,并使用接收到的问候语字符串回答发件人。如果消息属于其他未知类型,则将其传递给参与者预定义的unhandled方法。

我们来看一下:

@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class GreetingActor extends UntypedActor {
    private GreetingService greetingService;
    // constructor
    @Override
    public void onReceive(Object message) throws Throwable {
        if (message instanceof Greet) {
            String name = ((Greet) message).getName();
            getSender().tell(greetingService.greet(name), getSelf());
        } else {
            unhandled(message);
        }
    }
    public static class Greet {
        private String name;
        // standard constructors/getters
    }
}

注意Greet消息类型被定义为这个actor内部的一个静态内部类,这被认为是一个很好的实践。接受的消息类型应尽可能靠近参与者定义,以避免混淆该参与者可以处理哪些消息类型。

还要注意 Spring 注释@Component@Scope——**它们将类定义为具有*prototype *范围的 Spring 管理的 bean。

范围非常重要,因为每个 bean 检索请求都应该产生一个新创建的实例,因为这种行为与 Akka 的 actor 生命周期相匹配。如果您在其他范围内实现此 bean,则在 Akka 中重新启动 actor 的典型情况很可能无法正常运行。

最后,请注意我们不必显式地*@Autowire GreetingService实例——这是可能的,因为 Spring 4.3 的新特性称为隐式构造函数注入*。

GreeterService的实现非常简单,请注意我们通过向其添加*@Component注释将其定义为 Spring 管理的 bean(具有默认的singleton *范围):

@Component
public class GreetingService {
    public String greet(String name) {
        return "Hello, " + name;
    }
}

4.2. 通过 Akka 扩展添加 Spring 支持

将 Spring 与 Akka 集成的最简单方法是通过 Akka 扩展。

**扩展是每个参与者系统创建的单例实例。**它由一个实现标记接口Extension的扩展类本身和一个通常继承AbstractExtensionId的扩展 id 类组成。

由于这两个类是紧密耦合的,因此实现嵌套在 ExtensionId 类中的Extension类是有意义的:

public class SpringExtension 
  extends AbstractExtensionId<SpringExtension.SpringExt> {
    public static final SpringExtension SPRING_EXTENSION_PROVIDER 
      = new SpringExtension();
    @Override
    public SpringExt createExtension(ExtendedActorSystem system) {
        return new SpringExt();
    }
    public static class SpringExt implements Extension {
        private volatile ApplicationContext applicationContext;
        public void initialize(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
        public Props props(String actorBeanName) {
            return Props.create(
              SpringActorProducer.class, applicationContext, actorBeanName);
        }
    }
}

首先——SpringExtensionAbstractExtensionId类中实现了一个createExtension方法——它负责创建扩展实例,即SpringExt对象。

SpringExtension类也有一个静态字段SPRING_EXTENSION_PROVIDER ,它包含对其唯一实例的引用。添加一个私有构造函数来明确声明SpringExtention应该是一个单例类通常是有意义的,但为了清楚起见,我们将省略它。

其次,静态内部类SpringExt就是扩展本身。由于Extension只是一个标记接口,我们可以按照我们认为合适的方式定义这个类的内容。

在我们的例子中,我们需要initialize方法来保存一个 Spring ApplicationContext实例——这个方法在每个扩展初始化时只会被调用一次。

我们还需要props方法来创建Props对象。Props实例是一个Actor的蓝图,在我们的例子中,Props.create方法接收一个SpringActorProducer类和该类的构造函数参数。这些是这个类的构造函数将被调用的参数。

每次我们需要 Spring 管理的 Actor 引用时,都会执行props方法。

第三个也是最后一个难题是SpringActorProducer类。它实现了 Akka 的IndirectActorProducer接口,该接口允许通过实现produceractorClass方法来覆盖 actor 的实例化过程。

你可能已经猜到了,**它不会直接实例化,而是总是从 Spring 的ApplicationContext中检索一个actor实例。**由于我们已将 actor设为prototype作用域的 bean,因此每次调用 producer方法都将返回该 actor 的一个新实例:

public class SpringActorProducer implements IndirectActorProducer {
    private ApplicationContext applicationContext;
    private String beanActorName;
    public SpringActorProducer(ApplicationContext applicationContext, 
      String beanActorName) {
        this.applicationContext = applicationContext;
        this.beanActorName = beanActorName;
    }
    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(beanActorName);
    }
    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext
          .getType(beanActorName);
    }
}

4.3. 把它们放在一起

剩下要做的就是创建一个 Spring 配置类(标有*@Configuration注释),它将告诉 Spring 扫描当前包以及所有嵌套包(这是由@ComponentScan*注释确保的)并创建一个 Spring 容器.

我们只需要添加一个额外的 bean—— ActorSystem实例——并在这个ActorSystem上初始化 Spring 扩展:

@Configuration
@ComponentScan
public class AppConfiguration {
    @Autowired
    private ApplicationContext applicationContext;
    @Bean
    public ActorSystem actorSystem() {
        ActorSystem system = ActorSystem.create("akka-spring-demo");
        SPRING_EXTENSION_PROVIDER.get(system)
          .initialize(applicationContext);
        return system;
    }
}

4.4. 检索Spring-Wired Actor

为了测试一切正常,我们可以将ActorSystem实例注入我们的代码(一些 Spring 管理的应用程序代码或基于 Spring 的测试),使用我们的扩展为一个Actor创建一个Props对象,检索一个Actor的引用通过Props对象并尝试向某人打招呼:

ActorRef greeter = system.actorOf(SPRING_EXTENSION_PROVIDER.get(system)
  .props("greetingActor"), "greeter");
FiniteDuration duration = FiniteDuration.create(1, TimeUnit.SECONDS);
Timeout timeout = Timeout.durationToTimeout(duration);
Future<Object> result = ask(greeter, new Greet("John"), timeout);
Assert.assertEquals("Hello, John", Await.result(result, duration));

这里我们使用典型的akka.pattern.Patterns.ask模式,它返回一个 Scala 的Future实例。一旦计算完成,Future将使用我们在GreetingActor.onMessasge方法中返回的值解析。

我们可以通过将 Scala 的Await.result方法应用于Future来等待结果,或者,更优选地,使用异步模式构建整个应用程序。