1. rust
  2. /concurrency
  3. /message-passing

Message Passing

Message passing is a powerful concurrent programming paradigm where threads communicate by sending messages rather than sharing memory. Rust's ownership system makes message passing particularly elegant and safe, eliminating many common concurrency bugs.

Basic Channel Communication

Single Producer, Single Consumer (SPSC)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Create a channel
    let (tx, rx) = mpsc::channel();

    // Spawn a thread that sends messages
    thread::spawn(move || {
        let values = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in values {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(500));
        }
    });

    // Receive messages in the main thread
    for received in rx {
        println!("Received: {}", received);
    }
}

Multiple Producer, Single Consumer (MPSC)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Clone the transmitter for multiple producers
    for i in 0..3 {
        let tx_clone = tx.clone();
        thread::spawn(move || {
            for j in 0..3 {
                let message = format!("Message {} from thread {}", j, i);
                tx_clone.send(message).unwrap();
                thread::sleep(Duration::from_millis(100));
            }
        });
    }

    // Drop the original transmitter
    drop(tx);

    // Receive all messages
    for received in rx {
        println!("Received: {}", received);
    }
}

Channel Types

Unbounded Channels

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();

    // Producer thread
    let producer = thread::spawn(move || {
        for i in 0..10 {
            println!("Sending: {}", i);
            tx.send(i).unwrap();
            // No blocking - unbounded channel
        }
    });

    // Consumer thread
    let consumer = thread::spawn(move || {
        while let Ok(value) = rx.recv() {
            println!("Received: {}", value);
            thread::sleep(std::time::Duration::from_millis(200));
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

Bounded Channels (Synchronous)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Create a synchronous channel with buffer size 2
    let (tx, rx) = mpsc::sync_channel(2);

    let producer = thread::spawn(move || {
        for i in 0..5 {
            println!("Sending: {}", i);
            match tx.send(i) {
                Ok(_) => println!("Sent: {}", i),
                Err(e) => println!("Send failed: {}", e),
            }
            thread::sleep(Duration::from_millis(100));
        }
    });

    let consumer = thread::spawn(move || {
        thread::sleep(Duration::from_millis(500)); // Delay to show buffering
        while let Ok(value) = rx.recv() {
            println!("Received: {}", value);
            thread::sleep(Duration::from_millis(300));
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

Advanced Channel Patterns

Request-Response Pattern

use std::sync::mpsc;
use std::thread;

#[derive(Debug)]
enum Request {
    Add(i32, i32, mpsc::Sender<i32>),
    Multiply(i32, i32, mpsc::Sender<i32>),
    Quit,
}

fn calculator_service(rx: mpsc::Receiver<Request>) {
    while let Ok(request) = rx.recv() {
        match request {
            Request::Add(a, b, response_tx) => {
                let result = a + b;
                println!("Computing {} + {} = {}", a, b, result);
                response_tx.send(result).unwrap();
            }
            Request::Multiply(a, b, response_tx) => {
                let result = a * b;
                println!("Computing {} * {} = {}", a, b, result);
                response_tx.send(result).unwrap();
            }
            Request::Quit => {
                println!("Calculator service shutting down");
                break;
            }
        }
    }
}

fn main() {
    let (request_tx, request_rx) = mpsc::channel();

    // Start calculator service
    let service_handle = thread::spawn(move || {
        calculator_service(request_rx);
    });

    // Make requests
    let (response_tx, response_rx) = mpsc::channel();
    
    // Send add request
    request_tx.send(Request::Add(5, 3, response_tx.clone())).unwrap();
    let result = response_rx.recv().unwrap();
    println!("Add result: {}", result);

    // Send multiply request
    request_tx.send(Request::Multiply(4, 7, response_tx)).unwrap();
    let result = response_rx.recv().unwrap();
    println!("Multiply result: {}", result);

    // Shutdown service
    request_tx.send(Request::Quit).unwrap();
    service_handle.join().unwrap();
}

Fan-Out Pattern (Load Distribution)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn worker(id: usize, rx: mpsc::Receiver<i32>) {
    while let Ok(work) = rx.recv() {
        println!("Worker {} processing work: {}", id, work);
        thread::sleep(Duration::from_millis(500)); // Simulate work
        println!("Worker {} completed work: {}", id, work);
    }
    println!("Worker {} shutting down", id);
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let num_workers = 3;

    // Create workers
    let mut handles = vec![];
    for i in 0..num_workers {
        let rx_clone = rx.clone(); // Not actually cloning - sharing ownership
        let handle = thread::spawn(move || {
            worker(i, rx_clone);
        });
        handles.push(handle);
    }

    // Drop the original receiver
    drop(rx);

    // Send work items
    for work_item in 0..10 {
        tx.send(work_item).unwrap();
        thread::sleep(Duration::from_millis(100));
    }

    // Signal shutdown by closing the channel
    drop(tx);

    // Wait for all workers to complete
    for handle in handles {
        handle.join().unwrap();
    }
}

Fan-In Pattern (Result Aggregation)

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn producer(id: usize, tx: mpsc::Sender<String>) {
    for i in 0..3 {
        let message = format!("Message {} from producer {}", i, id);
        tx.send(message).unwrap();
        thread::sleep(Duration::from_millis(200));
    }
    println!("Producer {} finished", id);
}

fn main() {
    let (tx, rx) = mpsc::channel();
    let num_producers = 4;

    // Create producer threads
    let mut handles = vec![];
    for i in 0..num_producers {
        let tx_clone = tx.clone();
        let handle = thread::spawn(move || {
            producer(i, tx_clone);
        });
        handles.push(handle);
    }

    // Drop the original transmitter
    drop(tx);

    // Collect all messages
    let collector_handle = thread::spawn(move || {
        let mut messages = Vec::new();
        while let Ok(message) = rx.recv() {
            println!("Collected: {}", message);
            messages.push(message);
        }
        println!("Total messages collected: {}", messages.len());
        messages
    });

    // Wait for all producers to complete
    for handle in handles {
        handle.join().unwrap();
    }

    // Get collected results
    let all_messages = collector_handle.join().unwrap();
    println!("Final collection: {:?}", all_messages);
}

Error Handling with Channels

Handling Send Errors

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // Drop receiver early to simulate disconnection
    drop(rx);

    // Try to send - this will fail
    match tx.send(42) {
        Ok(_) => println!("Message sent successfully"),
        Err(e) => println!("Failed to send message: {}", e),
    }
}

Handling Receive Errors

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel::<i32>();

    let sender_handle = thread::spawn(move || {
        for i in 0..3 {
            tx.send(i).unwrap();
            thread::sleep(Duration::from_millis(100));
        }
        // tx is dropped here, closing the channel
    });

    // Receive with different error handling strategies
    loop {
        match rx.recv() {
            Ok(value) => println!("Received: {}", value),
            Err(e) => {
                println!("Receive error: {}", e);
                break;
            }
        }
    }

    sender_handle.join().unwrap();
}

Non-blocking Receive

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(1000));
        tx.send("Hello after delay").unwrap();
    });

    // Non-blocking attempts
    for i in 0..5 {
        match rx.try_recv() {
            Ok(message) => {
                println!("Received: {}", message);
                break;
            }
            Err(mpsc::TryRecvError::Empty) => {
                println!("Attempt {}: No message yet", i + 1);
                thread::sleep(Duration::from_millis(300));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("Channel disconnected");
                break;
            }
        }
    }
}

