1. rust
  2. /web
  3. /database-integration

Database Integration

Database integration is crucial for most web applications. Rust offers excellent database libraries that provide both safety and performance while maintaining ergonomic APIs for database operations.

Database Libraries Overview

SQLx - Async SQL Toolkit

SQLx is a modern, async SQL toolkit that provides compile-time checked queries and supports multiple databases.

Features:

  • Compile-time query verification
  • Async/await support
  • Connection pooling
  • Migration support
  • Type-safe query results
  • Multiple database backends (PostgreSQL, MySQL, SQLite)

Diesel - Safe ORM

Diesel is a safe, extensible ORM that prevents runtime errors through Rust's type system.

Features:

  • Compile-time query building
  • Schema migrations
  • Strong type safety
  • Custom SQL support
  • Connection pooling

SQLx Setup and Basic Usage

Project Setup

Add to Cargo.toml:

[dependencies]
sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "postgres", "chrono", "uuid" ] }
tokio = { version = "1", features = ["full"] }
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
serde = { version = "1.0", features = ["derive"] }

# For migrations
[dev-dependencies]
sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "postgres", "chrono", "uuid", "migrate" ] }

Environment setup:

# .env file
DATABASE_URL=postgresql://username:password@localhost/database_name

# Install SQLx CLI
cargo install sqlx-cli

# Create database
sqlx database create

# Run migrations
sqlx migrate run

Basic Database Connection

use sqlx::{PgPool, Row};
use std::env;

#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
    // Load environment variables
    dotenv::dotenv().ok();
    
    // Create connection pool
    let database_url = env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");
    
    let pool = PgPool::connect(&database_url).await?;
    
    // Test connection
    let row: (i64,) = sqlx::query_as("SELECT $1")
        .bind(150_i64)
        .fetch_one(&pool)
        .await?;
    
    assert_eq!(row.0, 150);
    println!("Database connection successful!");
    
    Ok(())
}

Connection Pool Configuration

use sqlx::{PgPool, postgres::PgPoolOptions};
use std::time::Duration;

async fn create_database_pool() -> Result<PgPool, sqlx::Error> {
    let database_url = std::env::var("DATABASE_URL")
        .expect("DATABASE_URL must be set");
    
    PgPoolOptions::new()
        .max_connections(20)                    // Maximum connections
        .min_connections(5)                     // Minimum connections
        .connect_timeout(Duration::from_secs(10))
        .idle_timeout(Duration::from_secs(600)) // 10 minutes
        .max_lifetime(Duration::from_secs(1800)) // 30 minutes
        .test_before_acquire(true)              // Test connections before use
        .connect(&database_url)
        .await
}

// Use in application
#[derive(Clone)]
struct AppState {
    db: PgPool,
}

async fn setup_app() -> AppState {
    let pool = create_database_pool().await
        .expect("Failed to create database pool");
    
    AppState { db: pool }
}

Database Models and Schema

Defining Models

use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use chrono::{DateTime, Utc};
use uuid::Uuid;

#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct User {
    pub id: Uuid,
    pub username: String,
    pub email: String,
    pub full_name: String,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub is_active: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Post {
    pub id: Uuid,
    pub title: String,
    pub content: String,
    pub author_id: Uuid,
    pub published: bool,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub tags: Vec<String>, // PostgreSQL array
}

#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Comment {
    pub id: Uuid,
    pub post_id: Uuid,
    pub author_id: Uuid,
    pub content: String,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}

// Request/Response DTOs
#[derive(Debug, Deserialize)]
pub struct CreateUserRequest {
    pub username: String,
    pub email: String,
    pub full_name: String,
}

#[derive(Debug, Deserialize)]
pub struct UpdateUserRequest {
    pub username: Option<String>,
    pub email: Option<String>,
    pub full_name: Option<String>,
    pub is_active: Option<bool>,
}

#[derive(Debug, Deserialize)]
pub struct CreatePostRequest {
    pub title: String,
    pub content: String,
    pub published: Option<bool>,
    pub tags: Option<Vec<String>>,
}

// Join result for complex queries
#[derive(Debug, Serialize, FromRow)]
pub struct PostWithAuthor {
    // Post fields
    pub id: Uuid,
    pub title: String,
    pub content: String,
    pub published: bool,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
    pub tags: Vec<String>,
    // Author fields
    pub author_id: Uuid,
    pub author_username: String,
    pub author_full_name: String,
}

