第一章 RPCgolang源码分析
rpc服务器也就是在tcp服务器的基础上加上自定义的rpc协议而已。一个rpc协议里,主要有个3个非常重要的信息。
call参数,也就是发送给服务器的数据
1.1官方例子
首先看看官方给出的使用例子:看看rpc.Register到底做了什么事情!
typeArgsstruct{
A,Bint}
func(t*Arith)Multiply(args*Args,reply*int)error{*reply=args.A*args.Breturnnil
arith:=new(Arith)rpc.Register(arith)
找到regsiter主要函数如下所示:主要是把结构体加入到Server结构体中serverMap中,初始化工作,赋值方法等操作。
func(server*Server)register(rcvrinterface{},namestringuseNameboolerror{s:=new(service)。。。。。。。。。。。。。。。。server.serviceMap[s.name]=sreturnnil}
那么rpc服务器如何能够根据method去调用对应的方法呢?Go语言在这里其实采用反射的手段,虽然表面上是注册的是对象,实际却是通过反射取得了对象的所有方法,然后采用了map表保存了method到方法的映射
1.2结构体
typeServerstructmusync.RWMutex//protectstheserviceMapserviceMapmap[]*servicereqLocksync.Mutex//protectsfreeReqfreeReq*RequestrespLocksync.Mutex//protectsfreeRespfreeResp*Response}
servicenamestring//nameofservicercvrreflect.Value//receiverofmethodsfortheservicetypreflect.Type//typeofthereceivermethod[]*methodType//registeredmethods}
//AServerCodecimplementsreadingofRPCrequestsandwritingof//RPCresponsesfortheserversideofanRPCsession.//TheservercallsReadRequestHeaderandReadRequestBodyinpairs//toreadrequestsfromtheconnection,anditcallsWriteResponseto//writearesponseback.TheservercallsClosewhenfinishedwiththe//connection.ReadRequestBodymaybecalledwithanil//argumenttoforcethebodyoftherequesttobereadanddiscarded.
ServerCodecReadRequestHeader(*Request)ReadRequestBody({})//WriteResponsemustbesafeforconcurrentusebymultiplegoroutines.WriteResponse(*ResponseClose()}
ReadRequestHeader和ReadRequestBody一起使用从connection,nil读取body取出并丢弃.最终使用的都是这几个函数,读取和发送response
只需要实现ServerCodec这个接口,就可以自定义服务端的编码解码器,实现自定义的rpc协议了。Gorpc服务器端和客户端都是默认使用的Gob序列化协议数据。
1.3Server端处理请求
Accept(lisnet.Listener){forconnerr:=lis.iferr!=log.Print("rpc.Serve:accept:"err.Error())return}goserver.ServeConn(conn)Accept用来处理一个监听器,一直在监听客户端的连接,一旦监听器接收了一个连接,则还是交给ServeConn在另外一个goroutine中去处理:
//ServeConnrunstheserveronasingleconnection.//ServeConnblocks,servingtheconnectionuntiltheclienthangsup.//ThecallertypicallyinvokesServeConninagostatement.//ServeConnusesthegobwireformat(seepackagegob)onthe//connection.Touseanalternatecodec,useServeCodec.(connio.ReadWriteCloser){buf:=bufio.NewWritersrv:=&gobServerCodec{rwc:conndec:gob.NewDecoderenc:gob.NewEncoder(buf)encBuf:bufServeCodec(srv)}
//根据指定的codec进行协议解析
//ServeCodecislikeServeConnbutusesthespecifiedcodecto//decoderequestsandencoderesponses.(codecServerCodec){sending:=(sync.Mutex)servicemtypereqargvreplyvkeepReadingerr:=server.readRequest(codec)debugLog&&err!=io.EOF{Println"rpc:"err)!keepReading{break//sendaresponseifweactuallymanagedtoreadaheader.req!=sendResponse(sendinginvalidRequestcodecfreeRequest(req)continueservice.call(serversendingcodec)codec.解码request和编码response
funcreadRequestHeader(codecServerCodec)
(s*(server*Serversending*sync.Mutexmtype*methodTypereq*Requestreplyvreflect.ValuecodecServerCodec){mtype.Lockmtype.numCalls++Unlockfunction:=mtype.method.Func//Invokethemethod,providinganewvalueforthereply.returnValues:=function.Call([]reflect.Value{s.rcvrreplyv})//Thereturnvalueforthemethodisanerror.errInter:=returnValues0].Interfaceerrmsg:=""errInter!=errmsg=errInter.().replyv.errmsg)
returnValuesCall([]reflect.Value{s.rcvrargvreplyv})
原文链接:https://www.f2er.com/go/189310.html