Timeout and Selection

Receive with Timeout

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(2000));
        tx.send("Late message").unwrap();
    });

    // Try to receive with timeout
    match rx.recv_timeout(Duration::from_millis(1000)) {
        Ok(message) => println!("Received: {}", message),
        Err(mpsc::RecvTimeoutError::Timeout) => println!("Timeout occurred"),
        Err(mpsc::RecvTimeoutError::Disconnected) => println!("Channel disconnected"),
    }
}

Channel Selection (using crossbeam)

// Note: This requires the crossbeam crate
// In Cargo.toml: crossbeam = "0.8"

/*
use crossbeam::select;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx1, rx1) = mpsc::channel();
    let (tx2, rx2) = mpsc::channel();

    // Spawn threads that send on different channels
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500));
        tx1.send("Message from channel 1").unwrap();
    });

    thread::spawn(move || {
        thread::sleep(Duration::from_millis(800));
        tx2.send("Message from channel 2").unwrap();
    });

    // Select from multiple channels
    loop {
        select! {
            recv(rx1) -> msg => {
                match msg {
                    Ok(m) => println!("Received from channel 1: {}", m),
                    Err(_) => println!("Channel 1 closed"),
                }
            }
            recv(rx2) -> msg => {
                match msg {
                    Ok(m) => println!("Received from channel 2: {}", m),
                    Err(_) => println!("Channel 2 closed"),
                }
            }
            default(Duration::from_millis(100)) => {
                println!("Waiting for messages...");
            }
        }
        
        // Break after receiving from both channels
        if rx1.try_recv().is_err() && rx2.try_recv().is_err() {
            break;
        }
    }
}
*/

