golang http的按序号发送,按序号接收

前端之家收集整理的这篇文章主要介绍了golang http的按序号发送,按序号接收前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

应用场合:比如http请求,有先后次序,需要实现:先请求(request)先发送,并且读取(response)的时候也是遵循这个规则,这个读写构成一个pair(有请求并有返回)

过来,直接上代码吧:

func(cc*ClientConn)Do(req*http.Request)(resp*http.Response,errerror){
	err=cc.Write(req)//client向http服务器发送请求
	iferr!=nil{
		return
	}
	returncc.Read(req)//client读取http服务器返回回来数据
}

再仔细看看Write的时候干了什么:

func(cc*ClientConn)Write(req*http.Request)(errerror){

	//EnsureorderedexecutionofWrites
	//生成序号ID,保证ID=i比ID>i先执行
	id:=cc.pipe.Next()
	cc.pipe.StartRequest(id)//这个是重点
	deferfunc(){
		cc.pipe.EndRequest(id)//当前id执行完,去触发下一个id+1的请求执行
		iferr!=nil{
			cc.pipe.StartResponse(id)
			cc.pipe.EndResponse(id)
		}else{
			//Rememberthepipelineidofthisrequest
			cc.lk.Lock()
			cc.pipereq[req]=id//结束的时候保存这个,req的id,方便后面read时候继续按序
			cc.lk.Unlock()
		}
	}()

	cc.lk.Lock()//读写锁,防止执行冲突
	//判断read/write的错误信息,和net.Conn是否关闭,后面介绍cc的具体struct结构
	ifcc.re!=nil{//nopointsendingifread-sideclosedorbroken
		defercc.lk.Unlock()
		returncc.re
	}
	ifcc.we!=nil{
		defercc.lk.Unlock()
		returncc.we
	}
	ifcc.c==nil{//connectionclosedbyuserinthemeantime
		defercc.lk.Unlock()
		returnerrClosed
	}
	c:=cc.c
	ifreq.Close{
		//WewritetheEOFtothewrite-sideerror,becausethere
		//stillmightbesomepipelinedreads
		cc.we=ErrPersistEOF
	}
	cc.lk.Unlock()
//到这里才是具体执行写请求,所以前面都是保证按序请求的步骤
	err=cc.writeReq(req,c)
	cc.lk.Lock()
	defercc.lk.Unlock()
	iferr!=nil{
		cc.we=err
		returnerr
	}
	cc.nwritten++//次序++

	returnnil
}

再来看看cc的结构:

typeClientConnstruct{
	lksync.Mutex//读写锁
	cnet.Conn//golang连接interface
	r*bufio.Reader//bufReader
	re,weerror//read/writeerrors
	lastbodyio.ReadCloser//上一次ioReader
	nread,nwrittenint//读和写的个数
	pipereqmap[*http.Request]uint//保存pairrequest和id

	pipetextproto.Pipeline
	writeReqfunc(*http.Request,io.Writer)error//写数据匿名函数
}

typePipelinestruct{
	musync.Mutex
	iduint
	requestsequencer
	responsesequencer
}

typesequencerstruct{
	musync.Mutex
	iduint
	waitmap[uint]chanuint//就是用这个管道来阻塞没有到次序的操作
}

按序具体是怎么实现的:

//生成id的代码,批发次序
func(p*Pipeline)Next()uint{
	p.mu.Lock()
	id:=p.id
	p.id++
	p.mu.Unlock()
	returnid
}

//执行StartResponse,实际是执行sequencer的Start方法
func(p*Pipeline)StartResponse(iduint){
	p.response.Start(id)
}


func(s*sequencer)Start(iduint){
	s.mu.Lock()
	ifs.id==id{//到达当前id咯,可以执行咯,不需要阻塞
		s.mu.Unlock()
		return
	}
	c:=make(chanuint)
	ifs.wait==nil{
		s.wait=make(map[uint]chanuint)
	}
	s.wait[id]=c//在map里面记录chan
	s.mu.Unlock()
	<-c//读取阻塞,等待c的写入
}

//当然是前一个id执行结束的时候,后一个id触发阻塞解开的
func(s*sequencer)End(iduint){
	s.mu.Lock()
	ifs.id!=id{
		panic("outofsync")
	}
	id++//这里指向后一个id
	s.id=id
	ifs.wait==nil{
		s.wait=make(map[uint]chanuint)
	}
	c,ok:=s.wait[id]
	ifok{
		delete(s.wait,id)//删除这个map里面的chan
	}
	s.mu.Unlock()
	ifok{
		c<-1//往这个chan里面写数据,把阻塞解开
	}
}

同理,read的过程和write过程相似。可见,golang各种锁和管道保证并发环境下的顺序执行

猜你在找的Go相关文章