1. rust
  2. /concurrency
  3. /async-programming

Async Programming

Asynchronous programming allows you to write concurrent code that can handle many tasks efficiently without blocking. Rust's async/await syntax, combined with the futures ecosystem, provides powerful tools for building scalable applications.

Basic Async/Await Syntax

Simple Async Functions

// Note: This requires the tokio crate for the runtime
// In Cargo.toml: tokio = { version = "1", features = ["full"] }

use std::time::Duration;
use tokio::time::sleep;

async fn hello_async() {
    println!("Hello from async function!");
}

async fn delayed_greeting(name: &str, delay_ms: u64) -> String {
    sleep(Duration::from_millis(delay_ms)).await;
    format!("Hello, {}! (after {}ms delay)", name, delay_ms)
}

async fn fetch_data() -> Result<String, &'static str> {
    // Simulate async work
    sleep(Duration::from_millis(100)).await;
    
    // Simulate potential failure
    if rand::random::<bool>() {
        Ok("Data fetched successfully".to_string())
    } else {
        Err("Failed to fetch data")
    }
}

#[tokio::main]
async fn main() {
    hello_async().await;
    
    let greeting = delayed_greeting("World", 500).await;
    println!("{}", greeting);
    
    match fetch_data().await {
        Ok(data) => println!("Success: {}", data),
        Err(error) => println!("Error: {}", error),
    }
}

Concurrent Execution with join!

use tokio::time::{sleep, Duration};

async fn task_one() -> u32 {
    println!("Task 1 starting...");
    sleep(Duration::from_millis(1000)).await;
    println!("Task 1 completed!");
    42
}

async fn task_two() -> String {
    println!("Task 2 starting...");
    sleep(Duration::from_millis(800)).await;
    println!("Task 2 completed!");
    "Hello from task 2".to_string()
}

async fn task_three() -> Result<i32, &'static str> {
    println!("Task 3 starting...");
    sleep(Duration::from_millis(600)).await;
    println!("Task 3 completed!");
    Ok(100)
}

#[tokio::main]
async fn main() {
    println!("Starting concurrent tasks...");
    
    // All tasks run concurrently and we wait for all to complete
    let (result1, result2, result3) = tokio::join!(
        task_one(),
        task_two(),
        task_three()
    );
    
    println!("Results: {}, {}, {:?}", result1, result2, result3);
}

Async Error Handling

Using try_join! for Early Exit

use tokio::time::{sleep, Duration};

async fn successful_task() -> Result<String, &'static str> {
    sleep(Duration::from_millis(500)).await;
    Ok("Success!".to_string())
}

async fn failing_task() -> Result<i32, &'static str> {
    sleep(Duration::from_millis(300)).await;
    Err("Task failed!")
}

async fn slow_task() -> Result<f64, &'static str> {
    sleep(Duration::from_millis(1000)).await;
    Ok(3.14)
}

#[tokio::main]
async fn main() {
    println!("Running tasks with try_join...");
    
    // try_join! stops on first error
    match tokio::try_join!(
        successful_task(),
        failing_task(),
        slow_task()
    ) {
        Ok((result1, result2, result3)) => {
            println!("All succeeded: {}, {}, {}", result1, result2, result3);
        }
        Err(error) => {
            println!("At least one task failed: {}", error);
        }
    }
}

Error Propagation in Async Functions

use std::io;
use tokio::fs;
use tokio::time::{sleep, Duration};

#[derive(Debug)]
enum AppError {
    Io(io::Error),
    Network(String),
    Timeout,
}

impl From<io::Error> for AppError {
    fn from(error: io::Error) -> Self {
        AppError::Io(error)
    }
}

async fn read_config_file(path: &str) -> Result<String, AppError> {
    let content = fs::read_to_string(path).await?;
    Ok(content)
}

