1. go
  2. /concurrency
  3. /worker-pools

Understanding Worker Pools in Go Programming

Worker pools are a common pattern in Go for managing concurrent task processing with a fixed number of workers. This guide covers everything you need to know about implementing and using worker pools effectively.

Worker Pool Basics

Basic Worker Pool

type Job struct {
    ID   int
    Data interface{}
}

type Result struct {
    JobID int
    Data  interface{}
    Error error
}

func worker(id int, jobs <-chan Job, results chan<- Result) {
    for job := range jobs {
        // Process job
        result := Result{
            JobID: job.ID,
            Data:  process(job.Data),
        }
        results <- result
    }
}

// Usage
func main() {
    const numWorkers = 3
    jobs := make(chan Job, numWorkers)
    results := make(chan Result, numWorkers)
    
    // Start workers
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }
    
    // Send jobs
    for j := 1; j <= 5; j++ {
        jobs <- Job{ID: j, Data: fmt.Sprintf("job-%d", j)}
    }
    close(jobs)
    
    // Collect results
    for a := 1; a <= 5; a++ {
        result := <-results
        fmt.Printf("Result: %+v\n", result)
    }
}

Worker Pool with WaitGroup

type WorkerPool struct {
    numWorkers int
    jobs       chan Job
    results    chan Result
    wg         sync.WaitGroup
}

func NewWorkerPool(numWorkers int) *WorkerPool {
    return &WorkerPool{
        numWorkers: numWorkers,
        jobs:       make(chan Job, numWorkers),
        results:    make(chan Result, numWorkers),
    }
}

func (p *WorkerPool) Start() {
    for i := 0; i < p.numWorkers; i++ {
        p.wg.Add(1)
        go func(workerID int) {
            defer p.wg.Done()
            for job := range p.jobs {
                result := Result{
                    JobID: job.ID,
                    Data:  process(job.Data),
                }
                p.results <- result
            }
        }(i)
    }
}

func (p *WorkerPool) Stop() {
    close(p.jobs)
    p.wg.Wait()
    close(p.results)
}

Best Practices

1. Resource Management

type Worker struct {
    id      int
    jobs    <-chan Job
    results chan<- Result
    quit    chan struct{}
    pool    *Pool
}

func (w *Worker) Start() {
    go func() {
        defer w.pool.workerStopped(w.id)
        
        for {
            select {
            case job, ok := <-w.jobs:
                if !ok {
                    return
                }
                result := w.process(job)
                w.results <- result
            case <-w.quit:
                return
            }
        }
    }()
}

func (w *Worker) Stop() {
    close(w.quit)
}

2. Error Handling

type Result struct {
    JobID int
    Data  interface{}
    Error error
}

func (w *Worker) process(job Job) Result {
    result := Result{JobID: job.ID}
    
    // Process with error handling
    data, err := processJob(job)
    if err != nil {
        result.Error = fmt.Errorf("processing job %d: %w", job.ID, err)
        return result
    }
    
    result.Data = data
    return result
}

// Usage with error handling
for result := range pool.Results() {
    if result.Error != nil {
        log.Printf("Job %d failed: %v", result.JobID, result.Error)
        continue
    }
    // Handle successful result
}

3. Context Support

type Pool struct {
    workers []*Worker
    jobs    chan Job
    results chan Result
    ctx     context.Context
    cancel  context.CancelFunc
}

func NewPool(ctx context.Context, numWorkers int) *Pool {
    ctx, cancel := context.WithCancel(ctx)
    return &Pool{
        workers: make([]*Worker, numWorkers),
        jobs:    make(chan Job),
        results: make(chan Result),
        ctx:     ctx,
        cancel:  cancel,
    }
}

func (w *Worker) Start() {
    go func() {
        defer w.pool.workerStopped(w.id)
        
        for {
            select {
            case job, ok := <-w.jobs:
                if !ok {
                    return
                }
                result := w.process(job)
                w.results <- result
            case <-w.pool.ctx.Done():
                return
            }
        }
    }()
}

Common Patterns

1. Dynamic Pool Sizing

type DynamicPool struct {
    minWorkers int
    maxWorkers int
    workers    map[int]*Worker
    mu         sync.Mutex
}

func (p *DynamicPool) adjustWorkers(load int) {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    targetWorkers := p.calculateTargetWorkers(load)
    currentWorkers := len(p.workers)
    
    if targetWorkers > currentWorkers {
        // Add workers
        for i := currentWorkers; i < targetWorkers; i++ {
            worker := NewWorker(i, p.jobs, p.results)
            p.workers[i] = worker
            worker.Start()
        }
    } else if targetWorkers < currentWorkers {
        // Remove workers
        for i := currentWorkers - 1; i >= targetWorkers; i-- {
            if worker, exists := p.workers[i]; exists {
                worker.Stop()
                delete(p.workers, i)
            }
        }
    }
}

