在Java中使用无限流的并行处理

前端之家收集整理的这篇文章主要介绍了在Java中使用无限流的并行处理前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1?
IntStream.iterate(0,i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

虽然我知道理想的限制应该在不同之前放置,但是我的问题与添加并行处理造成的差异更为相关.

解决方法

真正的原因是有序的并行.distinct()是文档中的 described的完整屏障操作:

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier,with substantial buffering overhead),and stability is often not needed.

“完全屏障操作”是指所有上游操作必须在下游启动之前执行. Stream API中只有两个完整的屏障操作:.sorted()(每次)和.distinct()(在有序的并行情况下).由于您提供给.distinct()的非短路无限流,最终会产生无限循环.通过合约.distinct()不能以任何顺序向下游发送元素:它应该始终发出第一个重复元素.虽然在理论上可以更好地实现并行有序的.distinct(),但它将是更复杂的实现.

对于解决方案,@ user140547是正确的:在.distinct()之前添加.unordered()这个将distinct()算法转换为无序的()只使用共享ConcurrentHashMap来存储所有观察到的元素并将每个新元素发送到下游).请注意,在.distinct()之后添加.unordered()将不会有帮助.

猜你在找的Java相关文章