ruby – 订阅队列,接收1条消息,然后取消订阅

前端之家收集整理的这篇文章主要介绍了ruby – 订阅队列,接收1条消息,然后取消订阅前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个场景,我需要非常快速地分配和处理作业.我将在队列中快速填充大约45个作业,我可以同时处理大约20个(5台机器,每个4个核心).每个作业需要花费不同的时间,并且使事情变得复杂,垃圾收集是一个问题所以我需要能够让消费者离线进行垃圾收集.

目前,我有一切与pop一起工作(每个消费者每5ms弹出一次).这似乎是不合需要的,因为它转换为每秒600次popm请求到rabbitmq.

我很乐意,如果有一个pop命令会像订阅一样,但只有一条消息. (进程会阻塞,等待来自rabbitMQ连接的输入,通过类似于Kernel.select的东西)

我试图欺骗AMQP gem来做这样的事情,但它不起作用:在队列为空并且不再向消费者发送消息之前,我似乎无法取消订阅.其他取消订阅方法我担心会失去信息.

consume_1.rb:

  1. require "amqp"
  2.  
  3. EventMachine.run do
  4. puts "connecting..."
  5. connection = AMQP.connect(:host => "localhost",:user => "guest",:pass => "guest",:vhost => "/")
  6. puts "Connected to AMQP broker"
  7.  
  8. channel = AMQP::Channel.new(connection)
  9. queue = channel.queue("tasks",:auto_delete => true)
  10. exchange = AMQP::Exchange.default(channel)
  11.  
  12. queue.subscribe do |payload|
  13. puts "Received a message: #{payload}."
  14. queue.unsubscribe { puts "unbound" }
  15. sleep 3
  16. end
  17. end

consumer_many.rb:

  1. require "amqp"
  2.  
  3. # Imagine the command is something cpu - intensive like image processing.
  4. command = "sleep 0.1"
  5.  
  6. EventMachine.run do
  7. puts "connecting..."
  8. connection = AMQP.connect(:host => "localhost",:auto_delete => true)
  9. exchange = AMQP::Exchange.default(channel)
  10.  
  11. queue.subscribe do |payload|
  12. puts "Received a message: #{payload}."
  13. end
  14. end

producer.rb:

  1. require "amqp"
  2.  
  3. i = 0
  4. EventMachine.run do
  5. connection = AMQP.connect(:host => "localhost",:auto_delete => true)
  6. exchange = AMQP::Exchange.default(channel)
  7.  
  8. EM.add_periodic_timer(1) do
  9. msg = "Message #{i}"
  10. i+=1
  11. puts "~ publishing #{msg}"
  12. end
  13. end

我将启动consume_many.rb和producer.rb.消息将按预期流动.

当我启动consume_1.rb时,它会获取所有其他消息(如预期的那样).但它永远不会取消订阅,因为它永远不会完成处理它的所有消息……所以就这样了.

我如何让consume_1.rb订阅队列,获取单个消息,然后将自己从负载均衡器环中取出,这样它就可以完成它的工作,而不会丢失可能在队列中的任何其他待处理作业否则会被安排发送到这个过程?

蒂姆

解决方法

这是AMQP gem的一个简单但很难记录的功能,你需要的是:

在您的消费者中:

  1. channel = AMQP::Channel.new(connection,:prefetch => 1)

然后使用您的订阅块,执行:

  1. queue.subscribe(:ack => true) do |queue_header,payload|
  2. puts "Received a message: #{payload}."
  3. # Do long running work here
  4.  
  5. # Acknowledge message
  6. queue_header.ack
  7. end

这是什么告诉RabbitMQ一次只发送你的消费者1消息,而不是发送另一条消息,直到你在长时间运行一个长时间运行的任务后调用队列头上的ack.

我可能需要对此进行纠正,但我相信直接交换会更适合这项任务.

猜你在找的Ruby相关文章