Golang1.7闲来无事写了一个基于Gob的tcp通讯用的包

前端之家收集整理的这篇文章主要介绍了Golang1.7闲来无事写了一个基于Gob的tcp通讯用的包前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
@H_301_2@package gobconn

@H_301_2@import (
    "encoding/gob"
    "errors"
    "net"
    "reflect"
    "sync"
    "unsafe"
)

@H_301_2@type message @H_301_2@struct {
    Type  string
    value reflect.Value
}

@H_301_2@func (self *message) Recovery() {
    putPointer(self.value)
    putMsg(self)
}

@H_301_2@func (self *message) Interface() @H_301_2@interface{} {
    @H_301_2@return self.value.Interface()
}

/* 声明一个消息池用来重用对象 */

@H_301_2@var msgPool sync.Pool

@H_301_2@func getMsg() *message {
    @H_301_2@if msg,ok := msgPool.Get().(*message); ok {
        @H_301_2@return msg
    }
    @H_301_2@return new(message)
}

@H_301_2@func putMsg(msg *message) {
    msgPool.Put(msg)
}

@H_301_2@type gobConnection @H_301_2@struct {
    rwc   net.Conn
    enc   *gob.Encoder
    dec   *gob.Decoder
    rlock sync.Mutex
    wlock sync.Mutex
}

@H_301_2@type GobConnection @H_301_2@interface {
    Read() (msg *message,err error)
    Write(msg @H_301_2@interface{}) (err error)
    Close() error
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
}

@H_301_2@var gobPool sync.Pool

@H_301_2@func NewGobConnection(conn net.Conn) GobConnection {
    @H_301_2@if gcn,ok := gobPool.Get().(*gobConnection); ok {
        gcn.rwc = conn
        gcn.enc = gob.NewEncoder(conn)
        gcn.dec = gob.NewDecoder(conn)
        @H_301_2@return gcn
    }
    @H_301_2@return &gobConnection{rwc: conn,enc: gob.NewEncoder(conn),dec: gob.NewDecoder(conn)}
}

@H_301_2@type msgStruct @H_301_2@struct {
    StructName string
}

@H_301_2@var (
    rheadMsg = msgStruct{}
    wheadMsg = msgStruct{}
)

@H_301_2@func (self *gobConnection) Read() (msg *message,err error) {
    self.rlock.Lock()
    @H_301_2@defer self.rlock.Unlock()

    err = self.dec.Decode(&rheadMsg)
    @H_301_2@if err != nil {
        @H_301_2@return
    }
    @H_301_2@var typ reflect.Type
    typ,err = GetMsgType(rheadMsg.StructName)
    @H_301_2@if err != nil {
        @H_301_2@return
    }
    msg = getMsg()
    msg.Type = rheadMsg.StructName
    @H_301_2@var value = getPointer(typ)
    err = self.dec.DecodeValue(value)
    @H_301_2@if err != nil {
        msg.Recovery()
        @H_301_2@return
    }
    msg.value = value
    @H_301_2@return
}

@H_301_2@func (self *gobConnection) Write(msg @H_301_2@interface{}) (err error) {
    self.wlock.Lock()
    value := reflect.ValueOf(msg)
    @H_301_2@if value.Kind() == reflect.Interface || value.Kind() == reflect.Ptr {
        wheadMsg.StructName = value.Elem().Type().String()
    } @H_301_2@else {
        wheadMsg.StructName = value.Type().String()
    }
    err = self.enc.Encode(wheadMsg)
    @H_301_2@if err != nil {
        self.wlock.Unlock()
        @H_301_2@return
    }
    err = self.enc.EncodeValue(value)
    self.wlock.Unlock()
    @H_301_2@return
}

@H_301_2@func (self *gobConnection) Close() error {
    self.enc = nil
    self.dec = nil
    err := self.rwc.Close()
    gobPool.Put(self)
    @H_301_2@return err
}

@H_301_2@func (self *gobConnection) LocalAddr() net.Addr {
    @H_301_2@return self.rwc.LocalAddr()
}

@H_301_2@func (self *gobConnection) RemoteAddr() net.Addr {
    @H_301_2@return self.rwc.RemoteAddr()
}

/* 通过指定类型申请一个定长的内存. */

@H_301_2@var (
    lock   sync.Mutex
    ptrMap = make(@H_301_2@map[string]*sync.Pool)
)

@H_301_2@func getPointer(typ reflect.Type) reflect.Value {
    p,ok := ptrMap[typ.String()]
    @H_301_2@if ok {
        @H_301_2@if value,ok := p.Get().(reflect.Value); ok {
            @H_301_2@return value
        }
        @H_301_2@return reflect.New(typ)
    }
    lock.Lock()
    ptrMap[typ.String()] = new(sync.Pool)
    lock.Unlock()
    @H_301_2@return reflect.New(typ)
}

@H_301_2@func putPointer(value reflect.Value) {
    elem := value.Elem().Type()
    p,ok := ptrMap[elem.String()]
    @H_301_2@if !ok {
        lock.Lock()
        p = new(sync.Pool)
        ptrMap[elem.String()] = p
        lock.Unlock()
    }
    ClearData(elem.Size(),unsafe.Pointer(value.Pointer()))
    p.Put(value)
}

/* 使用此包进行数据发送之前必须将类型注册.否则接收放无法解包 */

