1. go
  2. /concurrency
  3. /sync

Understanding Synchronization Primitives in Go Programming

Go provides a rich set of synchronization primitives in the sync package for coordinating goroutines and managing concurrent access to shared resources. This guide covers everything you need to know about using synchronization primitives effectively.

WaitGroup

Basic WaitGroup Usage

func processItems(items []int) {
    var wg sync.WaitGroup
    
    for _, item := range items {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            processItem(i)
        }(item)
    }
    
    wg.Wait()
}

// With error handling
func processItemsWithErrors(items []int) error {
    var wg sync.WaitGroup
    errCh := make(chan error, len(items))
    
    for _, item := range items {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            if err := processItem(i); err != nil {
                errCh <- err
            }
        }(item)
    }
    
    wg.Wait()
    close(errCh)
    
    // Collect errors
    var errs []error
    for err := range errCh {
        errs = append(errs, err)
    }
    
    if len(errs) > 0 {
        return fmt.Errorf("multiple errors: %v", errs)
    }
    return nil
}

Once

Basic Once Usage

type Singleton struct {
    once sync.Once
    instance *Instance
}

func (s *Singleton) getInstance() *Instance {
    s.once.Do(func() {
        s.instance = &Instance{}
    })
    return s.instance
}

// With error handling
type SafeSingleton struct {
    once sync.Once
    instance *Instance
    err     error
}

func (s *SafeSingleton) getInstance() (*Instance, error) {
    s.once.Do(func() {
        var err error
        s.instance, err = createInstance()
        if err != nil {
            s.err = err
        }
    })
    return s.instance, s.err
}

Pool

Basic Pool Usage

type Buffer struct {
    pool sync.Pool
}

func NewBuffer() *Buffer {
    return &Buffer{
        pool: sync.Pool{
            New: func() interface{} {
                return new(bytes.Buffer)
            },
        },
    }
}

func (b *Buffer) Get() *bytes.Buffer {
    return b.pool.Get().(*bytes.Buffer)
}

func (b *Buffer) Put(buf *bytes.Buffer) {
    buf.Reset()
    b.pool.Put(buf)
}

// Usage
func processData(data []byte) error {
    buf := bufferPool.Get()
    defer bufferPool.Put(buf)
    
    // Use buffer
    buf.Write(data)
    return process(buf)
}

Map

Basic Map Usage

type Cache struct {
    sync.Map
}

func (c *Cache) Set(key string, value interface{}) {
    c.Store(key, value)
}

func (c *Cache) Get(key string) (interface{}, bool) {
    return c.Load(key)
}

func (c *Cache) Delete(key string) {
    c.Delete(key)
}

// With type safety
type TypedCache struct {
    sync.Map
}

func (c *TypedCache) Set(key string, value int) {
    c.Store(key, value)
}

func (c *TypedCache) Get(key string) (int, bool) {
    value, ok := c.Load(key)
    if !ok {
        return 0, false
    }
    return value.(int), true
}

Best Practices

1. WaitGroup Patterns

// Worker Pool with WaitGroup
type WorkerPool struct {
    workers int
    jobs    chan Job
    results chan Result
    wg      sync.WaitGroup
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.workers; i++ {
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
            for job := range p.jobs {
                result := job.Process()
                p.results <- result
            }
        }()
    }
}

func (p *WorkerPool) Stop() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

2. Once Patterns

type Service struct {
    startOnce sync.Once
    stopOnce  sync.Once
    running   atomic.Bool
}

func (s *Service) Start() error {
    var err error
    s.startOnce.Do(func() {
        err = s.doStart()
        if err == nil {
            s.running.Store(true)
        }
    })
    return err
}

func (s *Service) Stop() error {
    var err error
    s.stopOnce.Do(func() {
        err = s.doStop()
        s.running.Store(false)
    })
    return err
}

3. Pool Patterns

type ResourcePool struct {
    pool sync.Pool
    maxSize int
}

func NewResourcePool(maxSize int) *ResourcePool {
    return &ResourcePool{
        pool: sync.Pool{
            New: func() interface{} {
                return &Resource{
                    buffer: make([]byte, 0, maxSize),
                }
            },
        },
        maxSize: maxSize,
    }
}

func (p *ResourcePool) Get() *Resource {
    r := p.pool.Get().(*Resource)
    r.buffer = r.buffer[:0] // Reset but keep capacity
    return r
}

func (p *ResourcePool) Put(r *Resource) {
    if cap(r.buffer) <= p.maxSize {
        p.pool.Put(r)
    }
    // Drop oversized resources
}

Common Patterns

1. Concurrent Map Operations

type ConcurrentCache struct {
    sync.Map
    stats atomic.Int64
}

func (c *ConcurrentCache) LoadOrCompute(key string, compute func() interface{}) interface{} {
    // Try loading existing value
    if value, ok := c.Load(key); ok {
        return value
    }
    
    // Compute new value
    value := compute()
    
    // Store if not exists
    actual, loaded := c.LoadOrStore(key, value)
    if !loaded {
        c.stats.Add(1)
        return value
    }
    return actual
}

func (c *ConcurrentCache) Range(f func(key string, value interface{}) bool) {
    c.Map.Range(func(key, value interface{}) bool {
        return f(key.(string), value)
    })
}

2. Resource Management

type ResourceManager struct {
    resources sync.Map
    wg        sync.WaitGroup
    once      sync.Once
    done      chan struct{}
}

