我的工作目标
1 创建一个指定大小的工作数据 2 实现run接口 3 添加任务到池子里面 4 开始运行池子
首先定义任务的worker和runner接口
package workpool
type Runner interface {
Run(interface{}) (interface{},error)
}
type Work struct {
Runner Runner
Args interface{}
}
func NewWork(runner Runner,args interface{}) *Work {
return &Work{runner,args}
}
我使用了两张方式定义了pool
1 使用go的list
2 使用chan
1 使用go的list,基本思想启动几个go协程,是不停的从list里面读取数据,当然这里面用到了锁
package workpool
import (
"container/list"
"log"
"sync"
"time"
)
//工作池
type Kworkpool struct {
rmutex sync.RWMutex
mutex sync.Mutex //写锁
runnerList *list.List //任务列表
poolSize int //启动的pool的个数
flag bool //是否关闭
}
//新建一个池子
func NewKworkpool(size int) *Kworkpool {
l := list.New()
return &Kworkpool{
poolSize: size,flag: true,runnerList: l,}
}
//添加数据
func (kl *Kworkpool) AddRunner(w *Work) {
if kl.flag {
kl.runnerList.PushFront(w)
}
}
func (kl *Kworkpool) Start() {
if !kl.flag {
kl.Close()
return
}
kl.run()
}
//启动多个goroutine
func (kl *Kworkpool) run() {
for i := 0; i < kl.poolSize; i++ {
go kl.work()
}
}
//实际的工作脚本运行pool
func (kl *Kworkpool) work() {
for {
//检测数据的长度
kl.rmutex.RLock()
listLen := kl.runnerList.Len()
kl.rmutex.RUnlock()
if listLen == 0 { //休眠100毫秒
//已经关闭就结束程序,需要判断是否已经没有任务
if !kl.flag {
break
}
time.Sleep(time.Millisecond * 100)
continue
}
kl.mutex.Lock()
elem := kl.runnerList.Back()
if elem == nil {
kl.mutex.Unlock()
continue
}
worker := kl.runnerList.Remove(elem).(*Work)
kl.mutex.Unlock()
log.Println(" start running ...")
worker.Runner.Run(worker.Args)
}
}
func (kl *Kworkpool) Close() {
kl.flag = false
}
func init() {
log.SetFlags(log.LstdFlags)
}
使用chan实现,本质上是使用chan代替list,使用WaitGroup等待所有协程的结束
package workpool
import (
"log"
"sync"
)
//工作池
type KworkpoolChan struct {
agroup sync.WaitGroup
works chan *Work //一个chan
processSize int //启动的work pool的个数
flag bool //是否关闭
}
//新建一个池子
func NewKworkpoolChan(poolSize,processSize int) *KworkpoolChan {
pool := &KworkpoolChan{
works: make(chan *Work,poolSize),processSize: processSize,flag: true,}
pool.Start()
return pool
}
//添加数据
func (kl *KworkpoolChan) AddRunner(w *Work) {
//发送数据到chan
if kl.flag {
kl.works <- w
}
log.Println("works_len =",len(kl.works))
}
func (kl *KworkpoolChan) Start() {
if !kl.flag {
kl.Close()
return
}
kl.agroup.Add(kl.processSize)
for i := 0; i < kl.processSize; i++ {
go func() {
for runWorker := range kl.works {
runWorker.Runner.Run(runWorker.Args)
}
kl.agroup.Done()
}()
}
}
func (kl *KworkpoolChan) Close() {
if kl.flag != false {
kl.flag = false
}
close(kl.works)
//等待所有线程完成任务
kl.agroup.Wait()
}
测试代码
package main
import (
"fmt"
"log"
"os"
"strconv"
"time"
"workpool"
)
//测试runner
type Arunner struct {
}
func (a *Arunner) Run(avar interface{}) (interface{},error) {
i := avar.(int)
filename := "./log/" + strconv.Itoa(time.Now().Nanosecond())
fp,err := os.OpenFile(filename,os.O_WRONLY|os.O_CREATE,os.ModePerm)
if err != nil {
log.Println("go",i)
return nil,err
}
fp.WriteString("go" + strconv.Itoa(i) + "\n")
fp.Close()
return nil,nil
}
func main() {
tpool2()
}
func tpool2() {
//test pool 2
var run1,run2 workpool.Runner
run1 = &Arunner{}
run2 = &Arunner{}
pool2 := workpool.NewKworkpoolChan(8,20)
w1 := workpool.NewWork(run1,1)
w2 := workpool.NewWork(run2,2)
pool2.AddRunner(w1)
pool2.AddRunner(w2)
fmt.Println(pool2)
time.Sleep(time.Second * 5)
}
func tpool1() {
//test pool 1
pool := workpool.NewKworkpool(20)
pool.Start()
var run1,run2 workpool.Runner
run1 = &Arunner{}
run2 = &Arunner{}
w1 := workpool.NewWork(run1,2)
pool.AddRunner(w1)
pool.AddRunner(w2)
fmt.Print(pool)
}
代码的github路径
https://github.com/beckbikang/go_pool