Database Migrations

Creating Migrations

# Create a new migration
sqlx migrate add create_users_table

# Create another migration
sqlx migrate add create_posts_table

Migration files:

migrations/001_create_users_table.sql:

-- Create users table
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    full_name VARCHAR(255) NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    is_active BOOLEAN DEFAULT TRUE
);

-- Create indexes
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);

-- Create updated_at trigger
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_users_updated_at BEFORE UPDATE
    ON users FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

migrations/002_create_posts_table.sql:

-- Create posts table
CREATE TABLE posts (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    title VARCHAR(500) NOT NULL,
    content TEXT NOT NULL,
    author_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    published BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW(),
    tags TEXT[] DEFAULT '{}'
);

-- Create indexes
CREATE INDEX idx_posts_author_id ON posts(author_id);
CREATE INDEX idx_posts_published ON posts(published);
CREATE INDEX idx_posts_created_at ON posts(created_at);
CREATE INDEX idx_posts_tags ON posts USING GIN(tags);

-- Create updated_at trigger
CREATE TRIGGER update_posts_updated_at BEFORE UPDATE
    ON posts FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Create comments table
CREATE TABLE comments (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    post_id UUID NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
    author_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
    content TEXT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Create indexes for comments
CREATE INDEX idx_comments_post_id ON comments(post_id);
CREATE INDEX idx_comments_author_id ON comments(author_id);
CREATE INDEX idx_comments_created_at ON comments(created_at);

-- Create updated_at trigger for comments
CREATE TRIGGER update_comments_updated_at BEFORE UPDATE
    ON comments FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

Run migrations:

# Run all pending migrations
sqlx migrate run

# Check migration status
sqlx migrate info

# Revert last migration
sqlx migrate revert

CRUD Operations with SQLx

User Repository

use sqlx::{PgPool, Row};
use uuid::Uuid;
use chrono::Utc;

pub struct UserRepository {
    pool: PgPool,
}

impl UserRepository {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
    
    // Create user
    pub async fn create(&self, req: CreateUserRequest, password_hash: String) -> Result<User, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            r#"
            INSERT INTO users (username, email, full_name, password_hash)
            VALUES ($1, $2, $3, $4)
            RETURNING id, username, email, full_name, created_at, updated_at, is_active
            "#,
            req.username,
            req.email,
            req.full_name,
            password_hash
        )
        .fetch_one(&self.pool)
        .await?;
        
        Ok(user)
    }
    
    // Get user by ID
    pub async fn get_by_id(&self, id: Uuid) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            r#"
            SELECT id, username, email, full_name, created_at, updated_at, is_active
            FROM users
            WHERE id = $1
            "#,
            id
        )
        .fetch_optional(&self.pool)
        .await?;
        
        Ok(user)
    }
    
    // Get user by username
    pub async fn get_by_username(&self, username: &str) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            r#"
            SELECT id, username, email, full_name, created_at, updated_at, is_active
            FROM users
            WHERE username = $1
            "#,
            username
        )
        .fetch_optional(&self.pool)
        .await?;
        
        Ok(user)
    }
    
    // List users with pagination
    pub async fn list(&self, limit: i64, offset: i64) -> Result<Vec<User>, sqlx::Error> {
        let users = sqlx::query_as!(
            User,
            r#"
            SELECT id, username, email, full_name, created_at, updated_at, is_active
            FROM users
            ORDER BY created_at DESC
            LIMIT $1 OFFSET $2
            "#,
            limit,
            offset
        )
        .fetch_all(&self.pool)
        .await?;
        
        Ok(users)
    }
    
    // Count total users
    pub async fn count(&self) -> Result<i64, sqlx::Error> {
        let row = sqlx::query!(
            "SELECT COUNT(*) as count FROM users"
        )
        .fetch_one(&self.pool)
        .await?;
        
        Ok(row.count.unwrap_or(0))
    }
    
    // Update user
    pub async fn update(&self, id: Uuid, req: UpdateUserRequest) -> Result<Option<User>, sqlx::Error> {
        let user = sqlx::query_as!(
            User,
            r#"
            UPDATE users 
            SET username = COALESCE($2, username),
                email = COALESCE($3, email),
                full_name = COALESCE($4, full_name),
                is_active = COALESCE($5, is_active),
                updated_at = NOW()
            WHERE id = $1
            RETURNING id, username, email, full_name, created_at, updated_at, is_active
            "#,
            id,
            req.username,
            req.email,
            req.full_name,
            req.is_active
        )
        .fetch_optional(&self.pool)
        .await?;
        
        Ok(user)
    }
    
    // Delete user
    pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
        let result = sqlx::query!(
            "DELETE FROM users WHERE id = $1",
            id
        )
        .execute(&self.pool)
        .await?;
        
        Ok(result.rows_affected() > 0)
    }
    
    // Search users
    pub async fn search(&self, query: &str, limit: i64, offset: i64) -> Result<Vec<User>, sqlx::Error> {
        let search_pattern = format!("%{}%", query);
        
        let users = sqlx::query_as!(
            User,
            r#"
            SELECT id, username, email, full_name, created_at, updated_at, is_active
            FROM users
            WHERE username ILIKE $1 
               OR email ILIKE $1 
               OR full_name ILIKE $1
            ORDER BY created_at DESC
            LIMIT $2 OFFSET $3
            "#,
            search_pattern,
            limit,
            offset
        )
        .fetch_all(&self.pool)
        .await?;
        
        Ok(users)
    }
}

