Java 8 中的自定义并行流线程池
1. 概述
Java 8 引入了流的概念,作为对数据执行批量操作的有效方式。并且可以在支持并发的环境中获取并行Streams。 这些流可以带来更高的性能 - 以多线程开销为代价。 在本快速教程中,我们将了解Stream API的最大限制之一,**并了解如何使并行流与自定义ThreadPool实例一起工作,或者 -有一个库 可以处理这个。
2. 并行Stream
让我们从一个简单的例子开始——在任何Collection类型上调用parallelStream方法——这将返回一个可能的并行Stream:
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
在此类Stream中发生的默认处理使用ForkJoinPool.commonPool(),这是一个由整个应用程序共享的线程池。
3. 自定义线程池
我们实际上可以在处理流时传递一个自定义的ThreadPool。
以下示例让并行Stream使用自定义ThreadPool来计算从 1 到 1,000,000 的 long 值的总和,包括:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我们使用了并行度为 4 的ForkJoinPool构造函数。需要进行一些实验来确定不同环境的最佳值,但一个好的经验法则是简单地根据 CPU 的内核数来选择数量。 接下来,我们处理了并行Stream的内容,在reduce调用中将它们汇总。 这个简单的例子可能无法展示使用自定义线程池的全部用途,但在我们不希望将公共线程池与长时间运行的任务(例如处理来自网络源的数据)捆绑在一起的情况下,好处变得显而易见– 或者应用程序中的其他组件正在使用公共线程池。
如果我们运行上面的测试方法,它就会通过。到目前为止,一切都很好。 但是,如果我们像在测试方法中一样在普通方法中实例化ForkJoinPool类,则可能会导致OutOfMemoryError。 接下来,让我们仔细看看内存泄漏的原因。
4. 当心内存泄漏
正如我们之前谈到的,公共线程池默认情况下被整个应用程序使用。公共线程池是一个静态的ThreadPool实例。 因此,如果我们使用默认线程池,则不会发生内存泄漏。
现在,让我们回顾一下我们的测试方法。在测试方法中,我们创建了一个ForkJoinPool对象。当测试方法完成时,customThreadPool对象不会被取消引用和垃圾回收——相反,它将等待新任务被分配。 也就是说,我们每次调用测试方法时,都会创建一个新的customThreadPool对象,并且不会被释放。 问题的修复非常简单:在我们执行完方法后关闭customThreadPool对象:
try {
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
} finally {
customThreadPool.shutdown();
}