资讯专栏INFORMATION COLUMN

以太坊事件框架

骞讳护 / 2960人阅读

摘要:和各有优劣,最优秀的共同特点是,他们只依赖于原始的包,完全与以太坊的其他模块隔离开来,也就是说,你完全可以把这两个事件框架用在自己的项目中。

过去在学Actor模型的时候,就认为异步消息是相当的重要,在华为的时候,也深扒了一下当时产品用的消息模型,简单实用,支撑起了很多模块和业务,但也有一个缺点是和其他的框架有耦合,最近看到以太坊的事件框架,同样简单简洁,理念很适合初步接触事件框架的同学,写文介绍一下。

以太坊的事件框架是一个多带带的基础模块,存在于目录go-ethereum/event中,它有2中独立的事件框架实现,老点的叫TypeMux,已经基本弃用,新的叫Feed,当前正在广泛使用。

TypeMuxFeed还只是简单的事件框架,与Kafka、RocketMQ等消息系统相比,是非常的传统和简单,但是TypeMuxFeed的简单简洁,已经很好的支撑以太坊的上层模块,这是当下最好的选择。

TypeMuxFeed各有优劣,最优秀的共同特点是,他们只依赖于Golang原始的包,完全与以太坊的其他模块隔离开来,也就是说,你完全可以把这两个事件框架用在自己的项目中。

TypeMux的特点是,你把所有的订阅塞给它就好,事件来了它自会通知你,但有可能会阻塞,通知你不是那么及时,甚至过了一段挺长的时间。

Feed的特点是,它通常不存在阻塞的情况,会及时的把事件通知给你,但需要你为每类事件都建立一个Feed,然后不同的事件去不同的Feed上订阅和发送,这其实挺烦人的,如果你用错了Feed,会导致panic。

接下来,介绍下这种简单事件框架的抽象模型,然后再回归到以太坊,介绍下TypeMuxFeed

!<--more-->

事件框架的抽象结构

如上图,轻量级的事件框架会把所有的被订阅的事件收集起来,然后把每个订阅者组合成一个列表,当事件框架收到某个事件的时候,就把订阅该事件的所有订阅者找出来,然后把这个事件发给他们。

它需要具有2个功能:

让订阅者订阅、取消订阅某类事件。

让发布者能够发布某个事件,并且把事件送到每个订阅者。

如果做成完善的消息系统,就还得考虑这些特性:可用性、吞吐量、传输延迟、有序消息、消息存储、过滤、重发,这和事件框架相比就复杂上去了,我们专注的介绍下以太坊的事件模型怎么完成上述3个功能的。

以太坊的事件模型

TypeMux是一个以太坊不太满意的事件框架,所以以太坊就搞了Feed出来,它解决了TypeMux效率低下,延迟交付的问题。接下来就先看下这个TypeMux

TypeMux:同步事件框架

TypeMux是一个同步事件框架。它的实现和上面讲的事件框架的抽象结构是完全一样的,它维护了一个订阅表,表里维护了每个事件的订阅者列表。它的特点:

采用多对多结构:多个事件对多个订阅者。

采用推模式,把事件/消息推送给订阅者,就像信件一样,会被送到你的信箱,你在信箱里取信就行了。

是一个同步事件框架。这也是它的缺点所在,举个例子就是:邮递员要给小红、小明送信,只有信箱里的信被小红取走后,邮递员才去给小明送信,如果小红旅游去了无法取信,邮递员就一直等在小红家,而小明一直收不到信,小明很无辜无辜啊!

看下它2个功能的实现:

订阅和取消订阅。订阅通过函数TypeMux.Subscribe(),入参为要订阅的事件类型,会返回TypeMuxSubscription给订阅者,订阅者可通过此控制订阅,通过TypeMuxSubscription.Unsubscribe() 可以取消订阅。

发布事件和传递事件。TypeMux.Post(),入参为事件类型,根据订阅表找出该事件的订阅者列表,遍历列表,依次向每个订阅者传递事件,如果前一个没有传递完成进入阻塞,会导致后边的订阅者不能及时收到事件。

TypeMux源码速递

TypeMux的精简组成:

// A TypeMux dispatches events to registered receivers. Receivers can be
// registered to handle events of certain type. Any operation
// called after mux is stopped will return ErrMuxClosed.
//
// The zero value is ready to use.
//
// Deprecated: use Feed
// 本质:哈希列表,每个事件的订阅者都存到对于的列表里
type TypeMux struct {
    mutex   sync.RWMutex // 锁
    subm    map[reflect.Type][]*TypeMuxSubscription // 订阅表:所有事件类型的所有订阅者
    stopped bool
}

