看来
Java Streams的并行化核心是ForEachTask.了解其逻辑似乎对于获取预期针对Streams API编写的客户端代码的并发行为所必需的心理模型至关重要.然而,我发现我的预期与实际行为相矛盾.
作为参考,这里是关键的compute()方法(java / util / streams / ForEachOps.java:253):
public void compute() { Spliterator<S> rightSplit = spliterator,leftSplit; long sizeEstimate = rightSplit.estimateSize(),sizeThreshold; if ((sizeThreshold = targetSize) == 0L) targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); boolean forkRight = false; Sink<S> taskSink = sink; ForEachTask<S,T> task = this; while (!isShortCircuit || !taskSink.cancellationRequested()) { if (sizeEstimate <= sizeThreshold || (leftSplit = rightSplit.trySplit()) == null) { task.helper.copyInto(taskSink,rightSplit); break; } ForEachTask<S,T> leftTask = new ForEachTask<>(task,leftSplit); task.addToPendingCount(1); ForEachTask<S,T> taskToFork; if (forkRight) { forkRight = false; rightSplit = leftSplit; taskToFork = task; task = leftTask; } else { forkRight = true; taskToFork = leftTask; } taskToFork.fork(); sizeEstimate = rightSplit.estimateSize(); } task.spliterator = null; task.propagateCompletion(); }
在高级描述中,主循环不断分解分离器,交替地去除块的处理并内联处理,直到分割器拒绝进一步分割或剩余大小低于计算的阈值.
现在考虑上述算法在未定义的流的情况下,其中整体不被分割成大致相等的一半;而是从流的头部重复地取出预定尺寸的块.在这种情况下,块的“建议的目标大小”异常大,这主要意味着块不会重新分割成较小的块.
因此,该算法似乎交替地分离一个块,然后一个内联处理.如果每个组块需要相同的时间进行处理,则应该使用不超过两个内核.但是,实际的行为是我机器上的所有四个内核都被占用.显然,我使用该算法缺少一个重要的难题.
我失踪了什么?
附录:测试代码
package test; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static test.FixedBatchSpliteratorWrapper.withFixedSplits; import java.io.IOException; import java.io.PrintWriter; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; public class Parallelization { static final AtomicLong totalTime = new AtomicLong(); static final ExecutorService pool = Executors.newFixedThreadPool(4); public static void main(String[] args) throws IOException { final long start = System.nanoTime(); final Path inputPath = createInput(); System.out.println("Start processing"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(Paths.get("output.txt")))) { withFixedSplits(Files.newBufferedReader(inputPath).lines(),200).map(Parallelization::processLine) .forEach(w::println); } final double cpuTime = totalTime.get(),realTime = System.nanoTime() - start; final int cores = Runtime.getRuntime().availableProcessors(); System.out.println(" Cores: " + cores); System.out.format(" cpu time: %.2f s\n",cpuTime / SECONDS.toNanos(1)); System.out.format(" Real time: %.2f s\n",realTime / SECONDS.toNanos(1)); System.out.format("cpu utilization: %.2f%%",100.0 * cpuTime / realTime / cores); } private static String processLine(String line) { final long localStart = System.nanoTime(); double ret = 0; for (int i = 0; i < line.length(); i++) for (int j = 0; j < line.length(); j++) ret += Math.pow(line.charAt(i),line.charAt(j) / 32.0); final long took = System.nanoTime() - localStart; totalTime.getAndAdd(took); return NANOSECONDS.toMillis(took) + " " + ret; } private static Path createInput() throws IOException { final Path inputPath = Paths.get("input.txt"); try (PrintWriter w = new PrintWriter(Files.newBufferedWriter(inputPath))) { for (int i = 0; i < 6_000; i++) { final String text = String.valueOf(System.nanoTime()); for (int j = 0; j < 20; j++) w.print(text); w.println(); } } return inputPath; } }
package test; import static java.util.Spliterators.spliterator; import static java.util.stream.StreamSupport.stream; import java.util.Comparator; import java.util.Spliterator; import java.util.function.Consumer; import java.util.stream.Stream; public class FixedBatchSpliteratorWrapper<T> implements Spliterator<T> { private final Spliterator<T> spliterator; private final int batchSize; private final int characteristics; private long est; public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,long est,int batchSize) { final int c = toWrap.characteristics(); this.characteristics = (c & SIZED) != 0 ? c | SUBSIZED : c; this.spliterator = toWrap; this.batchSize = batchSize; this.est = est; } public FixedBatchSpliteratorWrapper(Spliterator<T> toWrap,int batchSize) { this(toWrap,toWrap.estimateSize(),batchSize); } public static <T> Stream<T> withFixedSplits(Stream<T> in,int batchSize) { return stream(new FixedBatchSpliteratorWrapper<>(in.spliterator(),batchSize),true); } @Override public Spliterator<T> trySplit() { final HoldingConsumer<T> holder = new HoldingConsumer<>(); if (!spliterator.tryAdvance(holder)) return null; final Object[] a = new Object[batchSize]; int j = 0; do a[j] = holder.value; while (++j < batchSize && tryAdvance(holder)); if (est != Long.MAX_VALUE) est -= j; return spliterator(a,j,characteristics()); } @Override public boolean tryAdvance(Consumer<? super T> action) { return spliterator.tryAdvance(action); } @Override public void forEachRemaining(Consumer<? super T> action) { spliterator.forEachRemaining(action); } @Override public Comparator<? super T> getComparator() { if (hasCharacteristics(SORTED)) return null; throw new IllegalStateException(); } @Override public long estimateSize() { return est; } @Override public int characteristics() { return characteristics; } static final class HoldingConsumer<T> implements Consumer<T> { Object value; @Override public void accept(T value) { this.value = value; } } }
解决方法
具有讽刺意味的是,在这个问题上几乎表示了答案:由于“左”和“右”任务轮到被分割而不是内联处理,所以一半的时间,由此表示的正确任务.完整的流,正在被分流.这意味着,分块的分配只是减慢了一点(每隔一段时间发生一次),但很明显它发生了.