【原创】packetbeat 之“协议数据包分析每次输出结果均不同”问题

前端之家收集整理的这篇文章主要介绍了【原创】packetbeat 之“协议数据包分析每次输出结果均不同”问题前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

问题描述

通过 packetbeat 可执行程序进行 redis 协议数据包分析的输出结果每次都有所不同;

测试命令为

  1. # ./packetbeat -c ./packetbeat.yml -e -I redis_xg-bjdev-rediscluster-2_prot-7101_20161222110723_20161222110733.pcap -E packetbeat.protocols.redis.ports=7101 -t

输出结果如下

  1. ... // 第一次
  2. 2017/01/12 04:18:56.634130 logp.go:245: INFO Total non-zero values: libbeat.publisher.published_events=13859 redis.unmatched_responses=23 tcp.dropped_because_of_gaps=4
  3. 2017/01/12 04:18:56.634143 logp.go:246: INFO Uptime: 1.210802009s
  4. ... // 第二次
  5. 2017/01/12 04:21:10.543460 logp.go:245: INFO Total non-zero values: redis.unmatched_responses=23 libbeat.publisher.published_events=14619 tcp.dropped_because_of_gaps=4
  6. 2017/01/12 04:21:10.543478 logp.go:246: INFO Uptime: 1.216717998s
  7. ... // 第三次
  8. 2017/01/12 04:22:49.583149 logp.go:245: INFO Total non-zero values: libbeat.publisher.published_events=15006 tcp.dropped_because_of_gaps=4 redis.unmatched_responses=23
  9. 2017/01/12 04:22:49.583160 logp.go:246: INFO Uptime: 1.628717709s

从上面的输出中可以看到:

  • 抓包文件中共有 72257 个数据包;
  • packetbeat 统计出的 "published_events" 三次分别为:138591461915006

能够确认的是:

  • published_events 的数值和 logs/ 目录下生成文件中的数据行数是一致的;
  • sniffer.go 计算得到的数据包总数和通过 capinfos 命令计算得到的数据包数量是一致的;

源码分析

针对如下每次发生变化的输出日志,进行代码反查:

  1. 2017/01/12 04:18:56.634130 logp.go:245: INFO Total non-zero values: libbeat.publisher.published_events=13859 redis.unmatched_responses=23 tcp.dropped_because_of_gaps=4
  2. 2017/01/12 04:18:56.634143 logp.go:246: INFO Uptime: 1.210802009s

logp.go

  1. func LogTotalExpvars(cfg *Logging) {
  2. if cfg.Metrics.Enabled != nil && *cfg.Metrics.Enabled == false {
  3. return
  4. }
  5. vals := map[string]int64{}
  6. prevVals := map[string]int64{}
  7. // 将注册到 expvar 中的全部 Int 类型内容保存到 vals 中
  8. snapshotExpvars(vals)
  9. // 构建“从开始运行到结束运行”的整个时间段内
  10. // 所有 Int 类型 expvar 变量的 delta 差值字符串
  11. metrics := buildMetricsOutput(prevVals,vals)
  12. // 输出“问题”打印
  13. Info("Total non-zero values: %s",metrics)
  14. // 输出“从开始运行到结束运行”的时间长度
  15. Info("Uptime: %s",time.Now().Sub(startTime))
  16. }

小结:输出结果正确的体现了“从开始到结束”的差值计算;

beat.go

  1. func (b *Beat) launch(bt Creator) error {
  2. ...
  3. // 标识 packetbeat 开始运行
  4. logp.Info("%s start running.",b.Name)
  5. // 标识 packetbeat 结束运行
  6. defer logp.Info("%s stopped.",b.Name)
  7. // 在结束运行之前,输出当前基于 expvar 记录
  8. // 的 metrics 变化值
  9. defer logp.LogTotalExpvars(&b.Config.Logging)
  10.  
  11. return beater.Run(b)
  12. }

小结:在运行结束前,将基于 expvar 维护的全局计数值进行计算输出

推演:如果输出过程没有问题,那么只能是计算过程出了问题;