async fn fetch_from_api(url: &str) -> Result<String, AppError> {
    // Simulate network delay
    sleep(Duration::from_millis(200)).await;
    
    // Simulate network failure
    if url.contains("bad") {
        return Err(AppError::Network("Invalid URL".to_string()));
    }
    
    Ok(format!("Data from {}", url))
}

async fn process_with_timeout() -> Result<String, AppError> {
    let timeout_duration = Duration::from_millis(100);
    
    match tokio::time::timeout(timeout_duration, slow_operation()).await {
        Ok(result) => result,
        Err(_) => Err(AppError::Timeout),
    }
}

async fn slow_operation() -> Result<String, AppError> {
    sleep(Duration::from_millis(200)).await; // This will timeout
    Ok("Slow result".to_string())
}

#[tokio::main]
async fn main() {
    // This will succeed if the file exists
    match read_config_file("Cargo.toml").await {
        Ok(content) => println!("Config length: {}", content.len()),
        Err(e) => println!("Config error: {:?}", e),
    }
    
    // This will fail due to bad URL
    match fetch_from_api("bad-url").await {
        Ok(data) => println!("API data: {}", data),
        Err(e) => println!("API error: {:?}", e),
    }
    
    // This will timeout
    match process_with_timeout().await {
        Ok(result) => println!("Result: {}", result),
        Err(e) => println!("Timeout error: {:?}", e),
    }
}

Async Traits and Dynamic Dispatch

Async Traits with async-trait

// In Cargo.toml: async-trait = "0.1"
use async_trait::async_trait;
use std::time::Duration;
use tokio::time::sleep;

#[async_trait]
trait DataStore {
    async fn get(&self, key: &str) -> Result<String, String>;
    async fn set(&self, key: &str, value: String) -> Result<(), String>;
    async fn delete(&self, key: &str) -> Result<(), String>;
}

struct MemoryStore {
    data: std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, String>>>,
}

impl MemoryStore {
    fn new() -> Self {
        MemoryStore {
            data: std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
        }
    }
}

#[async_trait]
impl DataStore for MemoryStore {
    async fn get(&self, key: &str) -> Result<String, String> {
        let data = self.data.lock().await;
        data.get(key)
            .cloned()
            .ok_or_else(|| format!("Key '{}' not found", key))
    }
    
    async fn set(&self, key: &str, value: String) -> Result<(), String> {
        let mut data = self.data.lock().await;
        data.insert(key.to_string(), value);
        Ok(())
    }
    
    async fn delete(&self, key: &str) -> Result<(), String> {
        let mut data = self.data.lock().await;
        data.remove(key)
            .map(|_| ())
            .ok_or_else(|| format!("Key '{}' not found", key))
    }
}

struct SlowStore;

#[async_trait]
impl DataStore for SlowStore {
    async fn get(&self, key: &str) -> Result<String, String> {
        sleep(Duration::from_millis(100)).await; // Simulate slow I/O
        Ok(format!("slow_value_for_{}", key))
    }
    
    async fn set(&self, key: &str, value: String) -> Result<(), String> {
        sleep(Duration::from_millis(150)).await;
        println!("SlowStore: Set {} = {}", key, value);
        Ok(())
    }
    
    async fn delete(&self, key: &str) -> Result<(), String> {
        sleep(Duration::from_millis(75)).await;
        println!("SlowStore: Deleted {}", key);
        Ok(())
    }
}

async fn use_data_store(store: &dyn DataStore) -> Result<(), String> {
    store.set("user:1", "Alice".to_string()).await?;
    store.set("user:2", "Bob".to_string()).await?;
    
    let user1 = store.get("user:1").await?;
    println!("Retrieved: {}", user1);
    
    store.delete("user:2").await?;
    
    match store.get("user:2").await {
        Ok(value) => println!("Unexpected value: {}", value),
        Err(e) => println!("Expected error: {}", e),
    }
    
    Ok(())
}

#[tokio::main]
async fn main() {
    println!("Testing MemoryStore:");
    let memory_store = MemoryStore::new();
    use_data_store(&memory_store).await.unwrap();
    
    println!("\nTesting SlowStore:");
    let slow_store = SlowStore;
    use_data_store(&slow_store).await.unwrap();
}

Streams and Async Iteration

Creating and Consuming Streams

use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};

async fn number_stream() {
    // Create a stream from an iterator
    let mut stream = stream::iter(1..=10);
    
    while let Some(number) = stream.next().await {
        println!("Number: {}", number);
        sleep(Duration::from_millis(100)).await;
    }
}

async fn async_number_generator() -> impl futures::Stream<Item = i32> {
    stream::unfold(0, |state| async move {
        if state < 5 {
            sleep(Duration::from_millis(200)).await;
            Some((state, state + 1))
        } else {
            None
        }
    })
}

async fn process_stream() {
    let stream = async_number_generator().await;
    
    // Transform and filter the stream
    let processed: Vec<String> = stream
        .filter(|&x| async move { x % 2 == 0 })
        .map(|x| format!("Even number: {}", x))
        .collect()
        .await;
    
    println!("Processed stream: {:?}", processed);
}

#[tokio::main]
async fn main() {
    println!("Basic stream iteration:");
    number_stream().await;
    
    println!("\nProcessing async stream:");
    process_stream().await;
}

Custom Stream Implementation

use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Duration, Instant};

struct TimerStream {
    interval: Duration,
    next_tick: Instant,
    count: usize,
    max_count: usize,
}

impl TimerStream {
    fn new(interval: Duration, max_count: usize) -> Self {
        TimerStream {
            interval,
            next_tick: Instant::now() + interval,
            count: 0,
            max_count,
        }
    }
}

impl Stream for TimerStream {
    type Item = usize;
    
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.count >= self.max_count {
            return Poll::Ready(None);
        }
        
        let now = Instant::now();
        if now >= self.next_tick {
            let current_count = self.count;
            self.count += 1;
            self.next_tick += self.interval;
            Poll::Ready(Some(current_count))
        } else {
            // Schedule wake-up
            let waker = cx.waker().clone();
            let sleep_duration = self.next_tick - now;
            tokio::spawn(async move {
                sleep(sleep_duration).await;
                waker.wake();
            });
            Poll::Pending
        }
    }
}

#[tokio::main]
async fn main() {
    let mut timer_stream = TimerStream::new(Duration::from_millis(500), 5);
    
    println!("Starting timer stream (500ms intervals, 5 ticks):");
    while let Some(tick) = timer_stream.next().await {
        println!("Tick {}: {:?}", tick, Instant::now());
    }
    println!("Timer stream completed");
}

HTTP Client with Async

Basic HTTP Requests

// In Cargo.toml: 
// tokio = { version = "1", features = ["full"] }
// reqwest = { version = "0.11", features = ["json"] }
// serde = { version = "1.0", features = ["derive"] }

use reqwest;
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Debug, Deserialize)]
struct Post {
    #[serde(rename = "userId")]
    user_id: u32,
    id: u32,
    title: String,
    body: String,
}

#[derive(Debug, Serialize)]
struct NewPost {
    title: String,
    body: String,
    #[serde(rename = "userId")]
    user_id: u32,
}

async fn fetch_post(id: u32) -> Result<Post, reqwest::Error> {
    let url = format!("https://jsonplaceholder.typicode.com/posts/{}", id);
    let response = reqwest::get(&url).await?;
    let post: Post = response.json().await?;
    Ok(post)
}

async fn fetch_multiple_posts(ids: Vec<u32>) -> Vec<Result<Post, reqwest::Error>> {
    let tasks: Vec<_> = ids
        .into_iter()
        .map(|id| tokio::spawn(fetch_post(id)))
        .collect();
    
    let mut results = Vec::new();
    for task in tasks {
        match task.await {
            Ok(result) => results.push(result),
            Err(e) => results.push(Err(reqwest::Error::from(e))),
        }
    }
    
    results
}

