代码地址:https://github.com/beyondskyw...
标签(空格分隔): falcon go
监控数据
设计原理
自发现采集值
不同类型数据采集分不同goroutine
进程和端口通过用户配置进行监控
配置文件
hostname和ip默认留空,agent自动探测
hbs和transfer都是配置其rpc地址
collector网卡采集前缀
ignore为true时取消上报
组织结构
cron:间隔执行的代码,即定时任务
funcs:信息采集
g:全局数据结构
http:简单的dashboard的server,获取单机监控指标数据
plugins:插件处理机制
public:静态资源文件
心跳机制
与HBS、Transfer交互
调用关系
代码解读
main入口
go cron.InitDataHistory() // 上报本机状态 cron.ReportAgentStatus() // 同步插件 cron.SyncMinePlugins() // 同步监控端口、路径、进程和URL cron.SyncBuiltinMetrics() // 后门调试agent,允许执行shell指令的ip列表 cron.SyncTrustableIps() // 开始数据次采集 cron.Collect() // 启动dashboard server go http.Start()
ReportAgentStatus:汇报agent本身状态
// 判断hbs配置是否正常,正常则上报agent状态 if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { // 根据配置的interval间隔上报信息 go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second) } func reportAgentStatus(interval time.Duration) { for { // 获取hostname,出错则错误赋值给hostname hostname,err := g.Hostname() if err != nil { hostname = fmt.Sprintf("error:%s",err.Error()) } // 请求发送信息 req := model.AgentReportRequest{ Hostname: hostname,IP: g.IP(),AgentVersion: g.VERSION,// 通过shell指令获取plugin版本,能否go实现 PluginVersion: g.GetCurrPluginVersion(),} var resp model.SimpleRpcResponse // 调用rpc接口 err = g.HbsClient.Call("Agent.ReportStatus",req,&resp) if err != nil || resp.Code != 0 { log.Println("call Agent.ReportStatus fail:",err,"Request:","Response:",resp) } time.Sleep(interval) } }
SyncMinePlugins:同步插件
func syncMinePlugins() { var ( timestamp int64 = -1 pluginDirs []string ) duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) hostname,err := g.Hostname() if err != nil { continue } req := model.AgentHeartbeatRequest{ Hostname: hostname,} var resp model.AgentPluginsResponse // 调用rpc接口,返回plugin err = g.HbsClient.Call("Agent.MinePlugins",&resp) if err != nil { log.Println("ERROR:",err) continue } // 保证时间顺序正确 if resp.Timestamp <= timestamp { continue } pluginDirs = resp.Plugins // 存放时间保证最新 timestamp = resp.Timestamp if g.Config().Debug { log.Println(&resp) } // 无插件则清空plugin if len(pluginDirs) == 0 { plugins.ClearAllPlugins() } desiredAll := make(map[string]*plugins.Plugin) // 读取所有plugin for _,p := range pluginDirs { underOneDir := plugins.ListPlugins(strings.Trim(p,"/")) for k,v := range underOneDir { desiredAll[k] = v } } // 停止不需要的插件,启动增加的插件 plugins.DelNoUsePlugins(desiredAll) plugins.AddNewPlugins(desiredAll) } }
SyncBuiltinMetrics:同步内置metric,包括端口、目录和进程信息
func syncBuiltinMetrics() { var timestamp int64 = -1 var checksum string = "nil" duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) // 监控端口、目录大小、进程 var ports = []int64{} var paths = []string{} var procs = make(map[string]map[int]string) var urls = make(map[string]string) hostname,Checksum: checksum,} var resp model.BuiltinMetricResponse err = g.HbsClient.Call("Agent.BuiltinMetrics",err) continue } if resp.Timestamp <= timestamp { continue } if resp.Checksum == checksum { continue } timestamp = resp.Timestamp checksum = resp.Checksum for _,metric := range resp.Metrics { if metric.Metric == g.URL_CHECK_HEALTH { arr := strings.Split(metric.Tags,",") if len(arr) != 2 { continue } url := strings.Split(arr[0],"=") if len(url) != 2 { continue } stime := strings.Split(arr[1],"=") if len(stime) != 2 { continue } if _,err := strconv.ParseInt(stime[1],10,64); err == nil { urls[url[1]] = stime[1] } else { log.Println("metric ParseInt timeout Failed:",err) } } // {metric: net.port.listen,tags: port=22} if metric.Metric == g.NET_PORT_LISTEN { arr := strings.Split(metric.Tags,"=") if len(arr) != 2 { continue } if port,err := strconv.ParseInt(arr[1],64); err == nil { ports = append(ports,port) } else { log.Println("metrics ParseInt Failed:",err) } continue } // metric: du.bs tags: path=/home/works/logs // du -bs /home/works/logs if metric.Metric == g.DU_BS { arr := strings.Split(metric.Tags,"=") if len(arr) != 2 { continue } paths = append(paths,strings.TrimSpace(arr[1])) continue } //mereic: proc.num tags: name=crond //或者metric: proc.num tags: cmdline=cfg.json if metric.Metric == g.PROC_NUM { arr := strings.Split(metric.Tags,") tmpMap := make(map[int]string) for i := 0; i < len(arr); i++ { if strings.HasPrefix(arr[i],"name=") { tmpMap[1] = strings.TrimSpace(arr[i][5:]) } else if strings.HasPrefix(arr[i],"cmdline=") { tmpMap[2] = strings.TrimSpace(arr[i][8:]) } } procs[metric.Tags] = tmpMap } } g.SetReportUrls(urls) g.SetReportPorts(ports) g.SetReportProcs(procs) g.SetDuPaths(paths) } }
func syncTrustableIps() { duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) var ips string err := g.HbsClient.Call("Agent.TrustableIps",model.NullRpcRequest{},&ips) if err != nil { log.Println("ERROR: call Agent.TrustableIps fail",err) continue } // 设置到本地可信IP列表 g.SetTrustableIps(ips) } }
FuncsAndInterval:拆分不同的采集函数集,方便通过不同goroutine运行
// 间隔internal时间执行fs中的函数 type FuncsAndInterval struct { Fs []func() []*model.MetricValue Interval int } var Mappers []FuncsAndInterval // 根据调用指令类型和是否容易被挂起而分类(通过不同的goroutine去执行,避免相互之间的影响) func BuildMappers() { interval := g.Config().Transfer.Interval Mappers = []FuncsAndInterval{ FuncsAndInterval{ Fs: []func() []*model.MetricValue{ AgentMetrics,cpuMetrics,NetMetrics,KernelMetrics,LoadAvgMetrics,MemMetrics,DiskIOMetrics,IOStatsMetrics,NetstatMetrics,ProcMetrics,UdpMetrics,},Interval: interval,// 容易出问题 FuncsAndInterval{ Fs: []func() []*model.MetricValue{ DeviceMetrics,// 调用相同指令 FuncsAndInterval{ Fs: []func() []*model.MetricValue{ PortMetrics,SocketStatSummaryMetrics,FuncsAndInterval{ Fs: []func() []*model.MetricValue{ DuMetrics,FuncsAndInterval{ Fs: []func() []*model.MetricValue{ UrlMetrics,} }
Colleet:配置信息读取,读取Mapper中的FuncsAndInterval,根据func调用采集函数,采集所有信息(并非先过滤采集项),从所有采集到的数据中过滤ignore的项,并上报到transfer。
func Collect() { // 配置信息判断 if !g.Config().Transfer.Enabled { return } if len(g.Config().Transfer.Addrs) == 0 { return } // 读取mapper中的FuncsAndInterval集,并通过不同的goroutine运行 for _,v := range funcs.Mappers { go collect(int64(v.Interval),v.Fs) } } // 间隔采集信息 func collect(sec int64,fns []func() []*model.MetricValue) { // 启动断续器,间隔执行 t := time.NewTicker(time.Second * time.Duration(sec)).C for { <-t hostname,err := g.Hostname() if err != nil { continue } mvs := []*model.MetricValue{} // 读取忽略metric名单 ignoreMetrics := g.Config().IgnoreMetrics // 从funcs的list中取出每个采集函数 for _,fn := range fns { // 执行采集函数 items := fn() if items == nil { continue } if len(items) == 0 { continue } // 读取采集数据,根据忽略的metric忽略部分采集数据 for _,mv := range items { if b,ok := ignoreMetrics[mv.Metric]; ok && b { continue } else { mvs = append(mvs,mv) } } } // 获取上报时间 now := time.Now().Unix() // 设置上报采集项的间隔、agent主机、上报时间 for j := 0; j < len(mvs); j++ { mvs[j].Step = sec mvs[j].Endpoint = hostname mvs[j].Timestamp = now } // 调用transfer发送采集数据 g.SendToTransfer(mvs) } }
采集信息结构
type MetricValue struct { Endpoint string // 主机名 Metric string // 信息标识cpu.idle、mem.memtotal等 Value interface{} // 采集结果 Step int64 // 该项上报间隔 Type string // GAUGE或COUNTER Tags string // 配置报警策略 Timestamp int64 // 此次上报时间 }
采集信息组成metricValue结构
func NewMetricValue(metric string,val interface{},dataType string,tags ...string) *model.MetricValue { mv := model.MetricValue{ Metric: metric,Value: val,Type: dataType,} size := len(tags) if size > 0 { mv.Tags = strings.Join(tags,") } return &mv } // 原值类型 func GaugeValue(metric string,tags ...string) *model.MetricValue { return NewMetricValue(metric,val,"GAUGE",tags...) } // 计数器类型 func CounterValue(metric string,"COUNTER",tags...) }
rpc组件
// 简单封装rpc.Cilent type SingleConnRpcClient struct { sync.Mutex rpcClient *rpc.Client RpcServer string Timeout time.Duration } // 关闭rpc func (this *SingleConnRpcClient) close() { if this.rpcClient != nil { this.rpcClient.Close() this.rpcClient = nil } } // 保证rpc存在,为空则重新创建,如果server宕机,死循环???? func (this *SingleConnRpcClient) insureConn() { if this.rpcClient != nil { return } var err error var retry int = 1 for { if this.rpcClient != nil { return } // 根据timeout和server地址去连接rpc的server this.rpcClient,err = net.JsonRpcClient("tcp",this.RpcServer,this.Timeout) if err == nil { return } log.Printf("dial %s fail: %v",err) if retry > 6 { retry = 1 } time.Sleep(time.Duration(math.Pow(2.0,float64(retry))) * time.Second) retry++ } } // rpc client调用hbs函数 func (this *SingleConnRpcClient) Call(method string,args interface{},reply interface{}) error { // 加锁保证一个agent只与server有一个连接,保证性能 this.Lock() defer this.Unlock() // 保证rpc连接可用 this.insureConn() timeout := time.Duration(50 * time.Second) done := make(chan error) go func() { err := this.rpcClient.Call(method,args,reply) done <- err }() // 超时控制 select { case <-time.After(timeout): log.Printf("[WARN] rpc call timeout %v => %v",this.rpcClient,this.RpcServer) this.close() case err := <-done: if err != nil { this.close() return err } } return nil }
Transfer部件
// 定义transfer的rpcClient对应Map,transferClients读写锁 var ( TransferClientsLock *sync.RWMutex = new(sync.RWMutex) TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{} ) // 发送数据到随机的transfer func SendMetrics(metrics []*model.MetricValue,resp *model.TransferResponse) { rand.Seed(time.Now().UnixNano()) // 随机transferClient发送数据,直到发送成功 for _,i := range rand.Perm(len(Config().Transfer.Addrs)) { addr := Config().Transfer.Addrs[i] if _,ok := TransferClients[addr]; !ok { initTransferClient(addr) } if updateMetrics(addr,metrics,resp) { break } } } // 初始化addr对应的transferClient func initTransferClient(addr string) { TransferClientsLock.Lock() defer TransferClientsLock.Unlock() TransferClients[addr] = &SingleConnRpcClient{ RpcServer: addr,Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond,} } // 调用rpc接口发送metric func updateMetrics(addr string,metrics []*model.MetricValue,resp *model.TransferResponse) bool { TransferClientsLock.RLock() defer TransferClientsLock.RUnlock() err := TransferClients[addr].Call("Transfer.Update",resp) if err != nil { log.Println("call Transfer.Update fail",addr,err) return false } return true }
采集插件同步
// 插件信息: 路径、修改时间、运行周期(来自plugin插件) type Plugin struct { FilePath string MTime int64 Cycle int } // 插件map和调度器map var ( Plugins = make(map[string]*Plugin) PluginsWithScheduler = make(map[string]*PluginScheduler) ) // 删除不需要的plugin func DelNoUsePlugins(newPlugins map[string]*Plugin) { for currKey,currPlugin := range Plugins { newPlugin,ok := newPlugins[currKey] if !ok || currPlugin.MTime != newPlugin.MTime { deletePlugin(currKey) } } } // 添加同步时增加的plugin func AddNewPlugins(newPlugins map[string]*Plugin) { for fpath,newPlugin := range newPlugins { // 去除重复插件 if _,ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime { continue } // 为新添加的插件新建调度器 Plugins[fpath] = newPlugin sch := NewPluginScheduler(newPlugin) PluginsWithScheduler[fpath] = sch // 启动plugin调度 sch.Schedule() } } func ClearAllPlugins() { for k := range Plugins { deletePlugin(k) } } func deletePlugin(key string) { v,ok := PluginsWithScheduler[key] if ok { // 暂停调度plugin v.Stop() delete(PluginsWithScheduler,key) } delete(Plugins,key) }
插件调度策略
// 持续间隔执行plugin type PluginScheduler struct { Ticker *time.Ticker Plugin *Plugin Quit chan struct{} } // 根据plugin创建新的schedule func NewPluginScheduler(p *Plugin) *PluginScheduler { scheduler := PluginScheduler{Plugin: p} scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second) scheduler.Quit = make(chan struct{}) return &scheduler } // plugin调度,间隔执行PluginRun,除非收到quit消息 func (this *PluginScheduler) Schedule() { go func() { for { select { case <-this.Ticker.C: PluginRun(this.Plugin) case <-this.Quit: this.Ticker.Stop() return } } }() } // 停止plugin调度 func (this *PluginScheduler) Stop() { close(this.Quit) } // 执行插件,读取插件运行返回数据并上报transfer func PluginRun(plugin *Plugin) { timeout := plugin.Cycle*1000 - 500 fpath := filepath.Join(g.Config().Plugin.Dir,plugin.FilePath) if !file.IsExist(fpath) { log.Println("no such plugin:",fpath) return } debug := g.Config().Debug if debug { log.Println(fpath,"running...") } cmd := exec.Command(fpath) var stdout bytes.Buffer cmd.Stdout = &stdout var stderr bytes.Buffer cmd.Stderr = &stderr cmd.Start() err,isTimeout := sys.CmdRunWithTimeout(cmd,time.Duration(timeout)*time.Millisecond) errStr := stderr.String() if errStr != "" { logFile := filepath.Join(g.Config().Plugin.LogDir,plugin.FilePath+".stderr.log") if _,err = file.WriteString(logFile,errStr); err != nil { log.Printf("[ERROR] write log to %s fail,error: %s\n",logFile,err) } } if isTimeout { // has be killed if err == nil && debug { log.Println("[INFO] timeout and kill process",fpath,"successfully") } if err != nil { log.Println("[ERROR] kill process","occur error:",err) } return } if err != nil { log.Println("[ERROR] exec plugin","fail. error:",err) return } // exec successfully data := stdout.Bytes() if len(data) == 0 { if debug { log.Println("[DEBUG] stdout of","is blank") } return } var metrics []*model.MetricValue err = json.Unmarshal(data,&metrics) if err != nil { log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n",stdout.String()) return } g.SendToTransfer(metrics) }