public <A,B> Iterable<ListenableFuture<B>> splitAndRun( final ListenableFuture<Iterable<A>> elements,final Function<A,B> func,final ListeningExecutorService executor );
很明显,如果我返回ListenableFuture< Iterable< ListenableFuture< B>>>,我可以这样做,但我觉得我应该能够拆分并运行它并保持其异步性.
这是我到目前为止的代码,但你会注意到最后讨厌的.get(),它会破坏事物.如果我的事情过于复杂,请原谅.
public class CallableFunction<I,O> implements Callable<O>{ private final I input; private final Function<I,O> func; public CallableFunction(I input,Function<I,O> func) { this.input = input; this.func = func; } @Override public O call() throws Exception { return func.apply(input); } } public <A,final ListeningExecutorService executor ) throws InterruptedException,ExecutionException { return Futures.transform(elements,new Function<Iterable<A>,Iterable<ListenableFuture<B>>>() { @Override public Iterable<ListenableFuture<B>> apply(Iterable<A> input) { return Iterables.transform(input,new Function<A,ListenableFuture<B>>() { @Override public ListenableFuture<B> apply(A a) { return executor.submit(new CallableFunction<A,B>(a,func)); } }); } },executor).get(); }
解决方法
但是,如果转型是缓慢的,或者某些输入可能会失败但是对其他输入成功会怎么样?在这种情况下,我们希望单独转换每个输出.另外,我们希望确保转换只发生一次.我们的集合转换方法不能保证这一点.因此,在您的示例代码中,输出上的每次迭代都会向执行程序提交新任务,即使先前提交的任务可能已经完成.因此,只有在转换是轻量级的时候,我们才推荐Iterables.transform和朋友. (通常,如果你正在做重量级的事情,你的转换函数将抛出一个已检查的异常,函数不允许.考虑这个提示:)当然,你的例子不会触发提示.)
这在代码中意味着什么?基本上,我们将颠倒我在其他答案中给出的操作顺序.我们将转换为Future< Iterable< A>>到Iterable< Future< A>>第一.然后我们将为每个A提交一个单独的任务,将其转换为B.对于后一步,我们将提供Executor so that the transformation doesn’t block some innocent thread.(我们现在只需要Futures.transform,所以我静态导入它. )
List<ListenableFuture<A>> individuals = newArrayList(); for (int i = 0; i < knownSize; i++) { final int index = i; individuals.add(transform(input,new Function<List<A>,A>() { @Override public A apply(List<A> values) { return values.get(index); } })); } List<ListenableFuture<B>> result = newArrayList(); for (ListenableFuture<A> original : individuals) { result.add(transform(original,function,executor)); } return result;
无论如何,那就是这个想法.但我的实施是愚蠢的.我们可以轻松地同时执行这两个步骤:
List<ListenableFuture<B>> result = newArrayList(); for (int i = 0; i < knownSize; i++) { final int index = i; result.add(transform(input,B>() { @Override public B apply(List<A> values) { return function.apply(values.get(index)); } },executor)); } return result;
因为这使得n Futures.transform调用而不是1并且因为它使用单独的Executor,所以如果转换是重量级的,那么它比我的其他解决方案更好,如果它是轻量级的则更糟.另一个警告仍然是:只有当你知道你将拥有多少输出时,这才有效.