Contents

Guava 中Future简介

1. 简介

Guava 为我们提供了ListenableFuture,它在默认的 Java Future 上提供了丰富的 API。让我们看看如何利用它来发挥我们的优势。

2. FutureListenableFutureFutures

让我们简要了解一下这些不同的类是什么以及它们是如何相互关联的。

2.1. Future

从Java 5 开始,我们可以使用*java.util.concurrent.Future *来表示异步任务。

Future允许我们访问已经完成或将来可能完成的任务的结果,并支持取消它们。

2.2. ListenableFuture

使用java.util.concurrent.Future时缺少的一个功能是添加侦听器以在完成时运行的能力,这是大多数流行的异步框架提供的常见功能。

Guava 通过允许我们将侦听器附加到它的*com.google.common.util.concurrent.ListenableFuture *来解决这个问题。

2.3. Futures

Guava 为我们提供了方便的类com.google.common.util.concurrent.Futures ,让我们可以更轻松地使用它们的ListenableFuture

该类提供了与ListenableFuture 交互的多种方式,其中支持添加成功/失败回调,并允许我们通过聚合或转换来协调多个未来。

3. 简单使用

现在让我们看看如何以最简单的方式使用ListenableFuture;创建和添加回调。

3.1. 创建ListenableFuture

获得ListenableFuture的最简单方法是向ListeningExecutorService提交任务(很像我们使用普通ExecutorService获得普通Future的方式):

ExecutorService execService = Executors.newSingleThreadExecutor();
ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService);
ListenableFuture<Integer> asyncTask = lExecService.submit(() -> {
    TimeUnit.MILLISECONDS.sleep(500); // long running task
    return 5;
});

请注意我们如何使用MoreExecutors类将我们的ExecutorService装饰为ListeningExecutorService。我们可以参考 Java中的线程池简介 了解更多关于MoreExecutors的信息。

如果我们已经有一个返回Future的 API 并且我们需要将其转换为ListenableFuture,这很容易通过初始化其具体实现ListenableFutureTask 来完成:

// old api
public FutureTask<String> fetchConfigTask(String configKey) {
    return new FutureTask<>(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}
// new api
public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) {
    return ListenableFutureTask.create(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

我们需要注意,除非我们将它们提交给Executor,否则这些任务不会运行。直接与ListenableFutureTask交互并不常见,仅在极少数情况下(例如:实现我们自己的ExecutorService)。实际使用请参考 Guava 的 AbstractListeningExecutorService

如果我们的异步任务无法使用ListeningExecutorService或提供的Futures实用程序方法,我们也可以使用com.google.common.util.concurrent.SettableFuture,我们需要手动设置未来值。对于更复杂的用法,我们还可以考虑com.google.common.util.concurrent.AbstractFuture

3.2. 添加监听器/回调

我们可以*将监听器添加到ListenableFuture的一种方法是使用*Futures.addCallback()注册回调,当成功或失败发生时,我们可以访问结果或异常:

Executor listeningExecutor = Executors.newSingleThreadExecutor();
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
Futures.addCallback(asyncTask, new FutureCallback<Integer>() {
    @Override
    public void onSuccess(Integer result) {
        // do on success
    }
    @Override
    public void onFailure(Throwable t) {
        // do on failure
    }
}, listeningExecutor);

我们还可以**通过直接将监听器添加到ListenableFuture 来添加监听器。**请注意,当未来成功或异常完成时,此侦听器将运行。另外,请注意,我们无权访问异步任务的结果:

Executor listeningExecutor = Executors.newSingleThreadExecutor();
int nextTask = 1;
Set<Integer> runningTasks = ConcurrentHashMap.newKeySet();
runningTasks.add(nextTask);
ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask()
asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);

4. 复杂的用法

现在让我们看看如何在更复杂的场景中使用这些future 。

4.1. fan-in

我们有时可能需要调用多个异步任务并收集它们的结果,通常称为fan-in操作。

Guava 为我们提供了两种方法。但是,我们应该根据我们的要求谨慎选择正确的方法。假设我们需要协调以下异步任务:

ListenableFuture<String> task1 = service.fetchConfig("config.0");
ListenableFuture<String> task2 = service.fetchConfig("config.1");
ListenableFuture<String> task3 = service.fetchConfig("config.2");

**fan-in多个future 的一种方法是使用*Futures.allAsList()*方法。这允许我们按照提供的future 的顺序收集所有future 的结果,如果它们都成功的话。**如果这些future 中的任何一个失败,那么整个结果就是失败的未来:

ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // do on all futures success
    }
    @Override
    public void onFailure(Throwable t) {
        // handle on at least one failure
    }
}, someExecutor);

