NSQ系列之nsqlookupd代码分析一(初探nsqlookup)
nsqlookupd
是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd
来发现指定话题(topic
)的生产者,并且提供 nsqd
节点广播话题(topic
)和通道(channel
)信息。
nsqlookupd
有两个接口:TCP
接口,nsqd
用它来广播。HTTP
接口,客户端用它来发现和管理。
本系列的代码分析均是基于nsq v0.3.5的代码进行的分析,如有不对之处欢迎大家指正指导。
nsqlookup struct分析
代码文件路径为nsq/nsqlookupd/nsqlookupd.go
type Nsqlookupd struct { sync.RWMutex //读写锁 opts *Options //nsqlookupd 配置信息 定义文件路径为nsq/nsqlookupd/options.go tcpListener net.Listener httpListener net.Listener waitGroup util.WaitGroupWrapper //WaitGroup 典型应用 用于开启两个goroutine,一个监听HTTP 一个监听TCP DB *RegistrationDB //product 注册数据库 具体分析后面章节再讲 } //初始化Nsqlookupd实例 func New(opts *Options) *Nsqlookupd { n := &Nsqlookupd{ opts: opts,DB: NewRegistrationDB(),//初始化DB实例 } n.logf(version.String("nsqlookupd")) return n } func (l *Nsqlookupd) Main() { ctx := &Context{l} //初始化Context实例将Nsqlookupd指针放入Context实例中 Context结构请参考文件nsq/nsqlookupd/context.go Context用于nsqlookupd中的tcpServer 和 httpServer中 tcpListener,err := net.Listen("tcp",l.opts.TCPAddress) //开启TCP监听 if err != nil { l.logf("FATAL: listen (%s) Failed - %s",l.opts.TCPAddress,err) os.Exit(1) } l.Lock() l.tcpListener = tcpListener l.Unlock() tcpServer := &tcpServer{ctx: ctx} //创建一个tcpServer tcpServer 实现了nsq/internal/protocol包中的TCPHandler接口 l.waitGroup.Wrap(func() { //protocol.TCPServer方法的过程就是tcpListener accept tcp的连接 //然后通过tcpServer中的Handle分析报文,然后处理相关的协议 protocol.TCPServer(tcpListener,tcpServer,l.opts.Logger) }) //把tcpServer加入到waitGroup httpListener,l.opts.HTTPAddress) //开启HTTP监听 if err != nil { l.logf("FATAL: listen (%s) Failed - %s",l.opts.HTTPAddress,err) os.Exit(1) } l.Lock() l.httpListener = httpListener l.Unlock() httpServer := newHTTPServer(ctx) //创建一个httpServer l.waitGroup.Wrap(func() { http_api.Serve(httpListener,httpServer,"HTTP",l.opts.Logger) }) //把httpServer加入到waitGroup } //Nsqlookupd退出 func (l *Nsqlookupd) Exit() { if l.tcpListener != nil { l.tcpListener.Close() //关闭tcpListener } if l.httpListener != nil { l.httpListener.Close() //关闭httpListener } l.waitGroup.Wait() }