如何用golang写一个thread pool

前端之家收集整理的这篇文章主要介绍了如何用golang写一个thread pool前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

我的工作目标

1 创建一个指定大小的工作数据
2 实现run接口
3 添加任务到池子里面
4 开始运行池子

首先定义任务的worker和runner接口

package workpool


type Runner interface {
    Run(interface{}) (interface{},error)
}

type Work struct {
    Runner Runner
    Args   interface{}
}

func NewWork(runner Runner,args interface{}) *Work {
    return &Work{runner,args}
}

我使用了两张方式定义了pool
1 使用go的list
2 使用chan

1 使用go的list,基本思想启动几个go协程,是不停的从list里面读取数据,当然这里面用到了锁

package workpool

import (
    "container/list"
    "log"
    "sync"
    "time"
)

//工作池
type Kworkpool struct {
    rmutex     sync.RWMutex
    mutex      sync.Mutex //写锁
    runnerList *list.List //任务列表
    poolSize   int        //启动的pool的个数
    flag       bool       //是否关闭
}

//新建一个池子
func NewKworkpool(size int) *Kworkpool {
    l := list.New()
    return &Kworkpool{
        poolSize:   size,flag:       true,runnerList: l,}
}

//添加数据
func (kl *Kworkpool) AddRunner(w *Work) {
    if kl.flag {
        kl.runnerList.PushFront(w)
    }
}

func (kl *Kworkpool) Start() {
    if !kl.flag {
        kl.Close()
        return
    }
    kl.run()
}

//启动多个goroutine
func (kl *Kworkpool) run() {
    for i := 0; i < kl.poolSize; i++ {
        go kl.work()
    }
}

//实际的工作脚本运行pool
func (kl *Kworkpool) work() {
    for {
        //检测数据的长度
        kl.rmutex.RLock()
        listLen := kl.runnerList.Len()
        kl.rmutex.RUnlock()
        if listLen == 0 { //休眠100毫秒
            //已经关闭就结束程序,需要判断是否已经没有任务
            if !kl.flag {
                break
            }
            time.Sleep(time.Millisecond * 100)
            continue
        }
        kl.mutex.Lock()
        elem := kl.runnerList.Back()
        if elem == nil {
            kl.mutex.Unlock()
            continue
        }
        worker := kl.runnerList.Remove(elem).(*Work)
        kl.mutex.Unlock()
        log.Println(" start running ...")
        worker.Runner.Run(worker.Args)
    }
}

func (kl *Kworkpool) Close() {
    kl.flag = false
}

func init() {
    log.SetFlags(log.LstdFlags)
}

使用chan实现,本质上是使用chan代替list,使用WaitGroup等待所有协程的结束

package workpool

import (
    "log"
    "sync"
)

//工作池
type KworkpoolChan struct {
    agroup      sync.WaitGroup
    works       chan *Work //一个chan
    processSize int        //启动的work pool的个数
    flag        bool       //是否关闭
}

//新建一个池子
func NewKworkpoolChan(poolSize,processSize int) *KworkpoolChan {
    pool := &KworkpoolChan{
        works:       make(chan *Work,poolSize),processSize: processSize,flag:        true,}

    pool.Start()
    return pool
}

//添加数据
func (kl *KworkpoolChan) AddRunner(w *Work) {
    //发送数据到chan
    if kl.flag {
        kl.works <- w
    }
    log.Println("works_len =",len(kl.works))
}

func (kl *KworkpoolChan) Start() {
    if !kl.flag {
        kl.Close()
        return
    }
    kl.agroup.Add(kl.processSize)

    for i := 0; i < kl.processSize; i++ {
        go func() {
            for runWorker := range kl.works {
                runWorker.Runner.Run(runWorker.Args)
            }
            kl.agroup.Done()
        }()
    }

}

func (kl *KworkpoolChan) Close() {
    if kl.flag != false {
        kl.flag = false
    }
    close(kl.works)
    //等待所有线程完成任务
    kl.agroup.Wait()
}

测试代码

package main

import (
    "fmt"
    "log"
    "os"
    "strconv"
    "time"
    "workpool"
)

//测试runner
type Arunner struct {
}

func (a *Arunner) Run(avar interface{}) (interface{},error) {
    i := avar.(int)
    filename := "./log/" + strconv.Itoa(time.Now().Nanosecond())
    fp,err := os.OpenFile(filename,os.O_WRONLY|os.O_CREATE,os.ModePerm)
    if err != nil {
        log.Println("go",i)
        return nil,err
    }
    fp.WriteString("go" + strconv.Itoa(i) + "\n")
    fp.Close()
    return nil,nil
}

func main() {
    tpool2()
}

func tpool2() {
    //test pool 2
    var run1,run2 workpool.Runner
    run1 = &Arunner{}
    run2 = &Arunner{}

    pool2 := workpool.NewKworkpoolChan(8,20)

    w1 := workpool.NewWork(run1,1)
    w2 := workpool.NewWork(run2,2)
    pool2.AddRunner(w1)
    pool2.AddRunner(w2)
    fmt.Println(pool2)

    time.Sleep(time.Second * 5)
}

func tpool1() {
    //test pool 1
    pool := workpool.NewKworkpool(20)
    pool.Start()
    var run1,run2 workpool.Runner
    run1 = &Arunner{}
    run2 = &Arunner{}

    w1 := workpool.NewWork(run1,2)
    pool.AddRunner(w1)
    pool.AddRunner(w2)
    fmt.Print(pool)
}

代码的github路径
https://github.com/beckbikang/go_pool

原文链接:https://www.f2er.com/go/187759.html

猜你在找的Go相关文章