强制Java流尽早执行管道的一部分,以将阻塞任务提交给线程池

前端之家收集整理的这篇文章主要介绍了强制Java流尽早执行管道的一部分,以将阻塞任务提交给线程池前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我有一个我想要处理的对象列表,Java8流API看起来是最干净和可读的方式.

但是我需要对这些对象执行的一些操作包括阻塞IO(比如读取数据库) – 所以我想将这些操作提交给有几十个线程的线程池.

起初我想过做一些事情:

@H_403_9@myObjectList .stream() .filter(wrapPredicate(obj -> threadPoolExecutor.submit( () -> longQuery(obj) // returns boolean ).get()) // wait for future & unwrap boolean .map(filtered -> threadPoolExecutor.submit( () -> anotherQuery(filtered) // returns Optional )) .map(wrapFunction(Future::get)) .filter(Optional::isPresent) .map(Optional::get) .collect(toList());

wrapPredicate和wrapFunction仅用于检查异常重新抛出.

但是,很明显,对Future.get()的调用将阻塞流的线程,直到查询完成给定对象,并且流将在此之前不进行.因此,一次只处理一个对象,并且线程池没有意义.

我可以使用并行流,但是我需要希望默认的ForkJoinPool足够用于此.或者只是增加“java.util.concurrent.ForkJoinPool.common.parallelism”,但我不想为了那个流而改变整个应用程序的设置.我可以在自定义ForkJoinPool中创建流,但我看到it does not guarantee that level of parallelism.

所以我最终得到了类似的东西,只是为了保证在等待期货完成之前将所有需要的任务提交给threadPool:

@H_403_9@myObjectList .stream() .map(obj -> Pair.of(obj,threadPoolExecutor.submit( () -> longQuery(obj) // returns boolean )) ) .collect(toList()).stream() // terminate stream to actually submit tasks to the pool .filter(wrapPredicate(p -> p.getRight().get())) // wait & unwrap future after all tasks are submitted .map(Pair::getLeft) .map(filtered -> threadPoolExecutor.submit( () -> anotherQuery(filtered) // returns Optional )) .collect(toList()).stream() // terminate stream to actually submit tasks to the pool .map(wrapFunction(Future::get)) // wait & unwrap futures after all submitted .filter(Optional::isPresent) .map(Optional::get) .collect(toList());

有没有明显更好的方法来实现这一目标?

一种更优雅的方式告诉流“直到现在为流中的每个对象执行流水线步骤”,然后继续处理.collect(toList()).stream()以外的处理和更好的方法来过滤效果一个未来而不是将它打包到Apache Commons Pair中以便稍后过滤Pair :: getRight?或者对问题采取完全不同的方法

最佳答案
您可以通过使用来大大简化代码

@H_403_9@myObjectList.stream() .map(obj -> threadPoolExecutor.submit( () -> longQuery(obj)? anotherQuery(obj).orElse(null): null)) .collect(toList()).stream() .map(wrapFunction(Future::get)) .filter(Objects::nonNull) .collect(toList());

有一点是,如果您稍后再向同一个执行程序提交另一个查询,那么并发性将没有任何改进.因此,您可以在longQuery返回true后直接执行它.此时,obj仍然在范围内,因此您可以将它用于anotherQuery.

通过提取Optional的结果,使用null作为失败的表示,我们可以获得缺少结果的相同表示,因为longQuery返回false或anotherQuery返回空Optional.因此,在提取Future的结果之后我们所要做的就是.filter(Objects :: nonNull).

在获得实际结果之前,您必须首先提交作业,收集期货的逻辑不会改变.无论如何都无法绕过它.所有其他便利方法或框架都可以提供,隐藏这些对象的实际临时存储.

猜你在找的Java相关文章