我正在尝试使用AMQP,Websockets和
Ruby构建一个简单的聊天应用程序.我理解这可能不是理解AMQP的最佳用例,但我想了解我哪里出错了.
以下是我的amqp服务器代码
require 'rubygems' require 'amqp' require 'mongo' require 'em-websocket' require 'json' class MessageParser # message format => "room:harry_potter,nickname:siddharth,room:members" def self.parse(message) parsed_message = JSON.parse(message) response = {} if parsed_message['status'] == 'status' response[:status] = 'STATUS' response[:username] = parsed_message['username'] response[:roomname] = parsed_message['roomname'] elsif parsed_message['status'] == 'message' response[:status] = 'MESSAGE' response[:message] = parsed_message['message'] response[:roomname] = parsed_message['roomname'].split().join('_') end response end end class MongoManager def self.establish_connection(database) @db ||= Mongo::Connection.new('localhost',27017).db(database) @db.collection('rooms') @db end end @sockets = [] EventMachine.run do connection = AMQP.connect(:host => '127.0.0.1') channel = AMQP::Channel.new(connection) puts "Connected to AMQP broker. #{AMQP::VERSION} " mongo = MongoManager.establish_connection("trackertalk_development") EventMachine::WebSocket.start(:host => '127.0.0.1',:port => 8080) do |ws| socket_detail = {:socket => ws} ws.onopen do @sockets << socket_detail end ws.onmessage do |message| status = MessageParser.parse(message) exchange = channel.fanout(status[:roomname].split().join('_')) if status[:status] == 'STATUS' queue = channel.queue(status[:username],:durable => true) unless queue.subscribed? puts "--------- SUBSCRIBED --------------" queue.bind(exchange).subscribe do |payload| puts "PAYLOAD : #{payload}" ws.send(payload) end else puts "----ALREADY SUBSCRIBED" end # only after 0.8.0rc14 #queue = channel.queue(status[:username],:durable => true) #AMQP::Consumer.new(channel,queue) elsif status[:status] == 'MESSAGE' puts "********************* Message- published ******************************" exchange.publish(status[:message) end end ws.onclose do @sockets.delete ws end end end
我使用状态来指示传入消息是用于正在进行的聊天的消息还是要求我处理诸如订阅队列之类的杂事的状态消息.
我面临的问题是当我发送消息时
socket.send(JSON.stringify({status:’message’,message:’test’,roomname:’Harry Potter’}))
调用exchange.publish’但它仍然不会通过thews.send`推送到浏览器.
我对EventMachine和AMQP的理解是否存在根本性的错误?
这是相同代码http://pastie.org/private/xosgb8tw1w5vuroa4w7a的贴图
当我删除耐用=>时,我的代码似乎按照需要工作true = queue = channel.queue(status [:username],:durable => true)
以下是我的Rails视图的片段,它标识用户的用户名和房间名,并通过Websockets将其作为消息的一部分发送.
虽然代码似乎工作,当我删除耐用=>是的,我无法理解为什么会影响正在传递的消息.请忽略mongo部分,因为它还没有播放任何部分.
<script> $(document).ready(function(){ var username = '<%= @user.email %>'; var roomname = 'Bazingaa'; socket = new WebSocket('ws://127.0.0.1:8080/'); socket.onopen = function(msg){ console.log('connected'); socket.send(JSON.stringify({status:'status',username:username,roomname:roomname})); } socket.onmessage = function(msg){ $('#chat-log').append(msg.data); } }); </script> <div class='block'> <div class='content'> <h2 class='title'><%= @room.name %></h2> <div class='inner'> <div id="chat-log"> </div> <div id="chat-console"> <textarea rows="5" cols="40"></textarea> </div> </div> </div> </div> <style> #chat-log{ color:#000; font-weight:bold; margin-top:1em; width:900px; overflow:auto; height:300px; } #chat-console{ bottom:10px; } textarea{ width:100%; height:60px; } </style>
解决方法
我认为您的问题可能是在ws.onmessage调用之间的代理上挂起的队列.当客户端重新连接队列并且绑定已经存在时,ws.send()不会被调用.
默认情况下,当您创建一个队列时,它及其拥有的任何绑定都会挂起,直到代理重新启动,或者您明确告诉代理删除它.
有两种方法可以改变这种情况:
>在创建队列时添加持久标志,即使代理重新启动,也会导致队列停留
>添加auto_delete标志,这将导致代理在没有附加消费者的短时间后自动删除实体
如果您可以控制使用rabbitmq代理的代理,那么可以通过一种简单的方式来反省代理上发生的事情,即安装management plugin,它为代理上的交换,绑定和队列提供Web界面.