订阅:

// Subscribe creates a subscription for events of the given types. The
// subscription"s channel is closed when it is unsubscribed
// or the mux is closed.
// 订阅者只传入订阅的事件类型,然后TypeMux会返回给它一个订阅对象
func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription {
    sub := newsub(mux)
    mux.mutex.Lock()
    defer mux.mutex.Unlock()
    if mux.stopped {
        // set the status to closed so that calling Unsubscribe after this
        // call will short circuit.
        sub.closed = true
        close(sub.postC)
    } else {
        if mux.subm == nil {
            mux.subm = make(map[reflect.Type][]*TypeMuxSubscription)
        }
        for _, t := range types {
            rtyp := reflect.TypeOf(t)
            // 在同一次订阅中,不要重复订阅同一个类型的事件
            oldsubs := mux.subm[rtyp]
            if find(oldsubs, sub) != -1 {
                panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp))
            }
            subs := make([]*TypeMuxSubscription, len(oldsubs)+1)
            copy(subs, oldsubs)
            subs[len(oldsubs)] = sub
            mux.subm[rtyp] = subs
        }
    }
    return sub
}

取消订阅:

func (s *TypeMuxSubscription) Unsubscribe() {
    s.mux.del(s)
    s.closewait()
}

发布事件和传递事件:

// Post sends an event to all receivers registered for the given type.
// It returns ErrMuxClosed if the mux has been stopped.
// 遍历map,找到所有订阅的人,向它们传递event,同一个event对象,非拷贝,运行在调用者goroutine
func (mux *TypeMux) Post(ev interface{}) error {
    event := &TypeMuxEvent{
        Time: time.Now(),
        Data: ev,
    }
    rtyp := reflect.TypeOf(ev)
    mux.mutex.RLock()
    if mux.stopped {
        mux.mutex.RUnlock()
        return ErrMuxClosed
    }
    subs := mux.subm[rtyp]
    mux.mutex.RUnlock()
    for _, sub := range subs {
        sub.deliver(event)
    }
    return nil
}

func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) {
    // Short circuit delivery if stale event
    // 不发送过早(老)的消息
    if s.created.After(event.Time) {
        return
    }
    // Otherwise deliver the event
    s.postMu.RLock()
    defer s.postMu.RUnlock()

    select {
    case s.postC <- event:
    case <-s.closing:
    }
}

我上面指出了发送事件可能阻塞,阻塞在哪?关键就在下面这里:创建TypeMuxSubscription时,通道使用的是无缓存通道,读写是同步的,这里注定了TypeMux是一个同步事件框架,这是以太坊改用Feed的最大原因

func newsub(mux *TypeMux) *TypeMuxSubscription {
    c := make(chan *TypeMuxEvent) // 无缓冲通道,同步读写
    return &TypeMuxSubscription{
        mux:     mux,
        created: time.Now(),
        readC:   c,
        postC:   c,
        closing: make(chan struct{}),
    }
}
Feed:流式框架

Feed是一个流式事件框架。上文强调了TypeMux是一个同步框架,也正是因为此以太坊丢弃了它,难道Feed就是一个异步框架?不一定是的,这取决于订阅者是否采用有缓存的通道,采用有缓存的通道,则Feed就是异步的,采用无缓存的通道,Feed就是同步的,把同步还是异步的选择交给使用者

本节强调Feed的流式特点。事件本质是一个数据,连续不断的事件就组成了一个数据流,这些数据流不停的流向它的订阅者那里,并且不会阻塞在任何一个订阅者那里。

举几个不是十分恰当的例子。

公司要放中秋节,HR给所有同事都发了一封邮件,有些同事读了,有些同事没读,要到国庆节了HR又给所有同事发了一封邮件,这些邮件又进入到每个人的邮箱,不会因为任何一个人没有读邮件,导致剩下的同事收不到邮件。

你在朋友圈给朋友旅行的照片点了个赞,每当你们共同朋友点赞或者评论的时候,你都会收到提醒,无论你看没看这些提醒,这些提醒都会不断的发过来。

你微博关注了谢娜,谢娜发了个搞笑的视频,你刷微博的时候就收到了,但也有很多人根本没刷微博,你不会因为别人没有刷,你就收不到谢娜的动态。

Feed和TypeMux相同的是,它们都是推模式,不同的是Feed是异步的,如果有些订阅者阻塞了,没关系,它会继续向后面的订阅者发送事件/消息。

