简介:
协议说明:
通常网游的网络协议都是报文的形式,即使底层是使用TCP,也会用一些方法把数据拆分成一个个的报文(本文中称为协议包)。因此,本文也基于这一假设,但是对于具体的协议包格式,本文没有特别限制,只是要求协议包中能够容纳一个32字节的ID。
协议包的处理大概可以分为以下两种类型。其他更复杂的会话可以由以下两种类型组合而成。
框架说明:
Go语言是一种支持高并发的编程语言,它支持高并发的方式是大量轻量级的goroutine并发执行。在每个goroutine中的操作基本上都是同步阻塞的,这样可以极大地简化程序逻辑,使得代码清晰易读,容易维护。基于这点,本文实现的框架的调用接口也是使用同步方式的。
- 如果一个协议包需要等待回应,就在调用函数上阻塞等待。这个调用的签名为:
func (p *Connection) Query(data []byte) ([]byte,error)
注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。 - 如果发送一个协议包是对于接收到的某个协议包的回应,则调用:
func (p *Connection) Reply(query,answer []byte) error
注意:answer的控制权会转交给框架,因此函数调用后不能修改answer的内容。 - 如果一个协议包不需要回应,就直接调用发送函数:
func (p *Connection) Write(data []注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。 - 调用者需要实现的接口:
导出类型、函数和接口:
type Connection func NewConnection(conn Socket,maxcount int,dh DataHandler,ih IdentityHandler,eh ErrorHandler) *Connection func (p *Connection) Start() func (p *Connection) Close() func (p *Connection) Query(data []byte) (res []byte,err error) func (p *Connection) Reply(query,answer []byte) error func (p *Connection) Write(data []byte) error type Socket interface { Read() ([]byte,error) Write([]byte) error Close() } type DataHandler interface { Process([]byte) } type ErrorHandler interface { OnError(error) } type IdentityHandler interface { GetIdentity([]byte) uint32 SetIdentity([]byte,uint32) }完整的代码实现:
package multiplexer import ( "errors" "sync" "sync/atomic" ) var ( ERR_EXIT = errors.New("exit") ) type Socket interface { Read() ([]byte,error) Write([]byte) error Close() } type DataHandler interface { Process([]byte) } type ErrorHandler interface { OnError(error) } type IdentityHandler interface { GetIdentity([]byte) uint32 SetIdentity([]byte,uint32) } type Connection struct { conn Socket wg sync.WaitGroup mutex sync.Mutex applicants map[uint32]chan []byte chexit chan bool chsend chan []byte chch chan chan []byte dh DataHandler ih IdentityHandler eh ErrorHandler identity uint32 } func NewConnection(conn Socket,eh ErrorHandler) *Connection { count := maxcount if count < 1024 { count = 1024 } chch := make(chan chan []byte,count) for i := 0; i < count; i++ { chch <- make(chan []byte,1) } return &Connection{ conn: conn,applicants: make(map[uint32]chan []byte,count),chsend: make(chan []byte,chexit: make(chan bool),chch: chch,dh: dh,ih: ih,eh: eh,} } func (p *Connection) Start() { p.wg.Add(2) go func() { defer p.wg.Done() p.recv() }() go func() { defer p.wg.Done() p.send() }() } func (p *Connection) Close() { close(p.chexit) p.conn.Close() p.wg.Wait() } func (p *Connection) Query(data []byte) (res []byte,err error) { var ch chan []byte select { case <-p.chexit: return nil,ERR_EXIT case ch = <-p.chch: defer func() { p.chch <- ch }() } id := p.newIdentity() p.ih.SetIdentity(data,id) p.addApplicant(id,ch) defer func() { if err != nil { p.popApplicant(id) } }() if err := p.Write(data); err != nil { return nil,err } select { case <-p.chexit: return nil,ERR_EXIT case res = <-ch: break } return res,nil } func (p *Connection) Reply(query,answer []byte) error { // put back the identity attached to the query id := p.ih.GetIdentity(query) p.ih.SetIdentity(answer,id) return p.Write(answer) } func (p *Connection) Write(data []byte) error { select { case <-p.chexit: return ERR_EXIT case p.chsend <- data: break } return nil } func (p *Connection) send() { for { select { case <-p.chexit: return case data := <-p.chsend: if p.conn.Write(data) != nil { return } } } } func (p *Connection) recv() (err error) { defer func() { if err != nil { select { case <-p.chexit: err = nil default: p.eh.OnError(err) } } }() for { select { case <-p.chexit: return nil default: break } data,err := p.conn.Read() if err != nil { return err } if id := p.ih.GetIdentity(data); id > 0 { ch,ok := p.popApplicant(id) if ok { ch <- data continue } } p.dh.Process(data) } return nil } func (p *Connection) newIdentity() uint32 { return atomic.AddUint32(&p.identity,1) } func (p *Connection) addApplicant(identity uint32,ch chan []byte) { p.mutex.Lock() defer p.mutex.Unlock() p.applicants[identity] = ch } func (p *Connection) popApplicant(identity uint32) (chan []byte,bool) { p.mutex.Lock() defer p.mutex.Unlock() ch,ok := p.applicants[identity] if !ok { return nil,false } delete(p.applicants,identity) return ch,true }