Message Passing
Message passing is a powerful concurrent programming paradigm where threads communicate by sending messages rather than sharing memory. Rust's ownership system makes message passing particularly elegant and safe, eliminating many common concurrency bugs.
Basic Channel Communication
Single Producer, Single Consumer (SPSC)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create a channel
let (tx, rx) = mpsc::channel();
// Spawn a thread that sends messages
thread::spawn(move || {
let values = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in values {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(500));
}
});
// Receive messages in the main thread
for received in rx {
println!("Received: {}", received);
}
}
Multiple Producer, Single Consumer (MPSC)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// Clone the transmitter for multiple producers
for i in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
for j in 0..3 {
let message = format!("Message {} from thread {}", j, i);
tx_clone.send(message).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
}
// Drop the original transmitter
drop(tx);
// Receive all messages
for received in rx {
println!("Received: {}", received);
}
}
Channel Types
Unbounded Channels
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel::<i32>();
// Producer thread
let producer = thread::spawn(move || {
for i in 0..10 {
println!("Sending: {}", i);
tx.send(i).unwrap();
// No blocking - unbounded channel
}
});
// Consumer thread
let consumer = thread::spawn(move || {
while let Ok(value) = rx.recv() {
println!("Received: {}", value);
thread::sleep(std::time::Duration::from_millis(200));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
Bounded Channels (Synchronous)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Create a synchronous channel with buffer size 2
let (tx, rx) = mpsc::sync_channel(2);
let producer = thread::spawn(move || {
for i in 0..5 {
println!("Sending: {}", i);
match tx.send(i) {
Ok(_) => println!("Sent: {}", i),
Err(e) => println!("Send failed: {}", e),
}
thread::sleep(Duration::from_millis(100));
}
});
let consumer = thread::spawn(move || {
thread::sleep(Duration::from_millis(500)); // Delay to show buffering
while let Ok(value) = rx.recv() {
println!("Received: {}", value);
thread::sleep(Duration::from_millis(300));
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
Advanced Channel Patterns
Request-Response Pattern
use std::sync::mpsc;
use std::thread;
#[derive(Debug)]
enum Request {
Add(i32, i32, mpsc::Sender<i32>),
Multiply(i32, i32, mpsc::Sender<i32>),
Quit,
}
fn calculator_service(rx: mpsc::Receiver<Request>) {
while let Ok(request) = rx.recv() {
match request {
Request::Add(a, b, response_tx) => {
let result = a + b;
println!("Computing {} + {} = {}", a, b, result);
response_tx.send(result).unwrap();
}
Request::Multiply(a, b, response_tx) => {
let result = a * b;
println!("Computing {} * {} = {}", a, b, result);
response_tx.send(result).unwrap();
}
Request::Quit => {
println!("Calculator service shutting down");
break;
}
}
}
}
fn main() {
let (request_tx, request_rx) = mpsc::channel();
// Start calculator service
let service_handle = thread::spawn(move || {
calculator_service(request_rx);
});
// Make requests
let (response_tx, response_rx) = mpsc::channel();
// Send add request
request_tx.send(Request::Add(5, 3, response_tx.clone())).unwrap();
let result = response_rx.recv().unwrap();
println!("Add result: {}", result);
// Send multiply request
request_tx.send(Request::Multiply(4, 7, response_tx)).unwrap();
let result = response_rx.recv().unwrap();
println!("Multiply result: {}", result);
// Shutdown service
request_tx.send(Request::Quit).unwrap();
service_handle.join().unwrap();
}
Fan-Out Pattern (Load Distribution)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn worker(id: usize, rx: mpsc::Receiver<i32>) {
while let Ok(work) = rx.recv() {
println!("Worker {} processing work: {}", id, work);
thread::sleep(Duration::from_millis(500)); // Simulate work
println!("Worker {} completed work: {}", id, work);
}
println!("Worker {} shutting down", id);
}
fn main() {
let (tx, rx) = mpsc::channel();
let num_workers = 3;
// Create workers
let mut handles = vec![];
for i in 0..num_workers {
let rx_clone = rx.clone(); // Not actually cloning - sharing ownership
let handle = thread::spawn(move || {
worker(i, rx_clone);
});
handles.push(handle);
}
// Drop the original receiver
drop(rx);
// Send work items
for work_item in 0..10 {
tx.send(work_item).unwrap();
thread::sleep(Duration::from_millis(100));
}
// Signal shutdown by closing the channel
drop(tx);
// Wait for all workers to complete
for handle in handles {
handle.join().unwrap();
}
}
Fan-In Pattern (Result Aggregation)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn producer(id: usize, tx: mpsc::Sender<String>) {
for i in 0..3 {
let message = format!("Message {} from producer {}", i, id);
tx.send(message).unwrap();
thread::sleep(Duration::from_millis(200));
}
println!("Producer {} finished", id);
}
fn main() {
let (tx, rx) = mpsc::channel();
let num_producers = 4;
// Create producer threads
let mut handles = vec![];
for i in 0..num_producers {
let tx_clone = tx.clone();
let handle = thread::spawn(move || {
producer(i, tx_clone);
});
handles.push(handle);
}
// Drop the original transmitter
drop(tx);
// Collect all messages
let collector_handle = thread::spawn(move || {
let mut messages = Vec::new();
while let Ok(message) = rx.recv() {
println!("Collected: {}", message);
messages.push(message);
}
println!("Total messages collected: {}", messages.len());
messages
});
// Wait for all producers to complete
for handle in handles {
handle.join().unwrap();
}
// Get collected results
let all_messages = collector_handle.join().unwrap();
println!("Final collection: {:?}", all_messages);
}
Error Handling with Channels
Handling Send Errors
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// Drop receiver early to simulate disconnection
drop(rx);
// Try to send - this will fail
match tx.send(42) {
Ok(_) => println!("Message sent successfully"),
Err(e) => println!("Failed to send message: {}", e),
}
}
Handling Receive Errors
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel::<i32>();
let sender_handle = thread::spawn(move || {
for i in 0..3 {
tx.send(i).unwrap();
thread::sleep(Duration::from_millis(100));
}
// tx is dropped here, closing the channel
});
// Receive with different error handling strategies
loop {
match rx.recv() {
Ok(value) => println!("Received: {}", value),
Err(e) => {
println!("Receive error: {}", e);
break;
}
}
}
sender_handle.join().unwrap();
}
Non-blocking Receive
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
tx.send("Hello after delay").unwrap();
});
// Non-blocking attempts
for i in 0..5 {
match rx.try_recv() {
Ok(message) => {
println!("Received: {}", message);
break;
}
Err(mpsc::TryRecvError::Empty) => {
println!("Attempt {}: No message yet", i + 1);
thread::sleep(Duration::from_millis(300));
}
Err(mpsc::TryRecvError::Disconnected) => {
println!("Channel disconnected");
break;
}
}
}
}
Timeout and Selection
Receive with Timeout
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
thread::sleep(Duration::from_millis(2000));
tx.send("Late message").unwrap();
});
// Try to receive with timeout
match rx.recv_timeout(Duration::from_millis(1000)) {
Ok(message) => println!("Received: {}", message),
Err(mpsc::RecvTimeoutError::Timeout) => println!("Timeout occurred"),
Err(mpsc::RecvTimeoutError::Disconnected) => println!("Channel disconnected"),
}
}
Channel Selection (using crossbeam)
// Note: This requires the crossbeam crate
// In Cargo.toml: crossbeam = "0.8"
/*
use crossbeam::select;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx1, rx1) = mpsc::channel();
let (tx2, rx2) = mpsc::channel();
// Spawn threads that send on different channels
thread::spawn(move || {
thread::sleep(Duration::from_millis(500));
tx1.send("Message from channel 1").unwrap();
});
thread::spawn(move || {
thread::sleep(Duration::from_millis(800));
tx2.send("Message from channel 2").unwrap();
});
// Select from multiple channels
loop {
select! {
recv(rx1) -> msg => {
match msg {
Ok(m) => println!("Received from channel 1: {}", m),
Err(_) => println!("Channel 1 closed"),
}
}
recv(rx2) -> msg => {
match msg {
Ok(m) => println!("Received from channel 2: {}", m),
Err(_) => println!("Channel 2 closed"),
}
}
default(Duration::from_millis(100)) => {
println!("Waiting for messages...");
}
}
// Break after receiving from both channels
if rx1.try_recv().is_err() && rx2.try_recv().is_err() {
break;
}
}
}
*/
Message Serialization
Sending Complex Data
use std::sync::mpsc;
use std::thread;
#[derive(Debug, Clone)]
struct Task {
id: u32,
description: String,
priority: u8,
}
#[derive(Debug)]
enum Message {
NewTask(Task),
CompleteTask(u32),
GetStatus(mpsc::Sender<String>),
Shutdown,
}
fn task_manager(rx: mpsc::Receiver<Message>) {
let mut tasks = Vec::new();
let mut completed = Vec::new();
while let Ok(message) = rx.recv() {
match message {
Message::NewTask(task) => {
println!("Adding task: {:?}", task);
tasks.push(task);
}
Message::CompleteTask(id) => {
if let Some(pos) = tasks.iter().position(|t| t.id == id) {
let task = tasks.remove(pos);
println!("Completed task: {:?}", task);
completed.push(task);
}
}
Message::GetStatus(response_tx) => {
let status = format!(
"Active tasks: {}, Completed tasks: {}",
tasks.len(),
completed.len()
);
response_tx.send(status).unwrap();
}
Message::Shutdown => {
println!("Task manager shutting down");
break;
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let manager_handle = thread::spawn(move || {
task_manager(rx);
});
// Send some tasks
let task1 = Task {
id: 1,
description: "Write documentation".to_string(),
priority: 1,
};
let task2 = Task {
id: 2,
description: "Fix bug #123".to_string(),
priority: 2,
};
tx.send(Message::NewTask(task1)).unwrap();
tx.send(Message::NewTask(task2)).unwrap();
// Complete a task
tx.send(Message::CompleteTask(1)).unwrap();
// Get status
let (status_tx, status_rx) = mpsc::channel();
tx.send(Message::GetStatus(status_tx)).unwrap();
let status = status_rx.recv().unwrap();
println!("Status: {}", status);
// Shutdown
tx.send(Message::Shutdown).unwrap();
manager_handle.join().unwrap();
}
Performance Considerations
Batch Message Processing
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
fn batch_processor(rx: mpsc::Receiver<i32>) {
let mut batch = Vec::new();
let batch_size = 5;
let timeout = Duration::from_millis(100);
let mut last_process = Instant::now();
loop {
match rx.recv_timeout(timeout) {
Ok(item) => {
batch.push(item);
// Process batch if it's full or timeout elapsed
if batch.len() >= batch_size || last_process.elapsed() >= timeout {
println!("Processing batch of {} items: {:?}", batch.len(), batch);
batch.clear();
last_process = Instant::now();
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Process any remaining items on timeout
if !batch.is_empty() {
println!("Timeout - processing partial batch: {:?}", batch);
batch.clear();
last_process = Instant::now();
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
// Process any remaining items before shutdown
if !batch.is_empty() {
println!("Channel closed - processing final batch: {:?}", batch);
}
break;
}
}
}
}
fn main() {
let (tx, rx) = mpsc::channel();
let processor_handle = thread::spawn(move || {
batch_processor(rx);
});
// Send items with varying timing
for i in 0..12 {
tx.send(i).unwrap();
if i % 3 == 0 {
thread::sleep(Duration::from_millis(150)); // Longer delay
} else {
thread::sleep(Duration::from_millis(50)); // Short delay
}
}
drop(tx);
processor_handle.join().unwrap();
}
Zero-Copy Message Passing
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
let producer = thread::spawn(move || {
// Create data on producer side
let mut data = vec![1, 2, 3, 4, 5];
// Modify data
for item in &mut data {
*item *= 2;
}
println!("Producer: sending data of length {}", data.len());
// Move data through channel (zero-copy transfer of ownership)
tx.send(data).unwrap();
});
let consumer = thread::spawn(move || {
// Receive ownership of data
let received_data = rx.recv().unwrap();
println!("Consumer: received data: {:?}", received_data);
// Consumer now owns the data and can modify it
let sum: i32 = received_data.iter().sum();
println!("Consumer: sum = {}", sum);
});
producer.join().unwrap();
consumer.join().unwrap();
}
Best Practices
1. Prefer Message Passing Over Shared State
use std::sync::mpsc;
use std::thread;
// Good: Message passing
#[derive(Debug)]
enum BankMessage {
Deposit(u32, u64), // account_id, amount
Withdraw(u32, u64), // account_id, amount
GetBalance(u32, mpsc::Sender<u64>), // account_id, response_channel
}
fn bank_service(rx: mpsc::Receiver<BankMessage>) {
let mut balances = std::collections::HashMap::new();
while let Ok(message) = rx.recv() {
match message {
BankMessage::Deposit(account, amount) => {
let balance = balances.entry(account).or_insert(0);
*balance += amount;
println!("Deposited {} to account {}. New balance: {}", amount, account, balance);
}
BankMessage::Withdraw(account, amount) => {
let balance = balances.entry(account).or_insert(0);
if *balance >= amount {
*balance -= amount;
println!("Withdrew {} from account {}. New balance: {}", amount, account, balance);
} else {
println!("Insufficient funds in account {}", account);
}
}
BankMessage::GetBalance(account, response_tx) => {
let balance = balances.get(&account).copied().unwrap_or(0);
response_tx.send(balance).unwrap();
}
}
}
}
2. Use Bounded Channels for Backpressure
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// Bounded channel prevents unlimited memory growth
let (tx, rx) = mpsc::sync_channel(10);
let producer = thread::spawn(move || {
for i in 0..100 {
// This will block when channel is full, providing backpressure
match tx.send(i) {
Ok(_) => println!("Sent: {}", i),
Err(_) => break,
}
}
});
let consumer = thread::spawn(move || {
while let Ok(item) = rx.recv() {
// Slow consumer
thread::sleep(Duration::from_millis(100));
println!("Processed: {}", item);
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
3. Handle Channel Closure Gracefully
use std::sync::mpsc;
use std::thread;
fn robust_worker(rx: mpsc::Receiver<String>) {
loop {
match rx.recv() {
Ok(work) => {
println!("Processing: {}", work);
// Do work...
}
Err(mpsc::RecvError) => {
println!("Channel closed, worker shutting down gracefully");
break;
}
}
}
}
4. Use Typed Messages for Safety
use std::sync::mpsc;
// Good: Typed messages
#[derive(Debug)]
enum UserMessage {
Create { name: String, email: String },
Update { id: u32, name: Option<String>, email: Option<String> },
Delete { id: u32 },
Get { id: u32, response: mpsc::Sender<Option<User>> },
}
#[derive(Debug, Clone)]
struct User {
id: u32,
name: String,
email: String,
}
// Avoid: Untyped messages
// type GenericMessage = (String, Vec<String>); // Hard to understand and error-prone
5. Consider Using Higher-Level Abstractions
// For complex scenarios, consider using:
// - tokio::sync::mpsc for async code
// - crossbeam-channel for additional features
// - flume for better performance
// - async-channel for async environments
use std::sync::mpsc;
// Example of wrapping channels in higher-level abstractions
struct EventBus<T> {
senders: Vec<mpsc::Sender<T>>,
}
impl<T: Clone> EventBus<T> {
fn new() -> Self {
EventBus {
senders: Vec::new(),
}
}
fn subscribe(&mut self) -> mpsc::Receiver<T> {
let (tx, rx) = mpsc::channel();
self.senders.push(tx);
rx
}
fn publish(&self, event: T) {
// Remove closed senders and send to active ones
for sender in &self.senders {
let _ = sender.send(event.clone());
}
}
}
Message passing provides a powerful and safe way to coordinate between threads in Rust. By avoiding shared mutable state, you eliminate many classes of concurrency bugs while creating systems that are easier to reason about and debug. The ownership system ensures that data races are impossible when using message passing correctly.