第十章 Channel--第四天 完结

前端之家收集整理的这篇文章主要介绍了第十章 Channel--第四天 完结前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

channel用于goroutine之间的通讯. 其内部实现了同步,确保并发安全,多个goroutine同时访问,不需要加锁.

对channel操作行为做如下总结:

1. ch <- : 写入channel

2. ch -> :读出channel

3. clouse : 关闭channel

 

golang 中大部分类型都是值类型,只有 slice / channel / map 是引用类型

 

一. channel的语法

1. channel的定义方法

var c chan int //定义了一个int类型的chan  此时c=nil

定义一个有初始值的channel.

c := make(chan int)

  

2. channel是goroutine之间的通讯. 所有有一个goroutine发数据,就要有一个goroutine收数据

package main

import (
    "fmt"
    time"
)

func main() {

    c := make(chan int)

    go func() {
        for  {
       // 取出channel n :
= <- c fmt.Println(n) } }() // 放入channel c <- 1 c <- 2 // 让主进程停留1s,保证channel中的数据全部取出后,主协程再关闭 time.Sleep(time.Second) }

有一个goroutine发数据,必须有一个goroutine收数据,不然就不能放进去了,会报异常deadlock

fatal error: all goroutines are asleep - deadlock!

 

3. 函数是一等公民,channel也是,channel也能作为参数,也能作为返回值

a. channel作为参数传入


)

 chan也可以作为参数传入. 也可以作为返回值
func getChan(c chan int) {
      {
        n := <- c
        fmt.Println(n)
    }
}

func chanDemo () {
    c := make(chan )

    go getChan(c)
    c <- 1 c <- 2 关闭  
time.Sleep(time.Second)
}

func main() {
  chanDemo()
}

b. channel用作数组

 chan也可以作为参数传入. 也可以作为返回值
func worker(i int,c chan ) {
     c
        fmt.Printf(number: %d,worker,%c \n,i,n)
    }
}