Feed是一个一对多的事件流框架。每个类型的事件都需要一个与之对应的Feed,订阅者通过这个Feed进行订阅事件,发布者通过这个Feed发布事件。

看下Feed是如何实现2个功能的:

订阅和取消订阅:Feed.Subscribe(),入参是一个通道,通常是有缓冲的,就算是无缓存也不会造成Feed阻塞,Feed会校验这个通道的类型和本Feed管理的事件类型是否一致,然后把通道保存下来,返回给订阅者一个Subscription,可以通过它取消订阅和读取通道错误。

发布事件和传递事件。Feed.Send()入参是一个事件,加锁确保本类型事件只有一个发送协程正在进行,然后校验事件类型是否匹配,Feed会尝试给每个订阅者发送事件,如果订阅者阻塞,Feed就继续尝试给下一个订阅者发送,直到给每个订阅者发送事件,返回发送该事件的数量。

Feed源码速递

Feed定义:

// Feed implements one-to-many subscriptions where the carrier of events is a channel.
// Values sent to a Feed are delivered to all subscribed channels simultaneously.
//
// Feeds can only be used with a single type. The type is determined by the first Send or
// Subscribe operation. Subsequent calls to these methods panic if the type does not
// match.
//
// The zero value is ready to use.
// 一对多的事件订阅管理:每个feed对象,当别人调用send的时候,会发送给所有订阅者
// 每种事件类型都有一个自己的feed,一个feed内订阅的是同一种类型的事件,得用某个事件的feed才能订阅该事件
type Feed struct {
    once      sync.Once        // ensures that init only runs once
    sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases. 这个锁确保了只有一个协程在使用go routine
    removeSub chan interface{} // interrupts Send
    sendCases caseList         // the active set of select cases used by Send,订阅的channel列表,这些channel是活跃的

    // The inbox holds newly subscribed channels until they are added to sendCases.
    mu     sync.Mutex
    inbox  caseList // 不活跃的在这里
    etype  reflect.Type
    closed bool
}

订阅事件:

// Subscribe adds a channel to the feed. Future sends will be delivered on the channel
// until the subscription is canceled. All channels added must have the same element type.
//
// The channel should have ample buffer space to avoid blocking other subscribers.
// Slow subscribers are not dropped.
// 订阅者传入接收事件的通道,feed将通道保存为case,然后返回给订阅者订阅对象
func (f *Feed) Subscribe(channel interface{}) Subscription {
    f.once.Do(f.init)

    // 通道和通道类型检查
    chanval := reflect.ValueOf(channel)
    chantyp := chanval.Type()
    if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
        panic(errBadChannel)
    }
    sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

    f.mu.Lock()
    defer f.mu.Unlock()
    if !f.typecheck(chantyp.Elem()) {
        panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
    }
    
    // 把通道保存到case
    // Add the select case to the inbox.
    // The next Send will add it to f.sendCases.
    cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
    f.inbox = append(f.inbox, cas)
    return sub
}

发送和传递事件:这个发送是比较绕一点的,要想真正掌握其中的运行,最好写个小程序练习下。

// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
// 同时向所有的订阅者发送事件,返回订阅者的数量
func (f *Feed) Send(value interface{}) (nsent int) {
    rvalue := reflect.ValueOf(value)

    f.once.Do(f.init)
    <-f.sendLock // 获取发送锁

    // Add new cases from the inbox after taking the send lock.
    // 从inbox加入到sendCases,不能订阅的时候直接加入到sendCases,因为可能其他协程在调用发送
    f.mu.Lock()
    f.sendCases = append(f.sendCases, f.inbox...)
    f.inbox = nil

    // 类型检查:如果该feed不是要发送的值的类型,释放锁,并且执行panic
    if !f.typecheck(rvalue.Type()) {
        f.sendLock <- struct{}{}
        panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
    }
    f.mu.Unlock()

    // Set the sent value on all channels.
    // 把发送的值关联到每个case/channel,每一个事件都有一个feed,所以这里全是同一个事件的
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = rvalue
    }

    // Send until all channels except removeSub have been chosen. "cases" tracks a prefix
    // of sendCases. When a send succeeds, the corresponding case moves to the end of
    // "cases" and it shrinks by one element.
    // 所有case仍然保留在sendCases,只是用过的会移动到最后面
    cases := f.sendCases
    for {
        // Fast path: try sending without blocking before adding to the select set.
        // This should usually succeed if subscribers are fast enough and have free
        // buffer space.
        // 使用非阻塞式发送,如果不能发送就及时返回
        for i := firstSubSendCase; i < len(cases); i++ {
            // 如果发送成功,把这个case移动到末尾,所以i这个位置就是没处理过的,然后大小减1
            if cases[i].Chan.TrySend(rvalue) {
                nsent++
                cases = cases.deactivate(i)
                i--
            }
        }

        // 如果这个地方成立,代表所有订阅者都不阻塞,都发送完了
        if len(cases) == firstSubSendCase {
            break
        }

        // Select on all the receivers, waiting for them to unblock.
        // 返回一个可用的,直到不阻塞。
        chosen, recv, _ := reflect.Select(cases)
        if chosen == 0 /* <-f.removeSub */ {
            // 这个接收方要删除了,删除并缩小sendCases
            index := f.sendCases.find(recv.Interface())
            f.sendCases = f.sendCases.delete(index)
            if index >= 0 && index < len(cases) {
                // Shrink "cases" too because the removed case was still active.
                cases = f.sendCases[:len(cases)-1]
            }
        } else {
            // reflect已经确保数据已经发送,无需再尝试发送
            cases = cases.deactivate(chosen)
            nsent++
        }
    }

    // 把sendCases中的send都标记为空
    // Forget about the sent value and hand off the send lock.
    for i := firstSubSendCase; i < len(f.sendCases); i++ {
        f.sendCases[i].Send = reflect.Value{}
    }
    f.sendLock <- struct{}{}
    return nsent
}

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/24394.html

