为高度并发应用程序实现全局计数器的最佳方式是什么?在我的情况下,我可能有执行“工作”的10K-20K的例程,我想计算例程正在集合的项目的数量和类型…
“经典”同步编码风格将如下所示:
var work_counter int func GoWorkerRoutine() { for { // do work atomic.AddInt32(&work_counter,1) } }
现在这变得更复杂了,因为我想跟踪正在完成的工作的“类型”,所以我真的需要这样的东西:
var work_counter map[string]int var work_mux sync.Mutex func GoWorkerRoutine() { for { // do work work_mux.Lock() work_counter["type1"]++ work_mux.Unlock() } }
似乎应该有一个“去”优化的方式使用频道或类似的东西:
var work_counter int var work_chan chan int // make() called somewhere else (buffered) // started somewher else func GoCounterRoutine() { for { select { case c := <- work_chan: work_counter += c break } } } func GoWorkerRoutine() { for { // do work work_chan <- 1 } }
最后一个例子仍然缺少地图,但这很容易添加。这种风格是否比简单的原子增量提供更好的性能?当我们谈论对全局值的并发访问与可能阻止I / O完成的事情时,我无法确定这是多么复杂的
想法是赞赏
更新5/28/2013:
我测试了几个实现,结果不是我预期的,这里是我的计数器源代码:
package helpers import ( ) type CounterIncrementStruct struct { bucket string value int } type CounterQueryStruct struct { bucket string channel chan int } var counter map[string]int var counterIncrementChan chan CounterIncrementStruct var counterQueryChan chan CounterQueryStruct var counterListChan chan chan map[string]int func CounterInitialize() { counter = make(map[string]int) counterIncrementChan = make(chan CounterIncrementStruct,0) counterQueryChan = make(chan CounterQueryStruct,100) counterListChan = make(chan chan map[string]int,100) go goCounterWriter() } func goCounterWriter() { for { select { case ci := <- counterIncrementChan: if len(ci.bucket)==0 { return } counter[ci.bucket]+=ci.value break case cq := <- counterQueryChan: val,found:=counter[cq.bucket] if found { cq.channel <- val } else { cq.channel <- -1 } break case cl := <- counterListChan: nm := make(map[string]int) for k,v := range counter { nm[k] = v } cl <- nm break } } } func CounterIncrement(bucket string,counter int) { if len(bucket)==0 || counter==0 { return } counterIncrementChan <- CounterIncrementStruct{bucket,counter} } func CounterQuery(bucket string) int { if len(bucket)==0 { return -1 } reply := make(chan int) counterQueryChan <- CounterQueryStruct{bucket,reply} return <- reply } func CounterList() map[string]int { reply := make(chan map[string]int) counterListChan <- reply return <- reply }
它使用用于写入和读取的通道,这似乎是合乎逻辑的。
以下是我的测试用例:
func bcRoutine(b *testing.B,e chan bool) { for i := 0; i < b.N; i++ { CounterIncrement("abc123",5) CounterIncrement("def456",5) CounterIncrement("ghi789",5) CounterIncrement("abc123",5) } e<-true } func BenchmarkChannels(b *testing.B) { b.StopTimer() CounterInitialize() e:=make(chan bool) b.StartTimer() go bcRoutine(b,e) go bcRoutine(b,e) <-e <-e <-e <-e <-e } var mux sync.Mutex var m map[string]int func bmIncrement(bucket string,value int) { mux.Lock() m[bucket]+=value mux.Unlock() } func bmRoutine(b *testing.B,e chan bool) { for i := 0; i < b.N; i++ { bmIncrement("abc123",5) bmIncrement("def456",5) bmIncrement("ghi789",5) bmIncrement("abc123",5) } e<-true } func BenchmarkMutex(b *testing.B) { b.StopTimer() m=make(map[string]int) e:=make(chan bool) b.StartTimer() for i := 0; i < b.N; i++ { bmIncrement("abc123",5) } go bmRoutine(b,e) go bmRoutine(b,e) <-e <-e <-e <-e <-e }
我实现了一个简单的基准测试,只需要在地图上显示一个互斥体(只是测试写入),并以5个并行运行的goroutine为基准。结果如下:
$ go test --bench=. helpers PASS BenchmarkChannels 100000 15560 ns/op BenchmarkMutex 1000000 2669 ns/op ok helpers 4.452s
我不会期望互斥体要快得多
进一步的想法?
不要使用
sync/atomic – 从链接的页面
Package atomic provides low-level atomic memory primitives useful for
implementing synchronization algorithms.
These functions require great care to be used correctly. Except for
special,low-level applications,synchronization is better done with
channels or the facilities of the sync package
Last time I had to do this我把一些看起来像你的第二个例子的互斥体和某些东西看起来像是一个频道的第三个例子。当事情真的很忙时,频道代码就会赢,但请确保您的频道缓冲区很大。