Understanding Worker Pools in Go Programming
Worker pools are a common pattern in Go for managing concurrent task processing with a fixed number of workers. This guide covers everything you need to know about implementing and using worker pools effectively.
Worker Pool Basics
Basic Worker Pool
type Job struct {
ID int
Data interface{}
}
type Result struct {
JobID int
Data interface{}
Error error
}
func worker(id int, jobs <-chan Job, results chan<- Result) {
for job := range jobs {
// Process job
result := Result{
JobID: job.ID,
Data: process(job.Data),
}
results <- result
}
}
// Usage
func main() {
const numWorkers = 3
jobs := make(chan Job, numWorkers)
results := make(chan Result, numWorkers)
// Start workers
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// Send jobs
for j := 1; j <= 5; j++ {
jobs <- Job{ID: j, Data: fmt.Sprintf("job-%d", j)}
}
close(jobs)
// Collect results
for a := 1; a <= 5; a++ {
result := <-results
fmt.Printf("Result: %+v\n", result)
}
}
Worker Pool with WaitGroup
type WorkerPool struct {
numWorkers int
jobs chan Job
results chan Result
wg sync.WaitGroup
}
func NewWorkerPool(numWorkers int) *WorkerPool {
return &WorkerPool{
numWorkers: numWorkers,
jobs: make(chan Job, numWorkers),
results: make(chan Result, numWorkers),
}
}
func (p *WorkerPool) Start() {
for i := 0; i < p.numWorkers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
for job := range p.jobs {
result := Result{
JobID: job.ID,
Data: process(job.Data),
}
p.results <- result
}
}(i)
}
}
func (p *WorkerPool) Stop() {
close(p.jobs)
p.wg.Wait()
close(p.results)
}
Best Practices
1. Resource Management
type Worker struct {
id int
jobs <-chan Job
results chan<- Result
quit chan struct{}
pool *Pool
}
func (w *Worker) Start() {
go func() {
defer w.pool.workerStopped(w.id)
for {
select {
case job, ok := <-w.jobs:
if !ok {
return
}
result := w.process(job)
w.results <- result
case <-w.quit:
return
}
}
}()
}
func (w *Worker) Stop() {
close(w.quit)
}
2. Error Handling
type Result struct {
JobID int
Data interface{}
Error error
}
func (w *Worker) process(job Job) Result {
result := Result{JobID: job.ID}
// Process with error handling
data, err := processJob(job)
if err != nil {
result.Error = fmt.Errorf("processing job %d: %w", job.ID, err)
return result
}
result.Data = data
return result
}
// Usage with error handling
for result := range pool.Results() {
if result.Error != nil {
log.Printf("Job %d failed: %v", result.JobID, result.Error)
continue
}
// Handle successful result
}
3. Context Support
type Pool struct {
workers []*Worker
jobs chan Job
results chan Result
ctx context.Context
cancel context.CancelFunc
}
func NewPool(ctx context.Context, numWorkers int) *Pool {
ctx, cancel := context.WithCancel(ctx)
return &Pool{
workers: make([]*Worker, numWorkers),
jobs: make(chan Job),
results: make(chan Result),
ctx: ctx,
cancel: cancel,
}
}
func (w *Worker) Start() {
go func() {
defer w.pool.workerStopped(w.id)
for {
select {
case job, ok := <-w.jobs:
if !ok {
return
}
result := w.process(job)
w.results <- result
case <-w.pool.ctx.Done():
return
}
}
}()
}
Common Patterns
1. Dynamic Pool Sizing
type DynamicPool struct {
minWorkers int
maxWorkers int
workers map[int]*Worker
mu sync.Mutex
}
func (p *DynamicPool) adjustWorkers(load int) {
p.mu.Lock()
defer p.mu.Unlock()
targetWorkers := p.calculateTargetWorkers(load)
currentWorkers := len(p.workers)
if targetWorkers > currentWorkers {
// Add workers
for i := currentWorkers; i < targetWorkers; i++ {
worker := NewWorker(i, p.jobs, p.results)
p.workers[i] = worker
worker.Start()
}
} else if targetWorkers < currentWorkers {
// Remove workers
for i := currentWorkers - 1; i >= targetWorkers; i-- {
if worker, exists := p.workers[i]; exists {
worker.Stop()
delete(p.workers, i)
}
}
}
}
2. Job Prioritization
type PriorityPool struct {
highPriority chan Job
normalPriority chan Job
lowPriority chan Job
results chan Result
}
func (w *Worker) processJobs() {
for {
select {
// Try high priority first
case job := <-w.pool.highPriority:
w.process(job)
// Then normal priority
default:
select {
case job := <-w.pool.highPriority:
w.process(job)
case job := <-w.pool.normalPriority:
w.process(job)
default:
// Finally, low priority
select {
case job := <-w.pool.highPriority:
w.process(job)
case job := <-w.pool.normalPriority:
w.process(job)
case job := <-w.pool.lowPriority:
w.process(job)
}
}
}
}
}
3. Rate Limiting
type RateLimitedPool struct {
workers []*Worker
jobs chan Job
results chan Result
limiter *rate.Limiter
}
func NewRateLimitedPool(numWorkers int, rateLimit float64, burst int) *RateLimitedPool {
return &RateLimitedPool{
workers: make([]*Worker, numWorkers),
jobs: make(chan Job),
results: make(chan Result),
limiter: rate.NewLimiter(rate.Limit(rateLimit), burst),
}
}
func (w *Worker) processWithRateLimit(job Job) {
// Wait for rate limiter
err := w.pool.limiter.Wait(context.Background())
if err != nil {
w.pool.results <- Result{
JobID: job.ID,
Error: fmt.Errorf("rate limit: %w", err),
}
return
}
// Process job
result := w.process(job)
w.pool.results <- result
}
Performance Considerations
1. Pool Sizing
// Calculate optimal pool size
func calculatePoolSize() int {
numCPU := runtime.NumCPU()
// For CPU-bound tasks
return numCPU
// For I/O-bound tasks
return numCPU * 2
}
// Monitor pool performance
type PoolStats struct {
ActiveWorkers int
QueuedJobs int
CompletedJobs int
AverageLatency time.Duration
}
func (p *Pool) Stats() PoolStats {
p.mu.Lock()
defer p.mu.Unlock()
return PoolStats{
ActiveWorkers: len(p.workers),
QueuedJobs: len(p.jobs),
CompletedJobs: p.completedJobs,
AverageLatency: p.totalLatency / time.Duration(p.completedJobs),
}
}
2. Channel Buffering
// Optimize channel buffer sizes
func NewOptimizedPool(numWorkers int) *Pool {
// Buffer sizes based on worker count
jobBuffer := numWorkers * 2 // Allow for job queuing
resultBuffer := numWorkers * 2 // Prevent worker blocking
return &Pool{
workers: make([]*Worker, numWorkers),
jobs: make(chan Job, jobBuffer),
results: make(chan Result, resultBuffer),
}
}
Common Mistakes
1. Resource Leaks
// Wrong: Not cleaning up workers
func (p *Pool) Stop() {
close(p.jobs) // Workers might still be running
}
// Right: Clean shutdown
func (p *Pool) Stop() {
// Signal shutdown
p.cancel()
// Close job channel
close(p.jobs)
// Wait for workers to finish
p.wg.Wait()
// Close results channel
close(p.results)
}
2. Blocking Operations
// Wrong: Blocking result send
func (w *Worker) process(job Job) {
result := processJob(job)
w.results <- result // Might block
}
// Right: Use select with timeout
func (w *Worker) process(job Job) {
result := processJob(job)
select {
case w.results <- result:
// Result sent successfully
case <-time.After(time.Second):
log.Printf("Failed to send result for job %d: channel full", job.ID)
}
}
3. Error Propagation
// Wrong: Losing errors
func (w *Worker) process(job Job) {
if err := processJob(job); err != nil {
log.Printf("Error: %v", err) // Error lost
return
}
}
// Right: Propagate errors
func (w *Worker) process(job Job) Result {
result := Result{JobID: job.ID}
if err := processJob(job); err != nil {
result.Error = fmt.Errorf("job %d failed: %w", job.ID, err)
}
return result
}
Next Steps
- Learn about goroutines
- Explore channels
- Study context
- Practice with rate limiting