本文主要是介绍JAVA多线程之FutureTask源码解读,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
使用过Java线程池的应该都知道,在java.util.concurrent有个ExecutorService的线程池接口,通过这个接口先线程池提交任务,获取任务结果,关闭线程池等等操作。而关于任务执行结果的获取,就不得不提FutureTask这个类。本文从源码的角度分析,线程池是如何通过FutureTask执行多线程任务,又是如何获取多线程执行结果的。
使用多线程,我们多数是使用ExecutorService的submit方法,执行多线程任务的,所以笔者这里
ExecutorService接口有个submit()的方法,执行完该方法后会返回一个FutureTask对象,通过调用FutureTask对象的get方法()可以获取线程任务执行结果。submit()方法可以传Runnable或者Callable对象作为参数,那这两者又有什么区别?
我们先看看submit(Runnable task)方法的源码:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;
}
在submit方法中通过newTaskFor(task,null)方法创建了RunnableFuture对象,在调用execute(ftask)执行线程任务之后,讲ftask方法返回。下面通过newTaskFor方法一层一层往下看源码:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}
//FutureTask方法
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable
}
//callable方法
public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);
}//RunnableAdapter类
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;
RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;
}
public T call() {task.run();return result;}
}
从源码可以知道,newTaskFor(Runnable runnable, T value)就是创建一个FutureTask对象,通过入参runnable对象和value创建一个实现Callable接口类RunnableAdapter的对象,并赋值给callable成员变量。需要注意的是,RunnableAdapter持有两个个成员变量task,result,task是线程任务,result就是线程任务执行结果。FutureTask就是通过callable成员变量执行线程任务和获取任务执行结果的。但是submit(Runnable task)方法中的newTaskFor(task, null)第二个参数参入null,则返回结果置为null,通过submit(Runnable task)是无法获取返回结果的。
再看看submit(Callable<T> task)方法的源码:
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable
}
由此可知,submit(Callable<T> task)和submit(Runnable task)方法本质没区别,都是创建一个FutureTask方法,并讲FutureTask的成员变量callable和state状态初始化。
在上文中的源码可知,FutureTask的构造方法中,会初始化一个state的成员变量,state变量代笔了一个任务运行的状态。FutureTask类中定义了几种任务状态,如下:
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
状态值有小至大分别是新建,执行中,正常(可以理解成任务完成状态),异常,已取消状态,中断中,已中断。
任务状态变更过程的几种可能:
新建-->执行中-->已完成
新建-->执行中-->异常
新建-->已取消
新建-->中断中-->已中断
再看下其它成员变量,下面这些变量都会在后文中出现:
/** The underlying callable; nulled out after running */
//线程任务
private Callable<V> callable;
/** The result to return or exception to throw from get() */
//任务执行后的结果
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
//当前线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
//等待结点
private volatile WaitNode waiters;
在了解了FutureTask的构造方法和成员变量之后。接下来需要看看FutureTask的两个重要的方法。执行线程任务的run()方法和获取线程任务执行结果的get()方法。
首先,来看FutureTask的run()方法:
public void run() {//compareAndSwapObject()方法作用是若当前对象的偏移量runnerOffset的变量(即成员变量runner),若runner//为null,赋值为Thread.currentThread(),赋值成功返回true,否则返回false//若任务状态不是为新建状态或compareAndSwapObject()方法返回为false,则返回if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//执行任务result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)//设置任务执行结果set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}
源码中的UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))可能比较难理解,可以看下这篇文章:https://www.cnblogs.com/thomas12112406/p/6510787.html
run()主要的代码就是调用callable的call()方法执行任务,并获取返回结果赋值给result,并调用set(result)方法设置返回值。
protected void set(V v) {//compareAndSwapInt()方法同上,若成员变量state的值为NEW,设置为COMPLETING,设置成功,返回true,否则返回falseif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//将任务执行结果赋值给outcomeoutcome = v;//将任务状态state赋值为已完成状态UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//该方法主要将waiters结点一一置为nullfinishCompletion();}
}
通过set(V v)源码可以知道,FutureTask是在线程任务代码执行完成后,再讲代表任务状态的state变量从NEW置COMPLETING
。设置成功后,将任务执行结果赋值给成员变量outcome。赋值之后,再将state任务状态从COMPLETING置为NORMAL,也就是任务完成状态。
接下来,再看FutureTask另外一个重要方法,get()和get(long timeout, TimeUnit unit)方法。二者区别在于前者无限等待线程执行完任务,后者设定超时时间,超时则返回并抛出超时异常。
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;//若任务状态为NEW或者COMPLETING,则执行awaitDone方法,即等待任务执行完成if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)throw new TimeoutException();return report(s);
}
由源码可知,这两个get方法都调用了一个方法awaitDone()。下面再看看这个awaitDone()方法。
private int awaitDone(boolean timed, long nanos)throws InterruptedException {//设置超时时间final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;//循环等待任务完成for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;//如果状态大于COMPLETING,则返回此刻任务状态sif (s > COMPLETING) {if (q != null)q.thread = null;return s;}//如果任务执行中,让出cpu占用权else if (s == COMPLETING) // cannot time out yetThread.yield();//如果q为null,创建等待结点else if (q == null)q = new WaitNode();//将等待结点添加至waiters链表中else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);//若设置超时时间,判断是否超时,若超时,返回此时任务状态else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}
}
也就是,如果任务已经完成,返回。如果任务执行中,让出cpu占用权力,继续等待。如果任务状态不等于执行中,也就是NEW状态,判断等待结点是否为空,若为空创建等待结点,并在下次循环中添加至waiters链表中。如果设置了超时时间,判断是否超时,如果超时,将等待结点从等待链表waiters中移除,并返回此刻的任务状态。如果不设置超时时间,则一直循环至任务状态大于COMPLETING。
总结:FutureTask在构造方法中初始化了callable和state两个变量,callable用于执行线程任务和获取任务结果,state用于标识任务执行状态,任务状态的枚举型有NEW,COMPLETING,NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTING,INTERRUPTED。FutureTask通过执行run()方法调用callable的call()方法执行线程任务,并通过set(result)方法设置线程返回结果。而通过get()方法获取线程返回结果,get方法中调用awaitDone()循环等待线程任务执行完毕。如果设置超时时间,即便任务未执行完成也会跳出循环并返回。未设置超时时间,则一直循环等待线程任务执行完毕。
这篇关于JAVA多线程之FutureTask源码解读的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!