func (rm *ResourceManager) Start() {
    rm.once.Do(func() {
        rm.done = make(chan struct{})
        rm.wg.Add(1)
        go rm.monitor()
    })
}

func (rm *ResourceManager) Stop() {
    close(rm.done)
    rm.wg.Wait()
}

func (rm *ResourceManager) monitor() {
    defer rm.wg.Done()
    ticker := time.NewTicker(time.Minute)
    defer ticker.Stop()
    
    for {
        select {
        case <-rm.done:
            return
        case <-ticker.C:
            rm.cleanup()
        }
    }
}

func (rm *ResourceManager) cleanup() {
    rm.resources.Range(func(key, value interface{}) bool {
        resource := value.(*Resource)
        if resource.isExpired() {
            rm.resources.Delete(key)
        }
        return true
    })
}

3. Batch Processing

type Batch struct {
    size    int
    timeout time.Duration
    items   []interface{}
    mu      sync.Mutex
    notify  chan struct{}
}

func (b *Batch) Add(item interface{}) {
    b.mu.Lock()
    b.items = append(b.items, item)
    size := len(b.items)
    b.mu.Unlock()
    
    if size >= b.size {
        b.notify <- struct{}{}
    }
}

func (b *Batch) Process() {
    timer := time.NewTimer(b.timeout)
    defer timer.Stop()
    
    for {
        select {
        case <-b.notify:
            b.processBatch()
        case <-timer.C:
            b.processBatch()
            timer.Reset(b.timeout)
        }
    }
}

func (b *Batch) processBatch() {
    b.mu.Lock()
    if len(b.items) == 0 {
        b.mu.Unlock()
        return
    }
    
    items := b.items
    b.items = make([]interface{}, 0, b.size)
    b.mu.Unlock()
    
    // Process items
    processBatchItems(items)
}

Performance Considerations

1. Pool Sizing

// Bad: Fixed size pool
var pool = sync.Pool{
    New: func() interface{} {
        return make([]byte, 4096)
    },
}

// Good: Adaptive sizing
type AdaptivePool struct {
    pools []*sync.Pool
    sizes []int
}

func NewAdaptivePool(sizes []int) *AdaptivePool {
    pools := make([]*sync.Pool, len(sizes))
    for i, size := range sizes {
        size := size // Capture for closure
        pools[i] = &sync.Pool{
            New: func() interface{} {
                return make([]byte, size)
            },
        }
    }
    return &AdaptivePool{pools: pools, sizes: sizes}
}

func (p *AdaptivePool) Get(size int) []byte {
    // Find smallest buffer that fits
    for i, poolSize := range p.sizes {
        if poolSize >= size {
            return p.pools[i].Get().([]byte)
        }
    }
    // Fallback to allocating
    return make([]byte, size)
}

2. Map Access Patterns

// Bad: Frequent LoadOrStore
type BadCache struct {
    sync.Map
}

func (c *BadCache) Increment(key string) {
    for {
        value, _ := c.LoadOrStore(key, 1)
        if value == nil {
            return
        }
        if c.CompareAndSwap(key, value, value.(int)+1) {
            return
        }
    }
}

// Good: Use Load then Store
type GoodCache struct {
    sync.Map
}

func (c *GoodCache) Increment(key string) {
    if value, ok := c.Load(key); ok {
        c.Store(key, value.(int)+1)
        return
    }
    c.Store(key, 1)
}

Common Mistakes

1. WaitGroup Misuse

// Wrong: WaitGroup race condition
func wrong(items []int) {
    var wg sync.WaitGroup
    for _, item := range items {
        go func() {
            wg.Add(1)  // Race condition
            defer wg.Done()
            process(item)
        }()
    }
    wg.Wait()
}

// Right: Add before goroutine
func right(items []int) {
    var wg sync.WaitGroup
    for _, item := range items {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            process(i)
        }(item)
    }
    wg.Wait()
}

2. Pool Misuse

// Wrong: Storing pointers to pooled objects
type Wrong struct {
    pool sync.Pool
    refs []*[]byte
}

func (w *Wrong) GetRef() {
    buf := w.pool.Get().([]byte)
    w.refs = append(w.refs, &buf)  // Dangerous!
    w.pool.Put(buf)
}

// Right: Copy pooled objects
type Right struct {
    pool sync.Pool
    data [][]byte
}

func (r *Right) GetCopy() {
    buf := r.pool.Get().([]byte)
    copy := make([]byte, len(buf))
    copy(copy, buf)
    r.data = append(r.data, copy)
    r.pool.Put(buf)
}

3. Once Misuse

// Wrong: Multiple initialization attempts
type Wrong struct {
    once sync.Once
    err  error
}

func (w *Wrong) Init() error {
    w.once.Do(func() {
        w.err = initialize()
    })
    if w.err != nil {
        w.once = sync.Once{}  // Don't do this!
        return w.err
    }
    return nil
}

// Right: Handle initialization error
type Right struct {
    once    sync.Once
    err     error
    initialized atomic.Bool
}

func (r *Right) Init() error {
    r.once.Do(func() {
        r.err = initialize()
        if r.err == nil {
            r.initialized.Store(true)
        }
    })
    if !r.initialized.Load() {
        return r.err
    }
    return nil
}

Next Steps

  1. Learn about mutexes
  2. Explore channels
  3. Study atomic operations
  4. Practice with worker pools

Additional Resources