package main import ( "flag" "github.com/optiopay/kafka" "log" "net/http" "net/http/pprof" "strings" "time" "ooxx/config" "ooxx/lib" "ooxx/model" ) const LOG_CHANNEL_COUNT = 200 const LOG_BUFFER_COUNT = 100 var debug = flag.String("debug","false","debug mode") var queue = make(chan []byte,LOG_CHANNEL_COUNT) var buffer = make([]string,LOG_BUFFER_COUNT) var ticker = time.NewTicker(4 * time.Second) func save_message() { if len(buffer) > 0 { tm := lib.TimeFormat() log := "stats_play_" + tm file := config.Config.StatsLogDir + log content := "" for _,v := range buffer { if v == "" { continue } content = content + v + "\n" } if content != "" { lib.FilePutContents2(file,content) buffer = buffer[0:0] } } } func push_message() { for { select { case c := <-queue: buffer = append(buffer,string(c)) case <-ticker.C: save_message() } } } func consume_flow_message(broker kafka.Client,topic string,partition int) { conf := kafka.NewConsumerConf(topic,int32(partition)) conf.StartOffset = kafka.StartOffsetNewest consumer,err := broker.Consumer(conf) if err != nil { log.Fatalf("cannot create kafka consumer for %s:%d: %s",topic,partition,err) } for { msg,err := consumer.Consume() if err != nil { if err != kafka.ErrNoData { log.Printf("cannot consume %s:%d message: %s",err) } break } switch partition { case config.Config.KafkaPartitionFlay: log.Printf("%s:%d,%d: %s",msg.Offset,msg.Value) case config.Config.KafkaPartitionShow: log.Printf("%s:%d,msg.Value) case config.Config.KafkaPartitionFlow: log.Printf("%s:%d,msg.Value) if len(msg.Value) > 0 { queue <- msg.Value } } } log.Print("consume_flow_message,consumer quit,%s:%d",partition) } func main() { defer func() { if err := recover(); err != nil { lib.P("panic:",err,"\nstack:"+lib.Stack(false)) } }() defer model.Db.Close() flag.Parse() go func() { profServeMux := http.NewServeMux() profServeMux.HandleFunc("/debug/pprof/",pprof.Index) profServeMux.HandleFunc("/debug/pprof/cmdline",pprof.Cmdline) profServeMux.HandleFunc("/debug/pprof/profile",pprof.Profile) profServeMux.HandleFunc("/debug/pprof/symbol",pprof.Symbol) err := http.ListenAndServe(":9527",profServeMux) if err != nil { panic(err) } }() var kafkaAddrs = strings.Split(config.Config.KafkaBrokers,",") var conf = kafka.NewBrokerConf("xktv") conf.DialTimeout = 1 * time.Second conf.DialRetryLimit = 1 broker,err := kafka.Dial(kafkaAddrs,conf) if err != nil { log.Fatalf("cannot connect to kafka cluster: %s",err) } defer broker.Close() go push_message() go consume_flow_message(broker,config.Config.KafkaTopicFlow,config.Config.KafkaPartitionFlay) go consume_flow_message(broker,config.Config.KafkaPartitionShow) consume_flow_message(broker,config.Config.KafkaPartitionFlow) }
优化:
使用bytes.Buffer,更高效。
func save_message() { if len(buffer) > 0 { tm := lib.TimeFormat() log := "stats_play_" + tm file := config.Config.StatsLogDir + log buf := bytes.Buffer{} for _,v := range buffer { if v == "" { continue } buf.WriteString(v) buf.WriteString("\n") } content := buf.String() if content != "" { lib.FilePutContents2(file,content) buffer = buffer[0:0] } } }原文链接:https://www.f2er.com/go/189718.html