应用场合:比如http请求,有先后次序,需要实现:先请求(request)先发送,并且读取(response)的时候也是遵循这个规则,这个读写构成一个pair(有请求并有返回)
过来,直接上代码吧:
func(cc*ClientConn)Do(req*http.Request)(resp*http.Response,errerror){ err=cc.Write(req)//client向http服务器发送请求 iferr!=nil{ return } returncc.Read(req)//client读取http服务器返回回来数据 }
再仔细看看Write的时候干了什么:
func(cc*ClientConn)Write(req*http.Request)(errerror){ //EnsureorderedexecutionofWrites //生成序号ID,保证ID=i比ID>i先执行 id:=cc.pipe.Next() cc.pipe.StartRequest(id)//这个是重点 deferfunc(){ cc.pipe.EndRequest(id)//当前id执行完,去触发下一个id+1的请求执行 iferr!=nil{ cc.pipe.StartResponse(id) cc.pipe.EndResponse(id) }else{ //Rememberthepipelineidofthisrequest cc.lk.Lock() cc.pipereq[req]=id//结束的时候保存这个,req的id,方便后面read时候继续按序 cc.lk.Unlock() } }() cc.lk.Lock()//读写锁,防止执行冲突 //判断read/write的错误信息,和net.Conn是否关闭,后面介绍cc的具体struct结构 ifcc.re!=nil{//nopointsendingifread-sideclosedorbroken defercc.lk.Unlock() returncc.re } ifcc.we!=nil{ defercc.lk.Unlock() returncc.we } ifcc.c==nil{//connectionclosedbyuserinthemeantime defercc.lk.Unlock() returnerrClosed } c:=cc.c ifreq.Close{ //WewritetheEOFtothewrite-sideerror,becausethere //stillmightbesomepipelinedreads cc.we=ErrPersistEOF } cc.lk.Unlock() //到这里才是具体执行写请求,所以前面都是保证按序请求的步骤 err=cc.writeReq(req,c) cc.lk.Lock() defercc.lk.Unlock() iferr!=nil{ cc.we=err returnerr } cc.nwritten++//次序++ returnnil }
再来看看cc的结构:
typeClientConnstruct{ lksync.Mutex//读写锁 cnet.Conn//golang连接interface r*bufio.Reader//bufReader re,weerror//read/writeerrors lastbodyio.ReadCloser//上一次ioReader nread,nwrittenint//读和写的个数 pipereqmap[*http.Request]uint//保存pairrequest和id pipetextproto.Pipeline writeReqfunc(*http.Request,io.Writer)error//写数据匿名函数 } typePipelinestruct{ musync.Mutex iduint requestsequencer responsesequencer } typesequencerstruct{ musync.Mutex iduint waitmap[uint]chanuint//就是用这个管道来阻塞没有到次序的操作 }
按序具体是怎么实现的:
//生成id的代码,批发次序 func(p*Pipeline)Next()uint{ p.mu.Lock() id:=p.id p.id++ p.mu.Unlock() returnid } //执行StartResponse,实际是执行sequencer的Start方法 func(p*Pipeline)StartResponse(iduint){ p.response.Start(id) } func(s*sequencer)Start(iduint){ s.mu.Lock() ifs.id==id{//到达当前id咯,可以执行咯,不需要阻塞 s.mu.Unlock() return } c:=make(chanuint) ifs.wait==nil{ s.wait=make(map[uint]chanuint) } s.wait[id]=c//在map里面记录chan s.mu.Unlock() <-c//读取阻塞,等待c的写入 } //当然是前一个id执行结束的时候,后一个id触发阻塞解开的 func(s*sequencer)End(iduint){ s.mu.Lock() ifs.id!=id{ panic("outofsync") } id++//这里指向后一个id s.id=id ifs.wait==nil{ s.wait=make(map[uint]chanuint) } c,ok:=s.wait[id] ifok{ delete(s.wait,id)//删除这个map里面的chan } s.mu.Unlock() ifok{ c<-1//往这个chan里面写数据,把阻塞解开 } }
同理,read的过程和write过程相似。可见,golang各种锁和管道保证并发环境下的顺序执行