Contents

Akka 框架简介

1. 简介

**Akka 是一个开源库,**可通过利用 Actor 模型帮助使用 Java 或 Scala 轻松开发并发和分布式应用程序。

在本教程中,我们将介绍基本功能,例如定义参与者、他们如何通信以及我们如何杀死他们。在最后的笔记中,我们还将记录使用 Akka 时的一些最佳实践。

2. Actor 模型

Actor 模型对计算机科学界来说并不陌生。它由 Carl Eddie Hewitt 于 1973 年首次引入,作为处理并发计算的理论模型。

当软件行业开始意识到实现并发和分布式应用程序的陷阱时,它开始显示出它的实际适用性。

**一个actor代表一个独立的计算单元。**一些重要的特征是:

  • Actor 封装了它的状态和部分应用程序逻辑
  • 参与者仅通过异步消息进行交互,从不通过直接方法调用
  • 每个参与者都有一个唯一的地址和一个邮箱,其他参与者可以在其中传递消息
  • Actor 将按顺序处理邮箱中的所有消息(邮箱的默认实现是 FIFO 队列)
  • Actor 系统以树状层次结构组织
  • Actor 可以创建其他Actor ,可以向任何其他Actor 发送消息并停止自己或已创建任何Actor

2.1. 优点

开发并发应用程序很困难,因为我们需要处理同步、锁和共享内存。通过使用 Akka Actor,我们可以轻松编写异步代码,而无需锁和同步。

使用消息而不是方法调用的优点之一是发送者线程在向另一个参与者发送消息时不会阻塞以等待返回值。接收参与者将通过向发送者发送回复消息来响应结果。

使用消息的另一大好处是我们不必担心多线程环境中的同步问题。这是因为所有消息都是按顺序处理的

Akka actor 模型的另一个优点是错误处理。通过在层次结构中组织参与者,每个参与者都可以将失败通知其父级,因此它可以采取相应的行动。父actor可以决定停止或重新启动子actor。

3. 设置

为了利用 Akka actor,我们需要从Maven Central 添加以下依赖项:

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.8.4</version>
</dependency>

4. 创建一个Actor

如前所述,参与者是在层次系统中定义的。共享一个公共配置的所有参与者都将由一个ActorSystem 定义。

现在,我们将简单地定义一个具有默认配置和自定义名称的ActorSystem

ActorSystem system = ActorSystem.create("test-system");

即使我们还没有创建任何Actor ,系统已经包含 3 个主要Actor :

  • 具有地址“/”的根监护actor,其名称表示actor系统层次结构的根
  • 具有地址“/user”的用户监护人Actor 。这将是我们定义的所有actor的父级
  • 具有地址“/system”的系统守护者actor。这将是 Akka 系统内部定义的所有参与者的父级

任何 Akka Actor 都将扩展AbstractActor抽象类并实现*createReceive()*方法来处理来自其他 Actor 的传入消息:

public class MyActor extends AbstractActor {
    public Receive createReceive() {
        return receiveBuilder().build();
    }
}

**这是我们可以创建的最基本的actor。**它可以接收来自其他参与者的消息并将丢弃它们,因为在ReceiveBuilder 中没有定义匹配的消息模式。我们将在本文后面讨论消息模式匹配。

现在我们已经创建了我们的第一个actor,我们应该将它包含在ActorSystem中:

ActorRef readingActorRef 
  = system.actorOf(Props.create(MyActor.class), "my-actor");

4.1. Actor 配置

**Props类包含Actor 配置。**我们可以配置诸如调度程序、邮箱或部署配置之类的东西。此类是不可变的,因此是线程安全的,因此可以在创建新参与者时共享它。

强烈推荐并认为最佳实践是在 actor 对象中定义工厂方法来处理Props对象的创建。

举例来说,让我们定义一个将进行一些文本处理的Actor 。Actor 将收到一个String对象,它将在该对象上进行处理:

public class ReadingActor extends AbstractActor {
    private String text;
    public static Props props(String text) {
        return Props.create(ReadingActor.class, text);
    }
    // ...
}

现在,要创建这种类型的actor的实例,我们只需使用props()工厂方法将String参数传递给构造函数:

ActorRef readingActorRef = system.actorOf(
  ReadingActor.props(TEXT), "readingActor");

现在我们知道了如何定义一个actor,让我们看看它们是如何在actor系统中进行通信的。

5. Actor 消息

为了相互交互,参与者可以发送和接收来自系统中任何其他参与者的消息。这些消息可以是任何类型的对象,条件是它是不可变的

**在 Actor 类中定义消息是最佳实践。**这有助于编写易于理解的代码,并且知道参与者可以处理哪些消息。

5.1. 发送消息

在 Akka Actor 系统内部,消息使用以下方法发送:

  • tell()
  • ask()
  • forward()

**当我们想要发送消息但不期望响应时,我们可以使用*tell()*方法。**从性能的角度来看,这是最有效的方法:

readingActorRef.tell(new ReadingActor.ReadLines(), ActorRef.noSender());

第一个参数代表我们发送给actor地址readingActorRef的消息。

第二个参数指定发件人是谁。当接收消息的参与者需要向发送者以外的参与者(例如发送参与者的父节点)发送响应时,这很有用。

通常,我们可以将第二个参数设置为nullActorRef.noSender(),因为我们不期望回复。*当我们需要一个Actor 的回应时,我们可以使用*ask()方法:

CompletableFuture<Object> future = ask(wordCounterActorRef, 
  new WordCounterActor.CountWords(line), 1000).toCompletableFuture();

当请求参与者响应时,会返回一个CompletionStage对象,因此处理保持非阻塞状态。

我们必须注意的一个非常重要的事实是在将响应的参与者内部进行错误处理。要返回一个包含异常的Future对象,我们必须向发送者 actor 发送一条Status.Failure消息。

当参与者在处理消息时抛出异常并且*ask()*调用将超时并且在日志中不会看到对异常的引用时,这不会自动完成:

@Override
public Receive createReceive() {
    return receiveBuilder()
      .match(CountWords.class, r -> {
          try {
              int numberOfWords = countWordsFromLine(r.line);
              getSender().tell(numberOfWords, getSelf());
          } catch (Exception ex) {
              getSender().tell(
               new akka.actor.Status.Failure(ex), getSelf());
               throw ex;
          }
    }).build();
}

我们还有类似于*tell()forward()*方法。不同之处在于发送消息时保留了消息的原始发送者,因此转发消息的actor仅充当中间actor:

printerActorRef.forward(
  new PrinterActor.PrintFinalResult(totalNumberOfWords), getContext());

5.2. 接收消息

*每个参与者都将实现*createReceive()方法,该方法处理所有传入消息。*receiveBuilder()*就像一个 switch 语句,试图将接收到的消息与定义的消息类型匹配:

public Receive createReceive() {
    return receiveBuilder().matchEquals("printit", p -> {
        System.out.println("The address of this actor is: " + getSelf());
    }).build();
}

收到消息后,会将消息放入 FIFO 队列,因此消息按顺序处理

6. 杀死Actor

当我们使用完一个 actor 后,*我们可以通过从ActorRefFactory接口调用*stop()方法来停止它:

system.stop(myActorRef);

我们可以使用这个方法来终止任何子actor或actor本身。重要的是要注意停止是异步完成的,并且当前消息处理将在actor终止之前完成。Actor 邮箱将不再接受传入的消息

通过停止父actor,我们还将向所有由它生成的子actor发送终止信号。

当我们不再需要actor系统时,我们可以终止它以释放所有资源并防止任何内存泄漏:

Future<Terminated> terminateResponse = system.terminate();

这将停止系统监护参与者,因此此 Akka 系统中定义的所有参与者。

我们还可以向我们想要杀死的任何参与者发送PoisonPill消息:

myActorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());

PoisonPill消息将像任何其他消息一样被Actor 接收并放入队列中。Actor 将处理所有消息,直到到达PoisonPill消息。只有这样,参与者才会开始终止过程。

另一个用于杀死Actor 的特殊消息是Kill消息。与PoisonPill 不同, actor在处理此消息时会抛出ActorKilledException

myActorRef.tell(Kill.getInstance(), ActorRef.noSender());