1. go
  2. /concurrency
  3. /rate-limiting

Understanding Rate Limiting in Go Programming

Rate limiting is a crucial technique for controlling resource usage and maintaining system stability. This guide covers everything you need to know about implementing rate limiting effectively in Go.

Rate Limiting Basics

Basic Rate Limiter

// Simple time-based rate limiter
func rateLimiter(requests <-chan int, limit time.Duration) <-chan int {
    throttled := make(chan int)
    ticker := time.NewTicker(limit)
    
    go func() {
        defer close(throttled)
        for req := range requests {
            <-ticker.C  // Wait for tick
            throttled <- req
        }
        ticker.Stop()
    }()
    
    return throttled
}

// Usage
requests := make(chan int)
limited := rateLimiter(requests, 200*time.Millisecond)  // 5 requests per second

for i := 0; i < 10; i++ {
    requests <- i
}

Token Bucket Rate Limiter

type TokenBucket struct {
    tokens    atomic.Int64
    capacity  int64
    rate      time.Duration
    lastToken time.Time
    mu        sync.Mutex
}

func NewTokenBucket(capacity int64, rate time.Duration) *TokenBucket {
    tb := &TokenBucket{
        capacity: capacity,
        rate:     rate,
    }
    tb.tokens.Store(capacity)
    return tb
}

func (tb *TokenBucket) Allow() bool {
    tb.mu.Lock()
    defer tb.mu.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(tb.lastToken)
    newTokens := int64(elapsed / tb.rate)
    
    if newTokens > 0 {
        tb.tokens.Add(newTokens)
        if tb.tokens.Load() > tb.capacity {
            tb.tokens.Store(tb.capacity)
        }
        tb.lastToken = now
    }
    
    if tb.tokens.Load() > 0 {
        tb.tokens.Add(-1)
        return true
    }
    return false
}

Best Practices

1. Using golang.org/x/time/rate

import "golang.org/x/time/rate"

type RateLimitedAPI struct {
    limiter *rate.Limiter
}

func NewRateLimitedAPI(rps float64, burst int) *RateLimitedAPI {
    return &RateLimitedAPI{
        limiter: rate.NewLimiter(rate.Limit(rps), burst),
    }
}

func (api *RateLimitedAPI) Process(ctx context.Context, request Request) error {
    if err := api.limiter.Wait(ctx); err != nil {
        return fmt.Errorf("rate limit: %w", err)
    }
    
    return api.processRequest(request)
}

// Alternative using Allow
func (api *RateLimitedAPI) TryProcess(request Request) error {
    if !api.limiter.Allow() {
        return errors.New("rate limit exceeded")
    }
    
    return api.processRequest(request)
}

2. Per-Client Rate Limiting

type ClientLimiter struct {
    limiters sync.Map  // map[string]*rate.Limiter
    rps      float64
    burst    int
}

func NewClientLimiter(rps float64, burst int) *ClientLimiter {
    return &ClientLimiter{
        rps:   rps,
        burst: burst,
    }
}

func (cl *ClientLimiter) GetLimiter(clientID string) *rate.Limiter {
    limiter, exists := cl.limiters.Load(clientID)
    if !exists {
        limiter, _ = cl.limiters.LoadOrStore(
            clientID,
            rate.NewLimiter(rate.Limit(cl.rps), cl.burst),
        )
    }
    return limiter.(*rate.Limiter)
}

// Usage in HTTP middleware
func (cl *ClientLimiter) Middleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        clientID := r.Header.Get("X-Client-ID")
        limiter := cl.GetLimiter(clientID)
        
        if !limiter.Allow() {
            http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
            return
        }
        
        next.ServeHTTP(w, r)
    })
}

3. Distributed Rate Limiting

type DistributedLimiter struct {
    redis  *redis.Client
    key    string
    limit  int
    window time.Duration
}

func (dl *DistributedLimiter) Allow(ctx context.Context) (bool, error) {
    pipe := dl.redis.Pipeline()
    now := time.Now().UnixNano()
    windowStart := now - dl.window.Nanoseconds()
    
    // Remove old entries
    pipe.ZRemRangeByScore(ctx, dl.key, "0", strconv.FormatInt(windowStart, 10))
    
    // Add current request
    pipe.ZAdd(ctx, dl.key, &redis.Z{
        Score:  float64(now),
        Member: now,
    })
    
    // Get count in window
    pipe.ZCard(ctx, dl.key)
    
    // Set key expiration
    pipe.Expire(ctx, dl.key, dl.window)
    
    results, err := pipe.Exec(ctx)
    if err != nil {
        return false, err
    }
    
    count := results[2].(*redis.IntCmd).Val()
    return count <= int64(dl.limit), nil
}

Common Patterns

1. Adaptive Rate Limiting

type AdaptiveLimiter struct {
    limiter    *rate.Limiter
    metrics    *Metrics
    mu         sync.Mutex
    lastUpdate time.Time
}

func (al *AdaptiveLimiter) adjustRate() {
    al.mu.Lock()
    defer al.mu.Unlock()
    
    now := time.Now()
    if now.Sub(al.lastUpdate) < time.Minute {
        return
    }
    
    // Adjust rate based on metrics
    errorRate := al.metrics.ErrorRate()
    latency := al.metrics.AverageLatency()
    
    newLimit := al.calculateNewLimit(errorRate, latency)
    al.limiter.SetLimit(rate.Limit(newLimit))
    al.lastUpdate = now
}