Post Repository with Complex Queries

pub struct PostRepository {
    pool: PgPool,
}

impl PostRepository {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
    
    // Create post
    pub async fn create(&self, author_id: Uuid, req: CreatePostRequest) -> Result<Post, sqlx::Error> {
        let post = sqlx::query_as!(
            Post,
            r#"
            INSERT INTO posts (title, content, author_id, published, tags)
            VALUES ($1, $2, $3, $4, $5)
            RETURNING id, title, content, author_id, published, created_at, updated_at, tags
            "#,
            req.title,
            req.content,
            author_id,
            req.published.unwrap_or(false),
            &req.tags.unwrap_or_default()
        )
        .fetch_one(&self.pool)
        .await?;
        
        Ok(post)
    }
    
    // Get post with author information
    pub async fn get_with_author(&self, id: Uuid) -> Result<Option<PostWithAuthor>, sqlx::Error> {
        let post = sqlx::query_as!(
            PostWithAuthor,
            r#"
            SELECT 
                p.id,
                p.title,
                p.content,
                p.published,
                p.created_at,
                p.updated_at,
                p.tags,
                p.author_id,
                u.username as author_username,
                u.full_name as author_full_name
            FROM posts p
            INNER JOIN users u ON p.author_id = u.id
            WHERE p.id = $1
            "#,
            id
        )
        .fetch_optional(&self.pool)
        .await?;
        
        Ok(post)
    }
    
    // List posts with filters
    pub async fn list_with_filters(
        &self,
        published: Option<bool>,
        author_id: Option<Uuid>,
        tag: Option<String>,
        search: Option<String>,
        limit: i64,
        offset: i64,
    ) -> Result<Vec<PostWithAuthor>, sqlx::Error> {
        let mut query = "
            SELECT 
                p.id,
                p.title,
                p.content,
                p.published,
                p.created_at,
                p.updated_at,
                p.tags,
                p.author_id,
                u.username as author_username,
                u.full_name as author_full_name
            FROM posts p
            INNER JOIN users u ON p.author_id = u.id
            WHERE 1=1
        ".to_string();
        
        let mut param_count = 0;
        let mut params: Vec<Box<dyn sqlx::Type<sqlx::Postgres> + Send + Sync>> = Vec::new();
        
        if let Some(published) = published {
            param_count += 1;
            query.push_str(&format!(" AND p.published = ${}", param_count));
            params.push(Box::new(published));
        }
        
        if let Some(author_id) = author_id {
            param_count += 1;
            query.push_str(&format!(" AND p.author_id = ${}", param_count));
            params.push(Box::new(author_id));
        }
        
        if let Some(tag) = tag {
            param_count += 1;
            query.push_str(&format!(" AND ${} = ANY(p.tags)", param_count));
            params.push(Box::new(tag));
        }
        
        if let Some(search) = search {
            param_count += 1;
            let search_pattern = format!("%{}%", search);
            query.push_str(&format!(" AND (p.title ILIKE ${} OR p.content ILIKE ${})", param_count, param_count));
            params.push(Box::new(search_pattern));
        }
        
        query.push_str(" ORDER BY p.created_at DESC");
        
        param_count += 1;
        query.push_str(&format!(" LIMIT ${}", param_count));
        params.push(Box::new(limit));
        
        param_count += 1;
        query.push_str(&format!(" OFFSET ${}", param_count));
        params.push(Box::new(offset));
        
        // Note: This is a simplified example. In practice, you'd use a query builder
        // or implement this more safely with conditional query building
        
        let posts = sqlx::query_as::<_, PostWithAuthor>(&query)
            .fetch_all(&self.pool)
            .await?;
        
        Ok(posts)
    }
    
