Java 9 反应式Streams
1. 概述
在本文中,我们将研究 Java 9 Reactive Streams。简而言之,我们将能够使用Flow类,它包含用于构建反应式流处理逻辑的主要构建块。 Reactive Streams是具有非阻塞背压的异步流处理标准。该规范在*Reactive Manifesto 中定义,并且有各种实现,例如RxJava或Akka-Streams*。
2. 反应式 API 概述
要构建Flow,我们可以使用三个主要抽象并将它们组合成异步处理逻辑。
每个Flow都需要处理由 Publisher 实例发布给它的事件;Publisher有一个方法——subscribe()。
如果任何订阅者想要接收它发布的事件,他们需要订阅给定的Publisher。
**消息的接收者需要实现Subscriber接口。**通常这是每个Flow处理的结束,因为它的实例不会进一步发送消息。
我们可以将Subscriber视为接收器。这有四个需要重写的方法*——onSubscribe()、onNext()、onError()和onComplete()。*我们将在下一节中介绍这些内容。
**如果我们想要转换传入的消息并将其进一步传递给下一个Subscriber,我们需要实现处理器接口。**这既充当订阅者,因为它接收消息,又充当发布者,因为它处理这些消息并将它们发送以供进一步处理。
3. 发布和消费消息
假设我们要创建一个简单的Flow,其中我们有一个发布消息的发布者,以及一个在消息到达时消费消息的简单Subscriber——一次一个。 让我们创建一个EndSubscriber类。我们需要实现Subscriber接口。接下来,我们将覆盖所需的方法。
在处理开始之前调用onSubscribe()方法。Subscription的实例作为参数传递。它是一个类,用于控制Subscriber和Publisher之间的消息流:
public class EndSubscriber<T> implements Subscriber<T> {
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
}
我们还初始化了一个空的消耗元素列表,将在测试中使用。 现在,我们需要从Subscriber接口实现剩余的方法。这里的主要方法是 onNext() - 每当Publisher发布新消息时都会调用它:
@Override
public void onNext(T item) {
System.out.println("Got : " + item);
consumedElements.add(item);
subscription.request(1);
}
请注意,当我们在onSubscribe()方法中启动订阅并处理消息时,我们需要调用Subscription上的**request()方法来表示当前Subscriber已准备好消费更多消息。 最后,我们需要实现onError() ——在处理过程中抛出一些异常时调用它,以及onComplete()——在**Publisher关闭时调用:
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
让我们为处理流程编写一个测试。我们将使用SubmissionPublisher类——来自java.util.concurrent的构造——它实现了Publisher接口。 我们将向Publisher提交N个元素——我们的EndSubscriber将收到:
@Test
public void whenSubscribeToIt_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>();
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(
() -> assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(items)
);
}
请注意,我们在EndSubscriber的实例上调用close()方法。它将在给定Publisher的每个Subscriber上调用*onComplete()*回调。 运行该程序将产生以下输出:
Got : 1
Got : x
Got : 2
Got : x
Got : 3
Got : x
Done
4. 消息的转换
假设我们想在Publisher和Subscriber之间构建类似的逻辑,但还要应用一些转换。 我们将创建实现Processor并扩展SubmissionPublisher的TransformProcessor类——因为它既是Publisher又是Subscriber。
我们将传入一个将输入转换为输出的函数:
public class TransformProcessor<T, R>
extends SubmissionPublisher<R>
implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}
现在让我们使用Publisher发布String元素的处理流程编写一个快速测试。
我们的TransformProcessor将把String解析为Integer——这意味着这里需要进行转换:
@Test
public void whenSubscribeAndTransformElements_thenShouldConsumeAll()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor
= new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>();
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
// when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expectedResult)
);
}
请注意,调用基础Publisher上的close()方法将导致调用TransformProcessor上的*onComplete()*方法。 请记住,处理链中的所有发布者都需要以这种方式关闭。
5. 使用订阅控制消息需求
假设我们只想使用 Subscription 中的第一个元素,应用一些逻辑并完成处理。我们可以使用*request()*方法来实现这一点。
让我们修改我们的EndSubscriber以仅消费 N 条消息。我们将该数字作为howMuchMessagesConsume构造函数参数传递:
public class EndSubscriber<T> implements Subscriber<T> {
private AtomicInteger howMuchMessagesConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesConsume) {
this.howMuchMessagesConsume
= new AtomicInteger(howMuchMessagesConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesConsume.get() > 0) {
subscription.request(1);
}
}
//...
}
我们可以根据需要请求元素。
让我们编写一个测试,其中我们只想使用给定订阅中的一个元素:
@Test
public void whenRequestForOnlyOneElement_thenShouldConsumeOne()
throws InterruptedException {
// given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
// when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
// then
await().atMost(1000, TimeUnit.MILLISECONDS)
.until(() ->
assertThat(subscriber.consumedElements)
.containsExactlyElementsOf(expected)
);
}
尽管publisher发布了六个元素,但我们的EndSubscriber将只使用一个元素,因为它表示只处理单个元素的需求。
通过在Subscription上使用*request()*方法,我们可以实现更复杂的背压机制来控制消息消费的速度。