资讯专栏INFORMATION COLUMN

ForkJoin框架之ForkJoinTask

crossoverJie / 2662人阅读

摘要:前言在前面的文章和响应式编程中提到了和后者毫无疑问是一个线程池前者则是一个类似经典定义的概念官方有一个非常无语的解释就是运行在的一个任务抽象就是运行的线程池框架包含和若干的子类它的核心在于分治和工作窍取最大程度利用线程池中的工作线程避免忙的

前言

在前面的文章"CompletableFuture和响应式编程"中提到了ForkJoinTask和ForkJoinPool,后者毫无疑问是一个线程池,前者则是一个类似FutureTask经典定义的概念.

官方有一个非常无语的解释:ForkJoinTask就是运行在ForkJoinPool的一个任务抽象,ForkJoinPool就是运行ForkJoinTask的线程池.

ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子类,它的核心在于分治和工作窍取,最大程度利用线程池中的工作线程,避免忙的忙死,饿的饿死.

ForkJoinTask可以理解为类线程但比线程轻量的实体,在ForkJoinPool中运行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任务.ForkJoinTask同时也是一个轻量的Future,使用时应避免较长阻塞和io.

ForkJoinTask在JAVA8中应用广泛,但它是一个抽象类,它的子类派生了各种用途,如后续计划多带带介绍的CountedCompleter,以及若干JAVA8中stream api定义的与并行流有关的各种操作(ops).

源码

首先看ForkJoinTask的签名.

public abstract class ForkJoinTask implements Future, Serializable 

从签名上看,ForkJoinTask实现了future,也可以序列化,但它不是一个Runnable或Callable.

ForkJoinTask虽然可以序列化,但它只对运行前和后敏感,对于执行过程中不敏感.

先来看task的运行字段:

//volatie修饰的任务状态值,由ForkJoinPool或工作线程修改.
volatile int status; 
static final int DONE_MASK   = 0xf0000000;//用于屏蔽完成状态位. 
static final int NORMAL      = 0xf0000000;//表示正常完成,是负值.
static final int CANCELLED   = 0xc0000000;//表示被取消,负值,且小于NORMAL
static final int EXCEPTIONAL = 0x80000000;//异常完成,负值,且小于CANCELLED
static final int SIGNAL      = 0x00010000;//用于signal,必须不小于1<<16,默认为1<<16.
static final int SMASK       = 0x0000ffff;//后十六位的task标签

很显然,DONE_MASK能够过滤掉所有非NORMAL,非CANCELLED,非EXCEPTIONAL的状态,字段的含义也很直白,后面的SIGNAL和SMASK还不明确,后面再看.

//标记当前task的completion状态,同时根据情况唤醒等待该task的线程.
private int setCompletion(int completion) {
    for (int s;;) {
        //开启一个循环,如果当前task的status已经是各种完成(小于0),则直接返回status,这个status可能是某一次循环前被其他线程完成.
        if ((s = status) < 0)
            return s;
        //尝试将原来的status设置为它与completion按位或的结果.
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
            if ((s >>> 16) != 0)
                //此处体现了SIGNAL的标记作用,很明显,只要task完成(包含取消或异常),或completion传入的值不小于1<<16,
                //就可以起到唤醒其他线程的作用.
                synchronized (this) { notifyAll(); }
            //cas成功,返回参数中的completion.
            return completion;
        }
    }
}

前面用注释解释了这个方法的逻辑,显然该方法是阻塞的,如果传入的参数不能将status设置为负值会如何?

显然,可能会有至多一次的成功cas,并且若满足唤醒的条件,会尝试去唤醒线程,甚至可能因为为了唤醒其他线程而被阻塞在synchonized代码块外;也可能没有一次成功的cas,直到其他线程成功将status置为完成.

//final修饰,运行ForkJoinTask的核心方法.
final int doExec() {
    int s; boolean completed;
    //仅未完成的任务会运行,其他情况会忽略.
    if ((s = status) >= 0) {
        try {
            //调用exec
            completed = exec();
        } catch (Throwable rex) {
            //发生异常,用setExceptionalCompletion设置结果
            return setExceptionalCompletion(rex);
        }
        if (completed)
            //正常完成,调用前面说过的setCompletion,参数为normal,并将返回值作为结果s.
            s = setCompletion(NORMAL);
    }
    //返回s
    return s;
}

