如何处理有依赖的消息

前端之家收集整理的这篇文章主要介绍了如何处理有依赖的消息前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

在项目中踏完一系列坑后总结出来,消息的处理有两个要务:

  1. 消费一定要快,我们喜欢供小于求的市场。生产者生产的消息要满足不了消费者才行。

  2. 任何消息都不能丢,因为这都是数据啊,即使处理不了也得找地方存着。最好每次的消息都存着,之后就变成了event sourcing(另一个大坑)。

    要实现上述2点,其实要解决很多问题。一个字就不是那么做到的。业务系统收到消息有可能会触发一连串的,并且包裹着事务的逻辑。因为通常我们希望如果这一连串的处理失败的话,可以把ack退回给MQ。一旦业务逻辑过于复杂,work消费消息的速度也会变慢。这就需要开发人员去做权衡了,是不是有些非常heavy的操作可以先记一笔,等业务不繁忙的时候再做。具体实现不在这篇讨论。

问题

回到主题,有一类消息最让人头疼,就是消息之间有依赖,关系一般为单向的父子关系。举个栗子,Product和SKU的关系,一个Product包含多个SKU。比如我们的业务逻辑是要监听这两个消息组成一颗树放到索引中。可想而知,这棵树肯定是至少两层结构,第一级是Product,下面挂着一个或多个SKU。

一般来说,子结构如果是个单独的消息肯定会有个字段说明自己的parent id是什么。那么很自然的,我们在某一刻只收到一个SKU的Create事件,会去通过parent id找到索引中对应的Product,然后上去。问题来了,要是索引中没有对应的Product怎么办,消息是没有顺序的,可能是Product的Create事件还没处理到,或者是生产者出了bug消息没发出来造成的。这时SKU的消息就成了孤儿消息

解决思路一

比较近粗暴的方式,就是利用SKU上的parent id虚拟出一个只有id的Product,由处理SKU事件的worker来帮忙创建这个Product。等下次Product的Create消息进来做一次更新就好了。(处理消息应该不要区分这是Create还是Update还是Delete,消费者就都当Update来做比较好,可以想想为什么)。

当然这个思路一看就有点bad smell,从单一职责的角度上来看,处理SKU的worker应该只关注SKU,不应该关注Product。如果Product也是个孤儿怎么办呢?这个worker可能会越写越复杂。

改进的话可以把创建虚拟Product的这个事情放到SKU这个对象中去做,实现以下setProduct这个方法。那么即使Product也有依赖,那Product自己也得有个setParent的方法,这样就可以递归下去了。(之后想了一下无法处理多级关系,因为Product的消息没来,我们不知道它的parent id,父节点根本建不出来。所以思路一只能处理一个层级的依赖。)

总结一下,思路一是一种不管三七二十一,谁也不能阻止我消费的路线,大不了自下而上的创建虚拟父节点。

解决思路二

相对思路一而言,这种思路还是比较优雅的,但是优雅不等于性能好。

既然SKU是个孤儿,那么我们先收下来放孤儿院好了。新建一张孤儿院表:

id parent_type parent_id
101010 Product 1010
101011 Product 1010

上面两条数据就是SKU的,然后为了提升一点性能我们得对对象分个类,一类是有依赖的,一类是无依赖的。没有依赖的直接消费就好,像Product,SKU这种有依赖的,都得打上标签(就是对象里写个isDependency)。例如一个SKU(101010)的消息进来,worker发现这是一个有依赖的消息,那么先拿parent id (1010)去找Product,发现Product找不到就把这个SKU丢到孤儿院表里去。如果你是用OO的语言,这里其实可以抽象一下。一个BaseWorker,一个SKUWorker,BaseWork负责写个abstract的findParent(),SKUWorker去实现找Product的逻辑就好了。

public abstract class BaseWorker<T> {
    
    public void handle(T t) {
        if (t.isDependency() && findParent(t) == null) {
            // 送到孤儿院
            takeToOrphanage(t);
            return;
        }
    }
    
    abstract Entity findParent(T t);
    
    protected void takeToOrphanage(T t) {
    }
    
}

消息记录下来以后,Worker的工作就终止,等待下一条消息进来。过了几分钟,Product(1010)的消息过来了,这时候我们需要给BaseWorker再添加一些代码

public void handle(T t) {
        if (t.isDependency() && findParent(t) == null) {
            // 送到孤儿院
            takeToOrphanage(t);
            return;
        }
    
        // 正常业务...
    
        // 正常业务处理之后
        if (t.isDependency()) {
            List<Entity> children = findChildren(t);
            if (children != null) {
                children.forEach(child -> {
                    sendAsMessage(child);
                });
            }
        }
    
    }
    
    abstract List<Entity> findChildren(T t);

我们增加一个findChildren方法,让ProductWorker去实现具体逻辑。handle()中增加代码含义是,当Product这个消息消费完了以后,去孤儿院转一圈看看是不是有等待认领的孩子,简单的利用parent_typeparent_id就能查到。查到以后别直接处理,仍然是以消息的形式发出,让SKUWorker自己去handle,然后可以delete/soft-delete孤儿院中的记录。

可以看到一个有依赖的消息我们在处理的过程,会多一次查询操作,性能多少会受点影响。之前的那次findParent查询其实思路一也有的,目的就是挂靠。

再多一个层级看看是不是罩得住,Category --> Product --> SKU 三层。

如果没有Category的消息进来,孤儿院里是酱紫的。

id parent_type parent_id
101010 Product 1010
101011 Product 1010
1010 Category 10

某一时刻Category的消息进来,CategoryWorker会先到表里查到一条1010的Product消息,把它send出来。ProductWorker收到之后再处理,紧接着又找到SKU的2条消息,再send出来,让SKUWorker去处理。可以看到,自带递归,多层级只要是单向依赖的肯定搞的定。

猜你在找的设计模式相关文章