@H_301_2@var (
    typeMap   = make(@H_301_2@map[string]reflect.Type)
    Errortype = errors.New("type not register")
)

@H_301_2@func GetMsgType(name string) (reflect.Type,error) {
    typ,ok := typeMap[name]
    @H_301_2@if ok {
        @H_301_2@return typ,nil
    }
    @H_301_2@return nil,Errortype
}

@H_301_2@func GetMsgAllType() []string {
    list := make([]string, 0,len(typeMap))
    @H_301_2@for name,_ := @H_301_2@range typeMap {
        list = append(list,name)
    }
    @H_301_2@return list
}

@H_301_2@func RegisterType(typ reflect.Type) {
    typeMap[typ.String()] = typ
}

@H_301_2@func DeleteType(name string) {
    delete(typeMap,name)
}

/* 清除固定长度的内存数据,使用方法是:指定内存开始地址和长度. 请勿随便使用.使用不当可能会清除有效数据 */

@H_301_2@func ClearData(size uintptr,ptr unsafe.Pointer) {
    @H_301_2@var temptr uintptr = uintptr(ptr)
    @H_301_2@var step uintptr = 1
    @H_301_2@for {
        @H_301_2@if size <= 0 {
            @H_301_2@break
        }
        @H_301_2@switch {
        @H_301_2@case 1 <= size && size < 8:
            step = 1
        @H_301_2@case 8 <= size && size < 32:
            step = 8
        @H_301_2@case 32 <= size && size < 64:
            step = 32
        @H_301_2@case size >= 64:
            step = 64
        }
        clearData(step,unsafe.Pointer(temptr))
        temptr += step
        size -= step
    }
}

@H_301_2@func clearData(size uintptr,ptr unsafe.Pointer) {
    @H_301_2@switch size {
    @H_301_2@case 1:
        *(*[1]byte)(ptr) = [1]byte{}
    @H_301_2@case 8:
        *(*[8]byte)(ptr) = [8]byte{}
    @H_301_2@case 32:
        *(*[32]byte)(ptr) = [32]byte{}
    @H_301_2@case 64:
        *(*[64]byte)(ptr) = [64]byte{}
    }
}

下面是使用小例子:

@H_301_2@package main

@H_301_2@import (
    "fmt"
    "gobconn"
    "net"
    "reflect"
    "time"
)

@H_301_2@type Info @H_301_2@struct {
    Name string
    Age  int
    Job  string
    Hob  []string
}

@H_301_2@type Test @H_301_2@struct {
    Date    int
    Login   string
    Path    string
    Servers float64
    List    []string
    Dir     string
    Stream  bool
}
//初始化要发送的类型
@H_301_2@func init() {
    @H_301_2@go InitListen("tcp",":2789")
    time.Sleep(1e9)
    gobconn.RegisterType(reflect.TypeOf(Info{}))
    gobconn.RegisterType(reflect.TypeOf(Test{}))
}
@H_301_2@func main() {
    Test_rw()
    now := time.Now().Unix()
    Benchmark_rw()
    fmt.Println(time.Now().Unix())
    fmt.Println(now)
}
@H_301_2@func Test_rw() {
    Dail("tcp","127.0.0.1:2789", 1)
}

@H_301_2@func Benchmark_rw() {
    Dail("tcp", 10000)
}
//创建tcp监听的端口
@H_301_2@func InitListen(proto,addr string) {
    lis,err := net.Listen(proto,addr)
    @H_301_2@if err != nil {
        fmt.Println("listen error,",err.Error())
        @H_301_2@return
    }
    @H_301_2@defer lis.Close()
    @H_301_2@for {
        conn,err := lis.Accept()
        @H_301_2@if err != nil {
            fmt.Println("接入错误:",err)
            @H_301_2@continue
        }
        @H_301_2@go handle(conn)
    }
}
//链接处理逻辑
@H_301_2@func handle(conn net.Conn) {
    con := gobconn.NewGobConnection(conn)
    @H_301_2@defer con.Close()
    @H_301_2@for {
        msg,err := con.Read()
        @H_301_2@if err != nil {
            fmt.Println(con.RemoteAddr())
            fmt.Println("服务端ReadError:",err)
            @H_301_2@return
        }
        err = con.Write(msg.Interface())
        @H_301_2@if err != nil {
            fmt.Println("服务端WriteError:",err)
            msg.Recovery()
            @H_301_2@return
        }
        msg.Recovery()
    }
}
//创建连接.
@H_301_2@func Dail(proto,addr string,count int) {
    con,err := net.Dial(proto,addr)
    @H_301_2@if err != nil {
        fmt.Println("客户端连接错误:",err)
        @H_301_2@return
    }
    conn := gobconn.NewGobConnection(con)
    @H_301_2@defer conn.Close()

    @H_301_2@for i := 0; i < count; i++ {
        err = conn.Write(Info{"testing", 25,"IT",[]string{"backetball","football"}})
        @H_301_2@if err != nil {
            fmt.Println("客户端WriteError:",err)
            @H_301_2@return
        }
        msg,err := conn.Read()
        @H_301_2@if err != nil {
            fmt.Println("客户端ReadError:",err)
            @H_301_2@return
        }
        fmt.Println(msg,msg.Interface())
        msg.Recovery()
    }
}

猜你在找的Go相关文章