    // Get posts by tag
    pub async fn get_by_tag(&self, tag: &str, limit: i64, offset: i64) -> Result<Vec<Post>, sqlx::Error> {
        let posts = sqlx::query_as!(
            Post,
            r#"
            SELECT id, title, content, author_id, published, created_at, updated_at, tags
            FROM posts
            WHERE $1 = ANY(tags) AND published = true
            ORDER BY created_at DESC
            LIMIT $2 OFFSET $3
            "#,
            tag,
            limit,
            offset
        )
        .fetch_all(&self.pool)
        .await?;
        
        Ok(posts)
    }
    
    // Update post
    pub async fn update(&self, id: Uuid, author_id: Uuid, req: UpdatePostRequest) -> Result<Option<Post>, sqlx::Error> {
        let post = sqlx::query_as!(
            Post,
            r#"
            UPDATE posts 
            SET title = COALESCE($3, title),
                content = COALESCE($4, content),
                published = COALESCE($5, published),
                tags = COALESCE($6, tags),
                updated_at = NOW()
            WHERE id = $1 AND author_id = $2
            RETURNING id, title, content, author_id, published, created_at, updated_at, tags
            "#,
            id,
            author_id,
            req.title,
            req.content,
            req.published,
            &req.tags
        )
        .fetch_optional(&self.pool)
        .await?;
        
        Ok(post)
    }
}

#[derive(Debug, Deserialize)]
pub struct UpdatePostRequest {
    pub title: Option<String>,
    pub content: Option<String>,
    pub published: Option<bool>,
    pub tags: Option<Vec<String>>,
}

Transactions

Using Transactions

use sqlx::{PgPool, Postgres, Transaction};

pub async fn transfer_ownership(
    pool: &PgPool,
    post_id: Uuid,
    old_owner_id: Uuid,
    new_owner_id: Uuid,
) -> Result<(), sqlx::Error> {
    let mut tx: Transaction<Postgres> = pool.begin().await?;
    
    // Verify old owner
    let post = sqlx::query!(
        "SELECT author_id FROM posts WHERE id = $1",
        post_id
    )
    .fetch_optional(&mut *tx)
    .await?;
    
    let post = match post {
        Some(post) => post,
        None => {
            tx.rollback().await?;
            return Err(sqlx::Error::RowNotFound);
        }
    };
    
    if post.author_id != old_owner_id {
        tx.rollback().await?;
        return Err(sqlx::Error::RowNotFound);
    }
    
    // Verify new owner exists
    let new_owner_exists = sqlx::query!(
        "SELECT 1 FROM users WHERE id = $1",
        new_owner_id
    )
    .fetch_optional(&mut *tx)
    .await?
    .is_some();
    
    if !new_owner_exists {
        tx.rollback().await?;
        return Err(sqlx::Error::RowNotFound);
    }
    
    // Transfer ownership
    sqlx::query!(
        "UPDATE posts SET author_id = $1, updated_at = NOW() WHERE id = $2",
        new_owner_id,
        post_id
    )
    .execute(&mut *tx)
    .await?;
    
    // Log the transfer
    sqlx::query!(
        r#"
        INSERT INTO post_transfers (post_id, old_owner_id, new_owner_id, transferred_at)
        VALUES ($1, $2, $3, NOW())
        "#,
        post_id,
        old_owner_id,
        new_owner_id
    )
    .execute(&mut *tx)
    .await?;
    
    tx.commit().await?;
    Ok(())
}