2. Job Prioritization

type PriorityPool struct {
    highPriority   chan Job
    normalPriority chan Job
    lowPriority    chan Job
    results        chan Result
}

func (w *Worker) processJobs() {
    for {
        select {
        // Try high priority first
        case job := <-w.pool.highPriority:
            w.process(job)
            
        // Then normal priority
        default:
            select {
            case job := <-w.pool.highPriority:
                w.process(job)
            case job := <-w.pool.normalPriority:
                w.process(job)
            default:
                // Finally, low priority
                select {
                case job := <-w.pool.highPriority:
                    w.process(job)
                case job := <-w.pool.normalPriority:
                    w.process(job)
                case job := <-w.pool.lowPriority:
                    w.process(job)
                }
            }
        }
    }
}

3. Rate Limiting

type RateLimitedPool struct {
    workers  []*Worker
    jobs     chan Job
    results  chan Result
    limiter  *rate.Limiter
}

func NewRateLimitedPool(numWorkers int, rateLimit float64, burst int) *RateLimitedPool {
    return &RateLimitedPool{
        workers:  make([]*Worker, numWorkers),
        jobs:     make(chan Job),
        results:  make(chan Result),
        limiter:  rate.NewLimiter(rate.Limit(rateLimit), burst),
    }
}

func (w *Worker) processWithRateLimit(job Job) {
    // Wait for rate limiter
    err := w.pool.limiter.Wait(context.Background())
    if err != nil {
        w.pool.results <- Result{
            JobID: job.ID,
            Error: fmt.Errorf("rate limit: %w", err),
        }
        return
    }
    
    // Process job
    result := w.process(job)
    w.pool.results <- result
}

Performance Considerations

1. Pool Sizing

// Calculate optimal pool size
func calculatePoolSize() int {
    numCPU := runtime.NumCPU()
    
    // For CPU-bound tasks
    return numCPU
    
    // For I/O-bound tasks
    return numCPU * 2
}

// Monitor pool performance
type PoolStats struct {
    ActiveWorkers   int
    QueuedJobs     int
    CompletedJobs  int
    AverageLatency time.Duration
}

func (p *Pool) Stats() PoolStats {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    return PoolStats{
        ActiveWorkers:   len(p.workers),
        QueuedJobs:     len(p.jobs),
        CompletedJobs:  p.completedJobs,
        AverageLatency: p.totalLatency / time.Duration(p.completedJobs),
    }
}

2. Channel Buffering

// Optimize channel buffer sizes
func NewOptimizedPool(numWorkers int) *Pool {
    // Buffer sizes based on worker count
    jobBuffer := numWorkers * 2    // Allow for job queuing
    resultBuffer := numWorkers * 2 // Prevent worker blocking
    
    return &Pool{
        workers: make([]*Worker, numWorkers),
        jobs:    make(chan Job, jobBuffer),
        results: make(chan Result, resultBuffer),
    }
}

Common Mistakes

1. Resource Leaks

// Wrong: Not cleaning up workers
func (p *Pool) Stop() {
    close(p.jobs)  // Workers might still be running
}

// Right: Clean shutdown
func (p *Pool) Stop() {
    // Signal shutdown
    p.cancel()
    
    // Close job channel
    close(p.jobs)
    
    // Wait for workers to finish
    p.wg.Wait()
    
    // Close results channel
    close(p.results)
}

2. Blocking Operations

// Wrong: Blocking result send
func (w *Worker) process(job Job) {
    result := processJob(job)
    w.results <- result  // Might block
}

// Right: Use select with timeout
func (w *Worker) process(job Job) {
    result := processJob(job)
    
    select {
    case w.results <- result:
        // Result sent successfully
    case <-time.After(time.Second):
        log.Printf("Failed to send result for job %d: channel full", job.ID)
    }
}

3. Error Propagation

// Wrong: Losing errors
func (w *Worker) process(job Job) {
    if err := processJob(job); err != nil {
        log.Printf("Error: %v", err)  // Error lost
        return
    }
}

// Right: Propagate errors
func (w *Worker) process(job Job) Result {
    result := Result{JobID: job.ID}
    
    if err := processJob(job); err != nil {
        result.Error = fmt.Errorf("job %d failed: %w", job.ID, err)
    }
    
    return result
}

Next Steps

  1. Learn about goroutines
  2. Explore channels
  3. Study context
  4. Practice with rate limiting

Additional Resources