如果我们需要收集所有异步任务的结果,无论它们是否失败,我们都可以使用Futures.successfulAsList()。这将返回一个列表,其结果将与传递给参数的任务具有相同的顺序,并且失败的任务将具有null分配给它们在列表中的相应位置:

ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3);
Futures.addCallback(configsTask, new FutureCallback<List<String>>() {
    @Override
    public void onSuccess(@Nullable List<String> configResults) {
        // handle results. If task2 failed, then configResults.get(1) == null
    }
    @Override
    public void onFailure(Throwable t) {
        // handle failure
    }
}, listeningExecutor);

在上面的用法中我们应该小心,如果未来的任务通常在成功时返回null,它将与失败的任务无法区分(也将结果设置为null)。

4.2. 带组合器的fan-in

如果我们需要协调多个返回不同结果的future ,上述解决方案可能还不够。在这种情况下,我们可以使用fan-in操作的组合器变体来协调这种future 组合。

与简单的fan-in操作类似,Guava 为我们提供了两种变体;一种在所有任务都成功完成时成功,另一种即使某些任务失败也分别使用Futures.whenAllSucceed() 和Futures.whenAllComplete() 方法成功。

让我们看看我们如何使用*Futures.whenAllSucceed()*来组合来自多个future 的不同结果类型:

ListenableFuture<Integer> cartIdTask = service.getCartId();
ListenableFuture<String> customerNameTask = service.getCustomerName();
ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask)
    .call(() -> {
        int cartId = Futures.getDone(cartIdTask);
        String customerName = Futures.getDone(customerNameTask);
        List<String> cartItems = Futures.getDone(cartItemsTask);
        return new CartInfo(cartId, customerName, cartItems);
    }, someExecutor);
Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() {
    @Override
    public void onSuccess(@Nullable CartInfo result) {
        //handle on all success and combination success
    }
    @Override
    public void onFailure(Throwable t) {
        //handle on either task fail or combination failed
    }
}, listeningExecService);

如果我们需要让某些任务失败,我们可以使用Futures.whenAllComplete()。虽然语义大多与上述相似,但我们应该知道,失败的future 在调用 Futures.getDone()时会抛出ExecutionException

4.3. 转型

有时我们需要转换未来成功后的结果。Guava 为我们提供了两种使用Futures.transform() 和*Futures.lazyTransform()*的方法。

让我们看看如何*使用*Futures.transform()来转换未来的结果。只要变换计算量不大,就可以使用它:

ListenableFuture<List<String>> cartItemsTask = service.getCartItems();
Function<List<String>, Integer> itemCountFunc = cartItems -> {
    assertNotNull(cartItems);
    return cartItems.size();
};
ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);

我们还可以使用Futures.lazyTransform() 将转换函数应用于java.util.concurrent.Future。我们需要记住,此选项不会返回ListenableFuture  ,而是返回普通的java.util.concurrent.Future,并且每次在生成的 future 上调用*get()*时都会应用转换函数。

4.4. 链接future

我们可能会遇到我们的future 需要调用其他future 的情况。在这种情况下,Guava 为我们提供了*async()*变体,以安全地链接这些future 以一个接一个地执行。

让我们看看如何使用Futures.submitAsync()从提交的Callable内部调用未来:

AsyncCallable<String> asyncConfigTask = () -> {
    ListenableFuture<String> configTask = service.fetchConfig("config.a");
    TimeUnit.MILLISECONDS.sleep(500); //some long running task
    return configTask;
};
ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);

