1. rust
  2. /web
  3. /async-web

Async Web Development

Async programming is fundamental to modern web development in Rust, enabling applications to handle thousands of concurrent connections efficiently. This guide covers async patterns, best practices, and advanced techniques for building high-performance web applications.

Async Fundamentals

Understanding Async/Await

use std::time::Duration;
use tokio::time::sleep;

// Basic async function
async fn fetch_data(id: u32) -> Result<String, Box<dyn std::error::Error>> {
    // Simulate network delay
    sleep(Duration::from_millis(100)).await;
    
    Ok(format!("Data for ID: {}", id))
}

// Async function that calls other async functions
async fn process_user_data(user_id: u32) -> Result<String, Box<dyn std::error::Error>> {
    println!("Starting to process user {}", user_id);
    
    // Await the async operation
    let data = fetch_data(user_id).await?;
    
    // Process the data
    let processed = format!("Processed: {}", data);
    
    println!("Finished processing user {}", user_id);
    Ok(processed)
}

// Running async code
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let result = process_user_data(42).await?;
    println!("{}", result);
    Ok(())
}

Async vs Sync Performance

use std::time::{Duration, Instant};
use tokio::time::sleep;

// Synchronous approach (blocking)
fn sync_process_multiple_users(user_ids: Vec<u32>) -> Vec<String> {
    let mut results = Vec::new();
    
    for id in user_ids {
        // This blocks the entire thread
        std::thread::sleep(Duration::from_millis(100));
        results.push(format!("User {}", id));
    }
    
    results
}

// Asynchronous approach (non-blocking)
async fn async_process_multiple_users(user_ids: Vec<u32>) -> Vec<String> {
    let mut results = Vec::new();
    
    for id in user_ids {
        // This yields control, allowing other tasks to run
        sleep(Duration::from_millis(100)).await;
        results.push(format!("User {}", id));
    }
    
    results
}

// Concurrent processing with async
async fn concurrent_process_users(user_ids: Vec<u32>) -> Vec<String> {
    use futures::future::join_all;
    
    let futures: Vec<_> = user_ids.into_iter()
        .map(|id| async move {
            sleep(Duration::from_millis(100)).await;
            format!("User {}", id)
        })
        .collect();
    
    join_all(futures).await
}

#[tokio::main]
async fn main() {
    let user_ids = vec![1, 2, 3, 4, 5];
    
    // Measure sync performance
    let start = Instant::now();
    let _sync_results = sync_process_multiple_users(user_ids.clone());
    println!("Sync took: {:?}", start.elapsed());
    
    // Measure async sequential performance
    let start = Instant::now();
    let _async_results = async_process_multiple_users(user_ids.clone()).await;
    println!("Async sequential took: {:?}", start.elapsed());
    
    // Measure concurrent async performance
    let start = Instant::now();
    let _concurrent_results = concurrent_process_users(user_ids).await;
    println!("Async concurrent took: {:?}", start.elapsed());
}

Tokio Runtime and Configuration

Runtime Configuration

use tokio::runtime::{Builder, Runtime};
use std::time::Duration;

// Custom runtime configuration
fn create_custom_runtime() -> Runtime {
    Builder::new_multi_thread()
        .worker_threads(4)              // Number of worker threads
        .thread_name("my-async-worker") // Thread names
        .thread_stack_size(3 * 1024 * 1024) // 3MB stack size
        .enable_all()                   // Enable all Tokio features
        .build()
        .expect("Failed to create runtime")
}

// Runtime with different configurations
fn runtime_examples() {
    // Current thread runtime (single-threaded)
    let rt = Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    
    rt.block_on(async {
        println!("Running on current thread runtime");
    });
    
    // Multi-thread runtime with custom settings
    let rt = Builder::new_multi_thread()
        .worker_threads(8)
        .max_blocking_threads(16)
        .thread_keep_alive(Duration::from_secs(60))
        .enable_all()
        .build()
        .unwrap();
    
    rt.block_on(async {
        println!("Running on multi-thread runtime");
    });
}

// Runtime handle for spawning tasks
async fn spawn_tasks_example() {
    let handle = tokio::runtime::Handle::current();
    
    // Spawn a task on the current runtime
    let task = handle.spawn(async {
        tokio::time::sleep(Duration::from_secs(1)).await;
        "Task completed"
    });
    
    let result = task.await.unwrap();
    println!("{}", result);
}

Task Management

use tokio::task::{self, JoinHandle};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

