golang 是一门很优秀的语言,语法简单,功能强大 ,支持的 channal、goroutine 等都是非常优秀的特性。由于之前用golang 重构一个项目,对golang不是太了解,栽了不少坑,其中主要问题为:
1. go 直接协程运行函数、方法,大并发的时候不太可控会导致协程数量急剧增加。
2.协程池方式运行有不想每一个结构体都启动一个协程池
所以就萌生出搞一个通用协程池的想法,主要思想为,启动多个协程 从 channal 队列读取数据,由业务将需要执行的方法与参数放入channal,协程池仅仅负责维护协程,自动扩容,缩减,启用备用队列等策略,至于返回结果之类的都有业务方实现。
关键实现:
1. 定义一个task 的结构体 标示具体要执行的任务格式
type Job func([]interface{}) type taskWork struct { Run Job startBool bool params []interface{} }
2.定义一个worker 池,控制协程相关信息
type WorkPool struct { taskPool chan taskWork workNum int maxNum int stopTopic bool //考虑后期 作为冗余队列使用 taskQue chan taskWork }
3.实现协程池相关启动,停止,扩容策略,缩减策略,备用队列启用等 逻辑
//得到一个线程池并返回 句柄 func (p *WorkPool) InitPool() { *p = WorkPool{workNum: workerNumDefault,maxNum: workerNumMax,stopTopic: false,taskPool: make(chan taskWork,workerNumDefault*2),taskQue: nil} (p).start() go (p).workerRemoveConf() } //开始work func (p *WorkPool) start() { for i := 0; i < workerNumDefault; i++ { p.workInit(i) fmt.Println("start pool task:",i) } } //初始化 work池 后期应该考虑如何 自动 增减协程数,以达到最优 func (p *WorkPool) workInit(id int) { go func(idNum int) { //var i int = 0 for { select { case task := <-p.taskPool: if task.startBool == true && task.Run != nil { //fmt.Print("this is pool ",idNum,"---") task.Run(task.params) } //单个结束任务 if task.startBool == false { //fmt.Print("this is pool -- ","---") return } //防止从channal 中读取数据超时 case <-time.After(time.Millisecond * 1000): //fmt.Println("time out init") if p.stopTopic == true && len(p.taskPool) == 0 { fmt.Println("topic=",p.stopTopic) //work数递减 p.workNum-- return } //从备用队列读取数据 case queTask := <-p.taskQue: if queTask.startBool == true && queTask.Run != nil { //fmt.Print("this is que ","---") queTask.Run(queTask.params) } } } }(id) } //停止一个workPool func (p *WorkPool) Stop() { p.stopTopic = true } //普通运行实例,非自动扩充 func (p *WorkPool) Run(funcJob Job,params ...interface{}) { p.taskPool <- taskWork{funcJob,true,params} } //用select 去做 实现 自动扩充 协程个数 启用备用队列等特性 func (p *WorkPool) RunAuto(funcJob Job,params ...interface{}) { task := taskWork{funcJob,params} select { //正常写入 case p.taskPool <- task: //写入超时 说明队列满了 写入备用队列 case <-time.After(time.Millisecond * 1000): p.taskQueInit() p.workerAddConf() //task 入备用队列 p.taskQue <- task } } //自动初始化备用队列 func (p *WorkPool) taskQueInit() { //扩充队列 if p.taskQue == nil { p.taskQue = make(chan taskWork,p.maxNum*2) } } //自动扩充协程 简单的自动扩充策略 func (p *WorkPool) workerAddConf() { //说明需要扩充进程 协程数量小于 1000 协程数量成倍增长 if p.workNum < 1000 { p.workerAdd(p.workNum) } else if p.workNum < p.maxNum { tmpNum := p.maxNum - p.workNum tmpNum = tmpNum / 10 if tmpNum == 0 { tmpNum = 1 } p.workerAdd(1) } } //自动缩减协程 实现比较粗糙,可以考虑后续精细实现一些策略 func (p *WorkPool) workerRemoveConf() { for { select { case <-time.After(time.Millisecond * 1000 * 600): if p.workNum > workerNumDefault && len(p.taskPool) == 0 && len(p.taskQue) == 0 { rmNum := (p.workNum - workerNumDefault) / 5 if rmNum == 0 { rmNum = 1 } p.workerRemove(rmNum) } } } } func (p *WorkPool) workerAdd(num int) { for i := 0; i < num; i++ { p.workNum++ p.workInit(p.workNum) } } func (p *WorkPool) workerRemove(num int) { for i := 0; i < num; i++ { task := taskWork{startBool: false} p.taskPool <- task p.workNum-- } }
4.我们来看一下使用的demo,可以很方便的把一个已有业务作为task 交给协程池执行了
package main import ( "fmt" "github.com/wangyaofenghist/go-Call/call" "github.com/wangyaofenghist/go-Call/test" "github.com/wangyaofenghist/go-worker-base/worker" "runtime" "time" ) //声明一号池子 var poolOne worker.WorkPool //声明回调变量 var funcs call.CallMap //以结构体方式调用 type runWorker struct{} //初始化协程池 和回调参数 func init() { poolOne.InitPool() funcs = call.CreateCall() } //通用回调 func (f *runWorker) Run(param []interface{}) { name := param[0].(string) //调用回调并拿回结果 result,err := funcs.Call(name,param[1:]...) fmt.Println(result,err) } func runtimeNum() { for { fmt.Println("runtime num:",runtime.NumGoroutine()) time.Sleep(time.Millisecond * 1000) } } //主函数 func main() { tmp := make(chan int) go runtimeNum() var runFunc runWorker = runWorker{} funcs.AddCall("test4",test.Test4) var startTime = time.Now().UnixNano() for i := 0; i < 10000; i++ { poolOne.RunAuto(runFunc.Run,"test4"," ee "," ff") } <-tmp return }
以上通用协程池实现略微粗糙,并没有考虑太精细化的自动扩充协程策略或缩减策略,demo中可以将需要回调的函数加上sleep 会看到协程自动扩充与销毁的过程,中间涉及到一个通用回调是通过golang 的反射机制实现的一段通用代码,可以从gitHub 中拉取。
通用协程池 go-worker-base :https://github.com/wangyaofenghist/go-worker-base
通用回调函数 go-Call : https://github.com/wangyaofenghist/go-Call