Java中Future、FutureTask原理以及与线程池的搭配使用

前端之家收集整理的这篇文章主要介绍了Java中Future、FutureTask原理以及与线程池的搭配使用前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

Java中的Future和Future通常和线程池搭配使用,用来获取线程池返回执行后的返回值。我们假设通过Executors工厂方法构建一个线程池es ,es要执行某个任务有两种方式,一种是执行 es.execute(runnable),这种情况是没有返回值的; 另外一种情况是执行 es.submit(runnale)或者 es.submit(callable),这种情况会返回一个Future的对象,然后调用Future的get()来获取返回值。

Future

public interface Future<V> {
  boolean cancel(boolean mayInterruptIfRunning);

  boolean isCancelled();

  boolean isDone();

  V get() throws InterruptedException,ExecutionException;

  V get(long timeout,TimeUnit unit)
    throws InterruptedException,ExecutionException,TimeoutException;
}

Future是一个接口,他提供给了我们方法来检测当前的任务是否已经结束,还可以等待任务结束并且拿到一个结果,通过调用Future的get()方法可以当任务结束后返回一个结果值,如果工作没有结束,则会阻塞当前线程,直到任务执行完毕,我们可以通过调用cancel()方法来停止一个任务,如果任务已经停止,则cancel()方法会返回true;如果任务已经完成或者已经停止了或者这个任务无法停止,则cancel()会返回一个false。当一个任务被成功停止后,他无法再次执行。isDone()和isCancel()方法可以判断当前工作是否完成和是否取消。

线程池中有以下几个方法

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task,null);
    execute(ftask);
    return ftask;
  }
public <T> Future<T> submit(Runnable task,T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task,result);
    execute(ftask);
    return ftask;
  }
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
  }

都返回一个Future对象,仔细查看发现,所有的方法最终都将runnable或者callable转变成一个RunnableFuture的对象,这个RunnableFutre的对象是一个同时继承了Runnable和Future的接口

public interface RunnableFuture<V> extends Runnable,Future<V> {
  void run();
}

然后调用executor(runnable)方法,关于executor(runnable)的具体实现我们后面再讲。最后返回一个RunnableFuture对象。RunnableFuture这个接口直有一个具体的实现类,那就时我们接下来要讲的FutureTask。

FutureTask

public class FutureTask<V> implements RunnableFuture<V>

FutureTask实现了RunnableFuture的接口,既然我们知道最终返回的是一个FutureTask对象ftask,而且我们可以通过ftask.get()可以的来得到execute(task)的返回值,这个过程具体事怎么实现的了??这个也是本篇文章的所要讲的。

我们可以先来猜测一下它的实现过程,首先Runnable的run()是没有返回值的,所以当es.submit()的参数只有一个Runnable对象的时候,通过ftask.get()得到的也是一个null值,当参数还有一个result的时候,就返回这个result;如果参数是一个Callable的对象的时候,Callable的call()是有返回值的,同时这个call()方法会在转换的Runable对象ftask的run()方法中被调用,然后将它的返回值赋值给一个全局变量,最后在ftask的get()方法中得到这个值。猜想对不对了? 我们直接看源码。

将Runnable对象转为RunnableFuture的方法

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable,T value) {
    return new FutureTask<T>(runnable,value);
  }
public FutureTask(Runnable runnable,V result) {
    this.callable = Executors.callable(runnable,result);
    this.state = NEW;    // ensure visibility of callable
  }

Executors::callable()

public static <T> Callable<T> callable(Runnable task,T result) {
    if (task == null)
      throw new NullPointerException();
    return new RunnableAdapter<T>(task,result);
  }

Executors的内部类RunnableAdapter

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task,T result) {
      this.task = task;
      this.result = result;
    }
    public T call() {
      task.run();
      return result;
    }
  }

将Callable对象转为RunnableFuture的方法

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
  }
public FutureTask(Callable<V> callable) {
    if (callable == null)
      throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;    // ensure visibility of callable
  }

接下来我们来看execute(runnbale)的执行过程:

execute(runnable)最终的实现是在ThreadPoolExecutor,基本上所有的线程池都是通过ThreadPoolExecutor的构造方法传入不同的参数来构造的。

ThreadPoolExecutor::executor(runnable) :

public void execute(Runnable command) {
    if (command == null)
      throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command,true))
        return;
      c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
      if (! isRunning(recheck) && remove(command))
        reject(command);
      else if (workerCountOf(recheck) == 0)
        addWorker(null,false);
    }
    else if (!addWorker(command,false))
      reject(command);
  }

如上所示,这个过程分为三步:

Step 1:

如果当前线程池的的线程小于核心线程的数量的时候,就会调用addWorker检查运行状态和正在运行的线程数量,通过返回false来防止错误添加线程,然后执行当前任务。

Step 2:

否则当前线程池的的线程大于核心线程的数量的时候,我们仍然需要先判断是否需要添加一个新的线程来执行这个任务,因为可能已经存在的线程此刻任务执行完毕处于空闲状态,这个时候可以直接复用。否则创建一个新的线程来执行此任务。

Step 3:

如果不能再添加新的任务,就拒绝。

执行execute(runnable)最终会回调runnable的run()方法,也就是FutureTask的对象ftask的run()方法,源码如下:

public void run() {
    if (state != NEW ||
      !UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))
      return;
    try {
      Callable<V> c = callable;
      if (c != null && state == NEW) {
        V result;
        boolean ran;
        try {
          result = c.call();
          ran = true;
        } catch (Throwable ex) {
          result = null;
          ran = false;
          setException(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
    }
  }

通过执行result = c.call()拿到返回值,然后set(result) ,因此get()方法获得的值正是这个result。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

猜你在找的Java相关文章