为什么下面的代码不打印任何输出,而如果我们删除并行,它打印0,1?
IntStream.iterate(0,i -> ( i + 1 ) % 2) .parallel() .distinct() .limit(10) .forEach(System.out::println);
虽然我知道理想的限制应该在不同之前放置,但是我的问题与添加并行处理造成的差异更为相关.
解决方法
真正的原因是有序的并行.distinct()是文档中的
described的完整屏障操作:
Preserving stability for
distinct()
in parallel pipelines is relatively expensive (requires that the operation act as a full barrier,with substantial buffering overhead),and stability is often not needed.
“完全屏障操作”是指所有上游操作必须在下游启动之前执行. Stream API中只有两个完整的屏障操作:.sorted()(每次)和.distinct()(在有序的并行情况下).由于您提供给.distinct()的非短路无限流,最终会产生无限循环.通过合约.distinct()不能以任何顺序向下游发送元素:它应该始终发出第一个重复元素.虽然在理论上可以更好地实现并行有序的.distinct(),但它将是更复杂的实现.
对于解决方案,@ user140547是正确的:在.distinct()之前添加.unordered()这个将distinct()算法转换为无序的()只使用共享ConcurrentHashMap来存储所有观察到的元素并将每个新元素发送到下游).请注意,在.distinct()之后添加.unordered()将不会有帮助.