java – 可以使用自定义算法调度消息,而不是使用RabbitMQ进行循环?

前端之家收集整理的这篇文章主要介绍了java – 可以使用自定义算法调度消息,而不是使用RabbitMQ进行循环?前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在使用RabbitMQ的循环功能来在多个消费者之间发送消息,但是一次只能收到一个消息.

我的问题是我的消息代表任务,我想在我的消费者身上有本地会话(状态).我事先知道哪些消息属于哪个会话,但是我不知道使用我指定的算法使RabbitMQ向消费者发送什么是最好的方法(或者有什么方法?).

我不想写我自己的编排服务,因为它将成为一个瓶颈,我不希望我的制作人知道哪个消费者会收到他们的消息,因为我会失去使用兔子的解耦.

有没有办法使RabbitMQ基于预定义的算法/规则而不是循环方式向消费者发送消息?

澄清:我用不同语言编写的几个微服务器,每个服务都有自己的工作.我们之间使用protobuf消息进行通信.我给每个新消息一个UUID.如果消费者收到消息,它可以从中创建一个响应消息(这可能不是正确的术语,因为生产者和消费者被解耦,并且他们彼此不了解),并且该UUID被复制到新消息.这形成一个数据转换流水线,该“进程”由UUID(processId)标识.我的问题是,有可能我有多个工作的消费者,如果以前看到它,我需要一个工作者坚持一个UUID.我有这个需要,因为

>每个进程可能有本地状态
>进程完成后,我想清理本地状态
>微服务器可能会收到同一进程的多条消息,我需要区分哪个消息属于哪个进程

由于RabbitMQ在使用循环的工作人员之间分配任务,因此我无法强制我的进程坚持工作.我有几个注意事项:

生产者与消费者脱钩,所以直接信息不是一个选择
>工作人员的数量不是常数(有一个负载平衡器可能会启动一个工作的新实例)

如果有一个解决方案不涉及更改循环算法,并且不会破坏我的约束,那也是可以的!

解决方法

如果你不想去编排业务,你可以尝试这样的拓扑:

为了简单起见,我假设您的processId用作路由密钥(在现实世界中,您可能希望将其存储在标题中,并使用标题交换).

传入消息将被接收的Exchange(类型:直接)接受,其中alternative-exchange属性设置为指向无会话交换(扇出).

这是RabbitMQ文档在“替代交易”中所说的:

It is sometimes desirable to let clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues our no matching bindings).

Typical examples of this are

  • detecting when clients accidentally or malicIoUsly publish messages that cannot be routed
  • “or else” routing semantics where some messages are handled specially and the rest by a generic handler

RabbitMQ’s Alternate Exchange (“AE”) feature addresses these use cases.

(我们对这里的用例特别感兴趣)

每个消费者将创建它自己的队列,并将其绑定到传入Exchange,使用到目前为止知道的会话的processId作为绑定的路由密钥.

这样一来,它只会收到有兴趣的会话的消息.

另外,所有的消费者都将绑定到共享的无会话队列.

如果有一个以前未知的processId的消息进来,那么它将不会对与Incoming Exchange进行注册的特定绑定,因此它将被重新路由到No Session Exchange =>没有会话队列,并以通常(循环)方式发送到其中一个消费者.

然后,消费者将使用传入交换(即启动新的“会话”)为其注册新的绑定,以便随后将使用此processId获取所有后续消息.

一旦“会话”结束,它将不得不删除相应的绑定(即关闭“会话”).

猜你在找的Java相关文章