我正试图从SQS转移到RabbitMQ进行消息传递服务.我正在寻求建立稳定的高可用性排队服务.现在我要去集群了.
目前的实施,
我有三台带有RabbitMQ的EC2机器,在AMI中安装了管理插件,然后我明确地去了每台机器并添加
sudo rabbitmqctl join_cluster rabbit@
最佳答案
我2年前有类似的配置.
我决定使用amazon VPC,默认情况下我的设计有两个RabbitMQ实例始终在运行,并在集群中配置(称为主节点).
rabbitmq集群落后于internal amazon load balancer.
我创建了一个配置了RabbitMQ和管理插件的AMI(称为“master-AMI”),然后我配置了自动缩放规则.
如果引发自动调节警报,则启动新的主AMI.
此AMI在第一次执行时执行以下脚本:
#!/usr/bin/env python
import json
import urllib2,base64
if __name__ == '__main__':
prefix =''
from subprocess import call
call(["rabbitmqctl","stop_app"])
call(["rabbitmqctl","reset"])
try:
_url = 'http://internal-myloadbalamcer-xxx.com:15672/api/nodes'
print prefix + 'Get json info from ..' + _url
request = urllib2.Request(_url)
base64string = base64.encodestring('%s:%s' % ('guest','guest')).replace('\n','')
request.add_header("Authorization","Basic %s" % base64string)
data = json.load(urllib2.urlopen(request))
##if the script got an error here you can assume that it's the first machine and then
## exit without controll the error. Remember to add the new machine to the balancer
print prefix + 'request ok... finding for running node'
for r in data:
if r.get('running'):
print prefix + 'found running node to bind..'
print prefix + 'node name: '+ r.get('name') +'- running:' + str(r.get('running'))
from subprocess import call
call(["rabbitmqctl","join_cluster",r.get('name')])
break;
pass
except Exception,e:
print prefix + 'error during add node'
finally:
from subprocess import call
call(["rabbitmqctl","start_app"])
pass
脚本使用HTTP API“http://internal-myloadbalamcer-xxx.com:15672/api/nodes”查找节点,然后选择一个并将新AMI绑定到集群.
作为HA政策,我决定使用它:
rabbitmqctl set_policy ha-two "^two\." ^
"{""ha-mode"":""exactly"",""ha-params"":2,"ha-sync-mode":"automatic"}"
好吧,连接“非常”简单,问题在于何时可以从群集中删除节点.
您无法根据自动缩放规则删除节点,因为您可以向必须使用的队列发送消息.
我决定执行一个定期运行到两个主节点实例的脚本:
>通过API http://node:15672/api/queues检查消息计数
>如果所有队列的消息计数为零,我可以从负载均衡器中删除该实例,然后从rabbitmq集群中删除该实例.
这就是我所做的,希望它有所帮助.
[编辑]
我编辑了答案,因为有这个插件可以帮助: