Async Web Development
Async programming is fundamental to modern web development in Rust, enabling applications to handle thousands of concurrent connections efficiently. This guide covers async patterns, best practices, and advanced techniques for building high-performance web applications.
Async Fundamentals
Understanding Async/Await
use std::time::Duration;
use tokio::time::sleep;
// Basic async function
async fn fetch_data(id: u32) -> Result<String, Box<dyn std::error::Error>> {
// Simulate network delay
sleep(Duration::from_millis(100)).await;
Ok(format!("Data for ID: {}", id))
}
// Async function that calls other async functions
async fn process_user_data(user_id: u32) -> Result<String, Box<dyn std::error::Error>> {
println!("Starting to process user {}", user_id);
// Await the async operation
let data = fetch_data(user_id).await?;
// Process the data
let processed = format!("Processed: {}", data);
println!("Finished processing user {}", user_id);
Ok(processed)
}
// Running async code
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let result = process_user_data(42).await?;
println!("{}", result);
Ok(())
}
Async vs Sync Performance
use std::time::{Duration, Instant};
use tokio::time::sleep;
// Synchronous approach (blocking)
fn sync_process_multiple_users(user_ids: Vec<u32>) -> Vec<String> {
let mut results = Vec::new();
for id in user_ids {
// This blocks the entire thread
std::thread::sleep(Duration::from_millis(100));
results.push(format!("User {}", id));
}
results
}
// Asynchronous approach (non-blocking)
async fn async_process_multiple_users(user_ids: Vec<u32>) -> Vec<String> {
let mut results = Vec::new();
for id in user_ids {
// This yields control, allowing other tasks to run
sleep(Duration::from_millis(100)).await;
results.push(format!("User {}", id));
}
results
}
// Concurrent processing with async
async fn concurrent_process_users(user_ids: Vec<u32>) -> Vec<String> {
use futures::future::join_all;
let futures: Vec<_> = user_ids.into_iter()
.map(|id| async move {
sleep(Duration::from_millis(100)).await;
format!("User {}", id)
})
.collect();
join_all(futures).await
}
#[tokio::main]
async fn main() {
let user_ids = vec![1, 2, 3, 4, 5];
// Measure sync performance
let start = Instant::now();
let _sync_results = sync_process_multiple_users(user_ids.clone());
println!("Sync took: {:?}", start.elapsed());
// Measure async sequential performance
let start = Instant::now();
let _async_results = async_process_multiple_users(user_ids.clone()).await;
println!("Async sequential took: {:?}", start.elapsed());
// Measure concurrent async performance
let start = Instant::now();
let _concurrent_results = concurrent_process_users(user_ids).await;
println!("Async concurrent took: {:?}", start.elapsed());
}
Tokio Runtime and Configuration
Runtime Configuration
use tokio::runtime::{Builder, Runtime};
use std::time::Duration;
// Custom runtime configuration
fn create_custom_runtime() -> Runtime {
Builder::new_multi_thread()
.worker_threads(4) // Number of worker threads
.thread_name("my-async-worker") // Thread names
.thread_stack_size(3 * 1024 * 1024) // 3MB stack size
.enable_all() // Enable all Tokio features
.build()
.expect("Failed to create runtime")
}
// Runtime with different configurations
fn runtime_examples() {
// Current thread runtime (single-threaded)
let rt = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("Running on current thread runtime");
});
// Multi-thread runtime with custom settings
let rt = Builder::new_multi_thread()
.worker_threads(8)
.max_blocking_threads(16)
.thread_keep_alive(Duration::from_secs(60))
.enable_all()
.build()
.unwrap();
rt.block_on(async {
println!("Running on multi-thread runtime");
});
}
// Runtime handle for spawning tasks
async fn spawn_tasks_example() {
let handle = tokio::runtime::Handle::current();
// Spawn a task on the current runtime
let task = handle.spawn(async {
tokio::time::sleep(Duration::from_secs(1)).await;
"Task completed"
});
let result = task.await.unwrap();
println!("{}", result);
}
Task Management
use tokio::task::{self, JoinHandle};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
// Spawning and managing tasks
async fn task_management_example() {
let counter = Arc::new(AtomicU64::new(0));
let mut handles: Vec<JoinHandle<u64>> = Vec::new();
// Spawn multiple tasks
for i in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = tokio::spawn(async move {
let id = i;
counter_clone.fetch_add(1, Ordering::SeqCst);
// Simulate work
tokio::time::sleep(Duration::from_millis(100)).await;
println!("Task {} completed", id);
id
});
handles.push(handle);
}
// Wait for all tasks to complete
for handle in handles {
let task_id = handle.await.unwrap();
println!("Joined task {}", task_id);
}
println!("Final counter: {}", counter.load(Ordering::SeqCst));
}
// Task cancellation
async fn cancellation_example() {
use tokio::time::{timeout, Duration};
use tokio_util::sync::CancellationToken;
let cancel_token = CancellationToken::new();
let token_clone = cancel_token.clone();
let task = tokio::spawn(async move {
tokio::select! {
_ = long_running_operation() => {
println!("Operation completed");
}
_ = token_clone.cancelled() => {
println!("Operation was cancelled");
}
}
});
// Cancel after 2 seconds
tokio::time::sleep(Duration::from_secs(2)).await;
cancel_token.cancel();
task.await.unwrap();
}
async fn long_running_operation() {
tokio::time::sleep(Duration::from_secs(10)).await;
}
Async Patterns in Web Applications
Request Processing Patterns
use axum::{
extract::{Query, State},
http::StatusCode,
response::Json,
routing::get,
Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::time::{Duration, sleep};
#[derive(Debug, Deserialize)]
struct SearchQuery {
q: String,
limit: Option<u32>,
}
#[derive(Debug, Serialize)]
struct SearchResult {
id: u32,
title: String,
relevance: f32,
}
#[derive(Clone)]
struct AppState {
database_pool: Arc<DatabasePool>,
cache: Arc<Cache>,
}
// Simulated external services
struct DatabasePool;
struct Cache;
impl DatabasePool {
async fn search(&self, query: &str, limit: u32) -> Result<Vec<SearchResult>, String> {
// Simulate database query delay
sleep(Duration::from_millis(100)).await;
Ok(vec![
SearchResult { id: 1, title: format!("Result for {}", query), relevance: 0.9 },
SearchResult { id: 2, title: format!("Another {}", query), relevance: 0.7 },
])
}
}
impl Cache {
async fn get(&self, key: &str) -> Option<Vec<SearchResult>> {
// Simulate cache lookup delay
sleep(Duration::from_millis(10)).await;
None // Cache miss for this example
}
async fn set(&self, key: &str, value: Vec<SearchResult>) {
// Simulate cache write delay
sleep(Duration::from_millis(5)).await;
println!("Cached results for key: {}", key);
}
}
// Sequential async processing
async fn search_sequential(
Query(params): Query<SearchQuery>,
State(state): State<AppState>,
) -> Result<Json<Vec<SearchResult>>, StatusCode> {
let cache_key = format!("search:{}", params.q);
// Check cache first
if let Some(cached) = state.cache.get(&cache_key).await {
return Ok(Json(cached));
}
// Query database
let results = state.database_pool
.search(¶ms.q, params.limit.unwrap_or(10))
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Update cache
state.cache.set(&cache_key, results.clone()).await;
Ok(Json(results))
}
// Concurrent async processing
async fn search_concurrent(
Query(params): Query<SearchQuery>,
State(state): State<AppState>,
) -> Result<Json<Vec<SearchResult>>, StatusCode> {
let cache_key = format!("search:{}", params.q);
// Start cache and database operations concurrently
let cache_future = state.cache.get(&cache_key);
let db_future = state.database_pool.search(¶ms.q, params.limit.unwrap_or(10));
let (cache_result, db_result) = tokio::join!(cache_future, db_future);
// Use cache if available, otherwise use database result
let results = if let Some(cached) = cache_result {
cached
} else {
let db_results = db_result.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Update cache in background
let cache_clone = state.cache.clone();
let cache_key_clone = cache_key.clone();
let results_clone = db_results.clone();
tokio::spawn(async move {
cache_clone.set(&cache_key_clone, results_clone).await;
});
db_results
};
Ok(Json(results))
}
// Timeout handling
async fn search_with_timeout(
Query(params): Query<SearchQuery>,
State(state): State<AppState>,
) -> Result<Json<Vec<SearchResult>>, StatusCode> {
use tokio::time::timeout;
let search_future = state.database_pool.search(¶ms.q, params.limit.unwrap_or(10));
match timeout(Duration::from_secs(5), search_future).await {
Ok(Ok(results)) => Ok(Json(results)),
Ok(Err(_)) => Err(StatusCode::INTERNAL_SERVER_ERROR),
Err(_) => Err(StatusCode::REQUEST_TIMEOUT),
}
}
Error Handling in Async Code
use thiserror::Error;
use tokio::time::{timeout, Duration};
#[derive(Error, Debug)]
enum AsyncError {
#[error("Database connection failed")]
DatabaseError,
#[error("Request timeout after {seconds} seconds")]
TimeoutError { seconds: u64 },
#[error("Service unavailable: {service}")]
ServiceUnavailable { service: String },
#[error("Validation failed: {message}")]
ValidationError { message: String },
}
// Error propagation with async
async fn fetch_user_data(user_id: u32) -> Result<UserData, AsyncError> {
// Validate input
if user_id == 0 {
return Err(AsyncError::ValidationError {
message: "User ID cannot be zero".to_string(),
});
}
// Simulate database call with timeout
let db_future = async {
sleep(Duration::from_millis(2000)).await;
if user_id == 999 {
Err(AsyncError::DatabaseError)
} else {
Ok(UserData { id: user_id, name: "John".to_string() })
}
};
match timeout(Duration::from_secs(1), db_future).await {
Ok(result) => result,
Err(_) => Err(AsyncError::TimeoutError { seconds: 1 }),
}
}
#[derive(Debug)]
struct UserData {
id: u32,
name: String,
}
// Retry pattern with exponential backoff
async fn retry_with_backoff<F, Fut, T, E>(
mut operation: F,
max_attempts: u32,
initial_delay: Duration,
) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
let mut attempts = 0;
let mut delay = initial_delay;
loop {
attempts += 1;
match operation().await {
Ok(result) => return Ok(result),
Err(error) => {
if attempts >= max_attempts {
return Err(error);
}
println!("Attempt {} failed, retrying in {:?}", attempts, delay);
sleep(delay).await;
delay *= 2; // Exponential backoff
}
}
}
}
// Circuit breaker pattern
use std::sync::atomic::{AtomicU32, AtomicU64};
struct CircuitBreaker {
failure_count: AtomicU32,
last_failure_time: AtomicU64,
failure_threshold: u32,
timeout_duration: Duration,
}
impl CircuitBreaker {
fn new(failure_threshold: u32, timeout_duration: Duration) -> Self {
Self {
failure_count: AtomicU32::new(0),
last_failure_time: AtomicU64::new(0),
failure_threshold,
timeout_duration,
}
}
async fn call<F, Fut, T, E>(&self, operation: F) -> Result<T, CircuitBreakerError<E>>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
{
// Check if circuit is open
if self.is_open() {
return Err(CircuitBreakerError::CircuitOpen);
}
match operation().await {
Ok(result) => {
self.on_success();
Ok(result)
}
Err(error) => {
self.on_failure();
Err(CircuitBreakerError::OperationFailed(error))
}
}
}
fn is_open(&self) -> bool {
let failure_count = self.failure_count.load(Ordering::Relaxed);
let last_failure_time = self.last_failure_time.load(Ordering::Relaxed);
if failure_count >= self.failure_threshold {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
now - last_failure_time < self.timeout_duration.as_secs()
} else {
false
}
}
fn on_success(&self) {
self.failure_count.store(0, Ordering::Relaxed);
}
fn on_failure(&self) {
self.failure_count.fetch_add(1, Ordering::Relaxed);
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
self.last_failure_time.store(now, Ordering::Relaxed);
}
}
#[derive(Error, Debug)]
enum CircuitBreakerError<E> {
#[error("Circuit breaker is open")]
CircuitOpen,
#[error("Operation failed")]
OperationFailed(E),
}
Streaming Responses
Server-Sent Events (SSE)
use axum::{
extract::Query,
response::sse::{Event, Sse},
routing::get,
Router,
};
use futures::stream::{self, Stream};
use serde::Deserialize;
use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
#[derive(Deserialize)]
struct SseQuery {
interval: Option<u64>,
}
// Simple SSE endpoint
async fn sse_handler(
Query(params): Query<SseQuery>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let interval_duration = Duration::from_secs(params.interval.unwrap_or(1));
let interval_stream = IntervalStream::new(interval(interval_duration));
let stream = interval_stream.map(|_| {
let data = format!("Current time: {}", chrono::Utc::now().to_rfc3339());
Ok(Event::default().data(data))
});
Sse::new(stream)
}
// Live data streaming
async fn live_metrics_sse() -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::unfold(0u64, |counter| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let metrics = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"cpu_usage": rand::random::<f32>() * 100.0,
"memory_usage": rand::random::<f32>() * 100.0,
"requests_per_second": rand::random::<u32>() % 1000,
"counter": counter
});
let event = Event::default()
.event("metrics")
.data(metrics.to_string());
Some((Ok(event), counter + 1))
});
Sse::new(stream)
}
// Progress tracking SSE
async fn task_progress_sse(
Query(task_id): Query<u32>,
) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
let stream = stream::unfold(0u32, move |progress| async move {
if progress >= 100 {
return None;
}
// Simulate work
tokio::time::sleep(Duration::from_millis(100)).await;
let new_progress = progress + rand::random::<u32>() % 10;
let new_progress = new_progress.min(100);
let event_data = serde_json::json!({
"task_id": task_id,
"progress": new_progress,
"status": if new_progress >= 100 { "completed" } else { "running" }
});
let event = Event::default()
.event("progress")
.data(event_data.to_string());
Some((Ok(event), new_progress))
});
Sse::new(stream)
}
Chunked Responses
use axum::{
body::{Body, Bytes},
extract::Path,
http::StatusCode,
response::Response,
};
use futures::stream::{self, StreamExt};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio_util::io::ReaderStream;
// Stream large file downloads
async fn download_file(Path(filename): Path<String>) -> Result<Response<Body>, StatusCode> {
let file_path = format!("uploads/{}", filename);
match File::open(&file_path).await {
Ok(file) => {
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
Ok(Response::builder()
.header("content-type", "application/octet-stream")
.header("content-disposition", format!("attachment; filename=\"{}\"", filename))
.body(body)
.unwrap())
}
Err(_) => Err(StatusCode::NOT_FOUND),
}
}
// Generate CSV data on-the-fly
async fn export_csv() -> Response<Body> {
let stream = stream::iter(0..10000).map(|i| {
let csv_line = format!("row_{},value_{},data_{}\n", i, i * 2, i * 3);
Ok::<_, std::convert::Infallible>(Bytes::from(csv_line))
});
let body = Body::from_stream(stream);
Response::builder()
.header("content-type", "text/csv")
.header("content-disposition", "attachment; filename=\"export.csv\"")
.body(body)
.unwrap()
}
// Streaming JSON array
async fn stream_json_array() -> Response<Body> {
let header = stream::once(async { Ok::<_, std::convert::Infallible>(Bytes::from("[")) });
let data_stream = stream::iter(0..1000).map(|i| {
let json_item = serde_json::json!({
"id": i,
"name": format!("Item {}", i),
"value": i * 10
});
let separator = if i == 0 { "" } else { "," };
let json_str = format!("{}{}", separator, json_item.to_string());
Ok::<_, std::convert::Infallible>(Bytes::from(json_str))
});
let footer = stream::once(async { Ok::<_, std::convert::Infallible>(Bytes::from("]")) });
let combined_stream = header.chain(data_stream).chain(footer);
let body = Body::from_stream(combined_stream);
Response::builder()
.header("content-type", "application/json")
.body(body)
.unwrap()
}
WebSocket Implementation
Basic WebSocket Server
use axum::{
extract::{ws::WebSocketUpgrade, ws::WebSocket, Path, Query},
response::Response,
routing::get,
Router,
};
use futures::{stream::SplitSink, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ChatMessage {
user: String,
message: String,
timestamp: String,
}
#[derive(Debug, Deserialize)]
struct ChatParams {
room: String,
user: String,
}
type ChatRooms = Arc<Mutex<HashMap<String, broadcast::Sender<ChatMessage>>>>;
// WebSocket upgrade handler
async fn websocket_handler(
ws: WebSocketUpgrade,
Query(params): Query<ChatParams>,
axum::extract::State(rooms): axum::extract::State<ChatRooms>,
) -> Response {
ws.on_upgrade(move |socket| handle_websocket(socket, params, rooms))
}
// WebSocket connection handler
async fn handle_websocket(socket: WebSocket, params: ChatParams, rooms: ChatRooms) {
let (mut sender, mut receiver) = socket.split();
// Get or create chat room
let rx = {
let mut rooms = rooms.lock().await;
let (tx, rx) = rooms
.entry(params.room.clone())
.or_insert_with(|| broadcast::channel(100).0)
.subscribe();
rx
};
// Spawn task to handle incoming messages
let tx_task = {
let rooms = rooms.clone();
let room = params.room.clone();
let user = params.user.clone();
tokio::spawn(async move {
while let Some(msg) = receiver.next().await {
if let Ok(msg) = msg {
if let Ok(text) = msg.to_text() {
let chat_message = ChatMessage {
user: user.clone(),
message: text.to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
};
// Broadcast to all clients in the room
let rooms = rooms.lock().await;
if let Some(tx) = rooms.get(&room) {
let _ = tx.send(chat_message);
}
}
}
}
})
};
// Spawn task to handle outgoing messages
let rx_task = tokio::spawn(async move {
let mut rx = rx;
while let Ok(msg) = rx.recv().await {
let json_msg = serde_json::to_string(&msg).unwrap();
if sender.send(axum::extract::ws::Message::Text(json_msg)).await.is_err() {
break;
}
}
});
// Wait for either task to finish
tokio::select! {
_ = tx_task => {},
_ = rx_task => {},
}
}
// Real-time data WebSocket
async fn realtime_data_websocket(
ws: WebSocketUpgrade,
Query(interval): Query<Option<u64>>,
) -> Response {
let interval_ms = interval.unwrap_or(1000);
ws.on_upgrade(move |socket| async move {
let (mut sender, _receiver) = socket.split();
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
loop {
interval.tick().await;
let data = serde_json::json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"temperature": 20.0 + rand::random::<f32>() * 10.0,
"humidity": 40.0 + rand::random::<f32>() * 20.0,
"pressure": 1000.0 + rand::random::<f32>() * 50.0,
});
let message = axum::extract::ws::Message::Text(data.to_string());
if sender.send(message).await.is_err() {
break;
}
}
})
}
WebSocket with Authentication
use axum::{
extract::{ws::WebSocketUpgrade, ConnectInfo},
headers::{authorization::Bearer, Authorization},
http::StatusCode,
response::Response,
TypedHeader,
};
use std::net::SocketAddr;
// WebSocket with JWT authentication
async fn authenticated_websocket(
ws: WebSocketUpgrade,
TypedHeader(auth): TypedHeader<Authorization<Bearer>>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
) -> Result<Response, StatusCode> {
// Validate JWT token
let token = auth.token();
if !validate_jwt_token(token).await {
return Err(StatusCode::UNAUTHORIZED);
}
let user_id = extract_user_id_from_token(token).await
.ok_or(StatusCode::UNAUTHORIZED)?;
println!("WebSocket connection from {} for user {}", addr, user_id);
Ok(ws.on_upgrade(move |socket| handle_authenticated_websocket(socket, user_id)))
}
async fn validate_jwt_token(token: &str) -> bool {
// Implement JWT validation logic
!token.is_empty() // Simplified validation
}
async fn extract_user_id_from_token(token: &str) -> Option<u32> {
// Extract user ID from JWT token
Some(12345) // Simplified extraction
}
async fn handle_authenticated_websocket(socket: WebSocket, user_id: u32) {
let (mut sender, mut receiver) = socket.split();
// Send welcome message
let welcome = serde_json::json!({
"type": "welcome",
"user_id": user_id,
"message": "Connected successfully"
});
let _ = sender.send(axum::extract::ws::Message::Text(welcome.to_string())).await;
// Handle messages
while let Some(msg) = receiver.next().await {
if let Ok(msg) = msg {
match msg {
axum::extract::ws::Message::Text(text) => {
println!("User {} sent: {}", user_id, text);
// Echo back with user ID
let response = serde_json::json!({
"type": "echo",
"user_id": user_id,
"original": text
});
let _ = sender.send(axum::extract::ws::Message::Text(response.to_string())).await;
}
axum::extract::ws::Message::Close(_) => break,
_ => {}
}
}
}
}
Concurrency and Performance
Connection Pooling
use sqlx::{PgPool, Row};
use std::time::Duration;
// Database connection pool configuration
async fn create_database_pool() -> Result<PgPool, sqlx::Error> {
PgPool::builder()
.max_connections(20) // Maximum connections
.min_connections(5) // Minimum connections
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(600))
.max_lifetime(Duration::from_secs(1800))
.test_before_acquire(true) // Test connections before use
.build("postgresql://user:pass@localhost/db")
.await
}
// HTTP client pool
use reqwest::Client;
fn create_http_client() -> Client {
Client::builder()
.pool_max_idle_per_host(10)
.pool_idle_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30))
.connect_timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client")
}
// Connection management in handlers
async fn database_intensive_handler(
State(pool): State<PgPool>,
) -> Result<Json<Vec<String>>, StatusCode> {
// Use connection from pool
let results: Vec<String> = sqlx::query("SELECT name FROM users LIMIT 100")
.fetch_all(&pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.into_iter()
.map(|row| row.get(0))
.collect();
Ok(Json(results))
}
Rate Limiting
use governor::{Quota, RateLimiter};
use nonzero::NonZeroU32;
use std::num::NonZeroU32;
use std::sync::Arc;
// Rate limiting middleware
struct RateLimitState {
limiter: Arc<RateLimiter<String, governor::state::InMemoryState, governor::clock::DefaultClock>>,
}
impl RateLimitState {
fn new() -> Self {
let quota = Quota::per_second(NonZeroU32::new(10).unwrap()); // 10 requests per second
let limiter = RateLimiter::keyed(quota);
Self {
limiter: Arc::new(limiter),
}
}
}
async fn rate_limited_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(rate_limit): State<RateLimitState>,
request: Request<Body>,
) -> Result<Response<Body>, StatusCode> {
let client_key = addr.ip().to_string();
// Check rate limit
match rate_limit.limiter.check_key(&client_key) {
Ok(_) => {
// Process request normally
Ok(Response::new(Body::from("Request processed")))
}
Err(_) => {
// Rate limit exceeded
Err(StatusCode::TOO_MANY_REQUESTS)
}
}
}
// Advanced rate limiting with different limits per endpoint
use dashmap::DashMap;
struct AdvancedRateLimiter {
limiters: DashMap<String, Arc<RateLimiter<String, governor::state::InMemoryState, governor::clock::DefaultClock>>>,
}
impl AdvancedRateLimiter {
fn new() -> Self {
Self {
limiters: DashMap::new(),
}
}
fn get_limiter(&self, endpoint: &str) -> Arc<RateLimiter<String, governor::state::InMemoryState, governor::clock::DefaultClock>> {
self.limiters.entry(endpoint.to_string()).or_insert_with(|| {
let quota = match endpoint {
"/api/auth/login" => Quota::per_minute(NonZeroU32::new(5).unwrap()), // Strict limit for auth
"/api/search" => Quota::per_second(NonZeroU32::new(2).unwrap()), // Moderate limit for search
_ => Quota::per_second(NonZeroU32::new(10).unwrap()), // Default limit
};
Arc::new(RateLimiter::keyed(quota))
}).clone()
}
async fn check_rate_limit(&self, client_key: &str, endpoint: &str) -> bool {
let limiter = self.get_limiter(endpoint);
limiter.check_key(client_key).is_ok()
}
}
Performance Monitoring
use std::time::Instant;
use std::sync::atomic::{AtomicU64, Ordering};
// Request metrics
#[derive(Default)]
struct RequestMetrics {
total_requests: AtomicU64,
total_response_time: AtomicU64,
error_count: AtomicU64,
}
impl RequestMetrics {
fn record_request(&self, duration: Duration, is_error: bool) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
self.total_response_time.fetch_add(duration.as_millis() as u64, Ordering::Relaxed);
if is_error {
self.error_count.fetch_add(1, Ordering::Relaxed);
}
}
fn get_stats(&self) -> (u64, f64, u64) {
let total = self.total_requests.load(Ordering::Relaxed);
let total_time = self.total_response_time.load(Ordering::Relaxed);
let errors = self.error_count.load(Ordering::Relaxed);
let avg_time = if total > 0 { total_time as f64 / total as f64 } else { 0.0 };
(total, avg_time, errors)
}
}
// Monitoring middleware
async fn monitoring_middleware(
request: Request<Body>,
next: Next<Body>,
State(metrics): State<Arc<RequestMetrics>>,
) -> Response<Body> {
let start = Instant::now();
let response = next.run(request).await;
let duration = start.elapsed();
let is_error = response.status().is_server_error() || response.status().is_client_error();
metrics.record_request(duration, is_error);
response
}
// Health check endpoint with metrics
async fn health_check(
State(metrics): State<Arc<RequestMetrics>>,
) -> Json<serde_json::Value> {
let (total_requests, avg_response_time, error_count) = metrics.get_stats();
let error_rate = if total_requests > 0 {
error_count as f64 / total_requests as f64 * 100.0
} else {
0.0
};
Json(serde_json::json!({
"status": "healthy",
"metrics": {
"total_requests": total_requests,
"average_response_time_ms": avg_response_time,
"error_count": error_count,
"error_rate_percent": error_rate,
},
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
Advanced Async Patterns
Fan-out/Fan-in Pattern
use futures::future::{join_all, try_join_all};
// Fan-out to multiple services, fan-in results
async fn aggregate_user_data(user_id: u32) -> Result<UserProfile, Box<dyn std::error::Error>> {
// Fan-out: Start multiple async operations concurrently
let profile_future = fetch_user_profile(user_id);
let preferences_future = fetch_user_preferences(user_id);
let activity_future = fetch_user_activity(user_id);
let social_future = fetch_social_connections(user_id);
// Fan-in: Wait for all operations to complete
let (profile, preferences, activity, social) = tokio::try_join!(
profile_future,
preferences_future,
activity_future,
social_future
)?;
// Combine results
Ok(UserProfile {
basic_info: profile,
preferences,
recent_activity: activity,
social_connections: social,
})
}
// Batch processing with controlled concurrency
async fn process_batch_with_concurrency<T, F, Fut, R>(
items: Vec<T>,
concurrency_limit: usize,
processor: F,
) -> Vec<R>
where
F: Fn(T) -> Fut + Clone,
Fut: std::future::Future<Output = R>,
{
use futures::stream::{self, StreamExt};
stream::iter(items)
.map(processor)
.buffer_unordered(concurrency_limit)
.collect()
.await
}
#[derive(Debug)]
struct UserProfile {
basic_info: BasicInfo,
preferences: UserPreferences,
recent_activity: Vec<Activity>,
social_connections: Vec<Connection>,
}
// Placeholder types and functions
#[derive(Debug)] struct BasicInfo { name: String }
#[derive(Debug)] struct UserPreferences { theme: String }
#[derive(Debug)] struct Activity { action: String }
#[derive(Debug)] struct Connection { friend_id: u32 }
async fn fetch_user_profile(user_id: u32) -> Result<BasicInfo, Box<dyn std::error::Error>> {
sleep(Duration::from_millis(100)).await;
Ok(BasicInfo { name: format!("User {}", user_id) })
}
async fn fetch_user_preferences(user_id: u32) -> Result<UserPreferences, Box<dyn std::error::Error>> {
sleep(Duration::from_millis(80)).await;
Ok(UserPreferences { theme: "dark".to_string() })
}
async fn fetch_user_activity(user_id: u32) -> Result<Vec<Activity>, Box<dyn std::error::Error>> {
sleep(Duration::from_millis(120)).await;
Ok(vec![Activity { action: "login".to_string() }])
}
async fn fetch_social_connections(user_id: u32) -> Result<Vec<Connection>, Box<dyn std::error::Error>> {
sleep(Duration::from_millis(90)).await;
Ok(vec![Connection { friend_id: user_id + 1 }])
}
Async Caching
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
// Generic async cache with TTL
pub struct AsyncCache<K, V> {
data: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
default_ttl: Duration,
}
struct CacheEntry<V> {
value: V,
expires_at: Instant,
}
impl<K, V> AsyncCache<K, V>
where
K: Eq + Hash + Clone,
V: Clone,
{
pub fn new(default_ttl: Duration) -> Self {
Self {
data: Arc::new(RwLock::new(HashMap::new())),
default_ttl,
}
}
pub async fn get(&self, key: &K) -> Option<V> {
let data = self.data.read().await;
if let Some(entry) = data.get(key) {
if entry.expires_at > Instant::now() {
Some(entry.value.clone())
} else {
None // Expired
}
} else {
None
}
}
pub async fn set(&self, key: K, value: V) -> Option<V> {
self.set_with_ttl(key, value, self.default_ttl).await
}
pub async fn set_with_ttl(&self, key: K, value: V, ttl: Duration) -> Option<V> {
let mut data = self.data.write().await;
let expires_at = Instant::now() + ttl;
let entry = CacheEntry {
value: value.clone(),
expires_at,
};
data.insert(key, entry).map(|old_entry| old_entry.value)
}
pub async fn get_or_set<F, Fut>(&self, key: K, factory: F) -> V
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = V>,
{
// Try to get from cache first
if let Some(value) = self.get(&key).await {
return value;
}
// Not in cache, compute value
let value = factory().await;
self.set(key, value.clone()).await;
value
}
// Background cleanup task
pub async fn cleanup_expired(&self) {
let mut data = self.data.write().await;
let now = Instant::now();
data.retain(|_, entry| entry.expires_at > now);
}
}
// Cache-aside pattern implementation
async fn cached_user_lookup(
user_id: u32,
cache: &AsyncCache<u32, UserData>,
database: &DatabasePool,
) -> Result<UserData, DatabaseError> {
// Try cache first
if let Some(user) = cache.get(&user_id).await {
return Ok(user);
}
// Not in cache, fetch from database
let user = database.get_user(user_id).await?;
// Store in cache
cache.set(user_id, user.clone()).await;
Ok(user)
}
// Write-through cache pattern
async fn update_user_write_through(
user_id: u32,
user_data: UserData,
cache: &AsyncCache<u32, UserData>,
database: &DatabasePool,
) -> Result<(), DatabaseError> {
// Update database first
database.update_user(user_id, &user_data).await?;
// Update cache
cache.set(user_id, user_data).await;
Ok(())
}
Best Practices
Resource Management
// Proper resource cleanup with RAII
pub struct ManagedResource {
connection: Option<Connection>,
cleanup_handle: Option<tokio::task::JoinHandle<()>>,
}
impl ManagedResource {
pub async fn new() -> Result<Self, ResourceError> {
let connection = Connection::establish().await?;
// Start background cleanup task
let cleanup_handle = tokio::spawn(async {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
// Perform periodic cleanup
println!("Performing cleanup...");
}
});
Ok(Self {
connection: Some(connection),
cleanup_handle: Some(cleanup_handle),
})
}
pub async fn operation(&self) -> Result<String, ResourceError> {
if let Some(conn) = &self.connection {
conn.query("SELECT 1").await
} else {
Err(ResourceError::ConnectionClosed)
}
}
}
impl Drop for ManagedResource {
fn drop(&mut self) {
if let Some(handle) = self.cleanup_handle.take() {
handle.abort();
}
if let Some(connection) = self.connection.take() {
// Note: In real code, you might want to spawn a task to close async resources
println!("Cleaning up connection");
}
}
}
// Graceful shutdown handling
async fn run_server_with_graceful_shutdown() -> Result<(), Box<dyn std::error::Error>> {
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
println!("Server starting on port 3000");
// Run server with graceful shutdown
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
println!("Server shut down gracefully");
Ok(())
}
async fn shutdown_signal() {
use tokio::signal;
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
println!("Shutdown signal received");
}
Async web development in Rust provides powerful tools for building high-performance, concurrent applications. Key principles include proper error handling, resource management, understanding async patterns, and leveraging Rust's ownership system for safe concurrency. Always measure performance and use appropriate patterns for your specific use case.