NSQ是什么
(本文作者 changjixiong,以下是正文)
NSQ是一个实时消息平台,引用一段InfoQ上的介绍:
“NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。”
如何开始使用
这里有一个例子用来说明如何安装、启动以及发送与接收消息:
An Example of Using NSQ From Go(地址:http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/)
构建消息的响应函数
如果单是用一个匿名函数来处理收到的消息显然是不够的,下面用代码来演示一下如果根据收到的消息来使用相应的处理函数。
生产者
首先我们来创建生产者
config := nsq.NewConfig()
w,_ := nsq.NewProducer("127.0.0.1:4150",config)
jsonData := []string{}
jsonData = append(jsonData,` { "func_name":"BarFuncAdd","params":[0.5,0.51] }`)
jsonData = append(jsonData,` { "func_name":"FooFuncSwap","params":["a","b"] }`)
for _,j := range jsonData {
w.Publish("Topic_json",[]byte(j))
}
上面的代码向NSQ发送了2个json格式的消息,从字面上不难看出其目的是调用2个函数,分别是BarFuncAdd和FooFuncSwap。
消费者
现在我们来创建消费者
config := nsq.NewConfig()
config.DefaultRequeueDelay = 0
config.MaxBackoffDuration = 20 * time.Millisecond
config.LookupdPollInterval = 1000 * time.Millisecond
config.RDYRedistributeInterval = 1000 * time.Millisecond
config.MaxInFlight = 2500
MakeConsumer("Topic_json","ch",config,HandleJsonMessage)
MakeConsumer的定义如下:
func MakeConsumer(topic,channel string,config *nsq.Config,handle func(message *nsq.Message) error) {
consumer,_ := nsq.NewConsumer(topic,channel,config)
consumer.AddHandler(nsq.HandlerFunc(handle))
err := consumer.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Panic("Could not connect")
}
}
处理器函数
NSQ消息的处理器函数定义如下:
func HandleJsonMessage(message *nsq.Message) error {
resultJson := reflectinvoke.InvokeByJson([]byte(message.Body))
result := reflectinvoke.Response{}
err := json.Unmarshal(resultJson,&result)
if err != nil {
return err
}
info := "HandleJsonMessage get a result\n"
info += "raw:\n" + string(resultJson) + "\n"
info += "function: " + result.FuncName + " \n"
info += fmt.Sprintf("result: %v\n",result.Data)
info += fmt.Sprintf("error: %d,%s\n\n",result.ErrorCode,reflectinvoke.ErrorMsg(result.ErrorCode))
fmt.Println(info)
return nil
}
功能函数
处理器函数根据收到的json数据通过反射最终调用了Foo的FooFuncSwap方法及Bar的BarFuncAdd方法。
type Foo struct {
}
type Bar struct {
}
func (b *Bar) BarFuncAdd(argOne,argTwo float64) float64 {
return argOne + argTwo
}
func (f *Foo) FooFuncSwap(argOne,argTwo string) (string,string) {
return argTwo,argOne
}
怎么调用的
reflectinvoke.InvokeByJson是如何根据形如:
{
"func_name":"BarFuncAdd","params":[0.5,0.51] }
的 json数据调用Bar.BarFuncAdd的?
请参考《golang通过反射使用json字符串调用struct的指定方法及返回json结果》(如果前面这段没有连接地址,那肯定是文章被爬虫干掉了连接,请找本文的原文阅读)
文中代码的完整内容在https://github.com/changjixiong/goNotes/tree/master/nsqNotes以及https://github.com/changjixiong/goNotes/tree/master/reflectinvoke中。
注意事项
同一个消息channel如果有多个消费者则消费者收到的消息是不确定的。例如,如果将文中的生产者运行一个实例,将消费者运行两个实例(命名为A,B),则会出现A收到2个消息或者B收到2个消息或者AB各收到一个消息。