// Complex transaction with multiple operations
pub async fn create_post_with_tags(
    pool: &PgPool,
    author_id: Uuid,
    req: CreatePostRequest,
    category_ids: Vec<i32>,
) -> Result<Post, sqlx::Error> {
    let mut tx = pool.begin().await?;
    
    // Create the post
    let post = sqlx::query_as!(
        Post,
        r#"
        INSERT INTO posts (title, content, author_id, published, tags)
        VALUES ($1, $2, $3, $4, $5)
        RETURNING id, title, content, author_id, published, created_at, updated_at, tags
        "#,
        req.title,
        req.content,
        author_id,
        req.published.unwrap_or(false),
        &req.tags.unwrap_or_default()
    )
    .fetch_one(&mut *tx)
    .await?;
    
    // Add categories
    for category_id in category_ids {
        sqlx::query!(
            "INSERT INTO post_categories (post_id, category_id) VALUES ($1, $2)",
            post.id,
            category_id
        )
        .execute(&mut *tx)
        .await?;
    }
    
    // Update user's post count
    sqlx::query!(
        "UPDATE users SET post_count = post_count + 1 WHERE id = $1",
        author_id
    )
    .execute(&mut *tx)
    .await?;
    
    tx.commit().await?;
    Ok(post)
}

Advanced Query Patterns

Query Builder Pattern

use sqlx::{PgPool, Postgres, QueryBuilder, Row};

pub struct PostQueryBuilder {
    query: QueryBuilder<'static, Postgres>,
    conditions: Vec<String>,
    params: Vec<String>,
}

