Database Integration
Database integration is crucial for most web applications. Rust offers excellent database libraries that provide both safety and performance while maintaining ergonomic APIs for database operations.
Database Libraries Overview
SQLx - Async SQL Toolkit
SQLx is a modern, async SQL toolkit that provides compile-time checked queries and supports multiple databases.
Features:
- Compile-time query verification
- Async/await support
- Connection pooling
- Migration support
- Type-safe query results
- Multiple database backends (PostgreSQL, MySQL, SQLite)
Diesel - Safe ORM
Diesel is a safe, extensible ORM that prevents runtime errors through Rust's type system.
Features:
- Compile-time query building
- Schema migrations
- Strong type safety
- Custom SQL support
- Connection pooling
SQLx Setup and Basic Usage
Project Setup
Add to Cargo.toml
:
[dependencies]
sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "postgres", "chrono", "uuid" ] }
tokio = { version = "1", features = ["full"] }
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
serde = { version = "1.0", features = ["derive"] }
# For migrations
[dev-dependencies]
sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "postgres", "chrono", "uuid", "migrate" ] }
Environment setup:
# .env file
DATABASE_URL=postgresql://username:password@localhost/database_name
# Install SQLx CLI
cargo install sqlx-cli
# Create database
sqlx database create
# Run migrations
sqlx migrate run
Basic Database Connection
use sqlx::{PgPool, Row};
use std::env;
#[tokio::main]
async fn main() -> Result<(), sqlx::Error> {
// Load environment variables
dotenv::dotenv().ok();
// Create connection pool
let database_url = env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
let pool = PgPool::connect(&database_url).await?;
// Test connection
let row: (i64,) = sqlx::query_as("SELECT $1")
.bind(150_i64)
.fetch_one(&pool)
.await?;
assert_eq!(row.0, 150);
println!("Database connection successful!");
Ok(())
}
Connection Pool Configuration
use sqlx::{PgPool, postgres::PgPoolOptions};
use std::time::Duration;
async fn create_database_pool() -> Result<PgPool, sqlx::Error> {
let database_url = std::env::var("DATABASE_URL")
.expect("DATABASE_URL must be set");
PgPoolOptions::new()
.max_connections(20) // Maximum connections
.min_connections(5) // Minimum connections
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(600)) // 10 minutes
.max_lifetime(Duration::from_secs(1800)) // 30 minutes
.test_before_acquire(true) // Test connections before use
.connect(&database_url)
.await
}
// Use in application
#[derive(Clone)]
struct AppState {
db: PgPool,
}
async fn setup_app() -> AppState {
let pool = create_database_pool().await
.expect("Failed to create database pool");
AppState { db: pool }
}
Database Models and Schema
Defining Models
use serde::{Deserialize, Serialize};
use sqlx::FromRow;
use chrono::{DateTime, Utc};
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct User {
pub id: Uuid,
pub username: String,
pub email: String,
pub full_name: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub is_active: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Post {
pub id: Uuid,
pub title: String,
pub content: String,
pub author_id: Uuid,
pub published: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub tags: Vec<String>, // PostgreSQL array
}
#[derive(Debug, Clone, Serialize, Deserialize, FromRow)]
pub struct Comment {
pub id: Uuid,
pub post_id: Uuid,
pub author_id: Uuid,
pub content: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
// Request/Response DTOs
#[derive(Debug, Deserialize)]
pub struct CreateUserRequest {
pub username: String,
pub email: String,
pub full_name: String,
}
#[derive(Debug, Deserialize)]
pub struct UpdateUserRequest {
pub username: Option<String>,
pub email: Option<String>,
pub full_name: Option<String>,
pub is_active: Option<bool>,
}
#[derive(Debug, Deserialize)]
pub struct CreatePostRequest {
pub title: String,
pub content: String,
pub published: Option<bool>,
pub tags: Option<Vec<String>>,
}
// Join result for complex queries
#[derive(Debug, Serialize, FromRow)]
pub struct PostWithAuthor {
// Post fields
pub id: Uuid,
pub title: String,
pub content: String,
pub published: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub tags: Vec<String>,
// Author fields
pub author_id: Uuid,
pub author_username: String,
pub author_full_name: String,
}
Database Migrations
Creating Migrations
# Create a new migration
sqlx migrate add create_users_table
# Create another migration
sqlx migrate add create_posts_table
Migration files:
migrations/001_create_users_table.sql
:
-- Create users table
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
full_name VARCHAR(255) NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
is_active BOOLEAN DEFAULT TRUE
);
-- Create indexes
CREATE INDEX idx_users_username ON users(username);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_users_created_at ON users(created_at);
-- Create updated_at trigger
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_users_updated_at BEFORE UPDATE
ON users FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
migrations/002_create_posts_table.sql
:
-- Create posts table
CREATE TABLE posts (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
title VARCHAR(500) NOT NULL,
content TEXT NOT NULL,
author_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
published BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
tags TEXT[] DEFAULT '{}'
);
-- Create indexes
CREATE INDEX idx_posts_author_id ON posts(author_id);
CREATE INDEX idx_posts_published ON posts(published);
CREATE INDEX idx_posts_created_at ON posts(created_at);
CREATE INDEX idx_posts_tags ON posts USING GIN(tags);
-- Create updated_at trigger
CREATE TRIGGER update_posts_updated_at BEFORE UPDATE
ON posts FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
-- Create comments table
CREATE TABLE comments (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
post_id UUID NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
author_id UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
content TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Create indexes for comments
CREATE INDEX idx_comments_post_id ON comments(post_id);
CREATE INDEX idx_comments_author_id ON comments(author_id);
CREATE INDEX idx_comments_created_at ON comments(created_at);
-- Create updated_at trigger for comments
CREATE TRIGGER update_comments_updated_at BEFORE UPDATE
ON comments FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
Run migrations:
# Run all pending migrations
sqlx migrate run
# Check migration status
sqlx migrate info
# Revert last migration
sqlx migrate revert
CRUD Operations with SQLx
User Repository
use sqlx::{PgPool, Row};
use uuid::Uuid;
use chrono::Utc;
pub struct UserRepository {
pool: PgPool,
}
impl UserRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
// Create user
pub async fn create(&self, req: CreateUserRequest, password_hash: String) -> Result<User, sqlx::Error> {
let user = sqlx::query_as!(
User,
r#"
INSERT INTO users (username, email, full_name, password_hash)
VALUES ($1, $2, $3, $4)
RETURNING id, username, email, full_name, created_at, updated_at, is_active
"#,
req.username,
req.email,
req.full_name,
password_hash
)
.fetch_one(&self.pool)
.await?;
Ok(user)
}
// Get user by ID
pub async fn get_by_id(&self, id: Uuid) -> Result<Option<User>, sqlx::Error> {
let user = sqlx::query_as!(
User,
r#"
SELECT id, username, email, full_name, created_at, updated_at, is_active
FROM users
WHERE id = $1
"#,
id
)
.fetch_optional(&self.pool)
.await?;
Ok(user)
}
// Get user by username
pub async fn get_by_username(&self, username: &str) -> Result<Option<User>, sqlx::Error> {
let user = sqlx::query_as!(
User,
r#"
SELECT id, username, email, full_name, created_at, updated_at, is_active
FROM users
WHERE username = $1
"#,
username
)
.fetch_optional(&self.pool)
.await?;
Ok(user)
}
// List users with pagination
pub async fn list(&self, limit: i64, offset: i64) -> Result<Vec<User>, sqlx::Error> {
let users = sqlx::query_as!(
User,
r#"
SELECT id, username, email, full_name, created_at, updated_at, is_active
FROM users
ORDER BY created_at DESC
LIMIT $1 OFFSET $2
"#,
limit,
offset
)
.fetch_all(&self.pool)
.await?;
Ok(users)
}
// Count total users
pub async fn count(&self) -> Result<i64, sqlx::Error> {
let row = sqlx::query!(
"SELECT COUNT(*) as count FROM users"
)
.fetch_one(&self.pool)
.await?;
Ok(row.count.unwrap_or(0))
}
// Update user
pub async fn update(&self, id: Uuid, req: UpdateUserRequest) -> Result<Option<User>, sqlx::Error> {
let user = sqlx::query_as!(
User,
r#"
UPDATE users
SET username = COALESCE($2, username),
email = COALESCE($3, email),
full_name = COALESCE($4, full_name),
is_active = COALESCE($5, is_active),
updated_at = NOW()
WHERE id = $1
RETURNING id, username, email, full_name, created_at, updated_at, is_active
"#,
id,
req.username,
req.email,
req.full_name,
req.is_active
)
.fetch_optional(&self.pool)
.await?;
Ok(user)
}
// Delete user
pub async fn delete(&self, id: Uuid) -> Result<bool, sqlx::Error> {
let result = sqlx::query!(
"DELETE FROM users WHERE id = $1",
id
)
.execute(&self.pool)
.await?;
Ok(result.rows_affected() > 0)
}
// Search users
pub async fn search(&self, query: &str, limit: i64, offset: i64) -> Result<Vec<User>, sqlx::Error> {
let search_pattern = format!("%{}%", query);
let users = sqlx::query_as!(
User,
r#"
SELECT id, username, email, full_name, created_at, updated_at, is_active
FROM users
WHERE username ILIKE $1
OR email ILIKE $1
OR full_name ILIKE $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
search_pattern,
limit,
offset
)
.fetch_all(&self.pool)
.await?;
Ok(users)
}
}
Post Repository with Complex Queries
pub struct PostRepository {
pool: PgPool,
}
impl PostRepository {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
// Create post
pub async fn create(&self, author_id: Uuid, req: CreatePostRequest) -> Result<Post, sqlx::Error> {
let post = sqlx::query_as!(
Post,
r#"
INSERT INTO posts (title, content, author_id, published, tags)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, title, content, author_id, published, created_at, updated_at, tags
"#,
req.title,
req.content,
author_id,
req.published.unwrap_or(false),
&req.tags.unwrap_or_default()
)
.fetch_one(&self.pool)
.await?;
Ok(post)
}
// Get post with author information
pub async fn get_with_author(&self, id: Uuid) -> Result<Option<PostWithAuthor>, sqlx::Error> {
let post = sqlx::query_as!(
PostWithAuthor,
r#"
SELECT
p.id,
p.title,
p.content,
p.published,
p.created_at,
p.updated_at,
p.tags,
p.author_id,
u.username as author_username,
u.full_name as author_full_name
FROM posts p
INNER JOIN users u ON p.author_id = u.id
WHERE p.id = $1
"#,
id
)
.fetch_optional(&self.pool)
.await?;
Ok(post)
}
// List posts with filters
pub async fn list_with_filters(
&self,
published: Option<bool>,
author_id: Option<Uuid>,
tag: Option<String>,
search: Option<String>,
limit: i64,
offset: i64,
) -> Result<Vec<PostWithAuthor>, sqlx::Error> {
let mut query = "
SELECT
p.id,
p.title,
p.content,
p.published,
p.created_at,
p.updated_at,
p.tags,
p.author_id,
u.username as author_username,
u.full_name as author_full_name
FROM posts p
INNER JOIN users u ON p.author_id = u.id
WHERE 1=1
".to_string();
let mut param_count = 0;
let mut params: Vec<Box<dyn sqlx::Type<sqlx::Postgres> + Send + Sync>> = Vec::new();
if let Some(published) = published {
param_count += 1;
query.push_str(&format!(" AND p.published = ${}", param_count));
params.push(Box::new(published));
}
if let Some(author_id) = author_id {
param_count += 1;
query.push_str(&format!(" AND p.author_id = ${}", param_count));
params.push(Box::new(author_id));
}
if let Some(tag) = tag {
param_count += 1;
query.push_str(&format!(" AND ${} = ANY(p.tags)", param_count));
params.push(Box::new(tag));
}
if let Some(search) = search {
param_count += 1;
let search_pattern = format!("%{}%", search);
query.push_str(&format!(" AND (p.title ILIKE ${} OR p.content ILIKE ${})", param_count, param_count));
params.push(Box::new(search_pattern));
}
query.push_str(" ORDER BY p.created_at DESC");
param_count += 1;
query.push_str(&format!(" LIMIT ${}", param_count));
params.push(Box::new(limit));
param_count += 1;
query.push_str(&format!(" OFFSET ${}", param_count));
params.push(Box::new(offset));
// Note: This is a simplified example. In practice, you'd use a query builder
// or implement this more safely with conditional query building
let posts = sqlx::query_as::<_, PostWithAuthor>(&query)
.fetch_all(&self.pool)
.await?;
Ok(posts)
}
// Get posts by tag
pub async fn get_by_tag(&self, tag: &str, limit: i64, offset: i64) -> Result<Vec<Post>, sqlx::Error> {
let posts = sqlx::query_as!(
Post,
r#"
SELECT id, title, content, author_id, published, created_at, updated_at, tags
FROM posts
WHERE $1 = ANY(tags) AND published = true
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
"#,
tag,
limit,
offset
)
.fetch_all(&self.pool)
.await?;
Ok(posts)
}
// Update post
pub async fn update(&self, id: Uuid, author_id: Uuid, req: UpdatePostRequest) -> Result<Option<Post>, sqlx::Error> {
let post = sqlx::query_as!(
Post,
r#"
UPDATE posts
SET title = COALESCE($3, title),
content = COALESCE($4, content),
published = COALESCE($5, published),
tags = COALESCE($6, tags),
updated_at = NOW()
WHERE id = $1 AND author_id = $2
RETURNING id, title, content, author_id, published, created_at, updated_at, tags
"#,
id,
author_id,
req.title,
req.content,
req.published,
&req.tags
)
.fetch_optional(&self.pool)
.await?;
Ok(post)
}
}
#[derive(Debug, Deserialize)]
pub struct UpdatePostRequest {
pub title: Option<String>,
pub content: Option<String>,
pub published: Option<bool>,
pub tags: Option<Vec<String>>,
}
Transactions
Using Transactions
use sqlx::{PgPool, Postgres, Transaction};
pub async fn transfer_ownership(
pool: &PgPool,
post_id: Uuid,
old_owner_id: Uuid,
new_owner_id: Uuid,
) -> Result<(), sqlx::Error> {
let mut tx: Transaction<Postgres> = pool.begin().await?;
// Verify old owner
let post = sqlx::query!(
"SELECT author_id FROM posts WHERE id = $1",
post_id
)
.fetch_optional(&mut *tx)
.await?;
let post = match post {
Some(post) => post,
None => {
tx.rollback().await?;
return Err(sqlx::Error::RowNotFound);
}
};
if post.author_id != old_owner_id {
tx.rollback().await?;
return Err(sqlx::Error::RowNotFound);
}
// Verify new owner exists
let new_owner_exists = sqlx::query!(
"SELECT 1 FROM users WHERE id = $1",
new_owner_id
)
.fetch_optional(&mut *tx)
.await?
.is_some();
if !new_owner_exists {
tx.rollback().await?;
return Err(sqlx::Error::RowNotFound);
}
// Transfer ownership
sqlx::query!(
"UPDATE posts SET author_id = $1, updated_at = NOW() WHERE id = $2",
new_owner_id,
post_id
)
.execute(&mut *tx)
.await?;
// Log the transfer
sqlx::query!(
r#"
INSERT INTO post_transfers (post_id, old_owner_id, new_owner_id, transferred_at)
VALUES ($1, $2, $3, NOW())
"#,
post_id,
old_owner_id,
new_owner_id
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
// Complex transaction with multiple operations
pub async fn create_post_with_tags(
pool: &PgPool,
author_id: Uuid,
req: CreatePostRequest,
category_ids: Vec<i32>,
) -> Result<Post, sqlx::Error> {
let mut tx = pool.begin().await?;
// Create the post
let post = sqlx::query_as!(
Post,
r#"
INSERT INTO posts (title, content, author_id, published, tags)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, title, content, author_id, published, created_at, updated_at, tags
"#,
req.title,
req.content,
author_id,
req.published.unwrap_or(false),
&req.tags.unwrap_or_default()
)
.fetch_one(&mut *tx)
.await?;
// Add categories
for category_id in category_ids {
sqlx::query!(
"INSERT INTO post_categories (post_id, category_id) VALUES ($1, $2)",
post.id,
category_id
)
.execute(&mut *tx)
.await?;
}
// Update user's post count
sqlx::query!(
"UPDATE users SET post_count = post_count + 1 WHERE id = $1",
author_id
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(post)
}
Advanced Query Patterns
Query Builder Pattern
use sqlx::{PgPool, Postgres, QueryBuilder, Row};
pub struct PostQueryBuilder {
query: QueryBuilder<'static, Postgres>,
conditions: Vec<String>,
params: Vec<String>,
}
impl PostQueryBuilder {
pub fn new() -> Self {
let query = QueryBuilder::new("
SELECT
p.id,
p.title,
p.content,
p.published,
p.created_at,
p.updated_at,
p.tags,
p.author_id,
u.username as author_username,
u.full_name as author_full_name
FROM posts p
INNER JOIN users u ON p.author_id = u.id
");
Self {
query,
conditions: Vec::new(),
params: Vec::new(),
}
}
pub fn published(mut self, published: bool) -> Self {
self.conditions.push(format!("p.published = ${}", self.params.len() + 1));
self.params.push(published.to_string());
self
}
pub fn author(mut self, author_id: Uuid) -> Self {
self.conditions.push(format!("p.author_id = ${}", self.params.len() + 1));
self.params.push(author_id.to_string());
self
}
pub fn has_tag(mut self, tag: &str) -> Self {
self.conditions.push(format!("${} = ANY(p.tags)", self.params.len() + 1));
self.params.push(tag.to_string());
self
}
pub fn search(mut self, search_term: &str) -> Self {
let pattern = format!("%{}%", search_term);
self.conditions.push(format!(
"(p.title ILIKE ${} OR p.content ILIKE ${})",
self.params.len() + 1,
self.params.len() + 1
));
self.params.push(pattern);
self
}
pub fn order_by(mut self, field: &str, direction: &str) -> Self {
self.query.push(" ORDER BY ");
self.query.push(field);
self.query.push(" ");
self.query.push(direction);
self
}
pub fn limit(mut self, limit: i64) -> Self {
self.query.push(" LIMIT ");
self.query.push_bind(limit);
self
}
pub fn offset(mut self, offset: i64) -> Self {
self.query.push(" OFFSET ");
self.query.push_bind(offset);
self
}
pub async fn execute(mut self, pool: &PgPool) -> Result<Vec<PostWithAuthor>, sqlx::Error> {
if !self.conditions.is_empty() {
self.query.push(" WHERE ");
self.query.push(&self.conditions.join(" AND "));
}
let query = self.query.build_query_as::<PostWithAuthor>();
query.fetch_all(pool).await
}
}
// Usage
pub async fn search_posts(
pool: &PgPool,
filters: PostFilters,
) -> Result<Vec<PostWithAuthor>, sqlx::Error> {
let mut builder = PostQueryBuilder::new();
if let Some(published) = filters.published {
builder = builder.published(published);
}
if let Some(author_id) = filters.author_id {
builder = builder.author(author_id);
}
if let Some(tag) = &filters.tag {
builder = builder.has_tag(tag);
}
if let Some(search) = &filters.search {
builder = builder.search(search);
}
builder
.order_by("p.created_at", "DESC")
.limit(filters.limit.unwrap_or(20))
.offset(filters.offset.unwrap_or(0))
.execute(pool)
.await
}
#[derive(Debug)]
pub struct PostFilters {
pub published: Option<bool>,
pub author_id: Option<Uuid>,
pub tag: Option<String>,
pub search: Option<String>,
pub limit: Option<i64>,
pub offset: Option<i64>,
}
Bulk Operations
// Bulk insert
pub async fn bulk_create_posts(
pool: &PgPool,
posts: Vec<CreatePostRequest>,
author_id: Uuid,
) -> Result<Vec<Post>, sqlx::Error> {
let mut query_builder = QueryBuilder::new(
"INSERT INTO posts (title, content, author_id, published, tags) "
);
query_builder.push_values(posts.iter(), |mut b, post| {
b.push_bind(&post.title)
.push_bind(&post.content)
.push_bind(author_id)
.push_bind(post.published.unwrap_or(false))
.push_bind(&post.tags.clone().unwrap_or_default());
});
query_builder.push(" RETURNING id, title, content, author_id, published, created_at, updated_at, tags");
let query = query_builder.build_query_as::<Post>();
query.fetch_all(pool).await
}
// Bulk update
pub async fn bulk_publish_posts(
pool: &PgPool,
post_ids: &[Uuid],
author_id: Uuid,
) -> Result<u64, sqlx::Error> {
let mut query_builder = QueryBuilder::new(
"UPDATE posts SET published = true, updated_at = NOW() WHERE author_id = "
);
query_builder.push_bind(author_id);
query_builder.push(" AND id = ANY(");
query_builder.push_bind(post_ids);
query_builder.push(")");
let result = query_builder.build().execute(pool).await?;
Ok(result.rows_affected())
}
Raw SQL and Dynamic Queries
Complex Raw SQL
pub async fn get_user_analytics(
pool: &PgPool,
user_id: Uuid,
) -> Result<UserAnalytics, sqlx::Error> {
let analytics = sqlx::query_as!(
UserAnalytics,
r#"
WITH user_stats AS (
SELECT
COUNT(CASE WHEN published = true THEN 1 END) as published_posts,
COUNT(CASE WHEN published = false THEN 1 END) as draft_posts,
AVG(CASE WHEN published = true THEN LENGTH(content) END) as avg_content_length,
MAX(created_at) as last_post_date
FROM posts
WHERE author_id = $1
),
comment_stats AS (
SELECT COUNT(*) as total_comments
FROM comments c
INNER JOIN posts p ON c.post_id = p.id
WHERE p.author_id = $1
),
engagement_stats AS (
SELECT
COUNT(DISTINCT c.author_id) as unique_commenters,
COUNT(c.id) as comments_received
FROM comments c
INNER JOIN posts p ON c.post_id = p.id
WHERE p.author_id = $1
)
SELECT
us.published_posts as "published_posts!",
us.draft_posts as "draft_posts!",
us.avg_content_length as "avg_content_length",
us.last_post_date,
cs.total_comments as "total_comments!",
es.unique_commenters as "unique_commenters!",
es.comments_received as "comments_received!"
FROM user_stats us
CROSS JOIN comment_stats cs
CROSS JOIN engagement_stats es
"#,
user_id
)
.fetch_one(pool)
.await?;
Ok(analytics)
}
#[derive(Debug, Serialize, FromRow)]
pub struct UserAnalytics {
pub published_posts: i64,
pub draft_posts: i64,
pub avg_content_length: Option<f64>,
pub last_post_date: Option<DateTime<Utc>>,
pub total_comments: i64,
pub unique_commenters: i64,
pub comments_received: i64,
}
Stored Procedures and Functions
-- Create stored procedure
CREATE OR REPLACE FUNCTION get_popular_tags(
limit_count INTEGER DEFAULT 10
) RETURNS TABLE(tag TEXT, post_count BIGINT) AS $$
BEGIN
RETURN QUERY
SELECT
unnest(p.tags) as tag,
COUNT(*) as post_count
FROM posts p
WHERE p.published = true
GROUP BY unnest(p.tags)
ORDER BY COUNT(*) DESC
LIMIT limit_count;
END;
$$ LANGUAGE plpgsql;
// Call stored procedure
pub async fn get_popular_tags(
pool: &PgPool,
limit: i32,
) -> Result<Vec<TagCount>, sqlx::Error> {
let tags = sqlx::query_as!(
TagCount,
"SELECT * FROM get_popular_tags($1)",
limit
)
.fetch_all(pool)
.await?;
Ok(tags)
}
#[derive(Debug, Serialize, FromRow)]
pub struct TagCount {
pub tag: Option<String>,
pub post_count: Option<i64>,
}
Error Handling and Logging
Custom Error Types
use thiserror::Error;
#[derive(Debug, Error)]
pub enum DatabaseError {
#[error("Database connection error: {0}")]
Connection(#[from] sqlx::Error),
#[error("User not found")]
UserNotFound,
#[error("Post not found")]
PostNotFound,
#[error("Duplicate entry: {field}")]
DuplicateEntry { field: String },
#[error("Validation error: {0}")]
Validation(String),
#[error("Permission denied")]
PermissionDenied,
}
impl From<DatabaseError> for sqlx::Error {
fn from(err: DatabaseError) -> Self {
match err {
DatabaseError::Connection(e) => e,
_ => sqlx::Error::RowNotFound,
}
}
}
// Repository with error handling
impl UserRepository {
pub async fn create_safe(&self, req: CreateUserRequest, password_hash: String) -> Result<User, DatabaseError> {
match self.create(req, password_hash).await {
Ok(user) => Ok(user),
Err(sqlx::Error::Database(db_err)) => {
if db_err.code() == Some("23505".into()) { // Unique violation
if db_err.message().contains("username") {
Err(DatabaseError::DuplicateEntry { field: "username".to_string() })
} else if db_err.message().contains("email") {
Err(DatabaseError::DuplicateEntry { field: "email".to_string() })
} else {
Err(DatabaseError::DuplicateEntry { field: "unknown".to_string() })
}
} else {
Err(DatabaseError::Connection(sqlx::Error::Database(db_err)))
}
}
Err(e) => Err(DatabaseError::Connection(e)),
}
}
}
Database Logging
use tracing::{info, warn, error, instrument};
impl UserRepository {
#[instrument(skip(self, password_hash))]
pub async fn create_with_logging(&self, req: CreateUserRequest, password_hash: String) -> Result<User, DatabaseError> {
info!("Creating user with username: {}", req.username);
match self.create(req.clone(), password_hash).await {
Ok(user) => {
info!("Successfully created user with ID: {}", user.id);
Ok(user)
}
Err(e) => {
error!("Failed to create user {}: {}", req.username, e);
Err(DatabaseError::Connection(e))
}
}
}
#[instrument(skip(self))]
pub async fn get_by_id_with_logging(&self, id: Uuid) -> Result<Option<User>, DatabaseError> {
match self.get_by_id(id).await {
Ok(Some(user)) => {
info!("Found user: {}", user.username);
Ok(Some(user))
}
Ok(None) => {
warn!("User not found with ID: {}", id);
Ok(None)
}
Err(e) => {
error!("Database error while fetching user {}: {}", id, e);
Err(DatabaseError::Connection(e))
}
}
}
}
Integration with Web Frameworks
Axum Integration
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::Json,
routing::{get, post, put, delete},
Router,
};
#[derive(Clone)]
struct AppState {
db: PgPool,
user_repo: UserRepository,
post_repo: PostRepository,
}
async fn create_user(
State(state): State<AppState>,
Json(req): Json<CreateUserRequest>,
) -> Result<Json<User>, StatusCode> {
let password_hash = "hashed_password".to_string(); // Hash the password properly
match state.user_repo.create_safe(req, password_hash).await {
Ok(user) => Ok(Json(user)),
Err(DatabaseError::DuplicateEntry { field }) => {
// Return 409 Conflict
Err(StatusCode::CONFLICT)
}
Err(DatabaseError::Validation(_)) => {
Err(StatusCode::BAD_REQUEST)
}
Err(_) => {
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
async fn get_user(
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<Json<User>, StatusCode> {
match state.user_repo.get_by_id(id).await {
Ok(Some(user)) => Ok(Json(user)),
Ok(None) => Err(StatusCode::NOT_FOUND),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
async fn list_posts(
State(state): State<AppState>,
Query(filters): Query<PostFilters>,
) -> Result<Json<Vec<PostWithAuthor>>, StatusCode> {
match search_posts(&state.db, filters).await {
Ok(posts) => Ok(Json(posts)),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}
fn create_router(state: AppState) -> Router {
Router::new()
.route("/users", post(create_user))
.route("/users/:id", get(get_user))
.route("/posts", get(list_posts))
.with_state(state)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Setup database
let pool = create_database_pool().await?;
// Run migrations
sqlx::migrate!().run(&pool).await?;
// Create repositories
let user_repo = UserRepository::new(pool.clone());
let post_repo = PostRepository::new(pool.clone());
let state = AppState {
db: pool,
user_repo,
post_repo,
};
let app = create_router(state);
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
Testing Database Code
Integration Tests
#[cfg(test)]
mod tests {
use super::*;
use sqlx::PgPool;
use testcontainers::{clients, images};
async fn setup_test_db() -> PgPool {
let docker = clients::Cli::default();
let postgres = docker.run(images::postgres::Postgres::default());
let connection_string = format!(
"postgres://postgres:[email protected]:{}/postgres",
postgres.get_host_port_ipv4(5432)
);
let pool = PgPool::connect(&connection_string).await.unwrap();
// Run migrations
sqlx::migrate!().run(&pool).await.unwrap();
pool
}
#[tokio::test]
async fn test_user_crud() {
let pool = setup_test_db().await;
let repo = UserRepository::new(pool);
// Test create
let req = CreateUserRequest {
username: "testuser".to_string(),
email: "[email protected]".to_string(),
full_name: "Test User".to_string(),
};
let user = repo.create(req.clone(), "password_hash".to_string()).await.unwrap();
assert_eq!(user.username, "testuser");
assert_eq!(user.email, "[email protected]");
// Test get by ID
let found_user = repo.get_by_id(user.id).await.unwrap();
assert!(found_user.is_some());
assert_eq!(found_user.unwrap().id, user.id);
// Test get by username
let found_user = repo.get_by_username("testuser").await.unwrap();
assert!(found_user.is_some());
// Test update
let update_req = UpdateUserRequest {
username: None,
email: Some("[email protected]".to_string()),
full_name: Some("New Name".to_string()),
is_active: None,
};
let updated_user = repo.update(user.id, update_req).await.unwrap();
assert!(updated_user.is_some());
assert_eq!(updated_user.unwrap().email, "[email protected]");
// Test delete
let deleted = repo.delete(user.id).await.unwrap();
assert!(deleted);
let not_found = repo.get_by_id(user.id).await.unwrap();
assert!(not_found.is_none());
}
#[tokio::test]
async fn test_transaction_rollback() {
let pool = setup_test_db().await;
let repo = UserRepository::new(pool.clone());
// Create a user
let req = CreateUserRequest {
username: "testuser".to_string(),
email: "[email protected]".to_string(),
full_name: "Test User".to_string(),
};
let user = repo.create(req, "password_hash".to_string()).await.unwrap();
// Test transaction that should fail
let result = transfer_ownership(
&pool,
Uuid::new_v4(), // Non-existent post
user.id,
Uuid::new_v4(), // Non-existent new owner
).await;
assert!(result.is_err());
// Verify user still exists (transaction was rolled back properly)
let found_user = repo.get_by_id(user.id).await.unwrap();
assert!(found_user.is_some());
}
}
Database integration in Rust provides excellent type safety, performance, and ergonomics. SQLx's compile-time query checking helps catch errors early, while the async support enables high-performance applications. Always use transactions for multi-step operations, implement proper error handling, and write comprehensive tests for your database code.