// Spawning and managing tasks
async fn task_management_example() {
    let counter = Arc::new(AtomicU64::new(0));
    let mut handles: Vec<JoinHandle<u64>> = Vec::new();
    
    // Spawn multiple tasks
    for i in 0..10 {
        let counter_clone = Arc::clone(&counter);
        let handle = tokio::spawn(async move {
            let id = i;
            counter_clone.fetch_add(1, Ordering::SeqCst);
            
            // Simulate work
            tokio::time::sleep(Duration::from_millis(100)).await;
            
            println!("Task {} completed", id);
            id
        });
        
        handles.push(handle);
    }
    
    // Wait for all tasks to complete
    for handle in handles {
        let task_id = handle.await.unwrap();
        println!("Joined task {}", task_id);
    }
    
    println!("Final counter: {}", counter.load(Ordering::SeqCst));
}

// Task cancellation
async fn cancellation_example() {
    use tokio::time::{timeout, Duration};
    use tokio_util::sync::CancellationToken;
    
    let cancel_token = CancellationToken::new();
    let token_clone = cancel_token.clone();
    
    let task = tokio::spawn(async move {
        tokio::select! {
            _ = long_running_operation() => {
                println!("Operation completed");
            }
            _ = token_clone.cancelled() => {
                println!("Operation was cancelled");
            }
        }
    });
    
    // Cancel after 2 seconds
    tokio::time::sleep(Duration::from_secs(2)).await;
    cancel_token.cancel();
    
    task.await.unwrap();
}

async fn long_running_operation() {
    tokio::time::sleep(Duration::from_secs(10)).await;
}

Async Patterns in Web Applications

Request Processing Patterns

