Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。
练习省掉复杂的,就实现简单的 订阅/取消订阅/发布信息 功能,足够了。
Server.go
Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的Client.
这个例子与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的Client列表。而Client中则保
存了其所有订阅的Channel信息。
package pubsub import ( "errors" "sync" ) type Client struct { Id int Ip string } type Server struct { Dict map[string]*Channel //map[Channel.Name]*Channel sync.RWMutex } func NewServer() *Server { s := &Server{} s.Dict = make(map[string]*Channel) //所有channel return s } //订阅 func (srv *Server) Subscribe(client *Client,channelName string) { // 客户是否在Channel的客户列表中 srv.RLock() ch,found := srv.Dict[channelName] srv.RUnlock() if !found { ch = NewChannel(channelName) ch.AddClient(client) srv.Lock() srv.Dict[channelName] = ch srv.Unlock() } else { ch.AddClient(client) } } //取消订阅 func (srv *Server) Unsubscribe(client *Client,channelName string) { srv.RLock() ch,found := srv.Dict[channelName] srv.RUnlock() if found { if ch.DeleteClient(client) == 0 { ch.Exit() srv.Lock() delete(srv.Dict,channelName) srv.Unlock() } } } //发布消息 func (srv *Server) PublishMessage(channelName,message string) (bool,error) { srv.RLock() ch,found := srv.Dict[channelName] if !found { srv.RUnlock() return false,errors.New("channelName不存在!") } srv.RUnlock() ch.Notify(message) ch.Wait() return true,nil }
Channel.go
每个Channel 负责将信息放入WaitGroup,发送到Client或队列,例子中是打印一条信息。 当clients为空时,则exit().
import ( "fmt" "sync" "sync/atomic" ) type Channel struct { Name string clients map[int]*Client // exitChan chan int sync.RWMutex waitGroup WaitGroupWrapper messageCount uint64 exitFlag int32 } func NewChannel(channelName string) *Channel { return &Channel{ Name: channelName,// exitChan: make(chan int),clients: make(map[int]*Client),} } func (ch *Channel) AddClient(client *Client) bool { ch.RLock() _,found := ch.clients[client.Id] ch.RUnlock() ch.Lock() if !found { ch.clients[client.Id] = client } ch.Unlock() return found } func (ch *Channel) DeleteClient(client *Client) int { var ret int ch.ReplyMsg( fmt.Sprintf("从channel:%s 中删除client:%d ",ch.Name,client.Id)) ch.Lock() delete(ch.clients,client.Id) ch.Unlock() ch.RLock() ret = len(ch.clients) ch.RUnlock() return ret } func (ch *Channel) Notify(message string) bool { ch.RLock() defer ch.RUnlock() for cid,_ := range ch.clients { ch.ReplyMsg( fmt.Sprintf("channel:%s client:%d message:%s",cid,message)) } return true } func (ch *Channel) ReplyMsg(message string) { ch.waitGroup.Wrap(func() { fmt.Println(message) }) } func (ch *Channel) Wait() { ch.waitGroup.Wait() } func (ch *Channel) Exiting() bool { return atomic.LoadInt32(&ch.exitFlag) == 1 } func (ch *Channel) Exit() { if !atomic.CompareAndSwapInt32(&ch.exitFlag,1) { return } //close(ch.exitChan) ch.Wait() } func (ch *Channel) PutMessage(clientID int,message string) { ch.RLock() defer ch.RUnlock() if ch.Exiting() { return } //select { // case <-t.exitChan: // return //} fmt.Println(ch.Name,":",message) atomic.AddUint64(&ch.messageCount,1) return }
主程序:
//订阅/发布 练习 //author: Xiong Chuan Liang //date: 2015-3-17 package main import ( . "pubsub" ) func main(){ c1 := &Client{Id:100,Ip:"172.18.1.1"} c3:= &Client{Id:300,Ip:"172.18.1.3"} srv := NewServer() srv.Subscribe(c1,"Topic") srv.Subscribe(c3,"Topic") srv.PublishMessage("Topic","测试信息1") srv.Unsubscribe(c3,"Topic") srv.PublishMessage("Topic","测试信息2222") srv.Subscribe(c1,"Topic2") srv.Subscribe(c3,"Topic2") srv.PublishMessage("Topic2"," Topic2的测试信息") } /* 运行结果: channel:Topic client:100 message:测试信息1 channel:Topic client:300 message:测试信息1 从channel:Topic 中删除client:300 channel:Topic client:100 message:测试信息2222 channel:Topic2 client:100 message: Topic2的测试信息 channel:Topic2 client:300 message: Topic2的测试信息 */
没做太复杂的测试,粗略看好像没有问题。
MAIL: xcl_168@aliyun.com
BLOG: http://blog.csdn.net/xcl168