//记录异常并且在符合条件时传播异常行为
private int setExceptionalCompletion(Throwable ex) {
    //首先记录异常信息到结果
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
        //status去除非完成态标志位(只保留前4位),等于EXCEPTIONAL.内部传播异常
        internalPropagateException(ex);
    return s;
}
//internalPropagateException方法是一个空方法,留给子类实现,可用于completer之间的异常传递
void internalPropagateException(Throwable ex) {}
//记录异常完成
final int recordExceptionalCompletion(Throwable ex) {
    int s;
    if ((s = status) >= 0) {
        //只能是异常态的status可以记录.
        //hash值禁止重写,不使用子类的hashcode函数.
        int h = System.identityHashCode(this);
        final ReentrantLock lock = exceptionTableLock;
        //异常锁,加锁
        lock.lock();
        try {
            //抹除脏异常,后面叙述
            expungeStaleExceptions();
            //异常表数组.ExceptionNode后面叙述.
            ExceptionNode[] t = exceptionTable;//exceptionTable是一个全局的静态常量,后面叙述
            //用hash值和数组长度进行与运算求一个初始的索引
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                //找到空的索引位,就创建一个新的ExceptionNode,保存this,异常对象并退出循环
                if (e == null) {
                    t[i] = new ExceptionNode(this, ex, t[i]);//(1)
                    break;
                }
                if (e.get() == this) //已设置在相同的索引位置的链表中,退出循环.//2
                    break;
            //否则e指向t[i]的next,进入下个循环,直到发现判断包装this这个ForkJoinTask的ExceptionNode已经出现在t[i]这个链表并break(2),
            //或者直到e是null,意味着t[i]出发开始的链表并无包装this的ExceptionNode,则将构建一个新的ExceptionNode并置换t[i],
            //将原t[i]置为它的next(1).整个遍历判断和置换过程处在锁中进行.
            }
        } finally {
            lock.unlock();
        }
        //记录成功,将当前task设置为异常完成.
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}

//exceptionTable声明
private static final ExceptionNode[] exceptionTable;//全局异常node表
private static final ReentrantLock exceptionTableLock;//上面用到的锁,就是一个普通的可重入锁.
private static final ReferenceQueue exceptionTableRefQueue;//变量表引用队列,后面详述.
private static final int EXCEPTION_MAP_CAPACITY = 32;//异常表的固定容量,不大,只有32而且是全局的.

//初始化在一个静态代码块.
static {
    exceptionTableLock = new ReentrantLock();
    exceptionTableRefQueue = new ReferenceQueue();
    exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];//容量
    try {
        U = sun.misc.Unsafe.getUnsafe();
        Class k = ForkJoinTask.class;
        STATUS = U.objectFieldOffset
            (k.getDeclaredField("status"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

//先来看ExceptionNode内部类的实现
//签名,实现了一个ForkJoinTask的弱引用.
static final class ExceptionNode extends WeakReference> {
    final Throwable ex;
    ExceptionNode next;
    final long thrower;  // use id not ref to avoid weak cycles
    final int hashCode;  // store task hashCode before weak ref disappears
    ExceptionNode(ForkJoinTask task, Throwable ex, ExceptionNode next) {
        super(task, exceptionTableRefQueue);//指向弱引用的构造函数,保存引用为task,队列为全局的exceptionTableRefQueue.
        this.ex = ex;//抛出的异常的引用
        this.next = next;//数组中的ExceptionNode以链表形式存在,前面分析过,先入者为后入者的next
        this.thrower = Thread.currentThread().getId();//保存抛出异常的线程id(严格来说是创建了this的线程)
        this.hashCode = System.identityHashCode(task);//哈希码保存关联task的哈希值.
    }
}
//清除掉异常表中的脏数据,仅在持有全局锁时才可使用.前面看到在记录新的异常信息时要进行一次清除尝试
private static void expungeStaleExceptions() {
    //循环条件,全局exceptionTableRefQueue队列不为空,前面说过ExceptionNode是弱引用,当它被回收时会被放入此队列.
    for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
        //从队首依次取出元素.
        if (x instanceof ExceptionNode) {
            //计算在全局exceptionTable中的索引.
            int hashCode = ((ExceptionNode)x).hashCode;
            ExceptionNode[] t = exceptionTable;
            int i = hashCode & (t.length - 1);
            //取出node
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            //不停遍历,直到e是null为止.
            while (e != null) {
                //e的next
                ExceptionNode next = e.next;//2
                //x是队首出队的元素.它与e相等说明找到
                if (e == x) {
                    //e是一个链表的元素,pred表示它是否有前置元素
                    if (pred == null)
                        //无前置元素,说明e在链表首部,直接将首部元素指向next即可.
                        t[i] = next;
                    else
                        //有前置元素,说明循环过若干次,将当前e出链表
                        pred.next = next;
                    //在链表中发现x即break掉内循环,继续从exceptionTableRefQueue的队首弹出新的元素.
                    break;
                }
                //只要发现当前e不是x,准备下一次循环,pred指向e.e指向next,进行下一个元素的比较.
                pred = e;
                e = next;
            }
        }
    }
}

到此doExec(也是每个ForkJoinTask的执行核心过程)就此结束.

很明显,ForkJoinTask的doExec负责了核心的执行,它留下了exec方法给子类实现,而重点负责了后面出现异常情况的处理.处理的逻辑前面已论述,在产生异常时尝试将异常存放在全局的execptionTable中,存放的结构为数组+链表,按哈希值指定索引,每次存放新的异常时,顺便清理上一次已被gc回收的ExceptionNode.所有ForkJoinTask共享了一个exceptionTable,因此必然在有关的几个环节要进行及时的清理.除了刚刚论述的过程,还有如下的几处:

前面论述了recordExceptionalCompletion,一共有四处使用了expungeStaleException,将已回收的ExceptionNode从引用队列中清除.

clearExceptionalCompletion在对一个ForkJoinTask重新初始化时使用,我们在前面提到序列化时说过,ForkJoinTask的序列化结果只保留了两种情况:运行前,运行结束.重新初始化一个ForkJoinTask,就要去除任何中间状态,包含自身产出的已被回收的异常node,而expungeStaleExceptions显然也顺便帮助其他task清除.

getThrowableException是查询task运行结果时调用,如一些get/join方法,很明显,记录这个异常的作用就在于返回给get/join,在这一块顺便清理已被回收的node,尤其是将自己运行时生成的node清除.

helpExpungeStaleExceptions是提供给ForkJoinPool在卸载worker时使用,顺便帮助清理全局异常表.

使用它们的方法稍后再论述,先来继续看ForkJoinTask的源码.

//内部等待任务完成,直到完成或超时.
final void internalWait(long timeout) {
    int s;
    //status小于0代表已完成,直接忽略wait.
    //未完成,则试着加上SIGNAL的标记,令完成任务的线程唤醒这个等待.
    if ((s = status) >= 0 && 
        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
        //加锁,只有一个线程可以进入.
        synchronized (this) {
            //再次判断未完成.等待timeout,且忽略扰动异常.
            if (status >= 0)
                try { wait(timeout); } catch (InterruptedException ie) { }
            else
                //已完成则响醒其他等待者.
                notifyAll();
        }
    }
}

internalWait方法逻辑很简单,首先判断是否未完成,满足未完成,则将标记位加上SIGNAL(可能已有别的线程做过),随后加锁double check status,还未完成则等待并释放锁,若发现已完成,或在后续被唤醒后发现已完成,则唤醒其他等待线程.通过notifyAll的方式避免了通知丢失.

同时,它的使用方法目前只有一个ForkJoinPool::awaitJoin,在该方法中使用循环的方式进行internalWait,满足了每次按截止时间或周期进行等待,同时也顺便解决了虚假唤醒.

继续看externalAwaitDone函数.它体现了ForkJoin框架的一个核心:外部帮助.

//外部线程等待一个common池中的任务完成.
private int externalAwaitDone() {
    int s = ((this instanceof CountedCompleter) ? 
    //当前task是一个CountedCompleter,尝试使用common ForkJoinPool去外部帮助完成,并将完成状态返回.
             ForkJoinPool.common.externalHelpComplete(
                 (CountedCompleter)this, 0) :
            //当前task不是CountedCompleter,则调用common pool尝试外部弹出该任务并进行执行,
            //status赋值doExec函数的结果,若弹出失败(其他线程先行弹出)赋0.
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
    if (s >= 0 && (s = status) >= 0) {
        //检查上一步的结果,即外部使用common池弹出并执行的结果(不是CountedCompleter的情况),或外部尝试帮助CountedCompleter完成的结果
        //status大于0表示尝试帮助完成失败.
        //扰动标识,初值false
        boolean interrupted = false;
        do {
            //循环尝试,先给status标记SIGNAL标识,便于后续唤醒操作.
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0) {
                        try {
                            //CAS成功,进同步块发现double check未完成,则等待.
                            wait(0L);
                        } catch (InterruptedException ie) {
                            //若在等待过程中发生了扰动,不停止等待,标记扰动.
                            interrupted = true;
                        }
                    }
                    else
                        //进同步块发现已完成,则唤醒所有等待线程.
                        notifyAll();
                }
            }
        } while ((s = status) >= 0);//循环条件,task未完成.
        if (interrupted)
            //循环结束,若循环中间曾有扰动,则中断当前线程.
            Thread.currentThread().interrupt();
    }
    //返回status
    return s;
}

