@H_301_2@package gobconn
@H_301_2@import (
"encoding/gob"
"errors"
"net"
"reflect"
"sync"
"unsafe"
)
@H_301_2@type message @H_301_2@struct {
Type string
value reflect.Value
}
@H_301_2@func (self *message) Recovery() {
putPointer(self.value)
putMsg(self)
}
@H_301_2@func (self *message) Interface() @H_301_2@interface{} {
@H_301_2@return self.value.Interface()
}
/* 声明一个消息池用来重用对象 */
@H_301_2@var msgPool sync.Pool
@H_301_2@func getMsg() *message {
@H_301_2@if msg,ok := msgPool.Get().(*message); ok {
@H_301_2@return msg
}
@H_301_2@return new(message)
}
@H_301_2@func putMsg(msg *message) {
msgPool.Put(msg)
}
@H_301_2@type gobConnection @H_301_2@struct {
rwc net.Conn
enc *gob.Encoder
dec *gob.Decoder
rlock sync.Mutex
wlock sync.Mutex
}
@H_301_2@type GobConnection @H_301_2@interface {
Read() (msg *message,err error)
Write(msg @H_301_2@interface{}) (err error)
Close() error
LocalAddr() net.Addr
RemoteAddr() net.Addr
}
@H_301_2@var gobPool sync.Pool
@H_301_2@func NewGobConnection(conn net.Conn) GobConnection {
@H_301_2@if gcn,ok := gobPool.Get().(*gobConnection); ok {
gcn.rwc = conn
gcn.enc = gob.NewEncoder(conn)
gcn.dec = gob.NewDecoder(conn)
@H_301_2@return gcn
}
@H_301_2@return &gobConnection{rwc: conn,enc: gob.NewEncoder(conn),dec: gob.NewDecoder(conn)}
}
@H_301_2@type msgStruct @H_301_2@struct {
StructName string
}
@H_301_2@var (
rheadMsg = msgStruct{}
wheadMsg = msgStruct{}
)
@H_301_2@func (self *gobConnection) Read() (msg *message,err error) {
self.rlock.Lock()
@H_301_2@defer self.rlock.Unlock()
err = self.dec.Decode(&rheadMsg)
@H_301_2@if err != nil {
@H_301_2@return
}
@H_301_2@var typ reflect.Type
typ,err = GetMsgType(rheadMsg.StructName)
@H_301_2@if err != nil {
@H_301_2@return
}
msg = getMsg()
msg.Type = rheadMsg.StructName
@H_301_2@var value = getPointer(typ)
err = self.dec.DecodeValue(value)
@H_301_2@if err != nil {
msg.Recovery()
@H_301_2@return
}
msg.value = value
@H_301_2@return
}
@H_301_2@func (self *gobConnection) Write(msg @H_301_2@interface{}) (err error) {
self.wlock.Lock()
value := reflect.ValueOf(msg)
@H_301_2@if value.Kind() == reflect.Interface || value.Kind() == reflect.Ptr {
wheadMsg.StructName = value.Elem().Type().String()
} @H_301_2@else {
wheadMsg.StructName = value.Type().String()
}
err = self.enc.Encode(wheadMsg)
@H_301_2@if err != nil {
self.wlock.Unlock()
@H_301_2@return
}
err = self.enc.EncodeValue(value)
self.wlock.Unlock()
@H_301_2@return
}
@H_301_2@func (self *gobConnection) Close() error {
self.enc = nil
self.dec = nil
err := self.rwc.Close()
gobPool.Put(self)
@H_301_2@return err
}
@H_301_2@func (self *gobConnection) LocalAddr() net.Addr {
@H_301_2@return self.rwc.LocalAddr()
}
@H_301_2@func (self *gobConnection) RemoteAddr() net.Addr {
@H_301_2@return self.rwc.RemoteAddr()
}
/* 通过指定类型申请一个定长的内存. */
@H_301_2@var (
lock sync.Mutex
ptrMap = make(@H_301_2@map[string]*sync.Pool)
)
@H_301_2@func getPointer(typ reflect.Type) reflect.Value {
p,ok := ptrMap[typ.String()]
@H_301_2@if ok {
@H_301_2@if value,ok := p.Get().(reflect.Value); ok {
@H_301_2@return value
}
@H_301_2@return reflect.New(typ)
}
lock.Lock()
ptrMap[typ.String()] = new(sync.Pool)
lock.Unlock()
@H_301_2@return reflect.New(typ)
}
@H_301_2@func putPointer(value reflect.Value) {
elem := value.Elem().Type()
p,ok := ptrMap[elem.String()]
@H_301_2@if !ok {
lock.Lock()
p = new(sync.Pool)
ptrMap[elem.String()] = p
lock.Unlock()
}
ClearData(elem.Size(),unsafe.Pointer(value.Pointer()))
p.Put(value)
}
/* 使用此包进行数据发送之前必须将类型注册.否则接收放无法解包 */
@H_301_2@var (
typeMap = make(@H_301_2@map[string]reflect.Type)
Errortype = errors.New("type not register")
)
@H_301_2@func GetMsgType(name string) (reflect.Type,error) {
typ,ok := typeMap[name]
@H_301_2@if ok {
@H_301_2@return typ,nil
}
@H_301_2@return nil,Errortype
}
@H_301_2@func GetMsgAllType() []string {
list := make([]string, 0,len(typeMap))
@H_301_2@for name,_ := @H_301_2@range typeMap {
list = append(list,name)
}
@H_301_2@return list
}
@H_301_2@func RegisterType(typ reflect.Type) {
typeMap[typ.String()] = typ
}
@H_301_2@func DeleteType(name string) {
delete(typeMap,name)
}
/* 清除固定长度的内存数据,使用方法是:指定内存开始地址和长度. 请勿随便使用.使用不当可能会清除有效数据 */
@H_301_2@func ClearData(size uintptr,ptr unsafe.Pointer) {
@H_301_2@var temptr uintptr = uintptr(ptr)
@H_301_2@var step uintptr = 1
@H_301_2@for {
@H_301_2@if size <= 0 {
@H_301_2@break
}
@H_301_2@switch {
@H_301_2@case 1 <= size && size < 8:
step = 1
@H_301_2@case 8 <= size && size < 32:
step = 8
@H_301_2@case 32 <= size && size < 64:
step = 32
@H_301_2@case size >= 64:
step = 64
}
clearData(step,unsafe.Pointer(temptr))
temptr += step
size -= step
}
}
@H_301_2@func clearData(size uintptr,ptr unsafe.Pointer) {
@H_301_2@switch size {
@H_301_2@case 1:
*(*[1]byte)(ptr) = [1]byte{}
@H_301_2@case 8:
*(*[8]byte)(ptr) = [8]byte{}
@H_301_2@case 32:
*(*[32]byte)(ptr) = [32]byte{}
@H_301_2@case 64:
*(*[64]byte)(ptr) = [64]byte{}
}
}
下面是使用小例子:
@H_301_2@package main
@H_301_2@import (
"fmt"
"gobconn"
"net"
"reflect"
"time"
)
@H_301_2@type Info @H_301_2@struct {
Name string
Age int
Job string
Hob []string
}
@H_301_2@type Test @H_301_2@struct {
Date int
Login string
Path string
Servers float64
List []string
Dir string
Stream bool
}
//初始化要发送的类型
@H_301_2@func init() {
@H_301_2@go InitListen("tcp",":2789")
time.Sleep(1e9)
gobconn.RegisterType(reflect.TypeOf(Info{}))
gobconn.RegisterType(reflect.TypeOf(Test{}))
}
@H_301_2@func main() {
Test_rw()
now := time.Now().Unix()
Benchmark_rw()
fmt.Println(time.Now().Unix())
fmt.Println(now)
}
@H_301_2@func Test_rw() {
Dail("tcp","127.0.0.1:2789", 1)
}
@H_301_2@func Benchmark_rw() {
Dail("tcp", 10000)
}
//创建tcp监听的端口
@H_301_2@func InitListen(proto,addr string) {
lis,err := net.Listen(proto,addr)
@H_301_2@if err != nil {
fmt.Println("listen error,",err.Error())
@H_301_2@return
}
@H_301_2@defer lis.Close()
@H_301_2@for {
conn,err := lis.Accept()
@H_301_2@if err != nil {
fmt.Println("接入错误:",err)
@H_301_2@continue
}
@H_301_2@go handle(conn)
}
}
//链接处理逻辑
@H_301_2@func handle(conn net.Conn) {
con := gobconn.NewGobConnection(conn)
@H_301_2@defer con.Close()
@H_301_2@for {
msg,err := con.Read()
@H_301_2@if err != nil {
fmt.Println(con.RemoteAddr())
fmt.Println("服务端ReadError:",err)
@H_301_2@return
}
err = con.Write(msg.Interface())
@H_301_2@if err != nil {
fmt.Println("服务端WriteError:",err)
msg.Recovery()
@H_301_2@return
}
msg.Recovery()
}
}
//创建连接.
@H_301_2@func Dail(proto,addr string,count int) {
con,err := net.Dial(proto,addr)
@H_301_2@if err != nil {
fmt.Println("客户端连接错误:",err)
@H_301_2@return
}
conn := gobconn.NewGobConnection(con)
@H_301_2@defer conn.Close()
@H_301_2@for i := 0; i < count; i++ {
err = conn.Write(Info{"testing", 25,"IT",[]string{"backetball","football"}})
@H_301_2@if err != nil {
fmt.Println("客户端WriteError:",err)
@H_301_2@return
}
msg,err := conn.Read()
@H_301_2@if err != nil {
fmt.Println("客户端ReadError:",err)
@H_301_2@return
}
fmt.Println(msg,msg.Interface())
msg.Recovery()
}
}