client.go

  1. ...
  2. // Metrics that can retrieved through the expvar web interface.
  3. // 用于计算 publish_events 值的 expvar 变量
  4. var (
  5. publishedEvents = expvar.NewInt("libbeat.publisher.published_events")
  6. )
  7. ...
  8. func (c *client) PublishEvent(event common.MapStr,opts ...ClientOption) bool {
  9. // 向 event 中添加自定义字段内容
  10. c.annotateEvent(event)
  11.  
  12. // 基于配置的 Processors 进行定制化 event 过滤
  13. // 由于我没有配置这个,因为不会有 event 被过滤掉
  14. publishEvent := c.filterEvent(event)
  15. if publishEvent == nil {
  16. return false
  17. }
  18.  
  19. // 根据配置获取一种投递 event 的管道
  20. ctx,pipeline := c.getPipeline(opts)
  21. // 将 publish_events 统计变量 +1
  22. publishedEvents.Add(1)
  23. // 将 event 封装成 message 投递到管道中
  24. return pipeline.publish(message{
  25. client: c,context: ctx,datum: outputs.Data{Event: *publishEvent},})
  26. }
  27. ...
  28. func (c *client) PublishEvents(events []common.MapStr,opts ...ClientOption) bool {
  29. data := make([]outputs.Data,len(events))
  30. // 针对 N 个 event 的循环处理
  31. for _,event := range events {
  32. c.annotateEvent(event)
  33.  
  34. publishEvent := c.filterEvent(event)
  35. if publishEvent != nil {
  36. data = append(data,outputs.Data{Event: *publishEvent})
  37. }
  38. }
  39.  
  40. ctx,pipeline := c.getPipeline(opts)
  41. if len(data) == 0 {
  42. logp.Debug("filter","No events to publish")
  43. return true
  44. }
  45.  
  46. // 将 publish_events 变量 +N
  47. publishedEvents.Add(int64(len(data)))
  48. return pipeline.publish(message{client: c,data: data})
  49. }
  50. ...

小结:针对每个 event 都进行了 +1 操作;

那么谁调用PublishEventPublishEvents 呢?

publish.go

  1. ...
  2. func (p *PacketbeatPublisher) onTransaction(event common.MapStr) {
  3. // 确认 event 的有效性,即特定字段校验
  4. if err := validateEvent(event); err != nil {
  5. logp.Warn("Dropping invalid event: %v",err)
  6. return
  7. }
  8.  
  9. // 针对 event 中的地址信息进行统一化处理
  10. if !p.normalizeTransAddr(event) {
  11. return
  12. }
  13.  
  14. // 将 event 发布到管道中
  15. p.client.PublishEvent(event)
  16. }
  17.  
  18. func (p *PacketbeatPublisher) onFlow(events []common.MapStr) {
  19. pub := events[:0]
  20. // 循环处理 N 个 event
  21. for _,event := range events {
  22. if err := validateEvent(event); err != nil {
  23. logp.Warn("Dropping invalid event: %v",err)
  24. continue
  25. }
  26.  
  27. if !p.addGeoIPToFlow(event) {
  28. continue
  29. }
  30.  
  31. pub = append(pub,event)
  32. }
  33.  
  34. p.client.PublishEvents(pub)
  35. }
  36. ...

小结:上述代码没进行任何特别处理;

  1. func (p *PacketbeatPublisher) Start() {
  2. p.wg.Add(1)
  3. go func() {
  4. defer p.wg.Done()
  5. for {
  6. select {
  7. case <-p.done:
  8. return
  9. // 从名为 trans 的 channel 获取一个 event
  10. case event := <-p.trans:
  11. p.onTransaction(event)
  12. }
  13. }
  14. }()
  15.  
  16. p.wg.Add(1)
  17. go func() {
  18. defer p.wg.Done()
  19. for {
  20. select {
  21. case <-p.done:
  22. return
  23. // 从名为 flows 的 channel 获取 N 个 event
  24. case events := <-p.flows:
  25. p.onFlow(events)
  26. }
  27. }
  28. }()
  29. }

小结:这里似乎可能会出问题,因为只要是基于 channel 传递内容,就无法避免 buffer 长度的问题;

相应代码如下

  1. type PacketbeatPublisher struct {
  2. ....
  3. trans chan common.MapStr
  4. flows chan []common.MapStr
  5. }
  6. ...
  7. func NewPublisher(
  8. pub publisher.Publisher,hwm,bulkHWM int,ignoreOutgoing bool,) (*PacketbeatPublisher,error) {
  9. ...
  10. return &PacketbeatPublisher{
  11. pub: pub,topo: topo,geoLite: topo.GeoLite(),ignoreOutgoing: ignoreOutgoing,client: pub.Connect(),done: make(chan struct{}),// trans channel 的 buffer 长度为 hwm
  12. trans: make(chan common.MapStr,hwm),// flows channel 的 buffer 长度为 bulkHWM
  13. flows: make(chan []common.MapStr,bulkHWM),},nil
  14. }

packetbeat.go

  1. // init packetbeat components
  2. func (pb *packetbeat) init(b *beat.Beat) error {
  3. ...
  4. // This is required as init Beat is called before the beat publisher is initialised
  5. b.Config.Shipper.InitShipperConfig()
  6. // hwm 即 QueueSize 的值;
  7. // bulkHWM 即 BulkQueueSize 的值;
  8. pb.pub,err = publish.NewPublisher(b.Publisher,*b.Config.Shipper.QueueSize,*b.Config.Shipper.BulkQueueSize,pb.config.IgnoreOutgoing)
  9. if err != nil {
  10. return fmt.Errorf("Initializing publisher Failed: %v",err)
  11. }
  12. ...
  13. }