impl PostQueryBuilder {
    pub fn new() -> Self {
        let query = QueryBuilder::new("
            SELECT 
                p.id,
                p.title,
                p.content,
                p.published,
                p.created_at,
                p.updated_at,
                p.tags,
                p.author_id,
                u.username as author_username,
                u.full_name as author_full_name
            FROM posts p
            INNER JOIN users u ON p.author_id = u.id
        ");
        
        Self {
            query,
            conditions: Vec::new(),
            params: Vec::new(),
        }
    }
    
    pub fn published(mut self, published: bool) -> Self {
        self.conditions.push(format!("p.published = ${}", self.params.len() + 1));
        self.params.push(published.to_string());
        self
    }
    
    pub fn author(mut self, author_id: Uuid) -> Self {
        self.conditions.push(format!("p.author_id = ${}", self.params.len() + 1));
        self.params.push(author_id.to_string());
        self
    }
    
    pub fn has_tag(mut self, tag: &str) -> Self {
        self.conditions.push(format!("${} = ANY(p.tags)", self.params.len() + 1));
        self.params.push(tag.to_string());
        self
    }
    
    pub fn search(mut self, search_term: &str) -> Self {
        let pattern = format!("%{}%", search_term);
        self.conditions.push(format!(
            "(p.title ILIKE ${} OR p.content ILIKE ${})",
            self.params.len() + 1,
            self.params.len() + 1
        ));
        self.params.push(pattern);
        self
    }
    
    pub fn order_by(mut self, field: &str, direction: &str) -> Self {
        self.query.push(" ORDER BY ");
        self.query.push(field);
        self.query.push(" ");
        self.query.push(direction);
        self
    }
    
    pub fn limit(mut self, limit: i64) -> Self {
        self.query.push(" LIMIT ");
        self.query.push_bind(limit);
        self
    }
    
    pub fn offset(mut self, offset: i64) -> Self {
        self.query.push(" OFFSET ");
        self.query.push_bind(offset);
        self
    }
    
    pub async fn execute(mut self, pool: &PgPool) -> Result<Vec<PostWithAuthor>, sqlx::Error> {
        if !self.conditions.is_empty() {
            self.query.push(" WHERE ");
            self.query.push(&self.conditions.join(" AND "));
        }
        
        let query = self.query.build_query_as::<PostWithAuthor>();
        query.fetch_all(pool).await
    }
}

// Usage
pub async fn search_posts(
    pool: &PgPool,
    filters: PostFilters,
) -> Result<Vec<PostWithAuthor>, sqlx::Error> {
    let mut builder = PostQueryBuilder::new();
    
    if let Some(published) = filters.published {
        builder = builder.published(published);
    }
    
    if let Some(author_id) = filters.author_id {
        builder = builder.author(author_id);
    }
    
    if let Some(tag) = &filters.tag {
        builder = builder.has_tag(tag);
    }
    
    if let Some(search) = &filters.search {
        builder = builder.search(search);
    }
    
    builder
        .order_by("p.created_at", "DESC")
        .limit(filters.limit.unwrap_or(20))
        .offset(filters.offset.unwrap_or(0))
        .execute(pool)
        .await
}

#[derive(Debug)]
pub struct PostFilters {
    pub published: Option<bool>,
    pub author_id: Option<Uuid>,
    pub tag: Option<String>,
    pub search: Option<String>,
    pub limit: Option<i64>,
    pub offset: Option<i64>,
}

Bulk Operations

// Bulk insert
pub async fn bulk_create_posts(
    pool: &PgPool,
    posts: Vec<CreatePostRequest>,
    author_id: Uuid,
) -> Result<Vec<Post>, sqlx::Error> {
    let mut query_builder = QueryBuilder::new(
        "INSERT INTO posts (title, content, author_id, published, tags) "
    );
    
    query_builder.push_values(posts.iter(), |mut b, post| {
        b.push_bind(&post.title)
         .push_bind(&post.content)
         .push_bind(author_id)
         .push_bind(post.published.unwrap_or(false))
         .push_bind(&post.tags.clone().unwrap_or_default());
    });
    
    query_builder.push(" RETURNING id, title, content, author_id, published, created_at, updated_at, tags");
    
    let query = query_builder.build_query_as::<Post>();
    query.fetch_all(pool).await
}

// Bulk update
pub async fn bulk_publish_posts(
    pool: &PgPool,
    post_ids: &[Uuid],
    author_id: Uuid,
) -> Result<u64, sqlx::Error> {
    let mut query_builder = QueryBuilder::new(
        "UPDATE posts SET published = true, updated_at = NOW() WHERE author_id = "
    );
    query_builder.push_bind(author_id);
    query_builder.push(" AND id = ANY(");
    query_builder.push_bind(post_ids);
    query_builder.push(")");
    
    let result = query_builder.build().execute(pool).await?;
    Ok(result.rows_affected())
}

Raw SQL and Dynamic Queries

Complex Raw SQL

pub async fn get_user_analytics(
    pool: &PgPool,
    user_id: Uuid,
) -> Result<UserAnalytics, sqlx::Error> {
    let analytics = sqlx::query_as!(
        UserAnalytics,
        r#"
        WITH user_stats AS (
            SELECT 
                COUNT(CASE WHEN published = true THEN 1 END) as published_posts,
                COUNT(CASE WHEN published = false THEN 1 END) as draft_posts,
                AVG(CASE WHEN published = true THEN LENGTH(content) END) as avg_content_length,
                MAX(created_at) as last_post_date
            FROM posts 
            WHERE author_id = $1
        ),
        comment_stats AS (
            SELECT COUNT(*) as total_comments
            FROM comments c
            INNER JOIN posts p ON c.post_id = p.id
            WHERE p.author_id = $1
        ),
        engagement_stats AS (
            SELECT 
                COUNT(DISTINCT c.author_id) as unique_commenters,
                COUNT(c.id) as comments_received
            FROM comments c
            INNER JOIN posts p ON c.post_id = p.id
            WHERE p.author_id = $1
        )
        SELECT 
            us.published_posts as "published_posts!",
            us.draft_posts as "draft_posts!",
            us.avg_content_length as "avg_content_length",
            us.last_post_date,
            cs.total_comments as "total_comments!",
            es.unique_commenters as "unique_commenters!",
            es.comments_received as "comments_received!"
        FROM user_stats us
        CROSS JOIN comment_stats cs
        CROSS JOIN engagement_stats es
        "#,
        user_id
    )
    .fetch_one(pool)
    .await?;
    
    Ok(analytics)
}

#[derive(Debug, Serialize, FromRow)]
pub struct UserAnalytics {
    pub published_posts: i64,
    pub draft_posts: i64,
    pub avg_content_length: Option<f64>,
    pub last_post_date: Option<DateTime<Utc>>,
    pub total_comments: i64,
    pub unique_commenters: i64,
    pub comments_received: i64,
}

Stored Procedures and Functions

-- Create stored procedure
CREATE OR REPLACE FUNCTION get_popular_tags(
    limit_count INTEGER DEFAULT 10
) RETURNS TABLE(tag TEXT, post_count BIGINT) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        unnest(p.tags) as tag,
        COUNT(*) as post_count
    FROM posts p
    WHERE p.published = true
    GROUP BY unnest(p.tags)
    ORDER BY COUNT(*) DESC
    LIMIT limit_count;
END;
$$ LANGUAGE plpgsql;
// Call stored procedure
pub async fn get_popular_tags(
    pool: &PgPool,
    limit: i32,
) -> Result<Vec<TagCount>, sqlx::Error> {
    let tags = sqlx::query_as!(
        TagCount,
        "SELECT * FROM get_popular_tags($1)",
        limit
    )
    .fetch_all(pool)
    .await?;
    
    Ok(tags)
}

#[derive(Debug, Serialize, FromRow)]
pub struct TagCount {
    pub tag: Option<String>,
    pub post_count: Option<i64>,
}

Error Handling and Logging

Custom Error Types

use thiserror::Error;

#[derive(Debug, Error)]
pub enum DatabaseError {
    #[error("Database connection error: {0}")]
    Connection(#[from] sqlx::Error),
    
    #[error("User not found")]
    UserNotFound,
    
    #[error("Post not found")]
    PostNotFound,
    
    #[error("Duplicate entry: {field}")]
    DuplicateEntry { field: String },
    
    #[error("Validation error: {0}")]
    Validation(String),
    
    #[error("Permission denied")]
    PermissionDenied,
}

impl From<DatabaseError> for sqlx::Error {
    fn from(err: DatabaseError) -> Self {
        match err {
            DatabaseError::Connection(e) => e,
            _ => sqlx::Error::RowNotFound,
        }
    }
}

// Repository with error handling
impl UserRepository {
    pub async fn create_safe(&self, req: CreateUserRequest, password_hash: String) -> Result<User, DatabaseError> {
        match self.create(req, password_hash).await {
            Ok(user) => Ok(user),
            Err(sqlx::Error::Database(db_err)) => {
                if db_err.code() == Some("23505".into()) { // Unique violation
                    if db_err.message().contains("username") {
                        Err(DatabaseError::DuplicateEntry { field: "username".to_string() })
                    } else if db_err.message().contains("email") {
                        Err(DatabaseError::DuplicateEntry { field: "email".to_string() })
                    } else {
                        Err(DatabaseError::DuplicateEntry { field: "unknown".to_string() })
                    }
                } else {
                    Err(DatabaseError::Connection(sqlx::Error::Database(db_err)))
                }
            }
            Err(e) => Err(DatabaseError::Connection(e)),
        }
    }
}

Database Logging

use tracing::{info, warn, error, instrument};

impl UserRepository {
    #[instrument(skip(self, password_hash))]
    pub async fn create_with_logging(&self, req: CreateUserRequest, password_hash: String) -> Result<User, DatabaseError> {
        info!("Creating user with username: {}", req.username);
        
        match self.create(req.clone(), password_hash).await {
            Ok(user) => {
                info!("Successfully created user with ID: {}", user.id);
                Ok(user)
            }
            Err(e) => {
                error!("Failed to create user {}: {}", req.username, e);
                Err(DatabaseError::Connection(e))
            }
        }
    }
    
