前端之家收集整理的这篇文章主要介绍了
golang实现worker,
前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
注意:以上代码摘抄至@harbor https://github.com/vmware/harbor
/*
Copyright (c) 2016 VMware,Inc. All Rights Reserved.
Licensed under the Apache License,Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,software
distributed under the License is distributed on an "AS IS" BASIS,WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package job
import (
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/jobservice/config"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/log"
)
//定义工人池
type workerPool struct {
workerChan chan *Worker
workerList []*Worker //和业务相关不用理会
}
// WorkerPool is a set of workers each worker is associate to a statemachine for handling jobs.
// it consists of a channel for free workers and a list to all workers
var WorkerPool *workerPool
// StopJobs accepts a list of jobs and will try to stop them if any of them is being executed by the worker.
func (wp *workerPool) StopJobs(jobs []int64) {
log.Debugf("Works working on jobs: %v will be stopped",jobs)
for _,id := range jobs {
for _,w := range wp.workerList {
if w.SM.JobID == id {
log.Debugf("found a worker whose job ID is %d,will try to stop it",id)
w.SM.Stop(id)
}
}
}
}
// Worker consists of a channel for job from which worker gets the next job to handle,and a pointer to a statemachine,// the actual work to handle the job is done via state machine.
//工人结构体
type Worker struct {
ID int
RepJobs chan int64
SM *SM
quit chan bool
}
// Start is a loop worker gets id from its channel and handle it.
func (w *Worker) Start() {
go func() {
for {
//注册功能到工人池
WorkerPool.workerChan <- w
select {
//获取任务
case jobID := <-w.RepJobs:
log.Debugf("worker: %d,will handle job: %d",w.ID,jobID)
w.handleRepJob(jobID)
case q := <-w.quit:
if q {
log.Debugf("worker: %d,will stop.",w.ID)
return
}
}
}
}()
}
// Stop ...
func (w *Worker) Stop() {
go func() {
//发送退出指令
w.quit <- true
}()
}
//接受到任务后并处理后续任务
func (w *Worker) handleRepJob(id int64) {
err := w.SM.Reset(id)
if err != nil {
log.Errorf("Worker %d,Failed to re-initialize statemachine for job: %d,error: %v",id,err)
err2 := dao.UpdateRepJobStatus(id,models.JobError)
if err2 != nil {
log.Errorf("Failed to update job status to ERROR,job: %d,error:%v",err2)
}
return
}
if w.SM.Parms.Enabled == 0 {
log.Debugf("The policy of job:%d is disabled,will cancel the job",id)
_ = dao.UpdateRepJobStatus(id,models.JobCanceled)
w.SM.Logger.Info("The job has been canceled")
} else {
w.SM.Start(models.JobRunning)
}
}
// NewWorker returns a pointer to new instance of worker
//创建工人
func NewWorker(id int) *Worker {
w := &Worker{
ID: id,//工人名称
RepJobs: make(chan int64),//任务队列
quit: make(chan bool),//退出goroutine
SM: &SM{},//和业务有关,不必理会
}
w.SM.Init() //和业务有关,不必理会
return w
}
// InitWorkerPool create workers according to configuration.
func InitWorkerPool() {
WorkerPool = &workerPool{
workerChan: make(chan *Worker,config.MaxJobWorkers()),//初始化工人池
workerList: make([]*Worker,//和业务有关,不必理会
}
for i := 0; i < config.MaxJobWorkers(); i++ {
worker := NewWorker(i)
WorkerPool.workerList = append(WorkerPool.workerList,worker) //和业务有关,不必理会
worker.Start() //启动
log.Debugf("worker %d started",worker.ID)
}
}
// Dispatch will listen to the jobQueue of job service and try to pick a free worker from the worker pool and assign the job to it.
func Dispatch() {
for {
select {
case job := <-jobQueue: //监听任务队列,如果存在就取出
go func(jobID int64) {
log.Debugf("Trying to dispatch job: %d",jobID)
worker := <-WorkerPool.workerChan //从工人池当中取出一个工人
worker.RepJobs <- jobID //把取到的工作交由工人去处理
}(job)
}
}
}