publisher/publish.go 中可以看到相应定义

  1. type ShipperConfig struct {
  2. ...
  3. // internal publisher queue sizes
  4. QueueSize *int `config:"queue_size"`
  5. BulkQueueSize *int `config:"bulk_queue_size"`
  6. ...
  7. }
  8. ...
  9. // 默认值
  10. const (
  11. DefaultQueueSize = 1000
  12. DefaultBulkQueueSize = 0
  13. )
  14. ...
  15. // 初始化函数
  16. func (config *ShipperConfig) InitShipperConfig() {
  17.  
  18. // TODO: replace by ucfg
  19. // QueueSize 的值在 packetbeat.yml 中定义
  20. if config.QueueSize == nil || *config.QueueSize <= 0 {
  21. queueSize := DefaultQueueSize
  22. config.QueueSize = &queueSize
  23. }
  24.  
  25. if config.BulkQueueSize == nil || *config.BulkQueueSize < 0 {
  26. bulkQueueSize := DefaultBulkQueueSize
  27. config.BulkQueueSize = &bulkQueueSize
  28. }
  29. }

在搞清楚了配置位置后,就剩下最后一个问题,trans 和 flows channel 中的消息来自哪里;

  1. func (p *PacketbeatPublisher) PublishTransaction(event common.MapStr) bool {
  2. select {
  3. case p.trans <- event:
  4. return true
  5. default:
  6. // drop event if queue is full
  7. // 这个注释说明很关键:如果 queue 满了,event 会被丢弃
  8. return false
  9. }
  10. }
  11.  
  12. func (p *PacketbeatPublisher) PublishFlows(event []common.MapStr) bool {
  13. select {
  14. case p.flows <- event:
  15. return true
  16. case <-p.done:
  17. // drop event,if worker has been stopped
  18. return false
  19. }
  20. }

redis.go

  1. // 将 request 和 response 进行关联
  2. func (redis *redisPlugin) correlate(conn *redisConnectionData) {
  3. // drop responses with missing requests
  4. if conn.requests.empty() {
  5. for !conn.responses.empty() {
  6. debugf("Response from unknown transaction. Ignoring")
  7. unmatchedResponses.Add(1)
  8. conn.responses.pop()
  9. }
  10. return
  11. }
  12.  
  13. // merge requests with responses into transactions
  14. for !conn.responses.empty() && !conn.requests.empty() {
  15. requ := conn.requests.pop()
  16. resp := conn.responses.pop()
  17.  
  18. if redis.results != nil {
  19. // 构建 transaction 消息内容(JSON 格式)
  20. event := redis.newTransaction(requ,resp)
  21. // 将 event 发送到 trans channel 中
  22. redis.results.PublishTransaction(event)
  23. }
  24. }
  25. }

小结:从上面的 PublishTransactionPublishFlows 函数实现中能够看到:导致输出结果每次不同的原因就是由于存在 event 被 drop 掉的问题;

至此,消息处理流程梳理完毕:

  • redis 模块协议解析完成后,进行 request-response 配对,构成 transaction 后(即 event)发布到 trans 或 flows channel 中;
  • event 从 channel 中获取出来,经过一系列判定和封装(构建为 message),再发送到 pipeline 中(试验发现使用的是 async pipeline);
  • 发送到 pipeline 中的 message 会以广播的方式发送给该 pipeline 下关联的每一个 worker ;在代码层面 messageWorker 实现了 worker 这个 interface ,因此 message 实际是被发送到了 messageWorker 名为 queue 和 bulkQueue 的 channel 中;
  • 作为 goroutine 运行的 messageWorker 不断从上述 channel 取走 message 并触发初始化时注册到 handler 上的 onMessage 回调函数;经确认,初始化过程中注册到 handler 上的为 outputWorker ;
  • outputWorker 在拿到 message 后,会根据在 packetbeat.yml 中配置的 output 进行处理;在实际配置中,我只配置了 file 这个 output ,因此最终内容会写入磁盘文件(可配置的 output 包括:console,file,Redis,Kafka,logstash 和 Elasticsearch);

问题原因

  • 考虑到性能原因,官方默认配置 queue_size1000 ;需要注意的是,该值对应了 packetbeat 内部多种 channel 的 buffer 长度;如果你要处理的 pcap 文件中包数量非常多,则需要根据实际情况调大该值;
  • 另外,还需要给 packetbeat 预留出足够长点包分析时间,否则可能出现尚未完成全部包的分析,就进入退出过程的情况;

解决办法

  • 将 packetbeat.yml 中的 queue_size 值按需要调大;
  • 使用 -waitstop=N 增加 packetbeat 等待文件分析结束的时间;

其他

  • 在官方论坛上的讨论
  • 给官方提的 issue

猜你在找的Go相关文章