Flux 创建序列
1. 概述
在本教程中,我们将使用Project Reactor 基础知识 来学习一些创建Flux 的技术。
2.Maven依赖
让我们从几个依赖项开始。我们需要*reactor-core *和 reactor-test :
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.6.RELEASE</version>
<scope>test</scope>
</dependency>
3. 同步创建
创建Flux的最简单方法是Flux#generate。此方法依赖于生成器函数来生成一系列项目。
但首先,让我们定义一个类来保存我们说明*generate *方法的方法:
public class SequenceGenerator {
// methods that will follow
}
3.1. 具有新状态的生成器
让我们看看如何使用 Reactor生成斐波那契数列:
public Flux<Integer> generateFibonacciWithTuples() {
return Flux.generate(
() -> Tuples.of(0, 1),
(state, sink) -> {
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
}
);
}
不难看出这个generate 方法接受两个函数作为它的参数——一个Callable和一个BiFunction:
- Callable函数为生成器设置初始状态——在这种情况下,它是一个包含元素 0 和1的*Tuples *
- BiFuntion函数是一个生成器,消耗一个SynchronousSink ,然后在每一轮中使用 sink 的next方法和当前状态发出一个项目
顾名思义, SynchronousSink对象同步工作。但是,请注意,我们不能 在每个生成器调用中多次调用此对象的next方法。
让我们使用StepVerifier 验证生成的序列:
@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);
StepVerifier.create(fibonacciFlux)
.expectNext(0, 1, 1, 2, 3)
.expectComplete()
.verify();
}
在此示例中,订阅者仅请求五个项目,因此生成的序列以数字3结尾。
正如我们所看到的,生成器返回一个新的状态对象以在下一次传递中使用。**不过,没有必要这样做。**我们可以为生成器的所有调用重用一个状态实例。
3.2. 具有可变状态的生成器
假设我们要生成具有回收状态的斐波那契数列。为了演示这个用例,让我们首先定义一个类:
public class FibonacciState {
private int former;
private int latter;
// constructor, getters and setters
}
我们将使用此类的一个实例来保存生成器的状态。这个实例的两个属性,former 和latter,在序列中存储两个连续的数字。
如果我们修改我们的初始示例,我们现在将使用可变状态与generate:
public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
return Flux.generate(
() -> new FibonacciState(0, 1),
(state, sink) -> {
sink.next(state.getFormer());
if (state.getLatter() > limit) {
sink.complete();
}
int temp = state.getFormer();
state.setFormer(state.getLatter());
state.setLatter(temp + state.getLatter());
return state;
});
}
与前面的示例类似,此生成变体 具有状态供应商和生成器参数。
Callable类型的状态提供者 只需创建一个初始属性为0 和1 的FibonacciState对象。此状态对象将在生成器的整个生命周期中重复使用。
就像Fibonacci-with-Tuples示例中的SynchronousSink一样,这里的 sink 一个一个地生成项目。但是,与该示例不同的是,生成器每次调用时都返回相同的状态对象。
还要注意这次,**为了避免无限序列,**我们指示接收器在产生的值达到限制时完成。
而且,让我们再次进行快速测试以确认它是否有效:
@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
.expectNext(0, 1, 1, 2, 3, 5, 8)
.expectComplete()
.verify();
}
3.3. 无状态变体
generate方法有另一个变体 ,只有一个Consumer<SynchronousSink>类型的参数。该变体仅适用于产生预定序列,因此没有那么强大。那我们就不详细介绍了。
4. 异步创建
同步创建并不是以编程方式创建Flux的唯一解决方案。
相反,我们可以使用create 和push运算符以异步方式在一轮发射中生成多个项目。
4.1. create方法
**使用create 方法,我们可以从多个线程中生成项目。**在此示例中,我们将来自两个不同来源的元素收集到一个序列中。
首先,让我们看看create与generate有何不同:
public class SequenceCreator {
public Consumer<List<Integer>> consumer;
public Flux<Integer> createNumberSequence() {
return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
}
}
与generate运算符不同,create方法不维护状态。传递给此方法的发射器不是自己生成项目,而是从外部源接收元素。
此外,我们可以看到create运算符要求我们使用FluxSink 而不是SynchronousSink。使用FluxSink,我们可以 根据需要多次调用next() 。
在我们的例子中,我们将为items列表中的每个项目调用next(),逐个发出。稍后我们将看到如何填充items。
在这种情况下,我们的外部源是一个虚构的*consumer *领域,尽管这可能是一些可观察的 API。
让我们将create运算符付诸行动,从两个数字序列开始:
@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();
// other statements described below
}
这些序列,sequence1和sequence2,将作为生成序列的项目源。
接下来是两个Thread对象,它们会将元素注入发布者:
SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2)
);
当调用*accept *操作符时,元素开始流入序列源。
然后,我们可以收听或subscribe我们新的合并序列:
List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);
通过订阅我们的序列,我们指示序列发出的每个项目应该发生什么。在这里,它将来自不同来源的每个项目添加到合并列表中。
现在,我们触发整个过程,看到项目在两个不同的线程上移动:
producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();
像往常一样,最后一步是验证操作的结果:
assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);
接收到的序列中的前三个数字来自sequence1,而后四个来自sequence2。由于异步操作的性质,不能保证这些序列中元素的顺序。
create方法有另一个变体 ,采用*OverflowStrategy *类型的参数。顾名思义,当下游跟不上发布者时,此参数管理背压。默认情况下,发布者会在这种情况下缓冲所有元素。
4.2. push方法
除了create运算符之外,Flux类还有另一个静态方法来异步发出序列,即push 。此方法的工作原理与create类似,不同之处在于它一次只允许一个生产线程发出信号。
我们可以用push替换刚才例子中 的create方法,代码仍然可以编译
然而,有时我们会看到一个断言错误,因为push 操作符阻止FluxSink#next在不同线程上被同时调用。因此,只有在我们不打算使用多线程时才应该使用 push 。
5. 处理序列
到目前为止,我们看到的所有方法都是静态的,并且允许从给定的源创建序列。Flux API 还提供了一个名为handle 的实例方法,用于处理发布者生成的序列。
这个handle运算符接受一个序列,进行一些处理并可能删除一些元素。在这方面,我们可以说handle运算符的工作方式就像一个map 和一个filter。
我们来看一个handle方法的简单说明:
public class SequenceHandler {
public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
return sequence.handle((number, sink) -> {
if (number % 2 == 0) {
sink.next(number / 2);
}
});
}
}
在此示例中,handle运算符采用一系列数字,如果是偶数,则将该值除以2 。如果该值为奇数,则运算符不执行任何操作,这意味着忽略这样的数字。
需要注意的另一件事是,与generate方法一样,handle使用SynchronousSink并且仅启用逐个发射。**
最后,我们需要进行测试。让我们最后一次使用StepVerifier来确认我们的处理程序有效:
@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
SequenceHandler sequenceHandler = new SequenceHandler();
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);
StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
.expectNext(0, 1, 4, 17)
.expectComplete()
.verify();
}
斐波那契数列的前 10 个项目中有四个偶数:0、2、8和 34,因此我们将参数传递给expectNext方法。