Understanding Synchronization Primitives in Go Programming
Go provides a rich set of synchronization primitives in the sync
package for coordinating goroutines and managing concurrent access to shared resources. This guide covers everything you need to know about using synchronization primitives effectively.
WaitGroup
Basic WaitGroup Usage
func processItems(items []int) {
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
go func(i int) {
defer wg.Done()
processItem(i)
}(item)
}
wg.Wait()
}
// With error handling
func processItemsWithErrors(items []int) error {
var wg sync.WaitGroup
errCh := make(chan error, len(items))
for _, item := range items {
wg.Add(1)
go func(i int) {
defer wg.Done()
if err := processItem(i); err != nil {
errCh <- err
}
}(item)
}
wg.Wait()
close(errCh)
// Collect errors
var errs []error
for err := range errCh {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("multiple errors: %v", errs)
}
return nil
}
Once
Basic Once Usage
type Singleton struct {
once sync.Once
instance *Instance
}
func (s *Singleton) getInstance() *Instance {
s.once.Do(func() {
s.instance = &Instance{}
})
return s.instance
}
// With error handling
type SafeSingleton struct {
once sync.Once
instance *Instance
err error
}
func (s *SafeSingleton) getInstance() (*Instance, error) {
s.once.Do(func() {
var err error
s.instance, err = createInstance()
if err != nil {
s.err = err
}
})
return s.instance, s.err
}
Pool
Basic Pool Usage
type Buffer struct {
pool sync.Pool
}
func NewBuffer() *Buffer {
return &Buffer{
pool: sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}
func (b *Buffer) Get() *bytes.Buffer {
return b.pool.Get().(*bytes.Buffer)
}
func (b *Buffer) Put(buf *bytes.Buffer) {
buf.Reset()
b.pool.Put(buf)
}
// Usage
func processData(data []byte) error {
buf := bufferPool.Get()
defer bufferPool.Put(buf)
// Use buffer
buf.Write(data)
return process(buf)
}
Map
Basic Map Usage
type Cache struct {
sync.Map
}
func (c *Cache) Set(key string, value interface{}) {
c.Store(key, value)
}
func (c *Cache) Get(key string) (interface{}, bool) {
return c.Load(key)
}
func (c *Cache) Delete(key string) {
c.Delete(key)
}
// With type safety
type TypedCache struct {
sync.Map
}
func (c *TypedCache) Set(key string, value int) {
c.Store(key, value)
}
func (c *TypedCache) Get(key string) (int, bool) {
value, ok := c.Load(key)
if !ok {
return 0, false
}
return value.(int), true
}
Best Practices
1. WaitGroup Patterns
// Worker Pool with WaitGroup
type WorkerPool struct {
workers int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func (p *WorkerPool) Start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go func() {
defer p.wg.Done()
for job := range p.jobs {
result := job.Process()
p.results <- result
}
}()
}
}
func (p *WorkerPool) Stop() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}
2. Once Patterns
type Service struct {
startOnce sync.Once
stopOnce sync.Once
running atomic.Bool
}
func (s *Service) Start() error {
var err error
s.startOnce.Do(func() {
err = s.doStart()
if err == nil {
s.running.Store(true)
}
})
return err
}
func (s *Service) Stop() error {
var err error
s.stopOnce.Do(func() {
err = s.doStop()
s.running.Store(false)
})
return err
}
3. Pool Patterns
type ResourcePool struct {
pool sync.Pool
maxSize int
}
func NewResourcePool(maxSize int) *ResourcePool {
return &ResourcePool{
pool: sync.Pool{
New: func() interface{} {
return &Resource{
buffer: make([]byte, 0, maxSize),
}
},
},
maxSize: maxSize,
}
}
func (p *ResourcePool) Get() *Resource {
r := p.pool.Get().(*Resource)
r.buffer = r.buffer[:0] // Reset but keep capacity
return r
}
func (p *ResourcePool) Put(r *Resource) {
if cap(r.buffer) <= p.maxSize {
p.pool.Put(r)
}
// Drop oversized resources
}
Common Patterns
1. Concurrent Map Operations
type ConcurrentCache struct {
sync.Map
stats atomic.Int64
}
func (c *ConcurrentCache) LoadOrCompute(key string, compute func() interface{}) interface{} {
// Try loading existing value
if value, ok := c.Load(key); ok {
return value
}
// Compute new value
value := compute()
// Store if not exists
actual, loaded := c.LoadOrStore(key, value)
if !loaded {
c.stats.Add(1)
return value
}
return actual
}
func (c *ConcurrentCache) Range(f func(key string, value interface{}) bool) {
c.Map.Range(func(key, value interface{}) bool {
return f(key.(string), value)
})
}
2. Resource Management
type ResourceManager struct {
resources sync.Map
wg sync.WaitGroup
once sync.Once
done chan struct{}
}
func (rm *ResourceManager) Start() {
rm.once.Do(func() {
rm.done = make(chan struct{})
rm.wg.Add(1)
go rm.monitor()
})
}
func (rm *ResourceManager) Stop() {
close(rm.done)
rm.wg.Wait()
}
func (rm *ResourceManager) monitor() {
defer rm.wg.Done()
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-rm.done:
return
case <-ticker.C:
rm.cleanup()
}
}
}
func (rm *ResourceManager) cleanup() {
rm.resources.Range(func(key, value interface{}) bool {
resource := value.(*Resource)
if resource.isExpired() {
rm.resources.Delete(key)
}
return true
})
}
3. Batch Processing
type Batch struct {
size int
timeout time.Duration
items []interface{}
mu sync.Mutex
notify chan struct{}
}
func (b *Batch) Add(item interface{}) {
b.mu.Lock()
b.items = append(b.items, item)
size := len(b.items)
b.mu.Unlock()
if size >= b.size {
b.notify <- struct{}{}
}
}
func (b *Batch) Process() {
timer := time.NewTimer(b.timeout)
defer timer.Stop()
for {
select {
case <-b.notify:
b.processBatch()
case <-timer.C:
b.processBatch()
timer.Reset(b.timeout)
}
}
}
func (b *Batch) processBatch() {
b.mu.Lock()
if len(b.items) == 0 {
b.mu.Unlock()
return
}
items := b.items
b.items = make([]interface{}, 0, b.size)
b.mu.Unlock()
// Process items
processBatchItems(items)
}
Performance Considerations
1. Pool Sizing
// Bad: Fixed size pool
var pool = sync.Pool{
New: func() interface{} {
return make([]byte, 4096)
},
}
// Good: Adaptive sizing
type AdaptivePool struct {
pools []*sync.Pool
sizes []int
}
func NewAdaptivePool(sizes []int) *AdaptivePool {
pools := make([]*sync.Pool, len(sizes))
for i, size := range sizes {
size := size // Capture for closure
pools[i] = &sync.Pool{
New: func() interface{} {
return make([]byte, size)
},
}
}
return &AdaptivePool{pools: pools, sizes: sizes}
}
func (p *AdaptivePool) Get(size int) []byte {
// Find smallest buffer that fits
for i, poolSize := range p.sizes {
if poolSize >= size {
return p.pools[i].Get().([]byte)
}
}
// Fallback to allocating
return make([]byte, size)
}
2. Map Access Patterns
// Bad: Frequent LoadOrStore
type BadCache struct {
sync.Map
}
func (c *BadCache) Increment(key string) {
for {
value, _ := c.LoadOrStore(key, 1)
if value == nil {
return
}
if c.CompareAndSwap(key, value, value.(int)+1) {
return
}
}
}
// Good: Use Load then Store
type GoodCache struct {
sync.Map
}
func (c *GoodCache) Increment(key string) {
if value, ok := c.Load(key); ok {
c.Store(key, value.(int)+1)
return
}
c.Store(key, 1)
}
Common Mistakes
1. WaitGroup Misuse
// Wrong: WaitGroup race condition
func wrong(items []int) {
var wg sync.WaitGroup
for _, item := range items {
go func() {
wg.Add(1) // Race condition
defer wg.Done()
process(item)
}()
}
wg.Wait()
}
// Right: Add before goroutine
func right(items []int) {
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
go func(i int) {
defer wg.Done()
process(i)
}(item)
}
wg.Wait()
}
2. Pool Misuse
// Wrong: Storing pointers to pooled objects
type Wrong struct {
pool sync.Pool
refs []*[]byte
}
func (w *Wrong) GetRef() {
buf := w.pool.Get().([]byte)
w.refs = append(w.refs, &buf) // Dangerous!
w.pool.Put(buf)
}
// Right: Copy pooled objects
type Right struct {
pool sync.Pool
data [][]byte
}
func (r *Right) GetCopy() {
buf := r.pool.Get().([]byte)
copy := make([]byte, len(buf))
copy(copy, buf)
r.data = append(r.data, copy)
r.pool.Put(buf)
}
3. Once Misuse
// Wrong: Multiple initialization attempts
type Wrong struct {
once sync.Once
err error
}
func (w *Wrong) Init() error {
w.once.Do(func() {
w.err = initialize()
})
if w.err != nil {
w.once = sync.Once{} // Don't do this!
return w.err
}
return nil
}
// Right: Handle initialization error
type Right struct {
once sync.Once
err error
initialized atomic.Bool
}
func (r *Right) Init() error {
r.once.Do(func() {
r.err = initialize()
if r.err == nil {
r.initialized.Store(true)
}
})
if !r.initialized.Load() {
return r.err
}
return nil
}
Next Steps
- Learn about mutexes
- Explore channels
- Study atomic operations
- Practice with worker pools