use axum::{
    extract::{Query, State},
    http::StatusCode,
    response::Json,
    routing::get,
    Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::time::{Duration, sleep};

#[derive(Debug, Deserialize)]
struct SearchQuery {
    q: String,
    limit: Option<u32>,
}

#[derive(Debug, Serialize)]
struct SearchResult {
    id: u32,
    title: String,
    relevance: f32,
}

#[derive(Clone)]
struct AppState {
    database_pool: Arc<DatabasePool>,
    cache: Arc<Cache>,
}

// Simulated external services
struct DatabasePool;
struct Cache;

impl DatabasePool {
    async fn search(&self, query: &str, limit: u32) -> Result<Vec<SearchResult>, String> {
        // Simulate database query delay
        sleep(Duration::from_millis(100)).await;
        
        Ok(vec![
            SearchResult { id: 1, title: format!("Result for {}", query), relevance: 0.9 },
            SearchResult { id: 2, title: format!("Another {}", query), relevance: 0.7 },
        ])
    }
}

impl Cache {
    async fn get(&self, key: &str) -> Option<Vec<SearchResult>> {
        // Simulate cache lookup delay
        sleep(Duration::from_millis(10)).await;
        None // Cache miss for this example
    }
    
    async fn set(&self, key: &str, value: Vec<SearchResult>) {
        // Simulate cache write delay
        sleep(Duration::from_millis(5)).await;
        println!("Cached results for key: {}", key);
    }
}

// Sequential async processing
async fn search_sequential(
    Query(params): Query<SearchQuery>,
    State(state): State<AppState>,
) -> Result<Json<Vec<SearchResult>>, StatusCode> {
    let cache_key = format!("search:{}", params.q);
    
    // Check cache first
    if let Some(cached) = state.cache.get(&cache_key).await {
        return Ok(Json(cached));
    }
    
    // Query database
    let results = state.database_pool
        .search(&params.q, params.limit.unwrap_or(10))
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    // Update cache
    state.cache.set(&cache_key, results.clone()).await;
    
    Ok(Json(results))
}

// Concurrent async processing
async fn search_concurrent(
    Query(params): Query<SearchQuery>,
    State(state): State<AppState>,
) -> Result<Json<Vec<SearchResult>>, StatusCode> {
    let cache_key = format!("search:{}", params.q);
    
    // Start cache and database operations concurrently
    let cache_future = state.cache.get(&cache_key);
    let db_future = state.database_pool.search(&params.q, params.limit.unwrap_or(10));
    
    let (cache_result, db_result) = tokio::join!(cache_future, db_future);
    
    // Use cache if available, otherwise use database result
    let results = if let Some(cached) = cache_result {
        cached
    } else {
        let db_results = db_result.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
        
        // Update cache in background
        let cache_clone = state.cache.clone();
        let cache_key_clone = cache_key.clone();
        let results_clone = db_results.clone();
        tokio::spawn(async move {
            cache_clone.set(&cache_key_clone, results_clone).await;
        });
        
        db_results
    };
    
    Ok(Json(results))
}

// Timeout handling
async fn search_with_timeout(
    Query(params): Query<SearchQuery>,
    State(state): State<AppState>,
) -> Result<Json<Vec<SearchResult>>, StatusCode> {
    use tokio::time::timeout;
    
    let search_future = state.database_pool.search(&params.q, params.limit.unwrap_or(10));
    
    match timeout(Duration::from_secs(5), search_future).await {
        Ok(Ok(results)) => Ok(Json(results)),
        Ok(Err(_)) => Err(StatusCode::INTERNAL_SERVER_ERROR),
        Err(_) => Err(StatusCode::REQUEST_TIMEOUT),
    }
}

Error Handling in Async Code

use thiserror::Error;
use tokio::time::{timeout, Duration};

#[derive(Error, Debug)]
enum AsyncError {
    #[error("Database connection failed")]
    DatabaseError,
    #[error("Request timeout after {seconds} seconds")]
    TimeoutError { seconds: u64 },
    #[error("Service unavailable: {service}")]
    ServiceUnavailable { service: String },
    #[error("Validation failed: {message}")]
    ValidationError { message: String },
}

// Error propagation with async
async fn fetch_user_data(user_id: u32) -> Result<UserData, AsyncError> {
    // Validate input
    if user_id == 0 {
        return Err(AsyncError::ValidationError {
            message: "User ID cannot be zero".to_string(),
        });
    }
    
    // Simulate database call with timeout
    let db_future = async {
        sleep(Duration::from_millis(2000)).await;
        if user_id == 999 {
            Err(AsyncError::DatabaseError)
        } else {
            Ok(UserData { id: user_id, name: "John".to_string() })
        }
    };
    
    match timeout(Duration::from_secs(1), db_future).await {
        Ok(result) => result,
        Err(_) => Err(AsyncError::TimeoutError { seconds: 1 }),
    }
}

#[derive(Debug)]
struct UserData {
    id: u32,
    name: String,
}

// Retry pattern with exponential backoff
async fn retry_with_backoff<F, Fut, T, E>(
    mut operation: F,
    max_attempts: u32,
    initial_delay: Duration,
) -> Result<T, E>
where
    F: FnMut() -> Fut,
    Fut: std::future::Future<Output = Result<T, E>>,
{
    let mut attempts = 0;
    let mut delay = initial_delay;
    
    loop {
        attempts += 1;
        
        match operation().await {
            Ok(result) => return Ok(result),
            Err(error) => {
                if attempts >= max_attempts {
                    return Err(error);
                }
                
                println!("Attempt {} failed, retrying in {:?}", attempts, delay);
                sleep(delay).await;
                delay *= 2; // Exponential backoff
            }
        }
    }
}

// Circuit breaker pattern
use std::sync::atomic::{AtomicU32, AtomicU64};

struct CircuitBreaker {
    failure_count: AtomicU32,
    last_failure_time: AtomicU64,
    failure_threshold: u32,
    timeout_duration: Duration,
}

impl CircuitBreaker {
    fn new(failure_threshold: u32, timeout_duration: Duration) -> Self {
        Self {
            failure_count: AtomicU32::new(0),
            last_failure_time: AtomicU64::new(0),
            failure_threshold,
            timeout_duration,
        }
    }
    
    async fn call<F, Fut, T, E>(&self, operation: F) -> Result<T, CircuitBreakerError<E>>
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = Result<T, E>>,
    {
        // Check if circuit is open
        if self.is_open() {
            return Err(CircuitBreakerError::CircuitOpen);
        }
        
        match operation().await {
            Ok(result) => {
                self.on_success();
                Ok(result)
            }
            Err(error) => {
                self.on_failure();
                Err(CircuitBreakerError::OperationFailed(error))
            }
        }
    }
    
    fn is_open(&self) -> bool {
        let failure_count = self.failure_count.load(Ordering::Relaxed);
        let last_failure_time = self.last_failure_time.load(Ordering::Relaxed);
        
        if failure_count >= self.failure_threshold {
            let now = std::time::SystemTime::now()
                .duration_since(std::time::UNIX_EPOCH)
                .unwrap()
                .as_secs();
            
            now - last_failure_time < self.timeout_duration.as_secs()
        } else {
            false
        }
    }
    
    fn on_success(&self) {
        self.failure_count.store(0, Ordering::Relaxed);
    }
    
    fn on_failure(&self) {
        self.failure_count.fetch_add(1, Ordering::Relaxed);
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        self.last_failure_time.store(now, Ordering::Relaxed);
    }
}

#[derive(Error, Debug)]
enum CircuitBreakerError<E> {
    #[error("Circuit breaker is open")]
    CircuitOpen,
    #[error("Operation failed")]
    OperationFailed(E),
}

Streaming Responses

Server-Sent Events (SSE)

use axum::{
    extract::Query,
    response::sse::{Event, Sse},
    routing::get,
    Router,
};
use futures::stream::{self, Stream};
use serde::Deserialize;
use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

#[derive(Deserialize)]
struct SseQuery {
    interval: Option<u64>,
}

// Simple SSE endpoint
async fn sse_handler(
    Query(params): Query<SseQuery>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let interval_duration = Duration::from_secs(params.interval.unwrap_or(1));
    let interval_stream = IntervalStream::new(interval(interval_duration));
    
    let stream = interval_stream.map(|_| {
        let data = format!("Current time: {}", chrono::Utc::now().to_rfc3339());
        Ok(Event::default().data(data))
    });
    
    Sse::new(stream)
}

// Live data streaming
async fn live_metrics_sse() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::unfold(0u64, |counter| async move {
        tokio::time::sleep(Duration::from_secs(1)).await;
        
        let metrics = serde_json::json!({
            "timestamp": chrono::Utc::now().to_rfc3339(),
            "cpu_usage": rand::random::<f32>() * 100.0,
            "memory_usage": rand::random::<f32>() * 100.0,
            "requests_per_second": rand::random::<u32>() % 1000,
            "counter": counter
        });
        
        let event = Event::default()
            .event("metrics")
            .data(metrics.to_string());
        
        Some((Ok(event), counter + 1))
    });
    
    Sse::new(stream)
}