externalAwaitDone的逻辑不复杂,在当前task为ForkJoinPool.common的情况下可以在外部进行等待和尝试帮助完成.方法会首先根据ForkJoinTask的类型进行尝试帮助,并返回当前的status,若发现未完成,则进入下面的等待唤醒逻辑.该方法的调用者为非worker线程.

相似的方法:externalInterruptibleAwaitDone

private int externalInterruptibleAwaitDone() throws InterruptedException {
    int s;
    //不同于externalAwaitDone,入口处发现当前线程已中断,则立即抛出中断异常.
    if (Thread.interrupted())
        throw new InterruptedException();
    if ((s = status) >= 0 &&
        (s = ((this instanceof CountedCompleter) ?
              ForkJoinPool.common.externalHelpComplete(
                  (CountedCompleter)this, 0) :
              ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
              0)) >= 0) {
        while ((s = status) >= 0) {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0)
                        //wait时也不catch中断异常,发生即抛出.
                        wait(0L);
                    else
                        notifyAll();
                }
            }
        }
    }
    return s;
}

externalInterruptibleAwaitDone的逻辑与externalAwaitDone相似,只是对中断异常的态度为抛,后者为catch.

它们的使用点,externalAwaitDone为doJoin或doInvoke方法调用,externalInterruptibleAwaitDone为get方法调用,很明显,join操作不可扰动,get则可以扰动.

下面来看看doJoin和doInvoke

