Contents

Flux 创建序列

1. 概述

在本教程中,我们将使用Project Reactor 基础知识 来学习一些创建Flux 的技术。

2.Maven依赖

让我们从几个依赖项开始。我们需要*reactor-core *和 reactor-test

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.2.6.RELEASE</version>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <version>3.2.6.RELEASE</version>
    <scope>test</scope>
</dependency>

3. 同步创建

创建Flux的最简单方法是Flux#generate此方法依赖于生成器函数来生成一系列项目。

但首先,让我们定义一个类来保存我们说明*generate *方法的方法:

public class SequenceGenerator {
    // methods that will follow
}

3.1. 具有新状态的生成器

让我们看看如何使用 Reactor生成斐波那契数列:

public Flux<Integer> generateFibonacciWithTuples() {
    return Flux.generate(
            () -> Tuples.of(0, 1),
            (state, sink) -> {
                sink.next(state.getT1());
                return Tuples.of(state.getT2(), state.getT1() + state.getT2());
            }
    );
}

不难看出这个generate 方法接受两个函数作为它的参数——一个Callable和一个BiFunction

  • Callable函数为生成器设置初始状态——在这种情况下,它是一个包含元素 0 和1的*Tuples *
  • BiFuntion函数是一个生成器,消耗一个SynchronousSink ,然后在每一轮中使用 sink 的next方法和当前状态发出一个项目

顾名思义,  SynchronousSink对象同步工作。但是,请注意,我们不能 在每个生成器调用中多次调用此对象的next方法。

让我们使用StepVerifier 验证生成的序列:

@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);
    StepVerifier.create(fibonacciFlux)
      .expectNext(0, 1, 1, 2, 3)
      .expectComplete()
      .verify();
}

在此示例中,订阅者仅请求五个项目,因此生成的序列以数字3结尾。

正如我们所看到的,生成器返回一个新的状态对象以在下一次传递中使用。**不过,没有必要这样做。**我们可以为生成器的所有调用重用一个状态实例。

3.2. 具有可变状态的生成器

假设我们要生成具有回收状态的斐波那契数列。为了演示这个用例,让我们首先定义一个类:

public class FibonacciState {
    private int former;
    private int latter;
    // constructor, getters and setters
}

我们将使用此类的一个实例来保存生成器的状态。这个实例的两个属性,former latter,在序列中存储两个连续的数字。

如果我们修改我们的初始示例,我们现在将使用可变状态与generate

public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
    return Flux.generate(
      () -> new FibonacciState(0, 1),
      (state, sink) -> {
        sink.next(state.getFormer());
        if (state.getLatter() > limit) {
            sink.complete();
        }
        int temp = state.getFormer();
        state.setFormer(state.getLatter());
        state.setLatter(temp + state.getLatter());
        return state;
    });
}

与前面的示例类似,此生成变体 具有状态供应商和生成器参数。

Callable类型的状态提供者 只需创建一个初始属性为0 和1FibonacciState对象。此状态对象将在生成器的整个生命周期中重复使用。

就像Fibonacci-with-Tuples示例中的SynchronousSink一样,这里的 sink 一个一个地生成项目。但是,与该示例不同的是,生成器每次调用时都返回相同的状态对象。

还要注意这次,**为了避免无限序列,**我们指示接收器在产生的值达到限制时完成。

而且,让我们再次进行快速测试以确认它是否有效:

@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
      .expectNext(0, 1, 1, 2, 3, 5, 8)
      .expectComplete()
      .verify();
}

3.3. 无状态变体

generate方法有另一个变体 ,只有一个Consumer<SynchronousSink>类型的参数。该变体仅适用于产生预定序列,因此没有那么强大。那我们就不详细介绍了。

4. 异步创建

同步创建并不是以编程方式创建Flux的唯一解决方案。

相反,我们可以使用create 和push运算符以异步方式在一轮发射中生成多个项目。

4.1. create方法

**使用create 方法,我们可以从多个线程中生成项目。**在此示例中,我们将来自两个不同来源的元素收集到一个序列中。

首先,让我们看看creategenerate有何不同:

public class SequenceCreator {
    public Consumer<List<Integer>> consumer;
    public Flux<Integer> createNumberSequence() {
        return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
    }
}

generate运算符不同,create方法不维护状态。传递给此方法的发射器不是自己生成项目,而是从外部源接收元素。

此外,我们可以看到create运算符要求我们使用FluxSink  而不是SynchronousSink。使用FluxSink我们可以 根据需要多次调用next()

在我们的例子中,我们将为items列表中的每个项目调用next(),逐个发出。稍后我们将看到如何填充items

在这种情况下,我们的外部源是一个虚构的*consumer *领域,尽管这可能是一些可观察的 API。

让我们将create运算符付诸行动,从两个数字序列开始:

@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
    List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();
    // other statements described below
}

这些序列,sequence1sequence2,将作为生成序列的项目源。

接下来是两个Thread对象,它们会将元素注入发布者:

SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
  () -> sequenceCreator.consumer.accept(sequence2)
);

当调用*accept *操作符时,元素开始流入序列源。

然后,我们可以收听或subscribe我们新的合并序列:

List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);

通过订阅我们的序列,我们指示序列发出的每个项目应该发生什么。在这里,它将来自不同来源的每个项目添加到合并列表中。

现在,我们触发整个过程,看到项目在两个不同的线程上移动:

producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();

像往常一样,最后一步是验证操作的结果:

assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);

接收到的序列中的前三个数字来自sequence1,而后四个来自sequence2。由于异步操作的性质,不能保证这些序列中元素的顺序。

create方法有另一个变体 ,采用*OverflowStrategy *类型的参数。顾名思义,当下游跟不上发布者时,此参数管理背压。默认情况下,发布者会在这种情况下缓冲所有元素。

4.2. push方法

除了create运算符之外,Flux类还有另一个静态方法来异步发出序列,即push 。此方法的工作原理与create类似,不同之处在于它一次只允许一个生产线程发出信号。

我们可以用push替换刚才例子中 的create方法,代码仍然可以编译

然而,有时我们会看到一个断言错误,因为push 操作符阻止FluxSink#next在不同线程上被同时调用。因此,只有在我们不打算使用多线程时才应该使用 push 。

5. 处理序列

到目前为止,我们看到的所有方法都是静态的,并且允许从给定的源创建序列。Flux API 还提供了一个名为handle 的实例方法,用于处理发布者生成的序列。

这个handle运算符接受一个序列,进行一些处理并可能删除一些元素。在这方面,我们可以说handle运算符的工作方式就像一个map 和一个filter

我们来看一个handle方法的简单说明:

public class SequenceHandler {
    public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
        return sequence.handle((number, sink) -> {
            if (number % 2 == 0) {
                sink.next(number / 2);
            }
        });
    }
}

在此示例中,handle运算符采用一系列数字,如果是偶数,则将该值除以2 。如果该值为奇数,则运算符不执行任何操作,这意味着忽略这样的数字。

需要注意的另一件事是,与generate方法一样,handle使用SynchronousSink并且仅启用逐个发射。**

最后,我们需要进行测试。让我们最后一次使用StepVerifier来确认我们的处理程序有效:

@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
    SequenceHandler sequenceHandler = new SequenceHandler();
    SequenceGenerator sequenceGenerator = new SequenceGenerator();
    Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);
    StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
      .expectNext(0, 1, 4, 17)
      .expectComplete()
      .verify();
}

斐波那契数列的前 10 个项目中有四个偶数:0、2、834,因此我们将参数传递给expectNext方法。