    #[instrument(skip(self))]
    pub async fn get_by_id_with_logging(&self, id: Uuid) -> Result<Option<User>, DatabaseError> {
        match self.get_by_id(id).await {
            Ok(Some(user)) => {
                info!("Found user: {}", user.username);
                Ok(Some(user))
            }
            Ok(None) => {
                warn!("User not found with ID: {}", id);
                Ok(None)
            }
            Err(e) => {
                error!("Database error while fetching user {}: {}", id, e);
                Err(DatabaseError::Connection(e))
            }
        }
    }
}

Integration with Web Frameworks

Axum Integration

use axum::{
    extract::{Path, Query, State},
    http::StatusCode,
    response::Json,
    routing::{get, post, put, delete},
    Router,
};

#[derive(Clone)]
struct AppState {
    db: PgPool,
    user_repo: UserRepository,
    post_repo: PostRepository,
}

async fn create_user(
    State(state): State<AppState>,
    Json(req): Json<CreateUserRequest>,
) -> Result<Json<User>, StatusCode> {
    let password_hash = "hashed_password".to_string(); // Hash the password properly
    
    match state.user_repo.create_safe(req, password_hash).await {
        Ok(user) => Ok(Json(user)),
        Err(DatabaseError::DuplicateEntry { field }) => {
            // Return 409 Conflict
            Err(StatusCode::CONFLICT)
        }
        Err(DatabaseError::Validation(_)) => {
            Err(StatusCode::BAD_REQUEST)
        }
        Err(_) => {
            Err(StatusCode::INTERNAL_SERVER_ERROR)
        }
    }
}

async fn get_user(
    State(state): State<AppState>,
    Path(id): Path<Uuid>,
) -> Result<Json<User>, StatusCode> {
    match state.user_repo.get_by_id(id).await {
        Ok(Some(user)) => Ok(Json(user)),
        Ok(None) => Err(StatusCode::NOT_FOUND),
        Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
    }
}

async fn list_posts(
    State(state): State<AppState>,
    Query(filters): Query<PostFilters>,
) -> Result<Json<Vec<PostWithAuthor>>, StatusCode> {
    match search_posts(&state.db, filters).await {
        Ok(posts) => Ok(Json(posts)),
        Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
    }
}

fn create_router(state: AppState) -> Router {
    Router::new()
        .route("/users", post(create_user))
        .route("/users/:id", get(get_user))
        .route("/posts", get(list_posts))
        .with_state(state)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Setup database
    let pool = create_database_pool().await?;
    
    // Run migrations
    sqlx::migrate!().run(&pool).await?;
    
    // Create repositories
    let user_repo = UserRepository::new(pool.clone());
    let post_repo = PostRepository::new(pool.clone());
    
    let state = AppState {
        db: pool,
        user_repo,
        post_repo,
    };
    
    let app = create_router(state);
    
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

Testing Database Code

Integration Tests

#[cfg(test)]
mod tests {
    use super::*;
    use sqlx::PgPool;
    use testcontainers::{clients, images};
    
    async fn setup_test_db() -> PgPool {
        let docker = clients::Cli::default();
        let postgres = docker.run(images::postgres::Postgres::default());
        
        let connection_string = format!(
            "postgres://postgres:[email protected]:{}/postgres",
            postgres.get_host_port_ipv4(5432)
        );
        
        let pool = PgPool::connect(&connection_string).await.unwrap();
        
        // Run migrations
        sqlx::migrate!().run(&pool).await.unwrap();
        
        pool
    }
    
    #[tokio::test]
    async fn test_user_crud() {
        let pool = setup_test_db().await;
        let repo = UserRepository::new(pool);
        
        // Test create
        let req = CreateUserRequest {
            username: "testuser".to_string(),
            email: "[email protected]".to_string(),
            full_name: "Test User".to_string(),
        };
        
        let user = repo.create(req.clone(), "password_hash".to_string()).await.unwrap();
        assert_eq!(user.username, "testuser");
        assert_eq!(user.email, "[email protected]");
        
        // Test get by ID
        let found_user = repo.get_by_id(user.id).await.unwrap();
        assert!(found_user.is_some());
        assert_eq!(found_user.unwrap().id, user.id);
        
        // Test get by username
        let found_user = repo.get_by_username("testuser").await.unwrap();
        assert!(found_user.is_some());
        
        // Test update
        let update_req = UpdateUserRequest {
            username: None,
            email: Some("[email protected]".to_string()),
            full_name: Some("New Name".to_string()),
            is_active: None,
        };
        
        let updated_user = repo.update(user.id, update_req).await.unwrap();
        assert!(updated_user.is_some());
        assert_eq!(updated_user.unwrap().email, "[email protected]");
        
        // Test delete
        let deleted = repo.delete(user.id).await.unwrap();
        assert!(deleted);
        
        let not_found = repo.get_by_id(user.id).await.unwrap();
        assert!(not_found.is_none());
    }
    
    #[tokio::test]
    async fn test_transaction_rollback() {
        let pool = setup_test_db().await;
        let repo = UserRepository::new(pool.clone());
        
        // Create a user
        let req = CreateUserRequest {
            username: "testuser".to_string(),
            email: "[email protected]".to_string(),
            full_name: "Test User".to_string(),
        };
        
        let user = repo.create(req, "password_hash".to_string()).await.unwrap();
        
        // Test transaction that should fail
        let result = transfer_ownership(
            &pool,
            Uuid::new_v4(), // Non-existent post
            user.id,
            Uuid::new_v4(), // Non-existent new owner
        ).await;
        
        assert!(result.is_err());
        
        // Verify user still exists (transaction was rolled back properly)
        let found_user = repo.get_by_id(user.id).await.unwrap();
        assert!(found_user.is_some());
    }
}

Database integration in Rust provides excellent type safety, performance, and ergonomics. SQLx's compile-time query checking helps catch errors early, while the async support enables high-performance applications. Always use transactions for multi-step operations, implement proper error handling, and write comprehensive tests for your database code.