Async Programming
Asynchronous programming allows you to write concurrent code that can handle many tasks efficiently without blocking. Rust's async/await syntax, combined with the futures ecosystem, provides powerful tools for building scalable applications.
Basic Async/Await Syntax
Simple Async Functions
// Note: This requires the tokio crate for the runtime
// In Cargo.toml: tokio = { version = "1", features = ["full"] }
use std::time::Duration;
use tokio::time::sleep;
async fn hello_async() {
println!("Hello from async function!");
}
async fn delayed_greeting(name: &str, delay_ms: u64) -> String {
sleep(Duration::from_millis(delay_ms)).await;
format!("Hello, {}! (after {}ms delay)", name, delay_ms)
}
async fn fetch_data() -> Result<String, &'static str> {
// Simulate async work
sleep(Duration::from_millis(100)).await;
// Simulate potential failure
if rand::random::<bool>() {
Ok("Data fetched successfully".to_string())
} else {
Err("Failed to fetch data")
}
}
#[tokio::main]
async fn main() {
hello_async().await;
let greeting = delayed_greeting("World", 500).await;
println!("{}", greeting);
match fetch_data().await {
Ok(data) => println!("Success: {}", data),
Err(error) => println!("Error: {}", error),
}
}
Concurrent Execution with join!
use tokio::time::{sleep, Duration};
async fn task_one() -> u32 {
println!("Task 1 starting...");
sleep(Duration::from_millis(1000)).await;
println!("Task 1 completed!");
42
}
async fn task_two() -> String {
println!("Task 2 starting...");
sleep(Duration::from_millis(800)).await;
println!("Task 2 completed!");
"Hello from task 2".to_string()
}
async fn task_three() -> Result<i32, &'static str> {
println!("Task 3 starting...");
sleep(Duration::from_millis(600)).await;
println!("Task 3 completed!");
Ok(100)
}
#[tokio::main]
async fn main() {
println!("Starting concurrent tasks...");
// All tasks run concurrently and we wait for all to complete
let (result1, result2, result3) = tokio::join!(
task_one(),
task_two(),
task_three()
);
println!("Results: {}, {}, {:?}", result1, result2, result3);
}
Async Error Handling
Using try_join! for Early Exit
use tokio::time::{sleep, Duration};
async fn successful_task() -> Result<String, &'static str> {
sleep(Duration::from_millis(500)).await;
Ok("Success!".to_string())
}
async fn failing_task() -> Result<i32, &'static str> {
sleep(Duration::from_millis(300)).await;
Err("Task failed!")
}
async fn slow_task() -> Result<f64, &'static str> {
sleep(Duration::from_millis(1000)).await;
Ok(3.14)
}
#[tokio::main]
async fn main() {
println!("Running tasks with try_join...");
// try_join! stops on first error
match tokio::try_join!(
successful_task(),
failing_task(),
slow_task()
) {
Ok((result1, result2, result3)) => {
println!("All succeeded: {}, {}, {}", result1, result2, result3);
}
Err(error) => {
println!("At least one task failed: {}", error);
}
}
}
Error Propagation in Async Functions
use std::io;
use tokio::fs;
use tokio::time::{sleep, Duration};
#[derive(Debug)]
enum AppError {
Io(io::Error),
Network(String),
Timeout,
}
impl From<io::Error> for AppError {
fn from(error: io::Error) -> Self {
AppError::Io(error)
}
}
async fn read_config_file(path: &str) -> Result<String, AppError> {
let content = fs::read_to_string(path).await?;
Ok(content)
}
async fn fetch_from_api(url: &str) -> Result<String, AppError> {
// Simulate network delay
sleep(Duration::from_millis(200)).await;
// Simulate network failure
if url.contains("bad") {
return Err(AppError::Network("Invalid URL".to_string()));
}
Ok(format!("Data from {}", url))
}
async fn process_with_timeout() -> Result<String, AppError> {
let timeout_duration = Duration::from_millis(100);
match tokio::time::timeout(timeout_duration, slow_operation()).await {
Ok(result) => result,
Err(_) => Err(AppError::Timeout),
}
}
async fn slow_operation() -> Result<String, AppError> {
sleep(Duration::from_millis(200)).await; // This will timeout
Ok("Slow result".to_string())
}
#[tokio::main]
async fn main() {
// This will succeed if the file exists
match read_config_file("Cargo.toml").await {
Ok(content) => println!("Config length: {}", content.len()),
Err(e) => println!("Config error: {:?}", e),
}
// This will fail due to bad URL
match fetch_from_api("bad-url").await {
Ok(data) => println!("API data: {}", data),
Err(e) => println!("API error: {:?}", e),
}
// This will timeout
match process_with_timeout().await {
Ok(result) => println!("Result: {}", result),
Err(e) => println!("Timeout error: {:?}", e),
}
}
Async Traits and Dynamic Dispatch
Async Traits with async-trait
// In Cargo.toml: async-trait = "0.1"
use async_trait::async_trait;
use std::time::Duration;
use tokio::time::sleep;
#[async_trait]
trait DataStore {
async fn get(&self, key: &str) -> Result<String, String>;
async fn set(&self, key: &str, value: String) -> Result<(), String>;
async fn delete(&self, key: &str) -> Result<(), String>;
}
struct MemoryStore {
data: std::sync::Arc<tokio::sync::Mutex<std::collections::HashMap<String, String>>>,
}
impl MemoryStore {
fn new() -> Self {
MemoryStore {
data: std::sync::Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
}
}
}
#[async_trait]
impl DataStore for MemoryStore {
async fn get(&self, key: &str) -> Result<String, String> {
let data = self.data.lock().await;
data.get(key)
.cloned()
.ok_or_else(|| format!("Key '{}' not found", key))
}
async fn set(&self, key: &str, value: String) -> Result<(), String> {
let mut data = self.data.lock().await;
data.insert(key.to_string(), value);
Ok(())
}
async fn delete(&self, key: &str) -> Result<(), String> {
let mut data = self.data.lock().await;
data.remove(key)
.map(|_| ())
.ok_or_else(|| format!("Key '{}' not found", key))
}
}
struct SlowStore;
#[async_trait]
impl DataStore for SlowStore {
async fn get(&self, key: &str) -> Result<String, String> {
sleep(Duration::from_millis(100)).await; // Simulate slow I/O
Ok(format!("slow_value_for_{}", key))
}
async fn set(&self, key: &str, value: String) -> Result<(), String> {
sleep(Duration::from_millis(150)).await;
println!("SlowStore: Set {} = {}", key, value);
Ok(())
}
async fn delete(&self, key: &str) -> Result<(), String> {
sleep(Duration::from_millis(75)).await;
println!("SlowStore: Deleted {}", key);
Ok(())
}
}
async fn use_data_store(store: &dyn DataStore) -> Result<(), String> {
store.set("user:1", "Alice".to_string()).await?;
store.set("user:2", "Bob".to_string()).await?;
let user1 = store.get("user:1").await?;
println!("Retrieved: {}", user1);
store.delete("user:2").await?;
match store.get("user:2").await {
Ok(value) => println!("Unexpected value: {}", value),
Err(e) => println!("Expected error: {}", e),
}
Ok(())
}
#[tokio::main]
async fn main() {
println!("Testing MemoryStore:");
let memory_store = MemoryStore::new();
use_data_store(&memory_store).await.unwrap();
println!("\nTesting SlowStore:");
let slow_store = SlowStore;
use_data_store(&slow_store).await.unwrap();
}
Streams and Async Iteration
Creating and Consuming Streams
use futures::stream::{self, StreamExt};
use tokio::time::{sleep, Duration};
async fn number_stream() {
// Create a stream from an iterator
let mut stream = stream::iter(1..=10);
while let Some(number) = stream.next().await {
println!("Number: {}", number);
sleep(Duration::from_millis(100)).await;
}
}
async fn async_number_generator() -> impl futures::Stream<Item = i32> {
stream::unfold(0, |state| async move {
if state < 5 {
sleep(Duration::from_millis(200)).await;
Some((state, state + 1))
} else {
None
}
})
}
async fn process_stream() {
let stream = async_number_generator().await;
// Transform and filter the stream
let processed: Vec<String> = stream
.filter(|&x| async move { x % 2 == 0 })
.map(|x| format!("Even number: {}", x))
.collect()
.await;
println!("Processed stream: {:?}", processed);
}
#[tokio::main]
async fn main() {
println!("Basic stream iteration:");
number_stream().await;
println!("\nProcessing async stream:");
process_stream().await;
}
Custom Stream Implementation
use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Duration, Instant};
struct TimerStream {
interval: Duration,
next_tick: Instant,
count: usize,
max_count: usize,
}
impl TimerStream {
fn new(interval: Duration, max_count: usize) -> Self {
TimerStream {
interval,
next_tick: Instant::now() + interval,
count: 0,
max_count,
}
}
}
impl Stream for TimerStream {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.count >= self.max_count {
return Poll::Ready(None);
}
let now = Instant::now();
if now >= self.next_tick {
let current_count = self.count;
self.count += 1;
self.next_tick += self.interval;
Poll::Ready(Some(current_count))
} else {
// Schedule wake-up
let waker = cx.waker().clone();
let sleep_duration = self.next_tick - now;
tokio::spawn(async move {
sleep(sleep_duration).await;
waker.wake();
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let mut timer_stream = TimerStream::new(Duration::from_millis(500), 5);
println!("Starting timer stream (500ms intervals, 5 ticks):");
while let Some(tick) = timer_stream.next().await {
println!("Tick {}: {:?}", tick, Instant::now());
}
println!("Timer stream completed");
}
HTTP Client with Async
Basic HTTP Requests
// In Cargo.toml:
// tokio = { version = "1", features = ["full"] }
// reqwest = { version = "0.11", features = ["json"] }
// serde = { version = "1.0", features = ["derive"] }
use reqwest;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Debug, Deserialize)]
struct Post {
#[serde(rename = "userId")]
user_id: u32,
id: u32,
title: String,
body: String,
}
#[derive(Debug, Serialize)]
struct NewPost {
title: String,
body: String,
#[serde(rename = "userId")]
user_id: u32,
}
async fn fetch_post(id: u32) -> Result<Post, reqwest::Error> {
let url = format!("https://jsonplaceholder.typicode.com/posts/{}", id);
let response = reqwest::get(&url).await?;
let post: Post = response.json().await?;
Ok(post)
}
async fn fetch_multiple_posts(ids: Vec<u32>) -> Vec<Result<Post, reqwest::Error>> {
let tasks: Vec<_> = ids
.into_iter()
.map(|id| tokio::spawn(fetch_post(id)))
.collect();
let mut results = Vec::new();
for task in tasks {
match task.await {
Ok(result) => results.push(result),
Err(e) => results.push(Err(reqwest::Error::from(e))),
}
}
results
}
async fn create_post(new_post: NewPost) -> Result<Post, reqwest::Error> {
let client = reqwest::Client::new();
let response = client
.post("https://jsonplaceholder.typicode.com/posts")
.json(&new_post)
.send()
.await?;
let created_post: Post = response.json().await?;
Ok(created_post)
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Fetch a single post
println!("Fetching post 1...");
match fetch_post(1).await {
Ok(post) => println!("Post: {}", post.title),
Err(e) => println!("Error fetching post: {}", e),
}
// Fetch multiple posts concurrently
println!("\nFetching posts 1, 2, 3 concurrently...");
let results = fetch_multiple_posts(vec![1, 2, 3]).await;
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(post) => println!("Post {}: {}", i + 1, post.title),
Err(e) => println!("Error fetching post {}: {}", i + 1, e),
}
}
// Create a new post
println!("\nCreating a new post...");
let new_post = NewPost {
title: "My Async Post".to_string(),
body: "This is a post created with async Rust!".to_string(),
user_id: 1,
};
match create_post(new_post).await {
Ok(post) => println!("Created post with ID: {}", post.id),
Err(e) => println!("Error creating post: {}", e),
}
Ok(())
}
Task Spawning and Management
Spawning Background Tasks
use tokio::time::{sleep, Duration, interval};
use tokio::sync::mpsc;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
async fn background_worker(id: usize, counter: Arc<AtomicUsize>) {
let mut interval = interval(Duration::from_millis(1000));
for i in 0..5 {
interval.tick().await;
let current = counter.fetch_add(1, Ordering::SeqCst);
println!("Worker {} - iteration {}, global counter: {}", id, i, current + 1);
}
println!("Worker {} finished", id);
}
async fn message_processor(mut rx: mpsc::Receiver<String>) {
while let Some(message) = rx.recv().await {
println!("Processing message: {}", message);
sleep(Duration::from_millis(100)).await;
}
println!("Message processor finished");
}
async fn message_producer(tx: mpsc::Sender<String>) {
for i in 0..10 {
let message = format!("Message {}", i);
if tx.send(message).await.is_err() {
println!("Failed to send message {}", i);
break;
}
sleep(Duration::from_millis(150)).await;
}
println!("Message producer finished");
}
#[tokio::main]
async fn main() {
let counter = Arc::new(AtomicUsize::new(0));
// Spawn background workers
let mut worker_handles = Vec::new();
for i in 0..3 {
let counter_clone = Arc::clone(&counter);
let handle = tokio::spawn(background_worker(i, counter_clone));
worker_handles.push(handle);
}
// Set up message passing
let (tx, rx) = mpsc::channel(10);
let processor_handle = tokio::spawn(message_processor(rx));
let producer_handle = tokio::spawn(message_producer(tx));
// Wait for all tasks to complete
for handle in worker_handles {
handle.await.unwrap();
}
producer_handle.await.unwrap();
processor_handle.await.unwrap();
println!("Final counter value: {}", counter.load(Ordering::SeqCst));
}
Task Cancellation and Timeouts
use tokio::time::{sleep, timeout, Duration};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
async fn cancellable_task(token: CancellationToken, id: usize) {
let mut count = 0;
loop {
tokio::select! {
_ = token.cancelled() => {
println!("Task {} was cancelled after {} iterations", id, count);
break;
}
_ = sleep(Duration::from_millis(200)) => {
count += 1;
println!("Task {} - iteration {}", id, count);
if count >= 10 {
println!("Task {} completed normally", id);
break;
}
}
}
}
}
async fn timeout_example() {
let long_operation = async {
sleep(Duration::from_millis(2000)).await;
"Operation completed"
};
match timeout(Duration::from_millis(1000), long_operation).await {
Ok(result) => println!("Success: {}", result),
Err(_) => println!("Operation timed out"),
}
}
async fn graceful_shutdown() {
let token = CancellationToken::new();
// Spawn multiple cancellable tasks
let mut handles = Vec::new();
for i in 0..3 {
let token_clone = token.clone();
let handle = tokio::spawn(cancellable_task(token_clone, i));
handles.push(handle);
}
// Let tasks run for a while
sleep(Duration::from_millis(1500)).await;
// Cancel all tasks
println!("Initiating shutdown...");
token.cancel();
// Wait for all tasks to handle cancellation
for handle in handles {
handle.await.unwrap();
}
println!("All tasks shut down gracefully");
}
#[tokio::main]
async fn main() {
println!("Testing timeout:");
timeout_example().await;
println!("\nTesting graceful shutdown:");
graceful_shutdown().await;
}
Async File I/O
Reading and Writing Files Asynchronously
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncReadExt, AsyncWriteExt, AsyncBufReadExt, BufReader};
use std::path::Path;
async fn write_file_example() -> Result<(), Box<dyn std::error::Error>> {
let mut file = File::create("async_example.txt").await?;
let content = "Hello, async file I/O!\nThis is line 2.\nThis is line 3.";
file.write_all(content.as_bytes()).await?;
file.flush().await?;
println!("File written successfully");
Ok(())
}
async fn read_file_example() -> Result<(), Box<dyn std::error::Error>> {
// Read entire file into string
let contents = tokio::fs::read_to_string("async_example.txt").await?;
println!("File contents:\n{}", contents);
// Read file line by line
let file = File::open("async_example.txt").await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
println!("\nReading line by line:");
let mut line_number = 1;
while let Some(line) = lines.next_line().await? {
println!("Line {}: {}", line_number, line);
line_number += 1;
}
Ok(())
}
async fn append_to_file() -> Result<(), Box<dyn std::error::Error>> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("async_example.txt")
.await?;
let additional_content = "\nThis line was appended asynchronously!";
file.write_all(additional_content.as_bytes()).await?;
println!("Content appended to file");
Ok(())
}
async fn process_large_file() -> Result<(), Box<dyn std::error::Error>> {
// Create a large file for demonstration
let mut file = File::create("large_file.txt").await?;
for i in 0..1000 {
let line = format!("This is line number {}\n", i);
file.write_all(line.as_bytes()).await?;
}
file.flush().await?;
// Process the file in chunks
let mut file = File::open("large_file.txt").await?;
let mut buffer = vec![0; 1024]; // 1KB buffer
let mut total_bytes = 0;
let mut line_count = 0;
loop {
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break; // End of file
}
total_bytes += bytes_read;
// Count newlines in this chunk
line_count += buffer[..bytes_read].iter().filter(|&&b| b == b'\n').count();
}
println!("Processed {} bytes, {} lines", total_bytes, line_count);
// Clean up
tokio::fs::remove_file("large_file.txt").await?;
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
write_file_example().await?;
read_file_example().await?;
append_to_file().await?;
// Read the file again to see the appended content
println!("\nAfter appending:");
let final_contents = tokio::fs::read_to_string("async_example.txt").await?;
println!("{}", final_contents);
process_large_file().await?;
// Clean up
tokio::fs::remove_file("async_example.txt").await?;
Ok(())
}
Best Practices
1. Choose the Right Runtime
// For most applications, use tokio
#[tokio::main]
async fn main() {
// Your async code here
}
// For specific needs, you can configure the runtime
#[tokio::main(flavor = "current_thread")]
async fn single_threaded_main() {
// Single-threaded runtime
}
// Manual runtime creation for more control
fn custom_runtime() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
// Your async code here
});
}
2. Avoid Blocking in Async Context
use tokio::task;
// Good: Use async alternatives
async fn good_file_read() -> Result<String, std::io::Error> {
tokio::fs::read_to_string("file.txt").await
}
// Good: Use spawn_blocking for CPU-intensive work
async fn cpu_intensive_work() -> u64 {
task::spawn_blocking(|| {
// CPU-intensive computation
(0..1_000_000).sum()
}).await.unwrap()
}
// Avoid: Blocking calls in async context
async fn bad_example() {
// DON'T do this - blocks the async runtime
// std::thread::sleep(Duration::from_secs(1));
// std::fs::read_to_string("file.txt").unwrap();
}
3. Use Appropriate Synchronization Primitives
use tokio::sync::{Mutex, RwLock, Semaphore, broadcast, mpsc};
use std::sync::Arc;
// Async-aware primitives for async code
struct AsyncCounter {
value: Arc<Mutex<usize>>,
}
impl AsyncCounter {
fn new() -> Self {
AsyncCounter {
value: Arc::new(Mutex::new(0)),
}
}
async fn increment(&self) -> usize {
let mut guard = self.value.lock().await;
*guard += 1;
*guard
}
}
// Rate limiting with semaphore
async fn rate_limited_operation(semaphore: Arc<Semaphore>) -> Result<(), &'static str> {
let _permit = semaphore.acquire().await.unwrap();
// Do rate-limited work
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(())
}
4. Handle Errors Appropriately
use tokio::time::{timeout, Duration};
async fn robust_operation() -> Result<String, Box<dyn std::error::Error>> {
// Use timeout to prevent hanging
let result = timeout(Duration::from_secs(5), async {
// Some potentially slow operation
slow_network_call().await
}).await??; // Note the double ? for timeout and inner result
Ok(result)
}
async fn slow_network_call() -> Result<String, reqwest::Error> {
// Simulated network call
Ok("Network response".to_string())
}
5. Structure Async Applications Well
use tokio::sync::mpsc;
use std::sync::Arc;
// Good: Separate concerns into different async components
struct Application {
message_processor: MessageProcessor,
api_server: ApiServer,
}
struct MessageProcessor {
receiver: mpsc::Receiver<String>,
}
struct ApiServer {
port: u16,
}
impl Application {
async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
tokio::select! {
result = self.message_processor.run() => {
println!("Message processor finished: {:?}", result);
}
result = self.api_server.run() => {
println!("API server finished: {:?}", result);
}
}
Ok(())
}
}
impl MessageProcessor {
async fn run(mut self) -> Result<(), Box<dyn std::error::Error>> {
while let Some(message) = self.receiver.recv().await {
self.process_message(message).await?;
}
Ok(())
}
async fn process_message(&self, message: String) -> Result<(), Box<dyn std::error::Error>> {
println!("Processing: {}", message);
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(())
}
}
impl ApiServer {
async fn run(self) -> Result<(), Box<dyn std::error::Error>> {
println!("API server running on port {}", self.port);
// Simulate server running
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(())
}
}
Asynchronous programming in Rust provides powerful tools for building scalable, efficient applications. The async/await syntax, combined with the rich ecosystem of async libraries, enables you to write concurrent code that handles I/O and other asynchronous operations efficiently. Remember to use async-specific synchronization primitives and avoid blocking operations in async contexts to maintain the benefits of asynchronous execution.