Database Connection Pooling in Go
Connection pooling is essential for managing database connections efficiently in Go applications. This guide covers how to implement and manage connection pools effectively.
Basic Pool Configuration
SQL Connection Pool
package main
import (
"context"
"database/sql"
"time"
_ "github.com/lib/pq"
)
type DBConfig struct {
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
func NewDB(dsn string, config DBConfig) (*sql.DB, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, err
}
// Set pool configuration
db.SetMaxOpenConns(config.MaxOpenConns)
db.SetMaxIdleConns(config.MaxIdleConns)
db.SetConnMaxLifetime(config.ConnMaxLifetime)
db.SetConnMaxIdleTime(config.ConnMaxIdleTime)
// Verify connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
return nil, err
}
return db, nil
}
// Usage example
func main() {
config := DBConfig{
MaxOpenConns: 25,
MaxIdleConns: 10,
ConnMaxLifetime: 30 * time.Minute,
ConnMaxIdleTime: 10 * time.Minute,
}
db, err := NewDB(
"postgres://user:pass@localhost/dbname?sslmode=disable",
config,
)
if err != nil {
log.Fatal(err)
}
defer db.Close()
}
GORM Connection Pool
import (
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type GormConfig struct {
DSN string
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
func NewGormDB(config GormConfig) (*gorm.DB, error) {
db, err := gorm.Open(postgres.Open(config.DSN), &gorm.Config{})
if err != nil {
return nil, err
}
sqlDB, err := db.DB()
if err != nil {
return nil, err
}
// Set pool configuration
sqlDB.SetMaxOpenConns(config.MaxOpenConns)
sqlDB.SetMaxIdleConns(config.MaxIdleConns)
sqlDB.SetConnMaxLifetime(config.ConnMaxLifetime)
sqlDB.SetConnMaxIdleTime(config.ConnMaxIdleTime)
return db, nil
}
Pool Management
1. Connection Monitor
type ConnectionMonitor struct {
db *sql.DB
logger *log.Logger
done chan struct{}
}
func NewConnectionMonitor(db *sql.DB, logger *log.Logger) *ConnectionMonitor {
return &ConnectionMonitor{
db: db,
logger: logger,
done: make(chan struct{}),
}
}
func (m *ConnectionMonitor) Start(interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
for {
select {
case <-ticker.C:
stats := m.db.Stats()
m.logger.Printf(
"DB Stats - Open: %d, Idle: %d, InUse: %d, WaitCount: %d, WaitDuration: %s",
stats.OpenConnections,
stats.Idle,
stats.InUse,
stats.WaitCount,
stats.WaitDuration,
)
case <-m.done:
ticker.Stop()
return
}
}
}()
}
func (m *ConnectionMonitor) Stop() {
close(m.done)
}
2. Health Check
type HealthChecker struct {
db *sql.DB
timeout time.Duration
failures int64
}
func NewHealthChecker(db *sql.DB, timeout time.Duration) *HealthChecker {
return &HealthChecker{
db: db,
timeout: timeout,
}
}
func (hc *HealthChecker) Check() error {
ctx, cancel := context.WithTimeout(context.Background(), hc.timeout)
defer cancel()
if err := hc.db.PingContext(ctx); err != nil {
atomic.AddInt64(&hc.failures, 1)
return fmt.Errorf("database ping failed: %v", err)
}
atomic.StoreInt64(&hc.failures, 0)
return nil
}
func (hc *HealthChecker) GetFailures() int64 {
return atomic.LoadInt64(&hc.failures)
}
3. Connection Wrapper
type DBWrapper struct {
db *sql.DB
metrics *ConnectionMetrics
}
type ConnectionMetrics struct {
ActiveQueries int64
CompletedQueries int64
ErrorCount int64
QueryDurations []time.Duration
}
func NewDBWrapper(db *sql.DB) *DBWrapper {
return &DBWrapper{
db: db,
metrics: &ConnectionMetrics{
QueryDurations: make([]time.Duration, 0),
},
}
}
func (w *DBWrapper) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
atomic.AddInt64(&w.metrics.ActiveQueries, 1)
defer atomic.AddInt64(&w.metrics.ActiveQueries, -1)
start := time.Now()
rows, err := w.db.QueryContext(ctx, query, args...)
duration := time.Since(start)
atomic.AddInt64(&w.metrics.CompletedQueries, 1)
w.metrics.QueryDurations = append(w.metrics.QueryDurations, duration)
if err != nil {
atomic.AddInt64(&w.metrics.ErrorCount, 1)
}
return rows, err
}
Best Practices
1. Pool Configuration
type PoolConfig struct {
// Basic settings
MaxOpenConns int `json:"max_open_conns"`
MaxIdleConns int `json:"max_idle_conns"`
ConnMaxLifetime time.Duration `json:"conn_max_lifetime"`
ConnMaxIdleTime time.Duration `json:"conn_max_idle_time"`
// Advanced settings
MaxRetries int `json:"max_retries"`
RetryInterval time.Duration `json:"retry_interval"`
HealthInterval time.Duration `json:"health_interval"`
MetricsEnabled bool `json:"metrics_enabled"`
}
func DefaultPoolConfig() PoolConfig {
return PoolConfig{
MaxOpenConns: 25,
MaxIdleConns: 10,
ConnMaxLifetime: 30 * time.Minute,
ConnMaxIdleTime: 10 * time.Minute,
MaxRetries: 3,
RetryInterval: time.Second,
HealthInterval: 30 * time.Second,
MetricsEnabled: true,
}
}
func ValidatePoolConfig(config PoolConfig) error {
if config.MaxOpenConns < config.MaxIdleConns {
return fmt.Errorf(
"max_open_conns (%d) cannot be less than max_idle_conns (%d)",
config.MaxOpenConns,
config.MaxIdleConns,
)
}
if config.ConnMaxLifetime < config.ConnMaxIdleTime {
return fmt.Errorf(
"conn_max_lifetime (%s) cannot be less than conn_max_idle_time (%s)",
config.ConnMaxLifetime,
config.ConnMaxIdleTime,
)
}
return nil
}
2. Connection Factory
type ConnectionFactory struct {
config PoolConfig
dsn string
metrics *ConnectionMetrics
healthCtx context.Context
cancel context.CancelFunc
}
func NewConnectionFactory(dsn string, config PoolConfig) *ConnectionFactory {
ctx, cancel := context.WithCancel(context.Background())
return &ConnectionFactory{
config: config,
dsn: dsn,
metrics: &ConnectionMetrics{},
healthCtx: ctx,
cancel: cancel,
}
}
func (f *ConnectionFactory) CreatePool() (*sql.DB, error) {
db, err := sql.Open("postgres", f.dsn)
if err != nil {
return nil, err
}
// Configure pool
db.SetMaxOpenConns(f.config.MaxOpenConns)
db.SetMaxIdleConns(f.config.MaxIdleConns)
db.SetConnMaxLifetime(f.config.ConnMaxLifetime)
db.SetConnMaxIdleTime(f.config.ConnMaxIdleTime)
// Start health checks
if f.config.HealthInterval > 0 {
go f.runHealthChecks(db)
}
return db, nil
}
func (f *ConnectionFactory) runHealthChecks(db *sql.DB) {
ticker := time.NewTicker(f.config.HealthInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := db.PingContext(f.healthCtx); err != nil {
log.Printf("Health check failed: %v", err)
}
case <-f.healthCtx.Done():
return
}
}
}
func (f *ConnectionFactory) Close() {
f.cancel()
}
3. Connection Manager
type ConnectionManager struct {
pools map[string]*sql.DB
factory *ConnectionFactory
mu sync.RWMutex
metrics *ConnectionMetrics
}
func NewConnectionManager(factory *ConnectionFactory) *ConnectionManager {
return &ConnectionManager{
pools: make(map[string]*sql.DB),
factory: factory,
metrics: &ConnectionMetrics{},
}
}
func (m *ConnectionManager) GetPool(name string) (*sql.DB, error) {
m.mu.RLock()
pool, exists := m.pools[name]
m.mu.RUnlock()
if exists {
return pool, nil
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
if pool, exists = m.pools[name]; exists {
return pool, nil
}
pool, err := m.factory.CreatePool()
if err != nil {
return nil, err
}
m.pools[name] = pool
return pool, nil
}
func (m *ConnectionManager) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
var errs []error
for name, pool := range m.pools {
if err := pool.Close(); err != nil {
errs = append(errs, fmt.Errorf("error closing pool %s: %v", name, err))
}
}
m.factory.Close()
if len(errs) > 0 {
return fmt.Errorf("errors closing pools: %v", errs)
}
return nil
}
Common Patterns
1. Connection Retry
type RetryConfig struct {
MaxAttempts int
Interval time.Duration
Multiplier float64
MaxInterval time.Duration
}
func WithRetry(config RetryConfig, operation func() error) error {
var err error
interval := config.Interval
for attempt := 1; attempt <= config.MaxAttempts; attempt++ {
err = operation()
if err == nil {
return nil
}
if attempt == config.MaxAttempts {
break
}
time.Sleep(interval)
interval = time.Duration(float64(interval) * config.Multiplier)
if interval > config.MaxInterval {
interval = config.MaxInterval
}
}
return fmt.Errorf("operation failed after %d attempts: %v", config.MaxAttempts, err)
}
// Usage example
func ConnectWithRetry(dsn string, config RetryConfig) (*sql.DB, error) {
var db *sql.DB
err := WithRetry(config, func() error {
var err error
db, err = sql.Open("postgres", dsn)
if err != nil {
return err
}
return db.Ping()
})
if err != nil {
return nil, err
}
return db, nil
}
2. Connection Load Balancing
type LoadBalancer struct {
pools []*sql.DB
current uint64
strategy LoadBalancingStrategy
}
type LoadBalancingStrategy interface {
Next(pools []*sql.DB) *sql.DB
}
type RoundRobinStrategy struct {
counter uint64
}
func (s *RoundRobinStrategy) Next(pools []*sql.DB) *sql.DB {
count := uint64(len(pools))
if count == 0 {
return nil
}
current := atomic.AddUint64(&s.counter, 1)
return pools[current%count]
}
type LeastConnectionsStrategy struct{}
func (s *LeastConnectionsStrategy) Next(pools []*sql.DB) *sql.DB {
if len(pools) == 0 {
return nil
}
minConn := pools[0].Stats().InUse
selected := pools[0]
for _, pool := range pools[1:] {
inUse := pool.Stats().InUse
if inUse < minConn {
minConn = inUse
selected = pool
}
}
return selected
}
Next Steps
- Learn about Query Building
- Explore Transactions
- Study NoSQL