k8s与监控--从telegraf改造谈golang多协程精确控制

前端之家收集整理的这篇文章主要介绍了k8s与监控--从telegraf改造谈golang多协程精确控制前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

从telegraf改造谈golang多协程精确控制


前言

telegraf是infuxdb公司开源出来的一个基于插件机制的收集metrics的项目。整个架构和elastic公司的日志收集系统极其类似,具备良好的扩展性。与现在流行的各种exporter+promethues监控方案相比:

  1. 大致具备良好的可扩展性。很容易增加自己的处理逻辑,在input,output,process,filter等环境定制自己专属的插件
  2. 统一了各种exporter,减少了部署各种exporter的工作量和维护成本。

目前telegraf改造工作基本上是两大部分:

  1. 增加了一些telegraf不支持插件,比如虚拟化(kvm,vmware等),数据库(oracle),k8s和openstack等input插件
  2. telegraf是基于配置文件的,所以会有两个问题,很难做分布式和无停机动态调度input任务。所以我们的工作就是将获取配置接口化,所有的配置文件来源于统一配置中心。然后就是改造无停机动态调度input。

在改造改造无停机动态调度input就涉及到golang多协程精确控制的问题。

一些golang常用并发手段

sync包下WaitGroup

具体事例:

  1. var wg sync.WaitGroup
  2.  
  3. wg.Add(len(a.Config.Outputs))
  4. for _,o := range a.Config.Outputs {
  5. go func(output *models.RunningOutput) {
  6. defer wg.Done()
  7. err := output.Write()
  8. if err != nil {
  9. log.Printf("E! Error writing to output [%s]: %s\n",output.Name,err.Error())
  10. }
  11. }(o)
  12. }
  13.  
  14. wg.Wait()

WaitGroup内部维护了一个counter,当counter数值为0时,表明添加的任务都已经完成。
总共有三个方法

  1. func (wg *WaitGroup) Add(delta int)

添加任务,delta参数表示添加任务的数量

  1. func (wg *WaitGroup) Done()

任务执行完成,调用Done方法,一般使用姿势都是defer wg.Done(),此时counter中会减一。

  1. func (wg *WaitGroup) Wait()

通过使用sync.WaitGroup,可以阻塞主线程,直到相应数量的子线程结束。

chan struct{},控制协程退出

启动协程的时候,传递一个shutdown chan struct{},需要关闭该协程的时候,直接close(shutdown)。struct{}在golang中是一个消耗接近0的对象。
具体事例:

  1. // gatherer runs the inputs that have been configured with their own
  2. // reporting interval.
  3. func (a *Agent) gatherer(
  4. shutdown chan struct{},kill chan struct{},input *models.RunningInput,interval time.Duration,metricC chan telegraf.Metric,) {
  5. defer panicRecover(input)
  6.  
  7. GatherTime := selfstat.RegisterTiming("gather","gather_time_ns",map[string]string{"input": input.Config.Name},)
  8.  
  9. acc := NewAccumulator(input,metricC)
  10. acc.SetPrecision(a.Config.Agent.Precision.Duration,a.Config.Agent.Interval.Duration)
  11.  
  12. ticker := time.NewTicker(interval)
  13. defer ticker.Stop()
  14.  
  15. for {
  16. internal.RandomSleep(a.Config.Agent.CollectionJitter.Duration,shutdown)
  17.  
  18. start := time.Now()
  19. gatherWithTimeout(shutdown,kill,input,acc,interval)
  20. elapsed := time.Since(start)
  21.  
  22. GatherTime.Incr(elapsed.Nanoseconds())
  23.  
  24. select {
  25. case <-shutdown:
  26. return
  27. case <-kill:
  28. return
  29. case <-ticker.C:
  30. continue
  31. }
  32. }
  33. }

借助chan 实现指定数量的协程或动态调整协程数量

当然这里必须是每个协程是幂等,也就是所有协程做的是同样的工作。
首先创建 一个 pool:= make(chan chan struct{},maxWorkers),maxWorkers为目标协程数量
然后启动协程:

  1. for i := 0; i < s.workers; i++ {
  2. go func() {
  3. wQuit := make(chan struct{})
  4. s.pool <- wQuit
  5. s.sFlowWorker(wQuit)
  6. }()
  7. }

关闭协程:

  1. func (s *SFlow) sFlowWorker(wQuit chan struct{}) {
  2. LOOP:
  3. for {
  4.  
  5. select {
  6. case <-wQuit:
  7. break LOOP
  8. case msg,ok = <-sFlowUDPCh:
  9. if !ok {
  10. break LOOP
  11. }
  12. }
  13.  
  14. // 此处执行任务操作
  15. }

动态调整:

  1. for n = 0; n < 10; n++ {
  2. if len(s.pool) > s.workers {
  3. wQuit := <-s.pool
  4. close(wQuit)
  5. }
  6. }

多协程精确控制

在改造telegraf过程中,要想动态调整input,每个input都是唯一的,分属不同类型插件。就必须实现精准控制指定的协程的启停。
这个时候实现思路就是:实现一个kills map[string]chan struct{},k为每个任务的唯一ID。添加任务时候,传递一个chan struct{},这个时候关闭指定ID的chan struct{},就能控制指定的协程。

  1. // DelInput add input
  2. func (a *Agent) DelInput(inputs []*models.RunningInput) error {
  3. a.storeMutex.Lock()
  4. defer a.storeMutex.Unlock()
  5.  
  6. for _,v := range inputs {
  7. if _,ok := a.kills[v.Config.ID]; !ok {
  8. return fmt.Errorf("input: %s,未找到,无法删除",v.Config.ID)
  9. }
  10. }
  11.  
  12. for _,input := range inputs {
  13. if kill,ok := a.kills[input.Config.ID]; ok {
  14. delete(a.kills,input.Config.ID)
  15. close(kill)
  16. }
  17. }
  18. return nil
  19. }

添加任务:

  1. // AddInput add input
  2. func (a *Agent) AddInput(shutdown chan struct{},inputs []*models.RunningInput) error {
  3. a.storeMutex.Lock()
  4. defer a.storeMutex.Unlock()
  5. for _,ok := a.kills[v.Config.ID]; ok {
  6. return fmt.Errorf("input: %s,已经存在无法新增",input := range inputs {
  7. interval := a.Config.Agent.Interval.Duration
  8. // overwrite global interval if this plugin has it's own.
  9. if input.Config.Interval != 0 {
  10. interval = input.Config.Interval
  11. }
  12. if input.Config.ID == "" {
  13. continue
  14. }
  15. a.wg.Add(1)
  16.  
  17. kill := make(chan struct{})
  18. a.kills[input.Config.ID] = kill
  19.  
  20. go func(in *models.RunningInput,interv time.Duration) {
  21. defer a.wg.Done()
  22. a.gatherer(shutdown,in,interv,a.metricC)
  23. }(input,interval)
  24. }
  25.  
  26. return nil
  27. }

总结

简单介绍了一下telegraf项目。后续的优化和改造工作还在继续。主要是分布式telegraf的调度算法。毕竟集中化所有exporter以后,telegraf的负载能力受单机能力限制,而且也不符合高可用的使用目标。

猜你在找的Go相关文章