// Progress tracking SSE
async fn task_progress_sse(
    Query(task_id): Query<u32>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
    let stream = stream::unfold(0u32, move |progress| async move {
        if progress >= 100 {
            return None;
        }
        
        // Simulate work
        tokio::time::sleep(Duration::from_millis(100)).await;
        
        let new_progress = progress + rand::random::<u32>() % 10;
        let new_progress = new_progress.min(100);
        
        let event_data = serde_json::json!({
            "task_id": task_id,
            "progress": new_progress,
            "status": if new_progress >= 100 { "completed" } else { "running" }
        });
        
        let event = Event::default()
            .event("progress")
            .data(event_data.to_string());
        
        Some((Ok(event), new_progress))
    });
    
    Sse::new(stream)
}

Chunked Responses

use axum::{
    body::{Body, Bytes},
    extract::Path,
    http::StatusCode,
    response::Response,
};
use futures::stream::{self, StreamExt};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::io::ReaderStream;

// Stream large file downloads
async fn download_file(Path(filename): Path<String>) -> Result<Response<Body>, StatusCode> {
    let file_path = format!("uploads/{}", filename);
    
    match File::open(&file_path).await {
        Ok(file) => {
            let stream = ReaderStream::new(file);
            let body = Body::from_stream(stream);
            
            Ok(Response::builder()
                .header("content-type", "application/octet-stream")
                .header("content-disposition", format!("attachment; filename=\"{}\"", filename))
                .body(body)
                .unwrap())
        }
        Err(_) => Err(StatusCode::NOT_FOUND),
    }
}

// Generate CSV data on-the-fly
async fn export_csv() -> Response<Body> {
    let stream = stream::iter(0..10000).map(|i| {
        let csv_line = format!("row_{},value_{},data_{}\n", i, i * 2, i * 3);
        Ok::<_, std::convert::Infallible>(Bytes::from(csv_line))
    });
    
    let body = Body::from_stream(stream);
    
    Response::builder()
        .header("content-type", "text/csv")
        .header("content-disposition", "attachment; filename=\"export.csv\"")
        .body(body)
        .unwrap()
}

// Streaming JSON array
async fn stream_json_array() -> Response<Body> {
    let header = stream::once(async { Ok::<_, std::convert::Infallible>(Bytes::from("[")) });
    
    let data_stream = stream::iter(0..1000).map(|i| {
        let json_item = serde_json::json!({
            "id": i,
            "name": format!("Item {}", i),
            "value": i * 10
        });
        
        let separator = if i == 0 { "" } else { "," };
        let json_str = format!("{}{}", separator, json_item.to_string());
        Ok::<_, std::convert::Infallible>(Bytes::from(json_str))
    });
    
    let footer = stream::once(async { Ok::<_, std::convert::Infallible>(Bytes::from("]")) });
    
    let combined_stream = header.chain(data_stream).chain(footer);
    let body = Body::from_stream(combined_stream);
    
    Response::builder()
        .header("content-type", "application/json")
        .body(body)
        .unwrap()
}

WebSocket Implementation

Basic WebSocket Server

