资讯专栏INFORMATION COLUMN

FutureTask源码分析

luqiuwen / 1225人阅读

摘要:从而可以启动和取消异步计算任务查询异步计算任务是否完成和获取异步计算任务的返回结果。原理分析在分析中我们没有看它的父类,其中有一个方法,返回一个,说明该方法可以获取异步任务的返回结果。

FutureTask介绍

FutureTask是一种可取消的异步计算任务。它实现了Future接口,代表了异步任务的返回结果。从而FutureTask可以启动和取消异步计算任务、查询异步计算任务是否完成和获取异步计算任务的返回结果。只有到异步计算任务结束时才能获取返回结果,当异步计算任务还未结束时调用get方法会使线程阻塞。一旦异步计算任务完成,计算任务不能重新启动或者取消,除非调用了runAndReset。

FutureTask实现了RunnableFuture,RunnableFuture结合了Future和Runnable。

FutureTask原理分析

在ThreadPoolExecutor分析中我们没有看它的父类AbstractExecutorService,其中有一个方法submit,返回一个Future,说明该方法可以获取异步任务的返回结果。该方法有三个重载,可以接收Runnable和Callable,Callable是可以返回结果的一个Runnable,而Callable就是FutureTask的一个重要的变量。

@FunctionalInterface
public interface Callable {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
FutureTask的一些变量和状态
/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** The underlying callable; nulled out after running */
//一个可以返回结果的任务
private Callable callable;
/** The result to return or exception to throw from get() */
//包装返回结果或者异常,没有被volatile修饰,状态保护读写安全
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
//运行线程
private volatile Thread runner;
/** Treiber stack of waiting threads */
//单链表,是一个线程的栈的结构
private volatile WaitNode waiters;

FutureTask有7中状态,介绍一下状态之间的转换:
NEW -> COMPLETING -> NORMAL:任务正常执行;
NEW -> COMPLETING -> EXCEPTIONAL:任务发生异常;
NEW -> CANCELLED:任务被取消;
NEW -> INTERRUPTING -> INTERRUPTED:任务被中断;

run方法
public void run() {
    //如果state不为NEW,说明任务已经在执行或者取消
    //如果设置运行线程失败,说明任务已经有运行线程抢在前面
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable c = callable;
        //NEW状态才可以执行
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                //设置异常信息
                setException(ex);
            }
            if (ran)
                //设置任务运行结果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //将运行线程清空,在state被更改之前要保证runner非空,这样能包装run方法不被多次执行
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set、setException和handlePossibleCancellationInterrupt
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

当执行时发生异常,调用setException,首先将state设置为COMPLETING,设置成功后将outcome设置为异常,然后将state设置为EXCEPTIONAL。

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

当callable执行成功并返回,调用set,首先将state设置为COMPLETING,设置成功后将结果设置为outcome,然后设置state为NORMAL。

finally中如果state为中断,调用handlePossibleCancellationInterrupt:

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let"s spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

如果状态一直是INTERRUPTING,稍稍等待。

finishCompletion和get

在上面set和setException中最后都调用了finishCompletion方法:

private void finishCompletion() {
    // assert state > COMPLETING;
    //该方法必须在state > COMPLETING时调用
    //从头到尾唤醒WaitNode中阻塞的线程
    for (WaitNode q; (q = waiters) != null;) {
        //设置栈顶为空
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                //唤醒线程
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //如果next为空,break
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

在调用get方法时,如果任务还在执行,线程会阻塞,FutureTask会将阻塞的线程放入waiters单链表。等待任务结束时被唤醒,我们继续看get方法:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //如果任务还在执行,阻塞当前线程,放入waiters单链表
        s = awaitDone(false, 0L);
    return report(s);
}
awaitDone
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        //如果线程被中断,移除当前node,抛出异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        //如果任务完成或者被取消,直接返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任务正在执行,线程等待一下
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //如果q为空,新建一个node
        else if (q == null)
            q = new WaitNode();
        //如果还未入列,尝试将新建的node放入链表
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //如果设置了超时且超时了
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //超时,移除node
                removeWaiter(q);
                return state;
            }
            //阻塞线程
            LockSupport.parkNanos(this, nanos);
        }
        //阻塞当前线程
        else
            LockSupport.park(this);
    }
}
removeWaiter
private void removeWaiter(WaitNode node) {
    if (node != null) {
        //设置节点的线程为空,做删除标记
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                //thread不为空,continue
                if (q.thread != null)
                    pred = q;
                //thread为空且pred不为空
                else if (pred != null) {
                    //删除q
                    pred.next = s;
                    //检查一下pred的thread,如果被其他线程修改,retry outer loop
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                //thread为空且pred为空说明q为栈顶,将q.next设置为栈顶,失败则retry
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

report方法

get方法最后调用了report方法:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    //NORMAL表示任务执行正常,返回结果
    if (s == NORMAL)
        return (V)x;
    //任务被取消,抛出异常
    if (s >= CANCELLED)
        throw new CancellationException();
    //其他情况只有可能发生异常,抛出该异常
    throw new ExecutionException((Throwable)x);
}

cancel方法

最后看一下cancel方法:

public boolean cancel(boolean mayInterruptIfRunning) {
    //当state不为NEW说明任务已经开始,不能被取消,返回false
    //当设置state失败时,返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                //中断线程
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //设置任务为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70968.html

相关文章

  • FutureTask源码解析(2)——深入理解FutureTask

    摘要:本文的源码基于。人如其名,包含了和两部分。而将一个任务的状态设置成终止态只有三种方法我们将在下文的源码解析中分析这三个方法。将栈中所有挂起的线程都唤醒后,下面就是执行方法这个方法是一个空方 前言 系列文章目录 有了上一篇对预备知识的了解之后,分析源码就容易多了,本篇我们就直接来看看FutureTask的源码。 本文的源码基于JDK1.8。 Future和Task 在深入分析源码之前,我...

    Harpsichord1207 评论0 收藏0
  • FutureTask源码分析笔记

    摘要:主要的实现实际上运行还是一个,它对做了一个封装,让开发人员可以从其中获取返回值是有状态的共种状态,四种状态变换的可能和的区别通过方法调用有返回值可以抛异常结果的实现原理判断状态非状态则直接进入返回结果处于状态,则进入等待流程获 主要的实现FutureTask # FutureTask实际上运行还是一个runnable,它对callable做了一个封装,让开发人员可以从其中获取返回值; ...

    PascalXie 评论0 收藏0
  • 系列文章目录

    摘要:为了避免一篇文章的篇幅过长,于是一些比较大的主题就都分成几篇来讲了,这篇文章是笔者所有文章的目录,将会持续更新,以给大家一个查看系列文章的入口。 前言 大家好,笔者是今年才开始写博客的,写作的初衷主要是想记录和分享自己的学习经历。因为写作的时候发现,为了弄懂一个知识,不得不先去了解另外一些知识,这样以来,为了说明一个问题,就要把一系列知识都了解一遍,写出来的文章就特别长。 为了避免一篇...

    lijy91 评论0 收藏0
  • 系列文章目录

    摘要:为了避免一篇文章的篇幅过长,于是一些比较大的主题就都分成几篇来讲了,这篇文章是笔者所有文章的目录,将会持续更新,以给大家一个查看系列文章的入口。 前言 大家好,笔者是今年才开始写博客的,写作的初衷主要是想记录和分享自己的学习经历。因为写作的时候发现,为了弄懂一个知识,不得不先去了解另外一些知识,这样以来,为了说明一个问题,就要把一系列知识都了解一遍,写出来的文章就特别长。 为了避免一篇...

    Yumenokanata 评论0 收藏0
  • FutureTask源码解析(1)——预备知识

    摘要:在分析它的源码之前我们需要先了解一些预备知识。因为接口没有返回值所以为了与兼容我们额外传入了一个参数使得返回的对象的方法直接执行的方法然后返回传入的参数。 前言 系列文章目录 FutureTask 是一个同步工具类,它实现了Future语义,表示了一种抽象的可生成结果的计算。在包括线程池在内的许多工具类中都会用到,弄懂它的实现将有利于我们更加深入地理解Java异步操作实现。 在分析...

    mmy123456 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<