条件变量的作用并不是保证在同一时刻仅有一个线程访问某一个共享数据,而是在对应的共享数据的状态发生变化时,通知其他因此而被阻塞的线程。
三种操作方法
声明
func NewCond(l Locker) *Cond
示例
改造上一节的锁使用代码
Golang同步:锁的使用案例详解
传送门:http://www.jb51.cc/article/p-wtwupvia-ps.html
Read()方法改造如下:
func (df *myDataFile) Read() (rsn int64,d Data,err error){
// 读取并更新读偏移量
var offset int64
// 读互斥锁定
df.rmutex.Lock()
offset = df.roffset
// 更改偏移量,当前偏移量+数据块长度
df.roffset += int64(df.dataLen)
// 读互斥解锁
df.rmutex.Unlock()
//读取一个数据块,最后读取的数据块序列号
rsn = offset / int64(df.dataLen)
bytes := make([]byte,df.dataLen)
//读写锁:读锁定
df.fmutex.RLock()
defer df.fmutex.RUnlock()
for {
_,err = df.f.ReadAt(bytes,offset)
if err != nil {
if err == io.EOF {
//暂时放弃fmutex的 读锁,并等待通知的到来
df.rcond.Wait()
continue
}
}
return
}
d = bytes
return
}
Write()方法改造如下:
func (df *myDataFile) Write(d Data) (wsn int64,err error){
//读取并更新写的偏移量
var offset int64
df.wmutex.Lock()
offset = df.woffset
df.woffset += int64(df.dataLen)
df.wmutex.Unlock()
//写入一个数据块,最后写入数据块的序号
wsn = offset / int64(df.dataLen)
var bytes []byte
if len(d) > int(df.dataLen){
bytes = d[0:df.dataLen]
}else{
bytes = d
}
df.fmutex.Lock()
defer df.fmutex.Unlock()
_,err = df.f.Write(bytes)
//发送通知
df.rcond.Signal()
return
}
因为一个数据块只能有一个读操作读取,因此我们使用条件变量Signal方法通知某一个为此等待的Wait方法,唤醒一个相关的Goroutine。
还有一件事不能忘记,初始化rcond字段
func NewDataFile(path string,dataLen uint32) (DataFile,error){
//f,err := os.OpenFile(path,os.O_APPEND|os.O_RDWR|os.O_CREATE,0666)
f,err := os.Create(path)
if err != nil {
fmt.Println("Fail to find",f,"cServer start Failed")
return nil,err
}
if dataLen == 0 {
return nil,errors.New("Invalid data length!")
}
df := &myDataFile{
f : f,dataLen:dataLen,}
//创建一个可用的条件变量(初始化),返回一个*sync.Cond类型的结果值,我们就可以调用该值拥有的三个方法Wait,Signal,Broadcast
df.rcond = sync.NewCond(df.fmutex.RLocker())
return df,nil
}