channel用于goroutine之间的通讯. 其内部实现了同步,确保并发安全,多个goroutine同时访问,不需要加锁.
对channel操作行为做如下总结:
1. ch <- : 写入channel
2. ch -> :读出channel
3. clouse : 关闭channel
golang 中大部分类型都是值类型,只有 slice / channel / map 是引用类型
一. channel的语法
1. channel的定义方法
var c chan int //定义了一个int类型的chan 此时c=nil
定义一个有初始值的channel.
c := make(chan int)
2. channel是goroutine之间的通讯. 所有有一个goroutine发数据,就要有一个goroutine收数据
package main import ( "fmt" time" ) func main() { c := make(chan int) go func() { for {
// 取出channel n := <- c fmt.Println(n) } }() // 放入channel c <- 1 c <- 2 // 让主进程停留1s,保证channel中的数据全部取出后,主协程再关闭 time.Sleep(time.Second) }
有一个goroutine发数据,必须有一个goroutine收数据,不然就不能放进去了,会报异常deadlock
fatal error: all goroutines are asleep - deadlock!
3. 函数是一等公民,channel也是,channel也能作为参数,也能作为返回值
a. channel作为参数传入
) chan也可以作为参数传入. 也可以作为返回值 func getChan(c chan int) { { n := <- c fmt.Println(n) } } func chanDemo () { c := make(chan ) go getChan(c)
c <- 1 c <- 2 关闭
time.Sleep(time.Second)
}
func main() {
chanDemo()
}
b. channel用作数组
chan也可以作为参数传入. 也可以作为返回值 func worker(i int,c chan ) { c fmt.Printf(number: %d,worker,%c \n,i,n) } } func chanDemo () { 开了10个协程,让10个协程分别取各自的数据 var channel [10]chan int for i := 0; i<10; i++ { channel[i] = make(chan ) go func(c chan ) { worker(00; i < { channel[i] <- 'a' + i } A关闭 time.Sleep(time.Second) } func main() { chanDemo() }
分配一个管道数组,里面有10个管道. 向每个管道里放两个数据. 打印.
c. channel作为返回值
chan也可以作为参数传入. 也可以作为返回值 func createWorker(i int) chan int { c := make(chan ) go func() { { n := <- c fmt.Printf(return c } func chanDemo () { var channel [10]chan int { channel[i] = createWorker(i) } time.Sleep(time.Second) } func main() { chanDemo() }
4 给channel定义方向.
channel可以收数据也可以发数据. 那么我们返回的channel到底是收数据的还是发数据的呢? 我们可以告诉调用者.
chan<- int) go func() { // goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的 return c } func chanDemo () { var channel [10]chan<- int { channel[i] = createWorker(i) } time.Sleep(time.Second) } func main() { chanDemo() }
5. bufferedChannel 带有缓冲区的channel
我们知道如果创建了一个channel,往里面放了数据,但是没有人接收,那么就会deadlock死锁. 也就是必须要有人立刻能够接收走. 这就是要求,要求立刻被收走. 如果没有,就报错. 我们可以给他一个缓冲,让他在几个范围内可以不被立刻收走,给收数据方一个缓冲的时间
func bufferedChannel() { c := make(chan int,3) c <- 23 c <- 4 } func main() { bufferedChannel()
上面的demo,定义了带有三个缓冲的channel. 里面3个以内数据不会报deadlock. 超过三个还没有被取走,才会包deadlock
) func worker(i int,c chan int) { for { n := <- c fmt.Printf("number: %d,%d \n",n) } } func bufferedChannel() { c := make(chan 3) go worker(0 time.Sleep(time.Second) } func main() { bufferedChannel() }
6. channel Close()
- channel 发完了,我没有数据可以再发了,是可以close的. channel 的close是有发数据方close.
- channel的发送方close了,但是接收方依然是可以接收到数据的. 接收数据的返回值跟channel的类型有关系
- 接收方可以通过ok判断是否接收到数据了
) func worker(i goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的 { n,ok := <- c if !ok { // 如果没有数据了,就返回 break } fmt.Printf( channel发数据方,数据发完了,是可以close的 func channelClose() { c := make(chan ) go worker('bcd' 调用close 告诉接收方,数据已经发完了. close(c) time.Sleep(time.Second) } func main() { bufferedChannel() channelClose() }
另一种判断是否有数据的方法,使用range来判断.
close(c) time.Sleep(time.Second) } func main() { channelClose() }
二. Channel的应用: 不要通过共享内存来通信,通过通信来共享内存
通常,比如java是通过共享内存来通信. 比如定义一个公共的flag,这个flag就是共享的一块内存空间. 他的值变了,通知调用者. 这就是通过共享内存来通信.
使用共享内存的话在多线程的场景下为了处理竞态,需要加锁,使用起来比较麻烦。另外使用过多的锁,容易使得程序的代码逻辑坚涩难懂,并且容易使程序死锁,死锁了以后排查问题相当困难,特别是很多锁同时存在的时候。
go语言的channel保证同一个时间只有一个goroutine能够访问里面的数据,为开发者提供了一种优雅简单的工具,所以go原生的做法就是使用channle来通信,而不是使用共享内存来通信。
下面来感受一下go的通过通信来共享内存.
三. 使用channel等待任务结束
我们在上面的demo中,都会有一句话
time.Sleep(time.Second)
让主程序休眠1秒钟. 否则,协程还没执行完,主程序就退出了
再来分析,等待的目的是什么呢? 那就是等协程都执行完了, 主程序再退出. 换个思路,我不用time.sleep,能不能协程执行完了,主动告诉主线程,我执行完了,退出吧呢?
可以的.
1. 使用通信来共享内存,用channel实现.
go的原则: 不要通过共享内存来通信,通信来共享内存.
go中通信用什么呢? 使用channel
定义一个bool类型的chan,用来和外部通信. 事情做完了,给外面回复一个done func work(i for n := range w.c { fmt.Printf( w.done <- true } } type worker struct { c chan int // 传递字母的管道 done chan bool // 标记字母传递完成的管道 } w = worker{ c : make(chan int),done : make(chan bool),} go work(i,w) w } func chanDemo () { var wo [10]worker
// 第一步,第二步 wo[i] createWorker(i,wo[i]) } // 第三步 for i := 0; i < 10; i++ { wo[i].c <- 'a' + i // 这里有一个通道,一直等着. 等着取数据. <- wo[i].done } { wo[i].c <- i <- wo[i].done } time.Sleep(time.Second) } func main() { chanDemo() }
分析:
1. 定义了一个worker类型,里面有两个管道. 第一个管道是用来传输字母的. 第二个管道用来标记传送行为是否完成
2. 接下来看chanDemo,chanDemo是一个主goroutine. 在这里:
第一步: 创建了10个工作者.
第二步: 开了10个goroutine 用来传输字母. 开goroutine调用createWorker,然后worker的工作是work
第三步: for循环为每一个goroutine添加字母. 然后等待.......直到对应的done channel完成,取出结果. 继续往下执行.
{ wo[i].c <- i 这里有一个通道,一直等着. 等着取数据. <- wo[i].done }
放数据: wo[i].c <- 'a' + i . 然后就等待......等待到什么时候呢? wo[i].done中有数据可以取出.
这就让主goroutine保证了10个goroutine都执行完以后,在继续往后执行.
这解释了为什么goroutine可以让主goroutine等待的原因
输出结果
number: 4567899,J
这样打印的结果是按照顺序执行的. 一个goroutine执行完了,才能往另一个goroutine中放数据,这样效率太低了. 我们换一种方式,让他不停的打印. 最后一起等待执行完成.
go func() { w.done <- true }() } } type worker struct { c chan int 传递字母的管道 done chan bool 标记字母传递完成的管道 } worker{ c : make(chan ),done : make(chan boolvar work []worker { work[i] = createWorker(i,work[i]) } for i,w := range work { w.c <- i } for _,w := range work { 放了两次,所以要等待两次都处理完,在执行后面的结果 <-w.done <-w.done } chanDemo() }
这里有两个变化,
1. 我在取结果done的时候,没有在线等执行完. 而是,你们去执行吧,我最后来收结果,收的顺序也是从1-9的顺序收的. 每个goroutine要有两个结果.
2. 在work具体执行完成的地方,要定义一个协程. 这样程序才能正常运行,否则会报deadline异常. 为什么会报异常呢?
因为,有两次放数据,第一次放完了,往管道done里写了一个数据,结果没有被收走,又放了一次..... 所以就发生死锁了.
新开一个goroutine,让他并行的发done. 就可以了
疑问: 为什么定义成goroutine,他就不会deadline了呢? goroutine还有什么其他的含义?
比如下面这段程序
func main() { c := make(chan ) c <- }
这么写汇报deadline,因为管道只有发送方,没有接收方. 要求必须既有发送方又有接收方
但是这么写就没问题:
func main() { c := make(chan ) go func() { c <- c <- }() }
为啥呢?
经过一番研究过终于明白了,看下面这段程序
func main() { bufferedChannel() channelClose() c := make(chan ) c <- 0 go func() { log.Info("11") c <- 1 log.Info("22") c <- 2 log.Info("33") c <- 3 log.Info("44") c <- 4 }() log.Info("1,",<- c) log.Info("2,<- c ) log.Info("3,1)">log.Info("4,<- c ) time.Sleep(time.Second) }
你运行执行一下,看看结果,只打印出了11
[2020/02/21 07:09:58] channelDemo.go:83 [Info] 11 Process finished with exit code 0
原来goroutine中的代码一直在等待,知道有人要收数据了,他才会发
c <- 0 go func() { log.Info(11) c <- log.Info(223344 }() log.Info("1,<- c) time.Sleep(time.Second) }
这时的打印结果是
[12:08] channelDemo.go: [85 [Info] 2293 [Info] 1,1)">1
看到了吧,有人收,goroutine才会发,收几个,发几个. 其他的保留待发.
2. 使用WaitGroup等待任务的结束
sync. WaitGroup()是系统自带的一个等待任务全部完成的工具
- Add: 添加的任务个数
- Wait: 等待全部goroutine完成
- Done: 某一个goroutine完成
我们第一步: 定义一个WaitGroup
开启了一个等待任务,我们通过waitGroup来等待任务的完成 var wg sync.WaitGroup
第二步: 添加20个任务,因为我们知道有20个任务就直接添加20就好了,如果不知道,可以在for循环里一个个添加
wg.Add(20)
第三步: 等待20个任务全部结束
wg.Wait()
第四步: 完成了一个任务,就标记他为done
代码如下:
sync w.wg.Done()
}()
}
}
type worker int 传递字母的管道
wg *sync.WaitGroup // 标使用waitgroup来标记同步完成
sync.WaitGroup) worker {
w :=]worker
var wg sync.WaitGroup
work[i] = createWorker(i,&wg)
}
wg.Add(20 i
}
wg.Wait()
chanDemo()
}
3. 在go里面函数是一等公民,其实等待任务结束的过程中,或者结束时要做很多事情. 我们要是定义成某一个参数,那就只能接收这个参数了,其他都参数不行.
如何能够达到扩展的目的呢?定义成函数
w.wg()
}
}
type worker wg func() func() {
fmt.Println("事情1")
wg.Done()
fmt.Println("事情2")
},
}
go work(i,我们通过waitGroup来等待任务的完成
var wg sync.WaitGroup
{
work[i] = createWorker(i,&wg)
}
wg.Add( i
}
wg.Wait()
chanDemo()
}
四. 使用channel进行树的遍历
这里使用channel进行树的遍历,是对channel的一个应用.
之前我们对树遍历后处理使用的是函数.我们也可以用channel
比如: 查找树中最大的值
第一步: 循环遍历获取树,然后将所有树节点放入到channel中. 返回一个管道
第二步: 从管道中取出树的节点,进行计算
第三步: 在第一步中,把所有节点都添加到管道中以后,一定要close
定一个管道,循环遍历,把遍历后的节点添加到管道中
func (n *TreeNode) TraveresForChannel() chan *TreeNode {
out := make(chan *TreeNode)
go func() {
n.TraveresFunc(func(node *TreeNode) {
out <- node
})
close(out)
}()
return
}
从管道中取出所有节点,取最大值
max := 0
for c := range root.TraveresForChannel() {
if c.Value > max {
max = c.Value
}
}
fmt.Println(max)
这个逻辑比较清晰.
五. 用select进行调度
1. 首先,我们来从官方文档看一下有关select的描述:
A select" statement chooses which of a set of possible send or receive operations will proceed.
It looks similar to a switch statement but with the cases all referring to communication operations.
一个select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。
或者换一种说法,select就是用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。
比如: 现在有两个channel A 和B,我要从A 和 B中取值,谁先来,就取出谁. 怎么做呢?
package main
import
func main() {
var A,B chan int
select {
case n := <- A:
fmt.Println(select from A: case n := <- B:
fmt.Println(select from A:default:
fmt.Println(select from default)
}
}
这里定义了A和B两个channel,两个channel都是nil. 下面通过select来选择执行,会走一个默认的default.
输出结果:
select from default
这里A和B都不会有输出,所以,就走默认的default,这是非阻塞的方式输出内容. channel是阻塞的,如果实现非阻塞的呢? 那就是使用select.....default实现
如果没有default又不断的从A和B中取值,就会deadlock
B:
fmt.Println(
我们生成一个channelA和B, 让A和B不停的产生数据,然后看看select中是否能取出来
math/rand 生成A 和 B
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
// 随机的在1.5s以内休眠
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i ++
}
}()
return out
}
func main() {
generator(),generator()
不停的从A和B中收数据
{
{
A:
fmt.Println(B:
fmt.Println(select from B:随机生成的,数据是交叉执行的.
谁有数据,就取出谁
两个都有,随机取一个.
如果有default会一直执行default
生成的A和B,取出来的值,交给工作者,让工作者打印出来. 工作者是个管道,他会一直等着,有工作来了,就工作,没有的时候,等待
生成A 和 B
func generator() chan out := make(chan )
go func() {
i := 0
{
随机的在1.5s以内休眠
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
i
i ++
}
}()
}
// 工作者做的具体的工作内容--工作内容,从管道里不停的取数据
func doWork(i int,c chan int) {
for cc := range c {
fmt.Printf("取出的i:%d,取出的值时:%d\n",cc)
}
}
// 创建工作者
func createWorker(i int) chan int {
in := make(chan int)
// 开始工作
go doWork(i,in)
return inA:
w <- n
B:
w <- n
}
}
}
有一个生成数据的管道,然后从生成数据的管道里取出数据,将其交给工作者,让工作者开始工作
goroutine是非抢占式的,一个通道打开了会一直占有,直到主动释放. 这里有个问题: 那就是管道取出的一直是第一个的
下面我们在select中既可以向管道中存数据,又可以取数据
}
工作者做的具体的工作内容--工作内容,从管道里不停的取数据
func doWork(i for cc := range c {
fmt.Printf(取出的i:%d,取出的值时:%d\n 创建工作者
func createWorker(i int) chan in := make(chan 开始工作
go doWork(i,in
}
func main() {
)
n :=
hasValue := false
var activeWorker chan int
if hasValue {
activeWorker = w
}
select {
case n = <-A:
hasValue = true
case n = <-B:
hasValue = true
case activeWorker <- n:
hasValue = false
}
}
}
结果:
依然有问题: 这里生成数据的速度是在1.5秒以内随机的. 如果生成数据的速度快,消费数据的速度慢. 那么,就有可能有数据打印不出来. 被跳过了
我们让工作者取数据的速度慢一些
range c {
time.Sleep(time.Second * 5)
fmt.Printf(输出结果:
取出的i:0,取出的值时:
取出的i:122729364856
速度一慢下来,我们发现就丢数据了,有些数据被跳过了. 那怎么办呢? 我们用一个数组来接收
range c {
time.Sleep(time.Second * )
fmt.Printf()
// 不停的从A和B中收数据
var values []int
{
var activeWorker chan int
var activeValue int
if len(values) > 0 {
activeValue = values[0]
activeWorker = w
}
case n := <-A: 生成数据的channel
values = append(values,1)">case n := <-B: case activeWorker <- activeValue: 消费数据的channel
/*
* 取出saveData中的第一个数据,放入到channel中
* 将第一个数据删除
*/
values = values[1:]
}
}
}
这样就不会丢数据了,5秒打印一个数据. 那么我们想知道,现在管道里积压了多少数据,怎么处理呢?
做这件事之前,我们先来做一件事,这个程序是一直运行的,他不会停,我们设定一个10秒钟. 让程序10秒以后自动退出
使用time.After(10)
var values []int
// time.After返回的是一个管道. 也就是10秒以后,往管道里放一个数据
tm := time.After(10 * time.Second)
var activeValue if len(values) > {
activeValue = values[]
activeWorker = w
}
消费数据的channel
/*
* 取出saveData中的第一个数据,放入到channel中
* 将第一个数据删除
*/
values = values[:]
case <- tm: // 如果从管道中取出来值. 那么执行这个case
fmt.Println("bye")
return
}
}
}
10后自动结束了
接下来的一个需求,如果数据生成的速度太慢了,怎么办呢? 我们增加一个判断,如果数据生成的速度<800毫秒,打印一个timeout
time.After返回的是一个管道. 也就是10秒以后,往管道里放一个数据
tm := time.After(10 * time.Second)
:]
case <- time.After(800 * time.Millisecond): // 连续打印的两个数据之间时间间隔>800毫秒
fmt.Println("timeout")
case <- tm: 如果从管道中取出来值. 那么执行这个case
fmt.Println(bye)
return
/**
* 上面这两个case有什么区别呢?
* tm: 整体select运行的时间,超过10秒退出
* time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
*/
}
}
}
打印结果
2. 定时器的使用
我们最后来做这件事: 每秒钟打印出已经积压的数据,使用time.tick(1s) 这是一个定时器. 返回的也是一个channel,1秒钟往channel中放一个数据
time.Second)
// 定义一个定时器,每秒钟执行定时任务
tick := time.Tick(time.Second)
:]
case <- time.After(800 * time.Millisecond): 连续打印的两个数据之间时间间隔>800毫秒
fmt.Println(timeout)
return
*
* 上面这两个case有什么区别呢?
* tm: 整体select运行的时间,超过10秒退出
* time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
*/
case <- tick:
fmt.Println("积压数据个数:"输出结果:
感觉select不是很好理解,还需要在查询资料,学习一遍
完整代码
range c {
time.Sleep(time.Second )
fmt.Printf( 定义一个定时器,每秒钟执行定时任务
tick := time.Tick(time.Second)
*/
case <- tick:
fmt.Println(积压数据个数:go通过通信来共享内存,而不是通过共享内存来通信. go通过channel实现了这样的一个特性
看上面的demo. 我们定义了两个channel来存入数据,定义了一个channel来取出数据. 然后对数据进行处理. 在通信的过程中进行了存和取数据的过程. 这就是通过通信来共享内存.
六. 传统的同步机制
go建议通过通信来共享内存,但go本身也是支持传统的同步机制的,比如锁,
go使用csp模型来实现同步,建议少使用锁来同步,锁是通过共享内存来通讯. 但也是可以用的.
下面我们来看如何使用锁.
1. Mutex
我们定义两个同时存数据的场景,一个取数据的场景. 并发进行,先不加锁,看看有什么问题?
定义一个自定义的类型AtomicInt
type AtomicInt int
增加方法
func (a *AtomicInt) increase() {
*a ++ 取数据的方法
func (a *AtomicInt) get() {
int(*a)
}
func main() {
a AtomicInt
存数据
a.increase()
定义了一个单独的协程去存数据,这样就有两个协程同时存数据
go func() {
a.increase()
}()
time.Sleep(time.Second)
取数据---这时取的是1还是2 呢?
fmt.Println(a.get())
}
这个程序短期看运行不会有什么问题,但这里确实是有冲突的. 我们用-race来看一下
go run -race AutomicInt.go
可以看出发生冲突了,什么冲突呢? 在写数据之前,发现有个地方在读数据. 阿欧....这可不好. 有同步问题.
下面我们使用Mutex来加锁
定义一个自定义的类型AtomicInt
type AtomicInt struct {
value int
lock sync.Mutex
}
AtomicInt) increase() {
a.lock.Lock()
defer a.lock.Unlock()
a.value ++
}
{
lock.Lock()
defer a.lock.Unlock()
(a.value)
}
func main() {
())
}
其实这了展示了lock的用法还是挺简单的. 在结构体里,就是对结构体中的数据进行加锁. 加在哪里,就是对谁加的.
这一章学完了,感受是: 挺难的,尤其是select. 又要通过管道读数据,又要通过管道写数据,挺费事. 再学一遍. 眼睛透彻了
参考资料:
1. https://www.cnblogs.com/tobycnblogs/p/9935465.html
2.
猜你在找的Go相关文章