func (al *AdaptiveLimiter) Allow() bool {
    al.adjustRate()
    return al.limiter.Allow()
}

2. Burst Handling

type BurstLimiter struct {
    normal *rate.Limiter
    burst  *rate.Limiter
    mu     sync.Mutex
}

func (bl *BurstLimiter) Allow() bool {
    bl.mu.Lock()
    defer bl.mu.Unlock()
    
    // Try normal rate first
    if bl.normal.Allow() {
        return true
    }
    
    // Fall back to burst rate
    return bl.burst.Allow()
}

// Usage
limiter := &BurstLimiter{
    normal: rate.NewLimiter(rate.Limit(100), 10),  // 100 rps normally
    burst:  rate.NewLimiter(rate.Limit(1000), 100), // 1000 rps burst
}

3. Rate Limiting with Queuing

type QueuedLimiter struct {
    limiter *rate.Limiter
    queue   chan Request
    workers int
}

func (ql *QueuedLimiter) Start(ctx context.Context) {
    for i := 0; i < ql.workers; i++ {
        go ql.worker(ctx)
    }
}

func (ql *QueuedLimiter) worker(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case req := <-ql.queue:
            err := ql.limiter.Wait(ctx)
            if err != nil {
                req.Error <- err
                continue
            }
            
            result, err := processRequest(req)
            if err != nil {
                req.Error <- err
            } else {
                req.Result <- result
            }
        }
    }
}

Performance Considerations

1. Memory Usage

// Bad: Unlimited client storage
type BadLimiter struct {
    limiters map[string]*rate.Limiter  // Unbounded growth
    mu       sync.Mutex
}

// Good: Bounded storage with cleanup
type GoodLimiter struct {
    limiters *lru.Cache  // Fixed size cache
    mu       sync.Mutex
}

func NewGoodLimiter(size int) *GoodLimiter {
    cache, _ := lru.New(size)
    return &GoodLimiter{
        limiters: cache,
    }
}

func (gl *GoodLimiter) GetLimiter(clientID string) *rate.Limiter {
    gl.mu.Lock()
    defer gl.mu.Unlock()
    
    if limiter, ok := gl.limiters.Get(clientID); ok {
        return limiter.(*rate.Limiter)
    }
    
    limiter := rate.NewLimiter(rate.Limit(10), 5)
    gl.limiters.Add(clientID, limiter)
    return limiter
}

2. Lock Contention

// Bad: Global lock
type HighContentionLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.Mutex  // Single lock for all operations
}

// Good: Sharded locks
type LowContentionLimiter struct {
    shards    [256]shard
    hashFn    func(string) uint8
}

type shard struct {
    limiters map[string]*rate.Limiter
    mu       sync.Mutex
}

func (lcl *LowContentionLimiter) GetLimiter(clientID string) *rate.Limiter {
    shard := &lcl.shards[lcl.hashFn(clientID)]
    shard.mu.Lock()
    defer shard.mu.Unlock()
    
    return shard.limiters[clientID]
}

Common Mistakes

1. Not Cleaning Up

// Wrong: Memory leak
type LeakyLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.Mutex
}

// Right: With cleanup
type CleanLimiter struct {
    limiters  map[string]*limiterEntry
    mu        sync.Mutex
    cleanupInterval time.Duration
}

type limiterEntry struct {
    limiter *rate.Limiter
    lastUse time.Time
}

func (cl *CleanLimiter) cleanup() {
    ticker := time.NewTicker(cl.cleanupInterval)
    defer ticker.Stop()
    
    for range ticker.C {
        cl.mu.Lock()
        now := time.Now()
        for id, entry := range cl.limiters {
            if now.Sub(entry.lastUse) > cl.cleanupInterval {
                delete(cl.limiters, id)
            }
        }
        cl.mu.Unlock()
    }
}

2. Incorrect Burst Configuration

// Wrong: Burst too small
limiter := rate.NewLimiter(rate.Limit(1000), 1)  // 1000 rps but can't handle spikes

// Right: Appropriate burst
limiter := rate.NewLimiter(rate.Limit(1000), 50)  // Can handle short bursts

// Better: Calculate burst based on requirements
func calculateBurst(rps float64) int {
    // Burst = 5% of per-second rate, minimum 10
    burst := int(rps * 0.05)
    if burst < 10 {
        burst = 10
    }
    return burst
}

3. Not Handling Errors

// Wrong: Ignoring rate limit errors
func (api *API) Process(req Request) {
    if api.limiter.Allow() {
        processRequest(req)
    }
    // Request silently dropped
}

// Right: Proper error handling
func (api *API) Process(req Request) error {
    if !api.limiter.Allow() {
        return &RateLimitError{
            RetryAfter: api.limiter.Reserve().Delay(),
        }
    }
    return processRequest(req)
}

type RateLimitError struct {
    RetryAfter time.Duration
}

func (e *RateLimitError) Error() string {
    return fmt.Sprintf("rate limit exceeded, retry after %v", e.RetryAfter)
}

Next Steps

  1. Learn about timeouts
  2. Explore context
  3. Study worker pools
  4. Practice with distributed systems

Additional Resources