资讯专栏INFORMATION COLUMN

世界上最简单的无等待算法(getAndIncrement)

everfly / 1861人阅读

摘要:本文基于指令完成一个无等待并发算法。并且导致它失败的那一方必定取得了进展。通过将包裹的,从更新为来更新状态的同时传递对应线程通过判定操作已完成。,代表这个已经被对应的线程预定了,剩余线程达成共识。

本文基于compareandswap指令完成一个无等待并发算法。
根据维基百科,它的定义如下:

An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes.

本文的方法参考了Wait-free queues with multiple enqueuers and dequeuers,A methodology for creating fast wait-free data structures,A practical wait-free simulation for lock-free data structures。
它的目标非常简单,就是把一个值通过原子指令CMPXCHG加N,然后返回原来的值。
lock-free版本如下:

    public final int getAndIncrement(int add) {
        for (;;) {
            int current = get();
            int next = current + add;
            if (compareAndSet(current, next))
                return current;
        }
    }

那么,这里的方式是,compareAndSet失败了重试。并且导致它失败的那一方必定取得了进展。
这里的问题在于,究竟需要重试多少次compareAndSet才能成功呢?
假如一直处于竞争环境下,这个重试次数是没有上限的。这里也就是饥饿的问题。
有没有办法给出一个上限,哪怕这个上限比较大
这种并发的性质被称为wait-freedom。
先给出一个我实现好了的代码,之后再来讲思路:

    public static int getAndIncrement(int index, int add) {
        //fast-path, 最多MAX次。
        int count = MAX;
        for(;;) {
            ValueObj valueObj_ = valueObj;
            if(valueObj_.threadObj == null) {
                ValueObj valueObjNext = new ValueObj(valueObj_.value + add, null);
                if(casValueObj(valueObj_, valueObjNext)) {
                    StateObj myState = states[index];
                    //前进一步,每assistStep,尝试一个帮助。
                    if(((++myState.steps) & myState.assistStep) == 0){
                        long helpThread = myState.index;
                        help(helpThread);
                        //下一个协助的对象。
                        ++myState.index;
                    }
                    return valueObj_.value;
                }
                Thread.yield();Thread.yield();Thread.yield();Thread.yield();
            } else {
                helpTransfer(valueObj_);
            }
            
            if(--count == 0)
                break;
        }
//        System.out.println("here " + inter.incrementAndGet());
        for(int j = 0; j < bigYields; ++j)
            Thread.yield();
        
        //slow-path,将自己列为被帮助对象。
        ThreadObj myselfObj = new ThreadObj(new ThreadObj.WrapperObj(null, false), add);
        setThreadObj(index, myselfObj);
        //开始帮助自己
        ValueObj result = help(index);
        setThreadObj(index, null);
        return result.value;
    }
    
    // valueObj->threadObj->wrapperObj->valueObj。
    // step 1-3,每一个步骤都不会阻塞其他步骤。
    // 严格遵守以下顺序: 
    // step 1: 通过将ValueObj指向ThreadObj:
    //         atomic: (value, null)->(value, ThreadObj)来锚定该值                      //确定该value归ThreadObj对应线程所有。
    // step 2: 通过将ThreadObj包裹的WrapperObj,
    //         atomic: 从(null, false)更新为(valueObj, true)来更新状态的同时传递value    //对应线程通过isFinish判定操作已完成。
    // step 3: 更新ValueObj,提升value,同时设置ThreadObj为null:
    //         atomic: (value, ThreadObj)->(value+1, null)完成收尾动作                 //此时value值回到了没有被线程锚定的状态,也可以看做step1之前的状态。
    private static ValueObj help(long helpIndex) {
        helpIndex = helpIndex % N;
        ThreadObj helpObj = getThreadObj(helpIndex);
        ThreadObj.WrapperObj wrapperObj;
        if(helpObj == null || helpObj.wrapperObj == null)
            return null;
        //判定句,是否该线程对应的操作未完成,(先取valueObj,再取isFinish,这很重要)。
        ValueObj valueObj_ = valueObj;
        while(!(wrapperObj = helpObj.wrapperObj).isFinish) {
            /*ValueObj valueObj_ = valueObj;*/
            if(valueObj_.threadObj == null) {
                ValueObj intermediateObj = new ValueObj(valueObj_.value, helpObj);
                //step1
                if(!casValueObj(valueObj_, intermediateObj)) {
                    valueObj_ = valueObj;
                    continue;
                }
                //step1: 锚定该ValueObj,接下来所有看到该valueObj的线程,都会一致地完成一系列操作.
                valueObj_ = intermediateObj;
            }
            //完成ValueObj、ThreadObj中的WrapperObj的状态迁移。
            helpTransfer(valueObj_);
            valueObj_ = valueObj;
        }
        valueObj_ = wrapperObj.value;
        helpValueTransfer(valueObj_);
        //返回锚定的valueObj。
        return valueObj_;
    }
    
    private static void helpTransfer(ValueObj valueObj_) {
        ThreadObj.WrapperObj wrapperObj = valueObj_.threadObj.wrapperObj;
        //step2: 先完成ThreadObj的状态迁移,WrapperObj(valueObj,true)分别表示(值,完成),原子地将这两个值喂给threadObj。
        if(!wrapperObj.isFinish) {
            ThreadObj.WrapperObj wrapValueFiniash = new ThreadObj.WrapperObj(valueObj_, true);
            valueObj_.threadObj.casWrapValue(wrapperObj, wrapValueFiniash);
        }
        //step3: 最后完成ValueObj上的状态迁移
        helpValueTransfer(valueObj_);
    }
    
    private static ValueObj helpValueTransfer(ValueObj valueObj_) {
        if(valueObj_ == valueObj) {
            ValueObj valueObjNext = new ValueObj(valueObj_.value + valueObj_.threadObj.add, null);
            casValueObj(valueObj_, valueObjNext);
        }
        return valueObj_;
    }

