Understanding Rate Limiting in Go Programming
Rate limiting is a crucial technique for controlling resource usage and maintaining system stability. This guide covers everything you need to know about implementing rate limiting effectively in Go.
Rate Limiting Basics
Basic Rate Limiter
// Simple time-based rate limiter
func rateLimiter(requests <-chan int, limit time.Duration) <-chan int {
throttled := make(chan int)
ticker := time.NewTicker(limit)
go func() {
defer close(throttled)
for req := range requests {
<-ticker.C // Wait for tick
throttled <- req
}
ticker.Stop()
}()
return throttled
}
// Usage
requests := make(chan int)
limited := rateLimiter(requests, 200*time.Millisecond) // 5 requests per second
for i := 0; i < 10; i++ {
requests <- i
}
Token Bucket Rate Limiter
type TokenBucket struct {
tokens atomic.Int64
capacity int64
rate time.Duration
lastToken time.Time
mu sync.Mutex
}
func NewTokenBucket(capacity int64, rate time.Duration) *TokenBucket {
tb := &TokenBucket{
capacity: capacity,
rate: rate,
}
tb.tokens.Store(capacity)
return tb
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastToken)
newTokens := int64(elapsed / tb.rate)
if newTokens > 0 {
tb.tokens.Add(newTokens)
if tb.tokens.Load() > tb.capacity {
tb.tokens.Store(tb.capacity)
}
tb.lastToken = now
}
if tb.tokens.Load() > 0 {
tb.tokens.Add(-1)
return true
}
return false
}
Best Practices
1. Using golang.org/x/time/rate
import "golang.org/x/time/rate"
type RateLimitedAPI struct {
limiter *rate.Limiter
}
func NewRateLimitedAPI(rps float64, burst int) *RateLimitedAPI {
return &RateLimitedAPI{
limiter: rate.NewLimiter(rate.Limit(rps), burst),
}
}
func (api *RateLimitedAPI) Process(ctx context.Context, request Request) error {
if err := api.limiter.Wait(ctx); err != nil {
return fmt.Errorf("rate limit: %w", err)
}
return api.processRequest(request)
}
// Alternative using Allow
func (api *RateLimitedAPI) TryProcess(request Request) error {
if !api.limiter.Allow() {
return errors.New("rate limit exceeded")
}
return api.processRequest(request)
}
2. Per-Client Rate Limiting
type ClientLimiter struct {
limiters sync.Map // map[string]*rate.Limiter
rps float64
burst int
}
func NewClientLimiter(rps float64, burst int) *ClientLimiter {
return &ClientLimiter{
rps: rps,
burst: burst,
}
}
func (cl *ClientLimiter) GetLimiter(clientID string) *rate.Limiter {
limiter, exists := cl.limiters.Load(clientID)
if !exists {
limiter, _ = cl.limiters.LoadOrStore(
clientID,
rate.NewLimiter(rate.Limit(cl.rps), cl.burst),
)
}
return limiter.(*rate.Limiter)
}
// Usage in HTTP middleware
func (cl *ClientLimiter) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
clientID := r.Header.Get("X-Client-ID")
limiter := cl.GetLimiter(clientID)
if !limiter.Allow() {
http.Error(w, "rate limit exceeded", http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
3. Distributed Rate Limiting
type DistributedLimiter struct {
redis *redis.Client
key string
limit int
window time.Duration
}
func (dl *DistributedLimiter) Allow(ctx context.Context) (bool, error) {
pipe := dl.redis.Pipeline()
now := time.Now().UnixNano()
windowStart := now - dl.window.Nanoseconds()
// Remove old entries
pipe.ZRemRangeByScore(ctx, dl.key, "0", strconv.FormatInt(windowStart, 10))
// Add current request
pipe.ZAdd(ctx, dl.key, &redis.Z{
Score: float64(now),
Member: now,
})
// Get count in window
pipe.ZCard(ctx, dl.key)
// Set key expiration
pipe.Expire(ctx, dl.key, dl.window)
results, err := pipe.Exec(ctx)
if err != nil {
return false, err
}
count := results[2].(*redis.IntCmd).Val()
return count <= int64(dl.limit), nil
}
Common Patterns
1. Adaptive Rate Limiting
type AdaptiveLimiter struct {
limiter *rate.Limiter
metrics *Metrics
mu sync.Mutex
lastUpdate time.Time
}
func (al *AdaptiveLimiter) adjustRate() {
al.mu.Lock()
defer al.mu.Unlock()
now := time.Now()
if now.Sub(al.lastUpdate) < time.Minute {
return
}
// Adjust rate based on metrics
errorRate := al.metrics.ErrorRate()
latency := al.metrics.AverageLatency()
newLimit := al.calculateNewLimit(errorRate, latency)
al.limiter.SetLimit(rate.Limit(newLimit))
al.lastUpdate = now
}
func (al *AdaptiveLimiter) Allow() bool {
al.adjustRate()
return al.limiter.Allow()
}
2. Burst Handling
type BurstLimiter struct {
normal *rate.Limiter
burst *rate.Limiter
mu sync.Mutex
}
func (bl *BurstLimiter) Allow() bool {
bl.mu.Lock()
defer bl.mu.Unlock()
// Try normal rate first
if bl.normal.Allow() {
return true
}
// Fall back to burst rate
return bl.burst.Allow()
}
// Usage
limiter := &BurstLimiter{
normal: rate.NewLimiter(rate.Limit(100), 10), // 100 rps normally
burst: rate.NewLimiter(rate.Limit(1000), 100), // 1000 rps burst
}
3. Rate Limiting with Queuing
type QueuedLimiter struct {
limiter *rate.Limiter
queue chan Request
workers int
}
func (ql *QueuedLimiter) Start(ctx context.Context) {
for i := 0; i < ql.workers; i++ {
go ql.worker(ctx)
}
}
func (ql *QueuedLimiter) worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case req := <-ql.queue:
err := ql.limiter.Wait(ctx)
if err != nil {
req.Error <- err
continue
}
result, err := processRequest(req)
if err != nil {
req.Error <- err
} else {
req.Result <- result
}
}
}
}
Performance Considerations
1. Memory Usage
// Bad: Unlimited client storage
type BadLimiter struct {
limiters map[string]*rate.Limiter // Unbounded growth
mu sync.Mutex
}
// Good: Bounded storage with cleanup
type GoodLimiter struct {
limiters *lru.Cache // Fixed size cache
mu sync.Mutex
}
func NewGoodLimiter(size int) *GoodLimiter {
cache, _ := lru.New(size)
return &GoodLimiter{
limiters: cache,
}
}
func (gl *GoodLimiter) GetLimiter(clientID string) *rate.Limiter {
gl.mu.Lock()
defer gl.mu.Unlock()
if limiter, ok := gl.limiters.Get(clientID); ok {
return limiter.(*rate.Limiter)
}
limiter := rate.NewLimiter(rate.Limit(10), 5)
gl.limiters.Add(clientID, limiter)
return limiter
}
2. Lock Contention
// Bad: Global lock
type HighContentionLimiter struct {
limiters map[string]*rate.Limiter
mu sync.Mutex // Single lock for all operations
}
// Good: Sharded locks
type LowContentionLimiter struct {
shards [256]shard
hashFn func(string) uint8
}
type shard struct {
limiters map[string]*rate.Limiter
mu sync.Mutex
}
func (lcl *LowContentionLimiter) GetLimiter(clientID string) *rate.Limiter {
shard := &lcl.shards[lcl.hashFn(clientID)]
shard.mu.Lock()
defer shard.mu.Unlock()
return shard.limiters[clientID]
}
Common Mistakes
1. Not Cleaning Up
// Wrong: Memory leak
type LeakyLimiter struct {
limiters map[string]*rate.Limiter
mu sync.Mutex
}
// Right: With cleanup
type CleanLimiter struct {
limiters map[string]*limiterEntry
mu sync.Mutex
cleanupInterval time.Duration
}
type limiterEntry struct {
limiter *rate.Limiter
lastUse time.Time
}
func (cl *CleanLimiter) cleanup() {
ticker := time.NewTicker(cl.cleanupInterval)
defer ticker.Stop()
for range ticker.C {
cl.mu.Lock()
now := time.Now()
for id, entry := range cl.limiters {
if now.Sub(entry.lastUse) > cl.cleanupInterval {
delete(cl.limiters, id)
}
}
cl.mu.Unlock()
}
}
2. Incorrect Burst Configuration
// Wrong: Burst too small
limiter := rate.NewLimiter(rate.Limit(1000), 1) // 1000 rps but can't handle spikes
// Right: Appropriate burst
limiter := rate.NewLimiter(rate.Limit(1000), 50) // Can handle short bursts
// Better: Calculate burst based on requirements
func calculateBurst(rps float64) int {
// Burst = 5% of per-second rate, minimum 10
burst := int(rps * 0.05)
if burst < 10 {
burst = 10
}
return burst
}
3. Not Handling Errors
// Wrong: Ignoring rate limit errors
func (api *API) Process(req Request) {
if api.limiter.Allow() {
processRequest(req)
}
// Request silently dropped
}
// Right: Proper error handling
func (api *API) Process(req Request) error {
if !api.limiter.Allow() {
return &RateLimitError{
RetryAfter: api.limiter.Reserve().Delay(),
}
}
return processRequest(req)
}
type RateLimitError struct {
RetryAfter time.Duration
}
func (e *RateLimitError) Error() string {
return fmt.Sprintf("rate limit exceeded, retry after %v", e.RetryAfter)
}
Next Steps
- Learn about timeouts
- Explore context
- Study worker pools
- Practice with distributed systems