//join的核心方法
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    //已完成,返回status,未完成再尝试后续
    return (s = status) < 0 ? s :
        //未完成,当前线程是ForkJoinWorkerThread,从该线程中取出workQueue,并尝试将
        //当前task出队然后执行,执行的结果是完成则返回状态,否则使用当线程池所在的ForkJoinPool的awaitJoin方法等待.
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        //当前线程不是ForkJoinWorkerThread,调用前面说的externalAwaitDone方法.
        externalAwaitDone();
}

//invoke的核心方法
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    //先尝试本线程执行,不成功才走后续流程
    return (s = doExec()) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        //与上一个方法基本相同,但在当前线程是ForkJoinWorkerThread时不尝试将该task移除栈并执行,而是等
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

到此终于可以看一些公有对外方法了.有了前面的基础,再看get,join,invoke等方法非常简单.

//get方法还有get(long time)的变种.
public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
        //当前线程是ForkJoinWorkerThread则调用前面提过的doJoin方法.
        //否则调用前述externalInterruptibleAwaitDone
        doJoin() : externalInterruptibleAwaitDone();
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        //异常处理,取消的任务,抛出CancellationException.
        throw new CancellationException();
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        //异常处理,调用getThrowableException获取异常,封进ExecutionException.
        throw new ExecutionException(ex);
    //无异常处理,返回原始结果.
    return getRawResult();
}
//getRawResult默认为一个抽象实现,在ForkJoinTask中,并未保存该结果的字段.
 public abstract V getRawResult();

//getThrowableException方法
private Throwable getThrowableException() {
    //不是异常标识,直接返回null,从方法名的字面意思看,要返回一个可抛出的异常.
    if ((status & DONE_MASK) != EXCEPTIONAL)
        return null;
    //系统哈希码来定位ExceptionNode
    int h = System.identityHashCode(this);
    ExceptionNode e;
    final ReentrantLock lock = exceptionTableLock;
    //加异常表全局锁
    lock.lock();
    try {
        //先清理已被回收的异常node,前面已述.
        expungeStaleExceptions();
        ExceptionNode[] t = exceptionTable;
        e = t[h & (t.length - 1)];
        //循环找出this匹配的异常node
        while (e != null && e.get() != this)
            e = e.next;
    } finally {
        lock.unlock();
    }
    Throwable ex;
    //前面找不出异常node或异常node中存放的异常为null,则返回null
    if (e == null || (ex = e.ex) == null)
        return null;
    if (e.thrower != Thread.currentThread().getId()) {
        //不是当前线程抛出的异常.
        Class ec = ex.getClass();
        try {
            Constructor noArgCtor = null;//该异常的无参构造器
            Constructor[] cs = ec.getConstructors();//该异常类公有构造器
            for (int i = 0; i < cs.length; ++i) {
                Constructor c = cs[i];
                Class[] ps = c.getParameterTypes();
                if (ps.length == 0)
                    //构建器参数列表长度0说明存在无参构造器,存放.
                    noArgCtor = c;
                else if (ps.length == 1 && ps[0] == Throwable.class) {
                    //发现有参构造器且参数长度1且第一个参数类型是Throwable,说明可以存放cause.
                    //反射将前面取出的ex作为参数,反射调用该构造器创建一个要抛出的Throwable.
                    Throwable wx = (Throwable)c.newInstance(ex);
                    //反射失败,异常会被catch,返回ex,否则返回wx.
                    return (wx == null) ? ex : wx;
                }
            }
            if (noArgCtor != null) {
                //在尝试了寻找有参无参构造器,并发现只存在无参构造器的情况,用无参构造器初始化异常.
                Throwable wx = (Throwable)(noArgCtor.newInstance());
                if (wx != null) {
                    //将ex设置为它的cause并返回它的实例.
                    wx.initCause(ex);
                    return wx;
                }
            }
        } catch (Exception ignore) {
            //此方法不可抛出异常,一定要成功返回.
        }
    }
    //有参无参均未成功,返回找到的异常.
    return ex;
}

//join公有方法
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        //调用doJoin方法阻塞等待的结果不是NORMAL,说明有异常或取消.报告异常.
        reportException(s);
    //等于NORMAL,正常执行完毕,返回原始结果.
    return getRawResult();
}
//报告异常,可在前一步判断执行status是否为异常态,然后获取并重抛异常.
private void reportException(int s) {
    //参数s必须用DONE_MASK处理掉前4位以后的位.
    if (s == CANCELLED)
        //传入的状态码等于取消,抛出取消异常.
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        //使用前面的getThrowableException方法获取异常并重新抛出.
        rethrow(getThrowableException());
}

//invoke公有方法.
public final V invoke() {
    int s;
    //先尝试执行
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        //doInvoke方法的结果status只保留完成态位表示非NORMAL,则报告异常.
        reportException(s);
    //正常完成,返回原始结果.
    return getRawResult();
}

终于,读到此处的读者将关键的方法线串了起来,前述的所有内部方法,常量和变量与公有接口的关系已经明了.

很显然,ForkJoinTask是个抽象类,且它并未保存任务的完成结果,也不负责这个结果的处理,但声明并约束了返回结果的抽象方法getRawResult供子类实现.

因此,ForkJoinTask的自身关注任务的完成/异常/未完成,子类关注这个结果的处理.

