资讯专栏INFORMATION COLUMN

如何处理有依赖的消息

Tony / 3458人阅读

摘要:生产者生产的消息要满足不了消费者才行。可以看到一个有依赖的消息我们在处理的过程,会多一次查询操作,性能多少会受点影响。如果没有的消息进来,孤儿院里是酱紫的。收到之后再处理,紧接着又找到的条消息,再出来,让去处理。

在项目中踏完一系列坑后总结出来,消息的处理有两个要务:

消费一定要快,我们喜欢供小于求的市场。生产者生产的消息要满足不了消费者才行。

任何消息都不能丢,因为这都是数据啊,即使处理不了也得找地方存着。最好每次的消息都存着,之后就变成了event sourcing(另一个大坑)。

要实现上述2点,其实要解决很多问题。一个字就不是那么做到的。业务系统收到消息有可能会触发一连串的,并且包裹着事务的逻辑。因为通常我们希望如果这一连串的处理失败的话,可以把ack退回给MQ。一旦业务逻辑过于复杂,work消费消息的速度也会变慢。这就需要开发人员去做权衡了,是不是有些非常heavy的操作可以先记一笔,等业务不繁忙的时候再做。具体实现不在这篇讨论。

问题

回到主题,有一类消息最让人头疼,就是消息之间有依赖,关系一般为单向的父子关系。举个栗子,Product和SKU的关系,一个Product包含多个SKU。比如我们的业务逻辑是要监听这两个消息组成一颗树放到索引中。可想而知,这棵树肯定是至少两层结构,第一级是Product,下面挂着一个或多个SKU。

一般来说,子结构如果是个多带带的消息肯定会有个字段说明自己的parent id是什么。那么很自然的,我们在某一刻只收到一个SKU的Create事件,会去通过parent id找到索引中对应的Product,然后上去。问题来了,要是索引中没有对应的Product怎么办,消息是没有顺序的,可能是Product的Create事件还没处理到,或者是生产者出了bug消息没发出来造成的。这时SKU的消息就成了孤儿消息

解决思路一

比较近粗暴的方式,就是利用SKU上的parent id虚拟出一个只有id的Product,由处理SKU事件的worker来帮忙创建这个Product。等下次Product的Create消息进来做一次更新就好了。(处理消息应该不要区分这是Create还是Update还是Delete,消费者就都当Update来做比较好,可以想想为什么)。

当然这个思路一看就有点bad smell,从单一职责的角度上来看,处理SKU的worker应该只关注SKU,不应该关注Product。如果Product也是个孤儿怎么办呢?这个worker可能会越写越复杂。

改进的话可以把创建虚拟Product的这个事情放到SKU这个对象中去做,实现以下setProduct这个方法。那么即使Product也有依赖,那Product自己也得有个setParent的方法,这样就可以递归下去了。(之后想了一下无法处理多级关系,因为Product的消息没来,我们不知道它的parent id,父节点根本建不出来。所以思路一只能处理一个层级的依赖。)

总结一下,思路一是一种不管三七二十一,谁也不能阻止我消费的路线,大不了自下而上的创建虚拟父节点。

解决思路二

相对思路一而言,这种思路还是比较优雅的,但是优雅不等于性能好。

既然SKU是个孤儿,那么我们先收下来放孤儿院好了。新建一张孤儿院表:

id parent_type parent_id
101010 Product 1010
101011 Product 1010

上面两条数据就是SKU的,然后为了提升一点性能我们得对对象分个类,一类是有依赖的,一类是无依赖的。没有依赖的直接消费就好,像Product,SKU这种有依赖的,都得打上标签(就是对象里写个isDependency)。例如一个SKU(101010)的消息进来,worker发现这是一个有依赖的消息,那么先拿parent id (1010)去找Product, 发现Product找不到就把这个SKU丢到孤儿院表里去。如果你是用OO的语言,这里其实可以抽象一下。一个BaseWorker,一个SKUWorker,BaseWork负责写个abstract的findParent(),SKUWorker去实现找Product的逻辑就好了。

public abstract class BaseWorker {
    