如果我们想要真正的链接,其中一个未来的结果被输入到另一个未来的计算中,我们可以使用Futures.transformAsync()

ListenableFuture<String> usernameTask = service.generateUsername("john");
AsyncFunction<String, String> passwordFunc = username -> {
    ListenableFuture<String> generatePasswordTask = service.generatePassword(username);
    TimeUnit.MILLISECONDS.sleep(500); // some long running task
    return generatePasswordTask;
};
ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);

Guava 还为我们提供了Futures.scheduleAsync() 和*Futures. catchAsync()来分别提交计划任务和提供错误恢复的回退任务。虽然它们适用于不同的场景,但我们不会讨论它们,因为它们与其他async()*调用相似。

5. 使用注意事项

现在让我们研究一下我们在使用future 时可能遇到的一些常见陷阱以及如何避免它们。

5.1. 工作与倾听执行者

在使用 Guava future 时,了解工作执行器和监听执行器之间的区别很重要。例如,假设我们有一个异步任务来获取配置:

public ListenableFuture<String> fetchConfig(String configKey) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE));
    });
}

还假设我们要为上述未来附加一个侦听器:

ListenableFuture<String> configsTask = service.fetchConfig("config.0");
Futures.addCallback(configsTask, someListener, listeningExecutor);

请注意,这里的lExecService是运行我们的异步任务的执行器,而 listenerExecutor 是调用我们的侦听器的执行器。

如上所示,**我们应该始终考虑将这两个 executor 分开,以避免我们的 listeners 和 worker 竞争相同的线程池资源的情况。**共享同一个执行器可能会导致我们的繁重任务使侦听器执行饥饿。或者一个写得不好的重量级监听器最终阻止了我们重要的繁重任务。

5.2. 小心使用directExecutor()

虽然我们可以在单元测试中使用*MoreExecutors.directExecutor()MoreExecutors.newDirectExecutorService()*来更容易处理异步执行,但我们应该在生产代码中小心使用它们。

当我们从上述方法中获取到执行器时,我们提交给它的任何任务,无论是重量级的还是监听器,都将在当前线程上执行。如果当前的执行上下文需要高吞吐量,这可能很危险。

例如,在 UI 线程中使用directExecutor并向其提交重量级任务会自动阻塞我们的 UI 线程。

我们还可能面临这样一种情况,即我们的侦听器最终会减慢所有其他侦听器的速度(即使是那些不涉及directExecutor的侦听器)。这是因为 Guava在其各自的Executor 中执行while循环中的所有侦听器,但directExecutor 会导致侦听器与while循环在同一线程中运行。

5.3. 嵌套future 不好

在使用链式future 时,我们应该注意不要以创建嵌套future 的方式从另一个future 内部调用一个future :

public ListenableFuture<String> generatePassword(String username) {
    return lExecService.submit(() -> {
        TimeUnit.MILLISECONDS.sleep(500);
        return username + "123";
    });
}
String firstName = "john";
ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> {
    final String username = firstName.replaceAll("[^a-zA-Z]+", "")
        .concat("@service.com");
    return generatePassword(username);
});

*如果我们曾经看到有*ListenableFuture<ListenableFuture<V» 的代码,那么我们应该知道这是一个写得不好的未来,因为外部未来的取消和完成可能会竞争,并且取消可能不会传播到内心的未来。

如果我们看到上述场景,我们应该始终使用*Futures.async()*变体以连接的方式安全地解开这些链接的future 。

5.4. 小心使用JdkFutureAdapters.listenInPoolThread()

Guava 建议我们利用其ListenableFuture的最佳方式是将我们所有使用Future的代码转换为ListenableFuture

如果这种转换在某些情况下不可行,**Guava 为我们提供了使用  JdkFutureAdapters.listenInPoolThread()覆盖的适配器来执行此操作。虽然这看起来很有帮助,但Guava 警告我们这些是重量级适配器,应尽可能避免使用。