async fn create_post(new_post: NewPost) -> Result<Post, reqwest::Error> {
    let client = reqwest::Client::new();
    let response = client
        .post("https://jsonplaceholder.typicode.com/posts")
        .json(&new_post)
        .send()
        .await?;
    
    let created_post: Post = response.json().await?;
    Ok(created_post)
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Fetch a single post
    println!("Fetching post 1...");
    match fetch_post(1).await {
        Ok(post) => println!("Post: {}", post.title),
        Err(e) => println!("Error fetching post: {}", e),
    }
    
    // Fetch multiple posts concurrently
    println!("\nFetching posts 1, 2, 3 concurrently...");
    let results = fetch_multiple_posts(vec![1, 2, 3]).await;
    for (i, result) in results.into_iter().enumerate() {
        match result {
            Ok(post) => println!("Post {}: {}", i + 1, post.title),
            Err(e) => println!("Error fetching post {}: {}", i + 1, e),
        }
    }
    
    // Create a new post
    println!("\nCreating a new post...");
    let new_post = NewPost {
        title: "My Async Post".to_string(),
        body: "This is a post created with async Rust!".to_string(),
        user_id: 1,
    };
    
    match create_post(new_post).await {
        Ok(post) => println!("Created post with ID: {}", post.id),
        Err(e) => println!("Error creating post: {}", e),
    }
    
    Ok(())
}

Task Spawning and Management

Spawning Background Tasks

use tokio::time::{sleep, Duration, interval};
use tokio::sync::mpsc;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

async fn background_worker(id: usize, counter: Arc<AtomicUsize>) {
    let mut interval = interval(Duration::from_millis(1000));
    
    for i in 0..5 {
        interval.tick().await;
        let current = counter.fetch_add(1, Ordering::SeqCst);
        println!("Worker {} - iteration {}, global counter: {}", id, i, current + 1);
    }
    
    println!("Worker {} finished", id);
}

async fn message_processor(mut rx: mpsc::Receiver<String>) {
    while let Some(message) = rx.recv().await {
        println!("Processing message: {}", message);
        sleep(Duration::from_millis(100)).await;
    }
    println!("Message processor finished");
}

async fn message_producer(tx: mpsc::Sender<String>) {
    for i in 0..10 {
        let message = format!("Message {}", i);
        if tx.send(message).await.is_err() {
            println!("Failed to send message {}", i);
            break;
        }
        sleep(Duration::from_millis(150)).await;
    }
    println!("Message producer finished");
}

#[tokio::main]
async fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    
    // Spawn background workers
    let mut worker_handles = Vec::new();
    for i in 0..3 {
        let counter_clone = Arc::clone(&counter);
        let handle = tokio::spawn(background_worker(i, counter_clone));
        worker_handles.push(handle);
    }
    
    // Set up message passing
    let (tx, rx) = mpsc::channel(10);
    
    let processor_handle = tokio::spawn(message_processor(rx));
    let producer_handle = tokio::spawn(message_producer(tx));
    
    // Wait for all tasks to complete
    for handle in worker_handles {
        handle.await.unwrap();
    }
    
    producer_handle.await.unwrap();
    processor_handle.await.unwrap();
    
    println!("Final counter value: {}", counter.load(Ordering::SeqCst));
}

Task Cancellation and Timeouts

use tokio::time::{sleep, timeout, Duration};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;

async fn cancellable_task(token: CancellationToken, id: usize) {
    let mut count = 0;
    
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                println!("Task {} was cancelled after {} iterations", id, count);
                break;
            }
            _ = sleep(Duration::from_millis(200)) => {
                count += 1;
                println!("Task {} - iteration {}", id, count);
                
                if count >= 10 {
                    println!("Task {} completed normally", id);
                    break;
                }
            }
        }
    }
}

async fn timeout_example() {
    let long_operation = async {
        sleep(Duration::from_millis(2000)).await;
        "Operation completed"
    };
    
    match timeout(Duration::from_millis(1000), long_operation).await {
        Ok(result) => println!("Success: {}", result),
        Err(_) => println!("Operation timed out"),
    }
}

async fn graceful_shutdown() {
    let token = CancellationToken::new();
    
    // Spawn multiple cancellable tasks
    let mut handles = Vec::new();
    for i in 0..3 {
        let token_clone = token.clone();
        let handle = tokio::spawn(cancellable_task(token_clone, i));
        handles.push(handle);
    }
    
    // Let tasks run for a while
    sleep(Duration::from_millis(1500)).await;
    
    // Cancel all tasks
    println!("Initiating shutdown...");
    token.cancel();
    
    // Wait for all tasks to handle cancellation
    for handle in handles {
        handle.await.unwrap();
    }
    
    println!("All tasks shut down gracefully");
}

#[tokio::main]
async fn main() {
    println!("Testing timeout:");
    timeout_example().await;
    
    println!("\nTesting graceful shutdown:");
    graceful_shutdown().await;
}

Async File I/O

Reading and Writing Files Asynchronously

use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt, BufReader};
use std::path::Path;

async fn write_file_example() -> Result<(), Box<dyn std::error::Error>> {
    let mut file = File::create("async_example.txt").await?;
    
    let content = "Hello, async file I/O!\nThis is line 2.\nThis is line 3.";
    file.write_all(content.as_bytes()).await?;
    file.flush().await?;
    
    println!("File written successfully");
    Ok(())
}

async fn read_file_example() -> Result<(), Box<dyn std::error::Error>> {
    // Read entire file into string
    let contents = tokio::fs::read_to_string("async_example.txt").await?;
    println!("File contents:\n{}", contents);
    
    // Read file line by line
    let file = File::open("async_example.txt").await?;
    let reader = BufReader::new(file);
    let mut lines = reader.lines();
    
    println!("\nReading line by line:");
    let mut line_number = 1;
    while let Some(line) = lines.next_line().await? {
        println!("Line {}: {}", line_number, line);
        line_number += 1;
    }
    
    Ok(())
}

async fn append_to_file() -> Result<(), Box<dyn std::error::Error>> {
    let mut file = OpenOptions::new()
        .create(true)
        .append(true)
        .open("async_example.txt")
        .await?;
    
    let additional_content = "\nThis line was appended asynchronously!";
    file.write_all(additional_content.as_bytes()).await?;
    
    println!("Content appended to file");
    Ok(())
}

async fn process_large_file() -> Result<(), Box<dyn std::error::Error>> {
    // Create a large file for demonstration
    let mut file = File::create("large_file.txt").await?;
    for i in 0..1000 {
        let line = format!("This is line number {}\n", i);
        file.write_all(line.as_bytes()).await?;
    }
    file.flush().await?;
    
    // Process the file in chunks
    let mut file = File::open("large_file.txt").await?;
    let mut buffer = vec![0; 1024]; // 1KB buffer
    let mut total_bytes = 0;
    let mut line_count = 0;
    
    loop {
        let bytes_read = file.read(&mut buffer).await?;
        if bytes_read == 0 {
            break; // End of file
        }
        
        total_bytes += bytes_read;
        // Count newlines in this chunk
        line_count += buffer[..bytes_read].iter().filter(|&&b| b == b'\n').count();
    }
    
    println!("Processed {} bytes, {} lines", total_bytes, line_count);
    
    // Clean up
    tokio::fs::remove_file("large_file.txt").await?;
    
    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    write_file_example().await?;
    read_file_example().await?;
    append_to_file().await?;
    
    // Read the file again to see the appended content
    println!("\nAfter appending:");
    let final_contents = tokio::fs::read_to_string("async_example.txt").await?;
    println!("{}", final_contents);
    
    process_large_file().await?;
    
    // Clean up
    tokio::fs::remove_file("async_example.txt").await?;
    
    Ok(())
}

Best Practices

1. Choose the Right Runtime

// For most applications, use tokio
#[tokio::main]
async fn main() {
    // Your async code here
}