func chanDemo () {
     开了10个协程,让10个协程分别取各自的数据
    var channel [10]chan int
    for i := 0; i<10; i++ {
        channel[i] = make(chan )

        go func(c chan  ) {
            worker(00; i <  {
        channel[i] <- 'a' + i
    }

    A关闭
    time.Sleep(time.Second)
}


func main() {
    chanDemo()
}

分配一个管道数组,里面有10个管道. 向每个管道里放两个数据. 打印. 

 c. channel作为返回值

 chan也可以作为参数传入. 也可以作为返回值
func createWorker(i int) chan int {
    c := make(chan )
    go func() {
          {
            n := <- c
            fmt.Printf(return c
}


func chanDemo () {
    var channel [10]chan int
     {
        channel[i] = createWorker(i)
    }

        time.Sleep(time.Second)
}


func main() {
    chanDemo()
}

 

4 给channel定义方向.

channel可以收数据也可以发数据. 那么我们返回的channel到底是收数据的还是发数据的呢? 我们可以告诉调用者. 

 chan<- int)
    go func() {
        // goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的
        return c
}


func chanDemo () {
    var channel [10]chan<- int
     {
        channel[i] = createWorker(i)
    }

        time.Sleep(time.Second)
}


func main() {
    chanDemo()
}

 

5. bufferedChannel 带有缓冲区的channel

我们知道如果创建了一个channel,往里面放了数据,但是没有人接收,那么就会deadlock死锁. 也就是必须要有人立刻能够接收走. 这就是要求,要求立刻被收走. 如果没有,就报错. 我们可以给他一个缓冲,让他在几个范围内可以不被立刻收走,给收数据方一个缓冲的时间

func bufferedChannel() {
    c := make(chan int,3)

    c <- 23

    c <- 4
}
func main() {
    bufferedChannel()

上面的demo,定义了带有三个缓冲的channel. 里面3个以内数据不会报deadlock. 超过三个还没有被取走,才会包deadlock


)

func worker(i int,c chan int) {
    for  {
        n := <- c
        fmt.Printf("number: %d,%d \n",n)
    }
}

func bufferedChannel() {
    c := make(chan 3)

    go worker(0

    time.Sleep(time.Second)
}

func main() {
    bufferedChannel()
}

6. channel Close()

  • channel 发完了,我没有数据可以再发了,是可以close的. channel 的close是有发数据方close.
  • channel的发送方close了,但是接收方依然是可以接收到数据的. 接收数据的返回值跟channel的类型有关系
  • 接收方可以通过ok判断是否接收到数据了

)

func worker(i  goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的
      {
        n,ok := <- c
        if !ok {
            // 如果没有数据了,就返回
            break
        }
        fmt.Printf( channel发数据方,数据发完了,是可以close的
func channelClose() {
    c := make(chan )

    go worker('bcd'
     调用close 告诉接收方,数据已经发完了.
    close(c)

    time.Sleep(time.Second)

}
func main() {
    bufferedChannel()
    channelClose()
}

另一种判断是否有数据的方法,使用range来判断.

    close(c)

    time.Sleep(time.Second)

}
func main() {
        channelClose()
}

 

 

二. Channel的应用: 不要通过共享内存来通信,通过通信来共享内存

通常,比如java是通过共享内存来通信. 比如定义一个公共的flag,这个flag就是共享的一块内存空间. 他的值变了,通知调用者. 这就是通过共享内存来通信. 

 

使用共享内存的话在多线程的场景下为了处理竞态,需要加锁,使用起来比较麻烦。另外使用过多的锁,容易使得程序的代码逻辑坚涩难懂,并且容易使程序死锁,死锁了以后排查问题相当困难,特别是很多锁同时存在的时候。

go语言的channel保证同一个时间只有一个goroutine能够访问里面的数据,为开发者提供了一种优雅简单的工具,所以go原生的做法就是使用channle来通信,而不是使用共享内存来通信。

 

下面来感受一下go的通过通信来共享内存. 

三.  使用channel等待任务结束

我们在上面的demo中,都会有一句话

time.Sleep(time.Second)

让主程序休眠1秒钟.  否则,协程还没执行完,主程序就退出

再来分析,等待的目的是什么呢? 那就是等协程都执行完了,  主程序再退出. 换个思路,我不用time.sleep,能不能协程执行完了,主动告诉主线程,我执行完了,退出吧呢?

可以的. 

1. 使用通信来共享内存,用channel实现.

go的原则: 不要通过共享内存来通信,通信来共享内存. 

 

go中通信用什么呢? 使用channel

 定义一个bool类型的chan,用来和外部通信. 事情做完了,给外面回复一个done
func work(i for n := range w.c {
        fmt.Printf( w.done <- true
    }
}

type worker struct {
    c chan int        // 传递字母的管道
    done chan bool        // 标记字母传递完成的管道
}

w = worker{
        c : make(chan int),done : make(chan bool),}
    go work(i,w)
     w
}


func chanDemo () {
    var wo [10]worker 
  
// 第一步,第二步
wo[i] createWorker(i,wo[i]) } // 第三步 for i := 0; i < 10; i++ { wo[i].c <- 'a' + i // 这里有一个通道,一直等着. 等着取数据. <- wo[i].done } { wo[i].c <- i <- wo[i].done } time.Sleep(time.Second) } func main() { chanDemo() }

分析:

1. 定义了一个worker类型,里面有两个管道. 第一个管道是用来传输字母的. 第二个管道用来标记传送行为是否完成

2. 接下来看chanDemo,chanDemo是一个主goroutine. 在这里:

  第一步: 创建了10个工作者. 

  第二步: 开了10个goroutine 用来传输字母. 开goroutine调用createWorker,然后worker的工作是work

  第三步: for循环为每一个goroutine添加字母. 然后等待.......直到对应的done channel完成,取出结果. 继续往下执行.

 {
   wo[i].c <-  i
    这里有一个通道,一直等着. 等着取数据.
   <- wo[i].done
}

  放数据: wo[i].c <- 'a' + i . 然后就等待......等待到什么时候呢? wo[i].done中有数据可以取出.

  这就让主goroutine保证了10个goroutine都执行完以后,在继续往后执行.

  这解释了为什么goroutine可以让主goroutine等待的原因

输出结果

number: 4567899,J 

这样打印的结果是按照顺序执行的. 一个goroutine执行完了,才能往另一个goroutine中放数据,这样效率太低了. 我们换一种方式,让他不停的打印. 最后一起等待执行完成.

go func() {
            w.done <- true
        }()
    }
}

type worker struct {
    c chan int         传递字母的管道
    done chan bool         标记字母传递完成的管道
}

 worker{
        c : make(chan ),done : make(chan boolvar work []worker

     {
        work[i] = createWorker(i,work[i])
    }

    for i,w := range work {
        w.c <-  i
    }

    for _,w := range work  {
       放了两次,所以要等待两次都处理完,在执行后面的结果
        <-w.done
      <-w.done
   }
       chanDemo()
}

这里有两个变化,

1. 我在取结果done的时候,没有在线等执行完. 而是,你们去执行吧,我最后来收结果,收的顺序也是从1-9的顺序收的. 每个goroutine要有两个结果.

2. 在work具体执行完成的地方,要定义一个协程. 这样程序才能正常运行,否则会报deadline异常. 为什么会报异常呢?

  因为,有两次放数据,第一次放完了,往管道done里写了一个数据,结果没有被收走,又放了一次..... 所以就发生死锁了.

 

新开一个goroutine,让他并行的发done. 就可以了

疑问: 为什么定义成goroutine,他就不会deadline了呢? goroutine还有什么其他的含义?

比如下面这段程序

func main() {
    c := make(chan )
    c <- 
}

这么写汇报deadline,因为管道只有发送方,没有接收方. 要求必须既有发送方又有接收方

但是这么写就没问题:

func main() {

    c := make(chan )

    go func() {
        c <- 
        c <- 
    }()
}

为啥呢?

经过一番研究过终于明白了,看下面这段程序

func main() {
    bufferedChannel()
    channelClose()

    c := make(chan )
    c <- 0

    go func() {
        log.Info("11")
        c <- 1
        log.Info("22")
        c <- 2
        log.Info("33")
        c <- 3
        log.Info("44")
        c <- 4
    }()

    log.Info("1,",<- c)
    log.Info("2,<- c )
    log.Info("3,1)">log.Info("4,<- c )

    time.Sleep(time.Second)
}

你运行执行一下,看看结果,只打印出了11

[2020/02/21 07:09:58] channelDemo.go:83 [Info] 11

Process finished with exit code 0

原来goroutine中的代码一直在等待,知道有人要收数据了,他才会发

c <- 0

    go func() {
        log.Info(11)
        c <- 
        log.Info(223344
    }()

    log.Info("1,<- c)
    
    time.Sleep(time.Second)
}

这时的打印结果是

[12:08] channelDemo.go:
[85 [Info] 2293 [Info] 1,1)">1

看到了吧,有人收,goroutine才会发,收几个,发几个. 其他的保留待发.

 

2. 使用WaitGroup等待任务的结束

sync. WaitGroup()是系统自带的一个等待任务全部完成的工具

  • Add: 添加的任务个数
  • Wait: 等待全部goroutine完成
  • Done: 某一个goroutine完成

我们第一步: 定义一个WaitGroup

 开启了一个等待任务,我们通过waitGroup来等待任务的完成
var wg sync.WaitGroup

第二步: 添加20个任务,因为我们知道有20个任务就直接添加20就好了,如果不知道,可以在for循环里一个个添加

wg.Add(20)

第三步: 等待20个任务全部结束

wg.Wait()

第四步: 完成了一个任务,就标记他为done

代码如下:

sync w.wg.Done()
        }()
    }
}

type worker int              传递字母的管道
    wg *sync.WaitGroup        // 标使用waitgroup来标记同步完成
sync.WaitGroup) worker {
    w :=]worker
    var wg sync.WaitGroup


     work[i] = createWorker(i,&wg)
    }

    wg.Add(20 i
    }

    wg.Wait()
        chanDemo()
}

 

3. 在go里面函数是一等公民,其实等待任务结束的过程中,或者结束时要做很多事情. 我们要是定义成某一个参数,那就只能接收这个参数了,其他都参数不行. 

如何能够达到扩展的目的呢?定义成函数

w.wg()
    }
}

type worker  wg func()         func() {
            fmt.Println("事情1")
            wg.Done()
            fmt.Println("事情2")
        },
    }
    go work(i,我们通过waitGroup来等待任务的完成
    var wg sync.WaitGroup


     {
        work[i] = createWorker(i,&wg)
    }

    wg.Add( i
    }

    wg.Wait()
        chanDemo()
}

四. 使用channel进行树的遍历

这里使用channel进行树的遍历,是对channel的一个应用.

之前我们对树遍历后处理使用的是函数.我们也可以用channel

比如: 查找树中最大的值

第一步: 循环遍历获取树,然后将所有树节点放入到channel中. 返回一个管道

第二步: 从管道中取出树的节点,进行计算

第三步: 在第一步中,把所有节点都添加到管道中以后,一定要close

 定一个管道,循环遍历,把遍历后的节点添加到管道中
func (n *TreeNode) TraveresForChannel() chan *TreeNode {
    out := make(chan *TreeNode)
    go func() {
        n.TraveresFunc(func(node *TreeNode) {
            out <- node
        })

        close(out)
    }()
    return 
}
 从管道中取出所有节点,取最大值
    max := 0
    for c := range root.TraveresForChannel() {
        if c.Value > max {
            max = c.Value
        }
    }

    fmt.Println(max)

这个逻辑比较清晰. 

五. 用select进行调度

 1. 首先,我们来从官方文档看一下有关select的描述:

A select" statement chooses which of a set of possible send or receive operations will proceed. 
It looks similar to a switch statement but with the cases all referring to communication operations. 一个select语句用来选择哪个case中的发送或接收操作可以被立即执行。它类似于switch语句,但是它的case涉及到channel有关的I/O操作。

或者换一种说法,select就是用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。

比如: 现在有两个channel  A 和B,我要从A 和 B中取值,谁先来,就取出谁. 怎么做呢? 

package main

import 

func main() {
    var A,B chan int

    select {
    case n := <- A:
        fmt.Println(select from A: case n := <- B:
        fmt.Println(select from A:default:
        fmt.Println(select from default)
    }
}

这里定义了A和B两个channel,两个channel都是nil. 下面通过select来选择执行,会走一个默认的default.

输出结果:

select from default

这里A和B都不会有输出,所以,就走默认的default,这是非阻塞的方式输出内容. channel是阻塞的,如果实现非阻塞的呢? 那就是使用select.....default实现

如果没有default又不断的从A和B中取值,就会deadlock

B:
        fmt.Println(

 

 我们生成一个channelA和B, 让A和B不停的产生数据,然后看看select中是否能取出来

math/rand 生成A 和 B
func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for  {
            // 随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i ++
        }
    }()
    return out
}

func main() {
     generator(),generator()

     不停的从A和B中收数据
      {
         {
        A:
            fmt.Println(B:
            fmt.Println(select from B:随机生成的,数据是交叉执行的.

谁有数据,就取出谁

两个都有,随机取一个.

如果有default会一直执行default

生成的A和B,取出来的值,交给工作者,让工作者打印出来. 工作者是个管道,他会一直等着,有工作来了,就工作,没有的时候,等待

 生成A 和 B
func generator() chan out := make(chan )
    go func() {
        i := 0
          {
             随机的在1.5s以内休眠
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
             i
            i ++
        }
    }()
    
}

// 工作者做的具体的工作内容--工作内容,从管道里不停的取数据
func doWork(i int,c chan int) {
    for cc := range c  {
        fmt.Printf("取出的i:%d,取出的值时:%d\n",cc)
    }
}

// 创建工作者
func createWorker(i int) chan int {
    in := make(chan int)
    // 开始工作
    go doWork(i,in)
    return inA:
            w <- n
        B:
            w <- n
        }
    }
}

有一个生成数据的管道,然后从生成数据的管道里取出数据,将其交给工作者,让工作者开始工作

 goroutine是非抢占式的,一个通道打开了会一直占有,直到主动释放. 这里有个问题: 那就是管道取出的一直是第一个的

下面我们在select中既可以向管道中存数据,又可以取数据

 


}

 工作者做的具体的工作内容--工作内容,从管道里不停的取数据
func doWork(i for cc := range c  {
        fmt.Printf(取出的i:%d,取出的值时:%d\n 创建工作者
func createWorker(i int) chan in := make(chan  开始工作
    go doWork(i,in
}

func main() {
    )
    n := 
    hasValue := false
    var activeWorker chan int
        if hasValue {
            activeWorker = w
        }
        select {
        case n = <-A:
            hasValue = true
        case n = <-B:
            hasValue = true
        case activeWorker <- n:
            hasValue = false
        }
    }
}

结果:

 

 依然有问题: 这里生成数据的速度是在1.5秒以内随机的. 如果生成数据的速度快,消费数据的速度慢. 那么,就有可能有数据打印不出来. 被跳过了 

我们让工作者取数据的速度慢一些

 range c  {
        time.Sleep(time.Second * 5)
        fmt.Printf(输出结果:

取出的i:0,取出的值时:
取出的i:122729364856

速度一慢下来,我们发现就丢数据了,有些数据被跳过了. 那怎么办呢? 我们用一个数组来接收

 range c  {
        time.Sleep(time.Second * )
        fmt.Printf()
    // 不停的从A和B中收数据
    var values []int
     {
        var activeWorker chan int
        var activeValue int
        if len(values)  > 0 {
            activeValue = values[0]
            activeWorker = w
        }
        case n := <-A:         生成数据的channel
            values = append(values,1)">case n := <-B:        case activeWorker <- activeValue:         消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[1:]
        }
    }
}

这样就不会丢数据了,5秒打印一个数据. 那么我们想知道,现在管道里积压了多少数据,怎么处理呢?

 做这件事之前,我们先来做一件事,这个程序是一直运行的,他不会停,我们设定一个10秒钟. 让程序10秒以后自动退出

使用time.After(10)

var values []int
    // time.After返回的是一个管道. 也就是10秒以后,往管道里放一个数据
    tm := time.After(10 * time.Second)

    var activeValue if len(values)  >  {
            activeValue = values[]
            activeWorker = w
        }
         消费数据的channel
            /*
             * 取出saveData中的第一个数据,放入到channel中
             * 将第一个数据删除
             */
            values = values[:]
            
        case <- tm: // 如果从管道中取出来值. 那么执行这个case
            fmt.Println("bye")
            return
        }
    }
}

10后自动结束了

 

 接下来的一个需求,如果数据生成的速度太慢了,怎么办呢? 我们增加一个判断,如果数据生成的速度<800毫秒,打印一个timeout

 time.After返回的是一个管道. 也就是10秒以后,往管道里放一个数据
    tm := time.After(10 * time.Second)

    :]
        case <- time.After(800 * time.Millisecond):  // 连续打印的两个数据之间时间间隔>800毫秒
                fmt.Println("timeout")
        case <- tm:  如果从管道中取出来值. 那么执行这个case
            fmt.Println(bye)
            return

            /**
             * 上面这两个case有什么区别呢?
             * tm: 整体select运行的时间,超过10秒退出
             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
             */
        }
    }
}

 打印结果

 