use axum::{
    extract::{ws::WebSocketUpgrade, ws::WebSocket, Path, Query},
    response::Response,
    routing::get,
    Router,
};
use futures::{stream::SplitSink, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct ChatMessage {
    user: String,
    message: String,
    timestamp: String,
}

#[derive(Debug, Deserialize)]
struct ChatParams {
    room: String,
    user: String,
}

type ChatRooms = Arc<Mutex<HashMap<String, broadcast::Sender<ChatMessage>>>>;

// WebSocket upgrade handler
async fn websocket_handler(
    ws: WebSocketUpgrade,
    Query(params): Query<ChatParams>,
    axum::extract::State(rooms): axum::extract::State<ChatRooms>,
) -> Response {
    ws.on_upgrade(move |socket| handle_websocket(socket, params, rooms))
}

// WebSocket connection handler
async fn handle_websocket(socket: WebSocket, params: ChatParams, rooms: ChatRooms) {
    let (mut sender, mut receiver) = socket.split();
    
    // Get or create chat room
    let rx = {
        let mut rooms = rooms.lock().await;
        let (tx, rx) = rooms
            .entry(params.room.clone())
            .or_insert_with(|| broadcast::channel(100).0)
            .subscribe();
        rx
    };
    
    // Spawn task to handle incoming messages
    let tx_task = {
        let rooms = rooms.clone();
        let room = params.room.clone();
        let user = params.user.clone();
        
        tokio::spawn(async move {
            while let Some(msg) = receiver.next().await {
                if let Ok(msg) = msg {
                    if let Ok(text) = msg.to_text() {
                        let chat_message = ChatMessage {
                            user: user.clone(),
                            message: text.to_string(),
                            timestamp: chrono::Utc::now().to_rfc3339(),
                        };
                        
                        // Broadcast to all clients in the room
                        let rooms = rooms.lock().await;
                        if let Some(tx) = rooms.get(&room) {
                            let _ = tx.send(chat_message);
                        }
                    }
                }
            }
        })
    };
    
    // Spawn task to handle outgoing messages
    let rx_task = tokio::spawn(async move {
        let mut rx = rx;
        while let Ok(msg) = rx.recv().await {
            let json_msg = serde_json::to_string(&msg).unwrap();
            if sender.send(axum::extract::ws::Message::Text(json_msg)).await.is_err() {
                break;
            }
        }
    });
    
    // Wait for either task to finish
    tokio::select! {
        _ = tx_task => {},
        _ = rx_task => {},
    }
}

// Real-time data WebSocket
async fn realtime_data_websocket(
    ws: WebSocketUpgrade,
    Query(interval): Query<Option<u64>>,
) -> Response {
    let interval_ms = interval.unwrap_or(1000);
    
    ws.on_upgrade(move |socket| async move {
        let (mut sender, _receiver) = socket.split();
        let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
        
        loop {
            interval.tick().await;
            
            let data = serde_json::json!({
                "timestamp": chrono::Utc::now().to_rfc3339(),
                "temperature": 20.0 + rand::random::<f32>() * 10.0,
                "humidity": 40.0 + rand::random::<f32>() * 20.0,
                "pressure": 1000.0 + rand::random::<f32>() * 50.0,
            });
            
            let message = axum::extract::ws::Message::Text(data.to_string());
            if sender.send(message).await.is_err() {
                break;
            }
        }
    })
}

WebSocket with Authentication

use axum::{
    extract::{ws::WebSocketUpgrade, ConnectInfo},
    headers::{authorization::Bearer, Authorization},
    http::StatusCode,
    response::Response,
    TypedHeader,
};
use std::net::SocketAddr;

// WebSocket with JWT authentication
async fn authenticated_websocket(
    ws: WebSocketUpgrade,
    TypedHeader(auth): TypedHeader<Authorization<Bearer>>,
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> Result<Response, StatusCode> {
    // Validate JWT token
    let token = auth.token();
    if !validate_jwt_token(token).await {
        return Err(StatusCode::UNAUTHORIZED);
    }
    
    let user_id = extract_user_id_from_token(token).await
        .ok_or(StatusCode::UNAUTHORIZED)?;
    
    println!("WebSocket connection from {} for user {}", addr, user_id);
    
    Ok(ws.on_upgrade(move |socket| handle_authenticated_websocket(socket, user_id)))
}

async fn validate_jwt_token(token: &str) -> bool {
    // Implement JWT validation logic
    !token.is_empty() // Simplified validation
}

async fn extract_user_id_from_token(token: &str) -> Option<u32> {
    // Extract user ID from JWT token
    Some(12345) // Simplified extraction
}

async fn handle_authenticated_websocket(socket: WebSocket, user_id: u32) {
    let (mut sender, mut receiver) = socket.split();
    
    // Send welcome message
    let welcome = serde_json::json!({
        "type": "welcome",
        "user_id": user_id,
        "message": "Connected successfully"
    });
    
    let _ = sender.send(axum::extract::ws::Message::Text(welcome.to_string())).await;
    
    // Handle messages
    while let Some(msg) = receiver.next().await {
        if let Ok(msg) = msg {
            match msg {
                axum::extract::ws::Message::Text(text) => {
                    println!("User {} sent: {}", user_id, text);
                    
                    // Echo back with user ID
                    let response = serde_json::json!({
                        "type": "echo",
                        "user_id": user_id,
                        "original": text
                    });
                    
                    let _ = sender.send(axum::extract::ws::Message::Text(response.to_string())).await;
                }
                axum::extract::ws::Message::Close(_) => break,
                _ => {}
            }
        }
    }
}

Concurrency and Performance

Connection Pooling

use sqlx::{PgPool, Row};
use std::time::Duration;

// Database connection pool configuration
async fn create_database_pool() -> Result<PgPool, sqlx::Error> {
    PgPool::builder()
        .max_connections(20)              // Maximum connections
        .min_connections(5)               // Minimum connections
        .connect_timeout(Duration::from_secs(10))
        .idle_timeout(Duration::from_secs(600))
        .max_lifetime(Duration::from_secs(1800))
        .test_before_acquire(true)        // Test connections before use
        .build("postgresql://user:pass@localhost/db")
        .await
}

// HTTP client pool
use reqwest::Client;

fn create_http_client() -> Client {
    Client::builder()
        .pool_max_idle_per_host(10)
        .pool_idle_timeout(Duration::from_secs(30))
        .timeout(Duration::from_secs(30))
        .connect_timeout(Duration::from_secs(10))
        .build()
        .expect("Failed to create HTTP client")
}

// Connection management in handlers
async fn database_intensive_handler(
    State(pool): State<PgPool>,
) -> Result<Json<Vec<String>>, StatusCode> {
    // Use connection from pool
    let results: Vec<String> = sqlx::query("SELECT name FROM users LIMIT 100")
        .fetch_all(&pool)
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
        .into_iter()
        .map(|row| row.get(0))
        .collect();
    
    Ok(Json(results))
}

Rate Limiting

use governor::{Quota, RateLimiter};
use nonzero::NonZeroU32;
use std::num::NonZeroU32;
use std::sync::Arc;

// Rate limiting middleware
struct RateLimitState {
    limiter: Arc<RateLimiter<String, governor::state::InMemoryState, governor::clock::DefaultClock>>,
}

impl RateLimitState {
    fn new() -> Self {
        let quota = Quota::per_second(NonZeroU32::new(10).unwrap()); // 10 requests per second
        let limiter = RateLimiter::keyed(quota);
        
        Self {
            limiter: Arc::new(limiter),
        }
    }
}

async fn rate_limited_handler(
    ConnectInfo(addr): ConnectInfo<SocketAddr>,
    State(rate_limit): State<RateLimitState>,
    request: Request<Body>,
) -> Result<Response<Body>, StatusCode> {
    let client_key = addr.ip().to_string();
    
    // Check rate limit
    match rate_limit.limiter.check_key(&client_key) {
        Ok(_) => {
            // Process request normally
            Ok(Response::new(Body::from("Request processed")))
        }
        Err(_) => {
            // Rate limit exceeded
            Err(StatusCode::TOO_MANY_REQUESTS)
        }
    }
}

// Advanced rate limiting with different limits per endpoint
use dashmap::DashMap;

struct AdvancedRateLimiter {
    limiters: DashMap<String, Arc<RateLimiter<String, governor::state::InMemoryState, governor::clock::DefaultClock>>>,
}

impl AdvancedRateLimiter {
    fn new() -> Self {
        Self {
            limiters: DashMap::new(),
        }
    }
    
    fn get_limiter(&self, endpoint: &str) -> Arc<RateLimiter<String, governor::state::InMemoryState, governor::clock::DefaultClock>> {
        self.limiters.entry(endpoint.to_string()).or_insert_with(|| {
            let quota = match endpoint {
                "/api/auth/login" => Quota::per_minute(NonZeroU32::new(5).unwrap()),  // Strict limit for auth
                "/api/search" => Quota::per_second(NonZeroU32::new(2).unwrap()),      // Moderate limit for search
                _ => Quota::per_second(NonZeroU32::new(10).unwrap()),                 // Default limit
            };
            Arc::new(RateLimiter::keyed(quota))
        }).clone()
    }
    
    async fn check_rate_limit(&self, client_key: &str, endpoint: &str) -> bool {
        let limiter = self.get_limiter(endpoint);
        limiter.check_key(client_key).is_ok()
    }
}

Performance Monitoring

use std::time::Instant;
use std::sync::atomic::{AtomicU64, Ordering};

// Request metrics
#[derive(Default)]
struct RequestMetrics {
    total_requests: AtomicU64,
    total_response_time: AtomicU64,
    error_count: AtomicU64,
}

impl RequestMetrics {
    fn record_request(&self, duration: Duration, is_error: bool) {
        self.total_requests.fetch_add(1, Ordering::Relaxed);
        self.total_response_time.fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
        
        if is_error {
            self.error_count.fetch_add(1, Ordering::Relaxed);
        }
    }
    
    fn get_stats(&self) -> (u64, f64, u64) {
        let total = self.total_requests.load(Ordering::Relaxed);
        let total_time = self.total_response_time.load(Ordering::Relaxed);
        let errors = self.error_count.load(Ordering::Relaxed);
        
        let avg_time = if total > 0 { total_time as f64 / total as f64 } else { 0.0 };
        
        (total, avg_time, errors)
    }
}

// Monitoring middleware
async fn monitoring_middleware(
    request: Request<Body>,
    next: Next<Body>,
    State(metrics): State<Arc<RequestMetrics>>,
) -> Response<Body> {
    let start = Instant::now();
    let response = next.run(request).await;
    let duration = start.elapsed();
    
    let is_error = response.status().is_server_error() || response.status().is_client_error();
    metrics.record_request(duration, is_error);
    
    response
}

// Health check endpoint with metrics
async fn health_check(
    State(metrics): State<Arc<RequestMetrics>>,
) -> Json<serde_json::Value> {
    let (total_requests, avg_response_time, error_count) = metrics.get_stats();
    let error_rate = if total_requests > 0 { 
        error_count as f64 / total_requests as f64 * 100.0 
    } else { 
        0.0 
    };
    
    Json(serde_json::json!({
        "status": "healthy",
        "metrics": {
            "total_requests": total_requests,
            "average_response_time_ms": avg_response_time,
            "error_count": error_count,
            "error_rate_percent": error_rate,
        },
        "timestamp": chrono::Utc::now().to_rfc3339()
    }))
}

Advanced Async Patterns

Fan-out/Fan-in Pattern

use futures::future::{join_all, try_join_all};

// Fan-out to multiple services, fan-in results
async fn aggregate_user_data(user_id: u32) -> Result<UserProfile, Box<dyn std::error::Error>> {
    // Fan-out: Start multiple async operations concurrently
    let profile_future = fetch_user_profile(user_id);
    let preferences_future = fetch_user_preferences(user_id);
    let activity_future = fetch_user_activity(user_id);
    let social_future = fetch_social_connections(user_id);
    
    // Fan-in: Wait for all operations to complete
    let (profile, preferences, activity, social) = tokio::try_join!(
        profile_future,
        preferences_future,
        activity_future,
        social_future
    )?;
    
    // Combine results
    Ok(UserProfile {
        basic_info: profile,
        preferences,
        recent_activity: activity,
        social_connections: social,
    })
}

// Batch processing with controlled concurrency
async fn process_batch_with_concurrency<T, F, Fut, R>(
    items: Vec<T>,
    concurrency_limit: usize,
    processor: F,
) -> Vec<R>
where
    F: Fn(T) -> Fut + Clone,
    Fut: std::future::Future<Output = R>,
{
    use futures::stream::{self, StreamExt};
    
    stream::iter(items)
        .map(processor)
        .buffer_unordered(concurrency_limit)
        .collect()
        .await
}

#[derive(Debug)]
struct UserProfile {
    basic_info: BasicInfo,
    preferences: UserPreferences,
    recent_activity: Vec<Activity>,
    social_connections: Vec<Connection>,
}

// Placeholder types and functions
#[derive(Debug)] struct BasicInfo { name: String }
#[derive(Debug)] struct UserPreferences { theme: String }
#[derive(Debug)] struct Activity { action: String }
#[derive(Debug)] struct Connection { friend_id: u32 }

async fn fetch_user_profile(user_id: u32) -> Result<BasicInfo, Box<dyn std::error::Error>> {
    sleep(Duration::from_millis(100)).await;
    Ok(BasicInfo { name: format!("User {}", user_id) })
}

async fn fetch_user_preferences(user_id: u32) -> Result<UserPreferences, Box<dyn std::error::Error>> {
    sleep(Duration::from_millis(80)).await;
    Ok(UserPreferences { theme: "dark".to_string() })
}

async fn fetch_user_activity(user_id: u32) -> Result<Vec<Activity>, Box<dyn std::error::Error>> {
    sleep(Duration::from_millis(120)).await;
    Ok(vec![Activity { action: "login".to_string() }])
}

async fn fetch_social_connections(user_id: u32) -> Result<Vec<Connection>, Box<dyn std::error::Error>> {
    sleep(Duration::from_millis(90)).await;
    Ok(vec![Connection { friend_id: user_id + 1 }])
}

Async Caching

use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};

// Generic async cache with TTL
pub struct AsyncCache<K, V> {
    data: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
    default_ttl: Duration,
}

struct CacheEntry<V> {
    value: V,
    expires_at: Instant,
}

impl<K, V> AsyncCache<K, V>
where
    K: Eq + Hash + Clone,
    V: Clone,
{
    pub fn new(default_ttl: Duration) -> Self {
        Self {
            data: Arc::new(RwLock::new(HashMap::new())),
            default_ttl,
        }
    }
    
    pub async fn get(&self, key: &K) -> Option<V> {
        let data = self.data.read().await;
        
        if let Some(entry) = data.get(key) {
            if entry.expires_at > Instant::now() {
                Some(entry.value.clone())
            } else {
                None // Expired
            }
        } else {
            None
        }
    }
    
    pub async fn set(&self, key: K, value: V) -> Option<V> {
        self.set_with_ttl(key, value, self.default_ttl).await
    }
    
    pub async fn set_with_ttl(&self, key: K, value: V, ttl: Duration) -> Option<V> {
        let mut data = self.data.write().await;
        let expires_at = Instant::now() + ttl;
        
        let entry = CacheEntry {
            value: value.clone(),
            expires_at,
        };
        
        data.insert(key, entry).map(|old_entry| old_entry.value)
    }
    
    pub async fn get_or_set<F, Fut>(&self, key: K, factory: F) -> V
    where
        F: FnOnce() -> Fut,
        Fut: std::future::Future<Output = V>,
    {
        // Try to get from cache first
        if let Some(value) = self.get(&key).await {
            return value;
        }
        
        // Not in cache, compute value
        let value = factory().await;
        self.set(key, value.clone()).await;
        value
    }
    
    // Background cleanup task
    pub async fn cleanup_expired(&self) {
        let mut data = self.data.write().await;
        let now = Instant::now();
        
        data.retain(|_, entry| entry.expires_at > now);
    }
}

// Cache-aside pattern implementation
async fn cached_user_lookup(
    user_id: u32,
    cache: &AsyncCache<u32, UserData>,
    database: &DatabasePool,
) -> Result<UserData, DatabaseError> {
    // Try cache first
    if let Some(user) = cache.get(&user_id).await {
        return Ok(user);
    }
    
    // Not in cache, fetch from database
    let user = database.get_user(user_id).await?;
    
    // Store in cache
    cache.set(user_id, user.clone()).await;
    
    Ok(user)
}

// Write-through cache pattern
async fn update_user_write_through(
    user_id: u32,
    user_data: UserData,
    cache: &AsyncCache<u32, UserData>,
    database: &DatabasePool,
) -> Result<(), DatabaseError> {
    // Update database first
    database.update_user(user_id, &user_data).await?;
    
    // Update cache
    cache.set(user_id, user_data).await;
    
    Ok(())
}

Best Practices

Resource Management

// Proper resource cleanup with RAII
pub struct ManagedResource {
    connection: Option<Connection>,
    cleanup_handle: Option<tokio::task::JoinHandle<()>>,
}

impl ManagedResource {
    pub async fn new() -> Result<Self, ResourceError> {
        let connection = Connection::establish().await?;
        
        // Start background cleanup task
        let cleanup_handle = tokio::spawn(async {
            let mut interval = tokio::time::interval(Duration::from_secs(60));
            
            loop {
                interval.tick().await;
                // Perform periodic cleanup
                println!("Performing cleanup...");
            }
        });
        
        Ok(Self {
            connection: Some(connection),
            cleanup_handle: Some(cleanup_handle),
        })
    }
    
    pub async fn operation(&self) -> Result<String, ResourceError> {
        if let Some(conn) = &self.connection {
            conn.query("SELECT 1").await
        } else {
            Err(ResourceError::ConnectionClosed)
        }
    }
}

impl Drop for ManagedResource {
    fn drop(&mut self) {
        if let Some(handle) = self.cleanup_handle.take() {
            handle.abort();
        }
        
        if let Some(connection) = self.connection.take() {
            // Note: In real code, you might want to spawn a task to close async resources
            println!("Cleaning up connection");
        }
    }
}

// Graceful shutdown handling
async fn run_server_with_graceful_shutdown() -> Result<(), Box<dyn std::error::Error>> {
    let app = Router::new()
        .route("/", get(|| async { "Hello, World!" }));
    
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
    
    println!("Server starting on port 3000");
    
    // Run server with graceful shutdown
    axum::serve(listener, app)
        .with_graceful_shutdown(shutdown_signal())
        .await?;
    
    println!("Server shut down gracefully");
    Ok(())
}

async fn shutdown_signal() {
    use tokio::signal;
    
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };
    
    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };
    
    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();
    
    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }
    
    println!("Shutdown signal received");
}

Async web development in Rust provides powerful tools for building high-performance, concurrent applications. Key principles include proper error handling, resource management, understanding async patterns, and leveraging Rust's ownership system for safe concurrency. Always measure performance and use appropriate patterns for your specific use case.