Message Serialization

Sending Complex Data

use std::sync::mpsc;
use std::thread;

#[derive(Debug, Clone)]
struct Task {
    id: u32,
    description: String,
    priority: u8,
}

#[derive(Debug)]
enum Message {
    NewTask(Task),
    CompleteTask(u32),
    GetStatus(mpsc::Sender<String>),
    Shutdown,
}

fn task_manager(rx: mpsc::Receiver<Message>) {
    let mut tasks = Vec::new();
    let mut completed = Vec::new();

    while let Ok(message) = rx.recv() {
        match message {
            Message::NewTask(task) => {
                println!("Adding task: {:?}", task);
                tasks.push(task);
            }
            Message::CompleteTask(id) => {
                if let Some(pos) = tasks.iter().position(|t| t.id == id) {
                    let task = tasks.remove(pos);
                    println!("Completed task: {:?}", task);
                    completed.push(task);
                }
            }
            Message::GetStatus(response_tx) => {
                let status = format!(
                    "Active tasks: {}, Completed tasks: {}",
                    tasks.len(),
                    completed.len()
                );
                response_tx.send(status).unwrap();
            }
            Message::Shutdown => {
                println!("Task manager shutting down");
                break;
            }
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();

    let manager_handle = thread::spawn(move || {
        task_manager(rx);
    });

    // Send some tasks
    let task1 = Task {
        id: 1,
        description: "Write documentation".to_string(),
        priority: 1,
    };

    let task2 = Task {
        id: 2,
        description: "Fix bug #123".to_string(),
        priority: 2,
    };

    tx.send(Message::NewTask(task1)).unwrap();
    tx.send(Message::NewTask(task2)).unwrap();

    // Complete a task
    tx.send(Message::CompleteTask(1)).unwrap();

    // Get status
    let (status_tx, status_rx) = mpsc::channel();
    tx.send(Message::GetStatus(status_tx)).unwrap();
    let status = status_rx.recv().unwrap();
    println!("Status: {}", status);

    // Shutdown
    tx.send(Message::Shutdown).unwrap();
    manager_handle.join().unwrap();
}

Performance Considerations

Batch Message Processing

use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};

fn batch_processor(rx: mpsc::Receiver<i32>) {
    let mut batch = Vec::new();
    let batch_size = 5;
    let timeout = Duration::from_millis(100);
    let mut last_process = Instant::now();

    loop {
        match rx.recv_timeout(timeout) {
            Ok(item) => {
                batch.push(item);
                
                // Process batch if it's full or timeout elapsed
                if batch.len() >= batch_size || last_process.elapsed() >= timeout {
                    println!("Processing batch of {} items: {:?}", batch.len(), batch);
                    batch.clear();
                    last_process = Instant::now();
                }
            }
            Err(mpsc::RecvTimeoutError::Timeout) => {
                // Process any remaining items on timeout
                if !batch.is_empty() {
                    println!("Timeout - processing partial batch: {:?}", batch);
                    batch.clear();
                    last_process = Instant::now();
                }
            }
            Err(mpsc::RecvTimeoutError::Disconnected) => {
                // Process any remaining items before shutdown
                if !batch.is_empty() {
                    println!("Channel closed - processing final batch: {:?}", batch);
                }
                break;
            }
        }
    }
}

fn main() {
    let (tx, rx) = mpsc::channel();

    let processor_handle = thread::spawn(move || {
        batch_processor(rx);
    });

    // Send items with varying timing
    for i in 0..12 {
        tx.send(i).unwrap();
        if i % 3 == 0 {
            thread::sleep(Duration::from_millis(150)); // Longer delay
        } else {
            thread::sleep(Duration::from_millis(50)); // Short delay
        }
    }

    drop(tx);
    processor_handle.join().unwrap();
}

Zero-Copy Message Passing

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    let producer = thread::spawn(move || {
        // Create data on producer side
        let mut data = vec![1, 2, 3, 4, 5];
        
        // Modify data
        for item in &mut data {
            *item *= 2;
        }
        
        println!("Producer: sending data of length {}", data.len());
        
        // Move data through channel (zero-copy transfer of ownership)
        tx.send(data).unwrap();
    });

    let consumer = thread::spawn(move || {
        // Receive ownership of data
        let received_data = rx.recv().unwrap();
        println!("Consumer: received data: {:?}", received_data);
        
        // Consumer now owns the data and can modify it
        let sum: i32 = received_data.iter().sum();
        println!("Consumer: sum = {}", sum);
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

Best Practices

1. Prefer Message Passing Over Shared State

use std::sync::mpsc;
use std::thread;

// Good: Message passing
#[derive(Debug)]
enum BankMessage {
    Deposit(u32, u64),     // account_id, amount
    Withdraw(u32, u64),    // account_id, amount
    GetBalance(u32, mpsc::Sender<u64>), // account_id, response_channel
}

fn bank_service(rx: mpsc::Receiver<BankMessage>) {
    let mut balances = std::collections::HashMap::new();
    
    while let Ok(message) = rx.recv() {
        match message {
            BankMessage::Deposit(account, amount) => {
                let balance = balances.entry(account).or_insert(0);
                *balance += amount;
                println!("Deposited {} to account {}. New balance: {}", amount, account, balance);
            }
            BankMessage::Withdraw(account, amount) => {
                let balance = balances.entry(account).or_insert(0);
                if *balance >= amount {
                    *balance -= amount;
                    println!("Withdrew {} from account {}. New balance: {}", amount, account, balance);
                } else {
                    println!("Insufficient funds in account {}", account);
                }
            }
            BankMessage::GetBalance(account, response_tx) => {
                let balance = balances.get(&account).copied().unwrap_or(0);
                response_tx.send(balance).unwrap();
            }
        }
    }
}

2. Use Bounded Channels for Backpressure

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // Bounded channel prevents unlimited memory growth
    let (tx, rx) = mpsc::sync_channel(10);

    let producer = thread::spawn(move || {
        for i in 0..100 {
            // This will block when channel is full, providing backpressure
            match tx.send(i) {
                Ok(_) => println!("Sent: {}", i),
                Err(_) => break,
            }
        }
    });

    let consumer = thread::spawn(move || {
        while let Ok(item) = rx.recv() {
            // Slow consumer
            thread::sleep(Duration::from_millis(100));
            println!("Processed: {}", item);
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

3. Handle Channel Closure Gracefully

use std::sync::mpsc;
use std::thread;

fn robust_worker(rx: mpsc::Receiver<String>) {
    loop {
        match rx.recv() {
            Ok(work) => {
                println!("Processing: {}", work);
                // Do work...
            }
            Err(mpsc::RecvError) => {
                println!("Channel closed, worker shutting down gracefully");
                break;
            }
        }
    }
}

4. Use Typed Messages for Safety

use std::sync::mpsc;

// Good: Typed messages
#[derive(Debug)]
enum UserMessage {
    Create { name: String, email: String },
    Update { id: u32, name: Option<String>, email: Option<String> },
    Delete { id: u32 },
    Get { id: u32, response: mpsc::Sender<Option<User>> },
}

#[derive(Debug, Clone)]
struct User {
    id: u32,
    name: String,
    email: String,
}

// Avoid: Untyped messages
// type GenericMessage = (String, Vec<String>); // Hard to understand and error-prone

5. Consider Using Higher-Level Abstractions

// For complex scenarios, consider using:
// - tokio::sync::mpsc for async code
// - crossbeam-channel for additional features
// - flume for better performance
// - async-channel for async environments

use std::sync::mpsc;

// Example of wrapping channels in higher-level abstractions
struct EventBus<T> {
    senders: Vec<mpsc::Sender<T>>,
}

impl<T: Clone> EventBus<T> {
    fn new() -> Self {
        EventBus {
            senders: Vec::new(),
        }
    }
    
    fn subscribe(&mut self) -> mpsc::Receiver<T> {
        let (tx, rx) = mpsc::channel();
        self.senders.push(tx);
        rx
    }
    
    fn publish(&self, event: T) {
        // Remove closed senders and send to active ones
        for sender in &self.senders {
            let _ = sender.send(event.clone());
        }
    }
}

Message passing provides a powerful and safe way to coordinate between threads in Rust. By avoiding shared mutable state, you eliminate many classes of concurrency bugs while creating systems that are easier to reason about and debug. The ownership system ensures that data races are impossible when using message passing correctly.