2. 定时器的使用

 我们最后来做这件事: 每秒钟打印出已经积压的数据,使用time.tick(1s) 这是一个定时器. 返回的也是一个channel,1秒钟往channel中放一个数据

 time.Second)

    // 定义一个定时器,每秒钟执行定时任务
    tick := time.Tick(time.Second)

    :]
        case <- time.After(800 * time.Millisecond):   连续打印的两个数据之间时间间隔>800毫秒
                fmt.Println(timeout)
        return

            *
             * 上面这两个case有什么区别呢?
             * tm: 整体select运行的时间,超过10秒退出
             * time.After(800 * time.Millisecond) : 这个是两个数据打印之间的时间间隔>800毫秒
             */
         case <- tick:
            fmt.Println("积压数据个数:"输出结果:

 感觉select不是很好理解,还需要在查询资料,学习一遍

完整代码

 range c  {
        time.Sleep(time.Second )
        fmt.Printf( 定义一个定时器,每秒钟执行定时任务
    tick := time.Tick(time.Second)

    */
         case <- tick:
            fmt.Println(积压数据个数:go通过通信来共享内存,而不是通过共享内存来通信. go通过channel实现了这样的一个特性

看上面的demo. 我们定义了两个channel来存入数据,定义了一个channel来取出数据. 然后对数据进行处理. 在通信的过程中进行了存和取数据的过程. 这就是通过通信来共享内存.

 

六. 传统的同步机制

go建议通过通信来共享内存,但go本身也是支持传统的同步机制的,比如锁, 

go使用csp模型来实现同步,建议少使用锁来同步,锁是通过共享内存来通讯. 但也是可以用的.

下面我们来看如何使用锁. 

1. Mutex

 我们定义两个同时存数据的场景,一个取数据的场景. 并发进行,先不加锁,看看有什么问题?

 定义一个自定义的类型AtomicInt
type AtomicInt int

 增加方法
func (a *AtomicInt) increase() {
    *a ++ 取数据的方法
func (a *AtomicInt) get() {
    int(*a)
}

func main() {
     a AtomicInt

     存数据
    a.increase()

     定义了一个单独的协程去存数据,这样就有两个协程同时存数据
    go func() {
        a.increase()
    }()


    time.Sleep(time.Second)
     取数据---这时取的是1还是2 呢?
    fmt.Println(a.get())
}

这个程序短期看运行不会有什么问题,但这里确实是有冲突的. 我们用-race来看一下

go run -race AutomicInt.go

 

 可以看出发生冲突了,什么冲突呢? 在写数据之前,发现有个地方在读数据. 阿欧....这可不好. 有同步问题. 

下面我们使用Mutex来加锁

 定义一个自定义的类型AtomicInt
type AtomicInt struct {
    value int
    lock sync.Mutex
}

AtomicInt) increase() {
    a.lock.Lock()
    defer a.lock.Unlock()
    a.value ++

}

{
    lock.Lock()
    defer a.lock.Unlock()
    (a.value)

}

func main() {
    ())
}

其实这了展示了lock的用法还是挺简单的. 在结构体里,就是对结构体中的数据进行加锁. 加在哪里,就是对谁加的.

 

这一章学完了,感受是: 挺难的,尤其是select. 又要通过管道读数据,又要通过管道写数据,挺费事. 再学一遍. 眼睛透彻了

 


 

参考资料:

1. https://www.cnblogs.com/tobycnblogs/p/9935465.html

2. 

猜你在找的Go相关文章