首先全局作用域有个值ValueObj,一种自然的想法是ValueObj应该值包含一个int值value,图[1]:

如图所示,无锁并发的版本无非就是从一个值原子地变化为另一个+1的值。
但是我们要的是wait-free,势必涉及到线程之间的互助。如何在线程之间传递信息从而实现协助呢?
我们给ValueObj增加一个字段threadObj,那么上图的版本现在可以视为,图[2]:

那么现在增加了一个数据ThreadObj(后面会解释如何使用),ValueObj会有两种状态:

(Value, null)

(Value, threadObj)

那么好,现在是两种情况:

(value, null),代表这个value我可以去竞争。

(value, threadObj),代表这个value已经被threadObj对应的线程预定了,剩余线程达成共识。

所以情况2我们能做什么呢?
这里就涉及到无等待算法的核心技术,help,通俗地说,就是多个线程间的相互协助。
我们会先将value,原子地喂给threadObj,再将(value, threadObj)原子地迁移到(value+1, null)。
这里的顺序很重要,因为假如线程A先迁移了(value, threadObj)->(value+1, nulll),那么剩下的其他所有线程都无法再看到threadObj,也就是说尽管value已经增加了,但是threadObj对应线程被锁住了。只能等待线程A将value喂给threadObj。
ThreadObj中包裹着WrapperObj,WrapperObj中初始化为(null, false),将会原子地转变为(ValueObj, ture),从而ThreadObj拿到了ValueObj。
所以这里一共是三个steps:
step 1: (value, null) atomic-> (value, threadObj)
step 2: threadObj.WrapperObj(null, false) atomic-> WrapperObj(valueObj, true)
stpe 3: (value, trheadObj) atomic-> (value + 1, null)
如图所示[3]:

其中灰框黑字标识了关键的3个step.
图[3]所示是一种通用的无等待构造技术。
对于其中的ThreadObj以及每个线程协助其他线程的策略states,可以通过数组来构造,具体可在代码里看到。
最后我们的程序结构,如图[4]:

最后给出完整的可执行代码 WaitFreeGetAndIncrement:

https://github.com/Pslydhh/Ps...

or WaitFreeAtomic:

https://github.com/Pslydhh/Wa...

,每个loop,100个线程每个操作100000次。
https://github.com/Pslydhh/Ps...
参考文献:
0:Wait-free synchronization
1:Wait-free queues with multiple enqueuers and dequeuers
2:A methodology for creating fast wait-free data structures
3:A practical wait-free simulation for lock-free data structures

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70627.html

相关文章

  • ICML 2015压轴讨论总结:6大神畅谈深度学习的未来

    摘要:年的深度学习研讨会,压轴大戏是关于深度学习未来的讨论。他认为,有潜力成为深度学习的下一个重点。认为这样的人工智能恐惧和奇点的讨论是一个巨大的牵引。 2015年ICML的深度学习研讨会,压轴大戏是关于深度学习未来的讨论。基于平衡考虑,组织方分别邀请了来自工业界和学术界的六位专家开展这次圆桌讨论。组织者之一Kyunghyun Cho(Bengio的博士后)在飞机上凭记忆写下本文总结了讨论的内容,...

    netScorpion 评论0 收藏0
  • 为什么我需要深度学习?

    摘要:为什么我又要重新开始写机器学习相关的文章了最主要的原因是现在的机器学习和五年前十年前区别很大。深度学习带来了什么深度学习最重要的东西就是自带了特征学习,有时候也被翻译为表征学习,简单来说就是,不需要进行特别的特征抽取。 1.为什么我开始写这个系列博客说五年前我还在某A云公司的时候,身在一个机器学习算法组,对机器学习怀有浓厚的兴趣。花了好多的时间来试图搞清楚各种流行的机器学习算法,经常周末也跟...

    lordharrd 评论0 收藏0
  • 线程安全

    摘要:不可变在中,不可变的对象一定是线程安全的。在里标注自己是线程安全的类,大多都不是绝对线程安全,比如某些情况下类在调用端也需要额外的同步措施。无同步方案要保证线程安全,不一定就得需要数据的同步,两者没有因果关系。 在之前学习编程的时候,有一个概念根深蒂固,即程序=算法+数据结构。数据代表问题空间中的客体,代码就用来处理这些数据,这种思维是站在计算机的角度去抽象问题和解决问题,称之为面向过...

    fuyi501 评论0 收藏0
  • Google GAN之父 ICCV2017演讲:解读生成对抗网络的原理与应用

    摘要:但年在机器学习的较高级大会上,苹果团队的负责人宣布,公司已经允许自己的研发人员对外公布论文成果。苹果第一篇论文一经投放,便在年月日,斩获较佳论文。这项技术由的和开发,使用了生成对抗网络的机器学习方法。 GANs「对抗生成网络之父」Ian Goodfellow 在 ICCV 2017 上的 tutorial 演讲是聊他的代表作生成对抗网络(GAN/Generative Adversarial ...

    plokmju88 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<