每当获取到任务的执行状态时,ForkJoinTask可根据status来判断是否是异常/正常完成,并进入相应的处理逻辑,最终使用子类实现的方法完成一个闭环.

如果理解为将ForkJoinTask和子类的有关代码合并起来,在结果/完成状态/异常信息这一块,相当于同时有三个part在合作.

第一个part:status字段,它同时表示了未完成/正常完成/取消/异常完成等状态,也同时告诉有关等待线程是否要唤醒其他线程(每个线程等待前会设置SIGNAL),同时留出了后面16位对付其他情况.

第二个part:result,在ForkJoinTask见不到它,也没有相应的字段,子类也未必需要提供这个result字段,前面提到的CountedCompleter就没有提供这个result,它的getRawResult会固定返回null.但是CountedCompleter可以继承子类并实现这个result的保存与返回(道格大神在注释中举出了若干典型代码例子),在JAVA8中,stream api中的并行流也会保存每一步的计算结果,并对结果进行合并.

第三个part:异常.在ForkJoinTask中已经完成了所有异常处理流程和执行流程的定义,重点在于异常的存放,它是由ForkJoinTask的类变量进行存放的,结构为数组+链表,且元素利用了弱引用,借gc帮助清除掉已经被回收的ExceptionNode,显然在gc之前必须得到使用.而异常随时可以发生并进行record入列,但相应的能消费掉这个异常的只有相应的外部的get,join,invoke等方法或者内部扩展了exec()等方式,得到其他线程执行的task异常结果的情况.巧妙的是,只有外部调用者调用(get,invoke,join)时,这个异常信息才足够重要,需要rethrow出去并保存关键的堆栈信息;而内部线程在访问一些非自身执行的任务时,往往只需要status判断是否异常即可,在exec()中fork新任务的,也往往必须立即join这些新的子任务,这就保证了能够及时得到子任务中的异常堆栈(即使拿不到堆栈也知道它失败了).

经过前面的论述,ForkJoinTask的执行和异常处理已经基本论结,但是,一个ForkJoinTask在创建之后是如何运行的?显然,它不是一个Runnable,也不是Callable,不能直接submit或execute到普通的线程池.

临时切换到ForkJoinPool的代码,前面提到过,ForkJoinTask的官方定义就是可以运行在ForkJoinPool中的task.

//ForkJoinPool代码,submit一个ForkJoinTask到ForkJoinPool,并将该task自身返回.
//拿到返回的task,我们就可以进行前述的get方法了.
public  ForkJoinTask submit(ForkJoinTask task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}
//execute,不返回.类似普通线程池提交一个runnable的行为.
public void execute(ForkJoinTask task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
}

显然,若要使用一个自建的ForkJoinPool,可以使用execute或submit函数提交入池,然后用前述的get方法和变种方法进行.这是一种运行task的方式.

前面论述过的invoke方法会先去先去尝试本地执行,然后才去等待,故我们自己new一个ForkJoinTask,一样可以通过invoke直接执行,这是第二种运行task的方式.

前面论述的join方法在某种情况下也是一种task的运行方式,在当前线程是ForkJoinWorkerThread时,会去尝试将task出队并doExec,也就是会先用本线程执行一次,不成功才干等,非ForkJoinWorkerThread则直接干等了.显然我们可以自己构建一个ForkJoinWorkerThread并去join,这时会将任务出队并执行(但存在一个问题:什么时候入队).且出队后若未执行成功,则awaitJoin(参考ForkJoinPool::awaitJoin),此时因任务已出队,不会被窃取或帮助(在awaitJoin中会有helpStealer,但其实任务是当前线程自己"偷走"了),似乎完全要靠自己了.但并不表示ForkJoinTask子类无法获取这个已出队的任务,比如CountedCompleter使用时,可以在compute中新生成的Completer时,将源CountedCompleter(ForkJoinTask的子类)作为新生成的CountedCountedCompleter的completer(该子类中的一个字段),这样,若有一个ForkJoinWorkerThread窃取了这个新生成的CountedCompleter,可以通过completer链表找到先前被出队的CountedCompleter(ForkJoinTask).关于CountedCompleter多带带文章详述.

除此之外呢?包含前面提到的使用join操作不是ForkJoinWorkerThread调用的情况,不使用ForkJoinPool的submit execute入池,如何能让一个ForkJoinTask在将来执行?我们来看后面的方法.

//fork方法,将当前任务入池.
public final ForkJoinTask fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        //如果当前线程是ForkJoinWorkerThread,将任务压入该线程的任务队列.
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        //否则调用common池的externalPush方法入队.
        ForkJoinPool.common.externalPush(this);
    return this;
}

显然,我们还可以通过对一个ForkJoinTask进行fork方法入池,入哪个池完全取决于当前线程的类型.这是第四种让任务能被运行的方式.

同样,我们也看到了第五种方式,ForkJoinPool.common其实就是一个常量保存的ForkJoinPool,它能够调用externalPush,我们自然也可以直接new一个ForkJoinPool,然后将当前task进行externalPush,字面意思外部压入.这种办法,非ForkJoinWorkerThread也能将任务提交到非common的ForkJoinPool.

