Contents

Java 9 反应式Streams

1. 概述

在本文中,我们将研究 Java 9 Reactive Streams。简而言之,我们将能够使用Flow类,它包含用于构建反应式流处理逻辑的主要构建块。 Reactive Streams是具有非阻塞背压的异步流处理标准。该规范在*Reactive Manifesto 中定义,并且有各种实现,例如RxJavaAkka-Streams*。

2. 反应式 API 概述

要构建Flow,我们可以使用三个主要抽象并将它们组合成异步处理逻辑。

每个Flow都需要处理由 Publisher 实例发布给它的事件Publisher有一个方法——subscribe()

如果任何订阅者想要接收它发布的事件,他们需要订阅给定的Publisher

**消息的接收者需要实现Subscriber接口。**通常这是每个Flow处理的结束,因为它的实例不会进一步发送消息。

我们可以将Subscriber视为接收器。这有四个需要重写的方法*——onSubscribe()、onNext()、onError()onComplete()。*我们将在下一节中介绍这些内容。

**如果我们想要转换传入的消息并将其进一步传递给下一个Subscriber,我们需要实现处理器接口。**这既充当订阅者,因为它接收消息,又充当发布者,因为它处理这些消息并将它们发送以供进一步处理。

3. 发布和消费消息

假设我们要创建一个简单的Flow,其中我们有一个发布消息的发布者,以及一个在消息到达时消费消息的简单Subscriber——一次一个。 让我们创建一个EndSubscriber类。我们需要实现Subscriber接口。接下来,我们将覆盖所需的方法。

在处理开始之前调用onSubscribe()方法。Subscription的实例作为参数传递。它是一个类,用于控制SubscriberPublisher之间的消息流:

public class EndSubscriber<T> implements Subscriber<T> {
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
}

我们还初始化了一个空的消耗元素列表,将在测试中使用。 现在,我们需要从Subscriber接口实现剩余的方法。这里的主要方法是 onNext() - 每当Publisher发布新消息时都会调用它:

@Override
public void onNext(T item) {
    System.out.println("Got : " + item);
    consumedElements.add(item);
    subscription.request(1);
}

请注意,当我们在onSubscribe()方法中启动订阅并处理消息时,我们需要调用Subscription上的**request()方法来表示当前Subscriber已准备好消费更多消息。 最后,我们需要实现onError() ——在处理过程中抛出一些异常时调用它,以及onComplete()——在**Publisher关闭时调用:

@Override
public void onError(Throwable t) {
    t.printStackTrace();
}
@Override
public void onComplete() {
    System.out.println("Done");
}

让我们为处理流程编写一个测试。我们将使用SubmissionPublisher类——来自java.util.concurrent的构造——它实现了Publisher接口。 我们将向Publisher提交N个元素——我们的EndSubscriber将收到:

@Test
public void whenSubscribeToIt_thenShouldConsumeAll() 
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>();
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();
    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(
         () -> assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(items)
     );
}

请注意,我们在EndSubscriber的实例上调用close()方法。它将在给定Publisher的每个Subscriber上调用*onComplete()*回调。 运行该程序将产生以下输出:

Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done

4. 消息的转换

假设我们想在PublisherSubscriber之间构建类似的逻辑,但还要应用一些转换。 我们将创建实现Processor并扩展SubmissionPublisherTransformProcessor类——因为它既是Publisher又是Subscriber

我们将传入一个将输入转换为输出的函数:

public class TransformProcessor<T, R> 
  extends SubmissionPublisher<R> 
  implements Flow.Processor<T, R> {
    private Function<T, R> function;
    private Flow.Subscription subscription;
    public TransformProcessor(Function<T, R> function) {
        super();
        this.function = function;
    }
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(T item) {
        submit(function.apply(item));
        subscription.request(1);
    }
    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }
    @Override
    public void onComplete() {
        close();
    }
}

现在让我们使用Publisher发布String元素的处理流程编写一个快速测试

我们的TransformProcessor将把String解析为Integer——这意味着这里需要进行转换:

@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    TransformProcessor<String, Integer> transformProcessor 
      = new TransformProcessor<>(Integer::parseInt);
    EndSubscriber<Integer> subscriber = new EndSubscriber<>();
    List<String> items = List.of("1", "2", "3");
    List<Integer> expectedResult = List.of(1, 2, 3);
    // when
    publisher.subscribe(transformProcessor);
    transformProcessor.subscribe(subscriber);
    items.forEach(publisher::submit);
    publisher.close();
    // then
     await().atMost(1000, TimeUnit.MILLISECONDS)
       .until(() -> 
         assertThat(subscriber.consumedElements)
         .containsExactlyElementsOf(expectedResult)
     );
}

请注意,调用基础Publisher上的close()方法将导致调用TransformProcessor上的*onComplete()*方法。 请记住,处理链中的所有发布者都需要以这种方式关闭。

5. 使用订阅控制消息需求

假设我们只想使用 Subscription 中的第一个元素,应用一些逻辑并完成处理。我们可以使用*request()*方法来实现这一点。

让我们修改我们的EndSubscriber以仅消费 N 条消息。我们将该数字作为howMuchMessagesConsume构造函数参数传递:

public class EndSubscriber<T> implements Subscriber<T> {
 
    private AtomicInteger howMuchMessagesConsume;
    private Subscription subscription;
    public List<T> consumedElements = new LinkedList<>();
    public EndSubscriber(Integer howMuchMessagesConsume) {
        this.howMuchMessagesConsume 
          = new AtomicInteger(howMuchMessagesConsume);
    }
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    @Override
    public void onNext(T item) {
        howMuchMessagesConsume.decrementAndGet();
        System.out.println("Got : " + item);
        consumedElements.add(item);
        if (howMuchMessagesConsume.get() > 0) {
            subscription.request(1);
        }
    }
    //...
    
}

我们可以根据需要请求元素。

让我们编写一个测试,其中我们只想使用给定订阅中的一个元素:

@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
  throws InterruptedException {
 
    // given
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    EndSubscriber<String> subscriber = new EndSubscriber<>(1);
    publisher.subscribe(subscriber);
    List<String> items = List.of("1", "x", "2", "x", "3", "x");
    List<String> expected = List.of("1");
    // when
    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
    items.forEach(publisher::submit);
    publisher.close();
    // then
    await().atMost(1000, TimeUnit.MILLISECONDS)
      .until(() -> 
        assertThat(subscriber.consumedElements)
       .containsExactlyElementsOf(expected)
    );
}

尽管publisher发布了六个元素,但我们的EndSubscriber将只使用一个元素,因为它表示只处理单个元素的需求。

通过在Subscription上使用*request()*方法,我们可以实现更复杂的背压机制来控制消息消费的速度。