相关文章

  • 2018以太智能合约编程语言solidity的最佳IDEs

    摘要:使用基于以太坊的智能合约的集成开发环境。以太坊教程,主要介绍智能合约与应用开发,适合入门。以太坊,主要是介绍使用进行智能合约开发交互,进行账号创建交易转账代币开发以及过滤器和事件等内容。 Solidity是一种以智能合约为导向的编程语言。这是一种只有四年的年轻语言,旨在帮助开发基于以太坊数字货币的智能合约。 理解它官方文档应该是学习Solidity的最佳来源:solidity.read...

    darkerXi 评论0 收藏0
  • 以太智能合约开发第二篇:理解以太相关概念

    摘要:原文发表于以太坊智能合约开发第二篇理解以太坊相关概念很多人都说比特币是区块链,以太坊是区块链。它是以太坊智能合约的运行环境。是由以太坊节点提供。以太坊社区把基于智能合约的应用称为去中心化的应用。 原文发表于:以太坊智能合约开发第二篇:理解以太坊相关概念 很多人都说比特币是区块链1.0,以太坊是区块链2.0。在以太坊平台上,可以开发各种各样的去中心化应用,这些应用构成了以太坊的整个生态...

    yibinnn 评论0 收藏0
  • 以太连载(五):以太社区、基金会、贡献者介绍

    摘要:以太坊论坛大名鼎鼎的以太坊论坛将不再维护,可能很快就会停用。以太坊基金会以太坊基金会是在瑞士注册的非营利性机构,旨在管理以太币销售中筹措的基金,以更好地为以太坊和去中心化技术生态系统服务。 社区发起讨论和问问题,请明智选择论坛,并协助我们维护论坛环境整洁。 Reddit以太坊reddit分论坛是最全面的以太坊论坛,这里是大部分社区讨论发生的地方和核心开发者最活跃的地方。如果你想对新闻、...

    KoreyLee 评论0 收藏0
  • 智能合约实施指南

    摘要:在协议结束时,智能合约被视为已履行并仍存储在区块链网络中。这组条件和事件代表了最基本的一次性智能合约。智能合约用例智能合约越来越受欢迎,并已在各种区块链项目中实施。 与区块链技术一样,智能合约在商业领域也非常有价值。 为了让我们的读者彻底了解智能合约是什么以及它们如何影响现代商业的交易方式,我们准备了本指南。 集中商业模式正在给去中心化的模式让路 传统的商业关系模型都是集中式的,始终存...

    meteor199 评论0 收藏0
  • 智能合约实施指南

    摘要:在协议结束时,智能合约被视为已履行并仍存储在区块链网络中。这组条件和事件代表了最基本的一次性智能合约。智能合约用例智能合约越来越受欢迎,并已在各种区块链项目中实施。 与区块链技术一样,智能合约在商业领域也非常有价值。 为了让我们的读者彻底了解智能合约是什么以及它们如何影响现代商业的交易方式,我们准备了本指南。 集中商业模式正在给去中心化的模式让路 传统的商业关系模型都是集中式的,始终存...

    PumpkinDylan 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<