从名字来看,ForkJoinTask似乎已经说明了一切,按照官方的注释也是如此.对一个task,先Fork压队,再Join等待执行结果,这是一个ForkJoinTask的执行周期闭环(但不要简单理解为生命周期,前面提到过,任务可以被重新初始化,而且重新初始化时还会清空ExceptionNode数组上的已回收成员).

到此为止,ForkJoinTask的核心函数和api已经基本了然,其它同类型的方法以及周边的方法均不难理解,如invokeAll的各种变种.下面来看一些"周边"类型的函数.有前述的基础,它们很好理解.

//取消一个任务的执行,直接将status设置成CANCELLED,设置后判断该status 是否为CANCELLED,是则true否则false.
public boolean cancel(boolean mayInterruptIfRunning) {
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

//判断是否完成,status小于0代表正常完成/异常完成/取消,很好理解.
public final boolean isDone() {
    return status < 0;
}

//判断当前任务是否取消.
public final boolean isCancelled() {
    //status前4位
    return (status & DONE_MASK) == CANCELLED;
}
public final boolean isCompletedAbnormally() {
    //是否为异常完成,前面说过,CANCELLED和EXCEPTIONAL均小于NORMAL
    return status < NORMAL;
}
//是否正常完成.
public final boolean isCompletedNormally() {
    //完成态位等于NORMAL
    return (status & DONE_MASK) == NORMAL;
}
//获取异常.
 public final Throwable getException() {
    int s = status & DONE_MASK;
    //当为正常完成或未完成时,返回null.
    return ((s >= NORMAL)    ? null :
            //是取消时,新建一个取消异常.
            (s == CANCELLED) ? new CancellationException() :
            //不是取消,参考前面提到的getThrowableException.
            getThrowableException());
}
//使用异常完成任务.
 public void completeExceptionally(Throwable ex) {
    //参考前述的setExceptionalCompletion,
    //ex已经是运行时异常或者Error,直接使用ex完成,若是受检异常,包装成运行时异常.
    setExceptionalCompletion((ex instanceof RuntimeException) ||
                             (ex instanceof Error) ? ex :
                            new RuntimeException(ex));
   }
//使用value完成任务.
public void complete(V value) {
    try {
        //设置原始结果,它是一个空方法.前面说过ForkJoinTask没有维护result之类的结果字段,子类可自行发挥.
        setRawResult(value);
    } catch (Throwable rex) {
        //前述步骤出现异常,就用异常方式完成.
        setExceptionalCompletion(rex);
        return;
    }
    //前面的结果执行完,标记当前为完成.
    setCompletion(NORMAL);
}
//安静完成任务.直接用NORMAL setCompletion,没什么好说的.
public final void quietlyComplete() {
    setCompletion(NORMAL);
}

//安静join,它不会返回result也不会抛出异常.处理集合任务时,如果需要所有任务都被执行而不是一个执行出错(取消)其他也跟着出错的情况下,
//很明显适用,这不同于invokeAll,静态方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)会在任何一个任务出现异常后取消执行并抛出.
public final void quietlyJoin() {
    doJoin();
}

//安静执行一次,不返回结果不抛出异常,没什么好说的.
public final void quietlyInvoke() {
    doInvoke();
}
//重新初台化当前task
public void reinitialize() {
    if ((status & DONE_MASK) == EXCEPTIONAL)
        //如果当前任务是异常完成的,清除异常.该方法参考前面的论述.
        clearExceptionalCompletion();
    else
        //否则重置status为0.
        status = 0;
}
//反fork.
public boolean tryUnfork() {
    Thread t;
    //当前线程是ForkJoinWorkerThread,从它的队列尝试移除.
    return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
            //当前线程不是ForkJoinWorkerThread,用common池外部移除.
            ForkJoinPool.common.tryExternalUnpush(this));
}

上面是一些简单的周边方法,大多并不需要再论述了,unfork方法很明显在某些场景下不会成功,显然,当一个任务刚刚入队并未进行后续操作时,很可能成功.按前面所述,当对一个任务进行join时,可能会成功的弹出当前任务并执行,此时不可能再次弹出;当一个任务被其他线程窃取或被它本身执行的也不会弹出.

再来看一些老朋友,在前面的文章"CompletableFuture和响应式编程"一文中,作者曾着重强调过它将每个要执行的动作进行压栈(未能立即执行的情况),而栈中的元素Completion即是ForkJoinTask的子类,而标记该Completion是否被claim的方法和周边方法如下:

//获取ForkJoinTask的标记,返回结果为short型
public final short getForkJoinTaskTag() {
    //status的后16位
    return (short)status;
}


//原子设置任务的标记位.
public final short setForkJoinTaskTag(short tag) {
    for (int s;;) {
        //不停循环地尝试将status的后16位设置为tag.
        if (U.compareAndSwapInt(this, STATUS, s = status,
                                //替换的结果,前16位为原status的前16位,后16位为tag.
                                (s & ~SMASK) | (tag & SMASK)))
            //返回被换掉的status的后16位.
            return (short)s;
    }
}


