我正在做一些关于
Java 8并发性的实验
在ScheduledThreadPoolExecutor API中
我可以看到以下两个签名:
schedule(Callable<V> callable,long delay,TimeUnit unit) schedule(Runnable command,TimeUnit unit)
一个用于Callable,一个用于Runnable
我也可以在API中看到以下两个:
scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) scheduleWithFixedDelay(Runnable command,TimeUnit unit)
我的问题是,为什么不存在Callable的两个等价物
scheduleAtFixedRate(Callable<V> callable,TimeUnit unit) scheduleWithFixedDelay(Callable<V> callable,TimeUnit unit)
我需要为操作检索一个布尔结果.
谢谢.
解决方法
您期望scheduleAtFixedRate(Callable< V>)的返回类型是什么?计划的返回类型(可调用< V>)是Future< V>,表示在将来的某个时刻,可调用返回的类型V的值将是可用的.您可以通过调用Future上的get()来等待此值可用.
scheduleAtFixedRate(Callable< V>)的返回类型不能类似Future< List< V>>,因为这意味着在将来的某个时刻,重复调用callable返回的所有值都可用.但是,将会有更多的可调用调度执行,因此该列表将永远不存在.
对于这样的事情你需要的是一个异步结果流的概念,你可以按照这样的方式订阅它,以便在它到达时处理每个结果.据我所知,这在标准库中不存在.我知道的一个第三方库包含Netflix的RxJava.例如,使用该库中的ReplaySubject
,您可以创建结果流并在返回时处理每个结果:
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import rx.subjects.ReplaySubject; public class Callables { public static void main(String[] args) { // Example Callable that returns a boolean result Random random = new Random(); Callable<Boolean> callable = () -> random.nextBoolean(); // Turn the Callable into a Runnable that adds the last result to the stream of results ReplaySubject<Boolean> results = ReplaySubject.create(); Runnable runnable = () -> { try { boolean result = callable.call(); results.onNext(result); } catch (Exception e) { // Needed since Callable can throw an exception,but Runnable cannot } }; // Periodically run the Runnable ScheduledExecutorService executor = Executors.newScheduledThreadPool(5); executor.scheduleAtFixedRate(runnable,1,TimeUnit.SECONDS); // Handling the results as they arrive results.forEach(result -> System.out.println("Result: " + result)); System.out.println("Waiting for results..."); } }
如果你决定使用RxJava,可能值得使用更多的API而不是直接使用Executor.您可以使用Observable.interval
生成定期发出数字的流,然后将其映射以调用您的可调用对象.这样,您可以更简洁的方式获得相同的结果流:
import java.io.IOException; import java.util.Random; import java.util.concurrent.TimeUnit; import rx.Observable; public class MoreCallables { public static void main(String[] args) throws IOException { Observable<Long> periodic = Observable.interval(1,TimeUnit.SECONDS); Random random = new Random(); Observable<Boolean> results = periodic.map(i -> random.nextBoolean()); results.forEach(result -> System.out.println("Result: " + result)); System.out.println("Waiting for results..."); System.in.read(); } }