Go 1.3 的sync包中加入一个新特性:Pool。官方文档可以看这里http://golang.org/pkg/sync/#Pool
这个类设计的目的是用来保存和复用临时对象,以减少内存分配,降低CG压力。
type Pool func (p *Pool) Get() interface{} func (p *Pool) Put(x interface{}) New func() interface{}
Get返回Pool中的任意一个对象。如果Pool为空,则调用New返回一个新创建的对象。如果没有设置New,则返回nil。
还有一个重要的特性是,放进Pool中的对象,会在说不准什么时候被回收掉。所以如果事先Put进去100个对象,下次Get的时候发现Pool是空也是有可能的。不过这个特性的一个好处就在于不用担心Pool会一直增长,因为Go已经帮你在Pool中做了回收机制。之前我用Channel实现过一个类似接口的Pool,看到这个官方版本之后果断就抛弃了。
下面说说Pool的实现:
1.定时清理
文档上说,保存在Pool中的对象会在没有任何通知的情况下被自动移除掉。实际上,这个清理过程是在每次垃圾回收之前做的。垃圾回收是固定两分钟触发一次。而且每次清理会将Pool中的所有对象都清理掉!(我在看源码之前还以为会按照使用频率清理一部分…)所以如果Pool中的对象数量很多也会拖慢垃圾回收的时间。
var ( allPoolsMu Mutex allPools []*Pool ) func poolCleanup() { // This function is called with the world stopped,at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Defensively zero out everything,2 reasons: // 1. To prevent false retention of whole Pools. // 2. If GC happens while a goroutine works with l.shared in Put/Get,// it will retain whole Pool. So next cycle memory consumption would be doubled. for i,p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local,i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.a = nil } } allPools = []*Pool{} } func init() { runtime_registerPoolCleanup(poolCleanup) }
有一个全局变量allPools保存了所有被创建出来的Pool对象,并注册了一个poolCleanup函数回调给runtime,这个函数将会在每次垃圾回收之前调用。
2.如何管理数据
先看看两个数据结构
type Pool struct { local unsafe.Pointer // local fixed-size per-P pool,actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} } // Local per-P Pool appendix. type poolLocal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. pad [128]byte // Prevents false sharing. }Pool是提供给外部使用的对象。其中的local成员的真实类型是一个poolLocal数组,localSize是数组长度。poolLocal是真正保存数据的地方。priveate保存了一个临时对象,shared是保存临时对象的数组。
为什么Pool中需要这么多poolLocal对象呢?实际上,Pool是给每个线程分配了一个poolLocal对象。也就是说local数组的长度,就是工作线程的数量(size := runtime.GOMAXPROCS(0))。当多线程在并发读写的时候,通常情况下都是在自己线程的poolLocal中存取数据。当自己线程的poolLocal中没有数据时,才会尝试加锁去其他线程的poolLocal中“偷”数据。
func (p *Pool) Get() interface{} { if raceenabled { if p.New != nil { return p.New() } return nil } l := p.pin() // 获取当前线程的poolLocal对象,也就是p.local[pid]。 x := l.private l.private = nil runtime_procUnpin() if x != nil { return x } l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x != nil { return x } return p.getSlow() }Pool.Get的时候,首先会在local数组中获取当前线程对应的poolLocal对象。如果private中有数据,则取出来直接返回。如果没有则先锁住shared,有数据则直接返回。
为什么这里要锁住。答案在getSlow中。因为当shared中没有数据的时候,会尝试去其他的poolLocal的shared中偷数据。
func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() for i := 0; i < int(size); i++ { l := indexLocal(local,(pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } if x == nil && p.New != nil { x = p.New() } return x }Go语言的goroutine虽然可以创建很多,但是真正能物理上并发运行的goroutine数量是有限的,是由runtime.GOMAXPROCS(0)设置的。所以这个Pool高效的设计的地方就在于将数据分散在了各个真正并发的线程中,每个线程优先从自己的poolLocal中获取数据,很大程度上降低了锁竞争。