//循环尝试原子设置标记位为tag,前提是原来的标记位等于e,成功true失败false
public final boolean compareAndSetForkJoinTaskTag(short e, short tag) {
    for (int s;;) {
        if ((short)(s = status) != e)
            //如果某一次循环的原标记位不是e,则返回false.
            return false;
        //同上个方法
        if (U.compareAndSwapInt(this, STATUS, s,
                                (s & ~SMASK) | (tag & SMASK)))
            return true;
    }
}

还记得CompletableFuture在异步执行Completion时要先claim吗?claim方法中,会尝试设置这个标记位.这是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.

目前来看,在CompletableFuture的内部实现Completion还没有使用到ForkJoinTask的其他属性,比如放入一个ForkJoinPool执行(没有任何前面总结的调用,比如用ForkJoinPool的push,execute,submit等,也没有fork到common池).但是很明显,道格大神令它继承自ForkJoinTask不可能纯粹只为了使用区区一个标记位,试想一下,在如此友好支持响应式编程的CompletableFuture中传入的每一个action都可以生成若干新的action,那么CompletableFuture负责将这些action封装成Completion放入ForkJoinPool执行,将最大化利用到ForkJoin框架的工作窃取和外部帮助的功效,强力结合分治思想,这将是多么优雅的设计.或者在jdk9-12中已经出现了相应的Completion实现(尽管作者写过JAVA9-12,遗憾的是也没有去翻它们的源码).

另外,尽管Completion的众多子类也没有result之类的表示结果的字段,但它的一些子类通过封装,实际上间接地将这个Completion所引用的dep的result作为了自己的"result",当然,getRawResult依旧是null,但是理念却是相通的.

以上是ForkJoinTask的部分核心源码,除了上述的源码外,还有一些同属于ForkJoinTask的核心源码部分,比如其他的public方法(参考join fork invoke 即可),一些利用ForkJoinPool的实现,要深入了解ForkJoinPool才能了解的方法,一些不太难的静态方法等,这些没有必要论述了.

除了核心源码外,ForkJoinTask也提供了对Runnable,Callable的适配器实现,这块很好理解,简单看一看.

//对Runnable的实现,如果在ForkJoinPool中提交一个runnable,会用它封装成ForkJoinTask
static final class AdaptedRunnable extends ForkJoinTask
    implements RunnableFuture {
    final Runnable runnable;
    T result;
    AdaptedRunnable(Runnable runnable, T result) {
        //不能没有runnable
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
        //对runnable做适配器时,可以提交将结果传入,并设置为当前ForkJoinTask子类的result.
        //前面说过,ForkJoinTask不以result作为完成标记,判断一个任务是否完成或异常,使用status足以,
        //返回的结果才使用result.
        this.result = result; 
    }
    public final T getRawResult() { return result; }
    public final void setRawResult(T v) { result = v; }
    //前面说过提交入池的ForkJoinTask最终会运行doExec,而它会调用exec,此处会调用run.
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;//序列化用
}

//无结果的runnable适配器
static final class AdaptedRunnableAction extends ForkJoinTask
    implements RunnableFuture {
    final Runnable runnable;
    AdaptedRunnableAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    //区别就是result固定为null,也不能set
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    public final void run() { invoke(); }
    private static final long serialVersionUID = 5232453952276885070L;
}


//对runnable的适配器,但强制池中的工作线程在执行任务发现异常时抛出
static final class RunnableExecuteAction extends ForkJoinTask {
    final Runnable runnable;
    RunnableExecuteAction(Runnable runnable) {
        if (runnable == null) throw new NullPointerException();
        this.runnable = runnable;
    }
    //默认null结果,set也是空实现
    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) { }
    public final boolean exec() { runnable.run(); return true; }
    void internalPropagateException(Throwable ex) {
        //前面说过doExec会被执行,它会调exec并catch,在catch块中设置当前任务为异常完成态,
        //然后调用internalPropagateException方法,而在ForkJoinTask中默认为空实现.
        //此处将异常重新抛出,将造成worker线程抛出异常.
        rethrow(ex);
    }
    private static final long serialVersionUID = 5232453952276885070L;
}


//对callable的适配器,当将callable提交至ForkJoinPool时使用.
static final class AdaptedCallable extends ForkJoinTask
    implements RunnableFuture {
    final Callable callable;
    T result;
    AdaptedCallable(Callable callable) {
        if (callable == null) throw new NullPointerException();
        this.callable = callable;
    }
    //字段中有一个result,直接使用它返回.
    public final T getRawResult() { return result; }
    //result可外部直接设置.
    public final void setRawResult(T v) { result = v; }
    public final boolean exec() {
        try {
            //默认的result用call函数设置.
            result = callable.call();
            return true;
        
        } catch (Error err) {
            //catch住Error,抛出
            throw err;
        } catch (RuntimeException rex) {
            //catch住运行时异常,抛出
            throw rex;
        } catch (Exception ex) {
            //catch住受检异常,包装成运行时异常抛出.
            throw new RuntimeException(ex);
        }
    }
    //run方法一样只是调用invoke,进而调用doExec.
    public final void run() { invoke(); }
    private static final long serialVersionUID = 2838392045355241008L;
}