// For specific needs, you can configure the runtime
#[tokio::main(flavor = "current_thread")]
async fn single_threaded_main() {
    // Single-threaded runtime
}

// Manual runtime creation for more control
fn custom_runtime() {
    let runtime = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4)
        .enable_all()
        .build()
        .unwrap();
    
    runtime.block_on(async {
        // Your async code here
    });
}

2. Avoid Blocking in Async Context

use tokio::task;

// Good: Use async alternatives
async fn good_file_read() -> Result<String, std::io::Error> {
    tokio::fs::read_to_string("file.txt").await
}

// Good: Use spawn_blocking for CPU-intensive work
async fn cpu_intensive_work() -> u64 {
    task::spawn_blocking(|| {
        // CPU-intensive computation
        (0..1_000_000).sum()
    }).await.unwrap()
}

// Avoid: Blocking calls in async context
async fn bad_example() {
    // DON'T do this - blocks the async runtime
    // std::thread::sleep(Duration::from_secs(1));
    // std::fs::read_to_string("file.txt").unwrap();
}

3. Use Appropriate Synchronization Primitives

use tokio::sync::{Mutex, RwLock, Semaphore, broadcast, mpsc};
use std::sync::Arc;

// Async-aware primitives for async code
struct AsyncCounter {
    value: Arc<Mutex<usize>>,
}

impl AsyncCounter {
    fn new() -> Self {
        AsyncCounter {
            value: Arc::new(Mutex::new(0)),
        }
    }
    
    async fn increment(&self) -> usize {
        let mut guard = self.value.lock().await;
        *guard += 1;
        *guard
    }
}

// Rate limiting with semaphore
async fn rate_limited_operation(semaphore: Arc<Semaphore>) -> Result<(), &'static str> {
    let _permit = semaphore.acquire().await.unwrap();
    // Do rate-limited work
    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    Ok(())
}

4. Handle Errors Appropriately

use tokio::time::{timeout, Duration};

async fn robust_operation() -> Result<String, Box<dyn std::error::Error>> {
    // Use timeout to prevent hanging
    let result = timeout(Duration::from_secs(5), async {
        // Some potentially slow operation
        slow_network_call().await
    }).await??; // Note the double ? for timeout and inner result
    
    Ok(result)
}

async fn slow_network_call() -> Result<String, reqwest::Error> {
    // Simulated network call
    Ok("Network response".to_string())
}

5. Structure Async Applications Well

use tokio::sync::mpsc;
use std::sync::Arc;

// Good: Separate concerns into different async components
struct Application {
    message_processor: MessageProcessor,
    api_server: ApiServer,
}

struct MessageProcessor {
    receiver: mpsc::Receiver<String>,
}

struct ApiServer {
    port: u16,
}

impl Application {
    async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
        tokio::select! {
            result = self.message_processor.run() => {
                println!("Message processor finished: {:?}", result);
            }
            result = self.api_server.run() => {
                println!("API server finished: {:?}", result);
            }
        }
        Ok(())
    }
}

impl MessageProcessor {
    async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
        while let Some(message) = self.receiver.recv().await {
            self.process_message(message).await?;
        }
        Ok(())
    }
    
    async fn process_message(&self, message: String) -> Result<(), Box<dyn std::error::Error>> {
        println!("Processing: {}", message);
        tokio::time::sleep(Duration::from_millis(10)).await;
        Ok(())
    }
}

impl ApiServer {
    async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
        println!("API server running on port {}", self.port);
        // Simulate server running
        tokio::time::sleep(Duration::from_secs(10)).await;
        Ok(())
    }
}

Asynchronous programming in Rust provides powerful tools for building scalable, efficient applications. The async/await syntax, combined with the rich ecosystem of async libraries, enables you to write concurrent code that handles I/O and other asynchronous operations efficiently. Remember to use async-specific synchronization primitives and avoid blocking operations in async contexts to maintain the benefits of asynchronous execution.