    public void handle(T t) {
        if (t.isDependency() && findParent(t) == null) {
            // 送到孤儿院
            takeToOrphanage(t);
            return;
        }
    }
    
    abstract Entity findParent(T t);
    
    protected void takeToOrphanage(T t) {
    }
    
}

消息记录下来以后,Worker的工作就终止,等待下一条消息进来。过了几分钟,Product(1010)的消息过来了,这时候我们需要给BaseWorker再添加一些代码。

public void handle(T t) {
        if (t.isDependency() && findParent(t) == null) {
            // 送到孤儿院
            takeToOrphanage(t);
            return;
        }
    
        // 正常业务...
    
        // 正常业务处理之后
        if (t.isDependency()) {
            List children = findChildren(t);
            if (children != null) {
                children.forEach(child -> {
                    sendAsMessage(child);
                });
            }
        }
    
    }
    
    abstract List findChildren(T t);

我们增加一个findChildren方法,让ProductWorker去实现具体逻辑。handle()中增加的代码含义是,当Product这个消息消费完了以后,去孤儿院转一圈看看是不是有等待认领的孩子,简单的利用parent_typeparent_id就能查到。查到以后别直接处理,仍然是以消息的形式发出,让SKUWorker自己去handle,然后可以delete/soft-delete孤儿院中的记录。

可以看到一个有依赖的消息我们在处理的过程,会多一次查询操作,性能多少会受点影响。之前的那次findParent查询其实思路一也有的,目的就是挂靠。

再多一个层级看看是不是罩得住,Category --> Product --> SKU 三层。

如果没有Category的消息进来,孤儿院里是酱紫的。

id parent_type parent_id
101010 Product 1010
101011 Product 1010
1010 Category 10

某一时刻Category的消息进来,CategoryWorker会先到表里查到一条1010的Product消息,把它send出来。ProductWorker收到之后再处理,紧接着又找到SKU的2条消息,再send出来,让SKUWorker去处理。可以看到,自带递归,多层级只要是单向依赖的肯定搞的定。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66305.html

相关文章

  • 何处理Docker错误消息:please add——insecure-registry

    摘要:本地安装时,遇到如下的错误消息解决方案点击的菜单点击标签页,在里维护记录将错误信息里提到的维护进点击按钮重新启动之后错误消息消失。 本地安装Kubernetes时,遇到如下的错误消息: pleade add --insecure-registry gcr.io to daemons arguments showImg(https://segmentfault.com/img/remot...

    jsdt 评论0 收藏0
  • 白话RabbitMQ(六): RPC

    摘要:因为消费消息是在另外一个进程中,我们需要阻塞我们的进程直到结果返回,使用阻塞队列是一种非常好的方式,这里我们使用了长度为的,的功能是检查消息的的是不是我们之前所发送的,如果是,将返回值返回到。 推广 RabbitMQ专题讲座 https://segmentfault.com/l/15... CoolMQ开源项目 我们利用消息队列实现了分布式事务的最终一致性解决方案,请大家围观。可以参考...

    KevinYan 评论0 收藏0
  • 何处理Docker错误消息request canceled:Docker代理问题

    摘要:在本地安装时,遇到错误消息这个原因是应用没有正确设置代理。在上设置代理非常方便选择即手动设置。设置完之后,点击按钮之后在里使用命令行可以成功把镜像下载到本地。 在本地安装Kubernetes时,遇到错误消息: request canceled while waiting for connection(Client.Timeout exceeded while awaiting head...

    qianfeng 评论0 收藏0
  • 为Java程序员金三银四精心挑选300余道Java面试题与答案

    摘要:为程序员金三银四精心挑选的余道面试题与答案,欢迎大家向我推荐你在面试过程中遇到的问题我会把大家推荐的问题添加到下面的常用面试题清单中供大家参考。 为Java程序员金三银四精心挑选的300余道Java面试题与答案,欢迎大家向我推荐你在面试过程中遇到的问题,我会把大家推荐的问题添加到下面的常用面试题清单中供大家参考。 前两天写的以下博客,大家比较认可,热度不错,希望可以帮到准备或者正在参加...

    tomorrowwu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<