//runnable生成适配器的工具方法
public static ForkJoinTask adapt(Runnable runnable) {
    return new AdaptedRunnableAction(runnable);
}

//指定结果设置runnable的适配器工具方法
public static  ForkJoinTask adapt(Runnable runnable, T result) {
    return new AdaptedRunnable(runnable, result);
}

//对callable生成适配器的方法.
public static  ForkJoinTask adapt(Callable callable) {
    return new AdaptedCallable(callable);
}

以上的代码都不复杂,只要熟悉了ForkJoinTask的本身代码结构,对于这一块了解非常容易,这也间接说明了ForkJoinPool中是如何处理Runnable和Callable的(因为ForkJoinPool本身也是一种线程池,可以接受提交Callable和Runnable).

将runnable提交到pool时,可以指定result,也可以不指定,也可以用submit或execute方法区分异常处理行为,ForkJoinPool会自行选择相应的适配器.

将callable 提交到pool时,pool会选择对callable的适配器,它的结果将为task的结果,它的异常将为task的异常.

到此为止,ForkJoinTask的源码分析完成.

后语

本文详细分析了ForkJoinTask的源码,并解释了前文CompletableFuture中Completion与它的关联,以及分析了Completion继承自ForkJoinTask目前已带来的功能利用(tag)和将来可能增加的功用(一个Completion产生若干多个Completion并在ForkJoinPool中运行,还支持工作窃取).

同时本文也对ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream api中的并行流进行了略微的描述.

在文章的最后,或许有一些新手读者会好奇,我们究竟什么时候会使用ForkJoinTask?

首先,如果你在项目中大肆使用了流式计算,并使用了并行流,那么你已经在使用了.

前面提过,官方解释ForkJoinTask可以视作比线程轻量许多的实体,也是轻量的Future.结合在源码中时不时出来秀存在感的ForkJoinWorkerThread,显然它就是据说比普通线程轻量一些的线程,在前面的源码中可以看出,它维护了一组任务的队列,每个线程负责完成队列中的任务,也可以偷其他线程的任务,甚至池外的线程都可以时不时地来个join,顺便帮助出队执行任务.

显然,对于重计算,轻io,轻阻塞的任务,适合使用ForkJoinPool,也就使用了ForkJoinTask,你不会认为它可以提交runnable和callable,就可以不用ForkJoinTask了吧?前面的适配器ForkJoinPool在这种情况下必用的,可以去翻相应的源码.

本章没有去详述CountedCompleter,但前面论述时说过,你可以在exec()中将一个计算复杂的任务拆解为小的子任务,然后将子任务入池执行,父任务合并子任务的结果.这种分治的算法此前基本是在单线程模式下运行,使用ForkJoinTask,则可以将这种计算交给一个ForkJoinPool中的所有线程并行执行.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77869.html

相关文章

  • ForkJoin框架ForkJoinPool

    摘要:前言在前面的三篇文章中先后介绍了框架的任务组件体系体系源码并简单介绍了目前的并行流应用场景框架本质上是对的扩展它依旧支持经典的使用方式即任务池的配合向池中提交任务并异步地等待结果毫无疑问前面的文章已经解释了框架的新颖性初步了解了工作窃取 前言 在前面的三篇文章中先后介绍了ForkJoin框架的任务组件(ForkJoinTask体系,CountedCompleter体系)源码,并简单介绍...

    mayaohua 评论0 收藏0
  • ForkJoin框架CountedCompleter,工作线程及并行流

    摘要:前言在前面的文章框架之中梳理了框架的简要运行格架和异常处理流程显然要理解框架的调度包含工作窃取等思想需要去中了解而对于的拓展和使用则需要了解它的一些子类前文中偶尔会提到的一个子类直译为计数的完成器前文也说过的并行流其实就是基于了框架实现因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的简要运行格架和异常处理流程,显然要理解ForkJoi...

    msup 评论0 收藏0
  • Java7任务并行执行神器:Fork&Join框架

    摘要:对于任务的分割,要求各个子任务之间相互独立,能够并行独立地执行任务,互相之间不影响。是叉子分叉的意思,即将大任务分解成并行的小任务,是连接结合的意思,即将所有并行的小任务的执行结果汇总起来。使用方法会阻塞并等待子任务执行完并得到其结果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小...

    Luosunce 评论0 收藏0
  • Fork/Join框架简介

    摘要:第二步执行任务并合并结果。使用两个类来完成以上两件事情我们要使用框架,必须首先创建一个任务。用于有返回结果的任务。如果任务顺利执行完成了,则设置任务状态为,如果出现异常,则纪录异常,并将任务状态设置为。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的...

    W_BinaryTree 评论0 收藏0
  • ForkJoin框架CompletableFuture

    摘要:内部类,用于对和异常进行包装,从而保证对进行只有一次成功。是取消异常,转换后抛出。判断是否使用的线程池,在中持有该线程池的引用。 前言 近期作者对响应式编程越发感兴趣,在内部分享JAVA9-12新特性过程中,有两处特性让作者深感兴趣:1.JAVA9中的JEP266对并发编程工具的更新,包含发布订阅框架Flow和CompletableFuture加强,其中发布订阅框架以java.base...

    lindroid 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<