Threads & Parallelism
Rust provides excellent support for concurrent programming through its thread system. The ownership model and type system help prevent common concurrency bugs like data races, making concurrent Rust programs both safe and performant.
Creating and Managing Threads
Basic Thread Creation
use std::thread;
use std::time::Duration;
fn main() {
// Spawn a new thread
let handle = thread::spawn(|| {
for i in 1..10 {
println!("Hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
// Main thread continues execution
for i in 1..5 {
println!("Hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
// Wait for the spawned thread to finish
handle.join().unwrap();
println!("Main thread finished!");
}
Threads with Return Values
use std::thread;
fn main() {
let handle = thread::spawn(|| {
let mut sum = 0;
for i in 1..=100 {
sum += i;
}
sum // Return value
});
// Do other work in main thread
println!("Waiting for calculation...");
// Get the result from the thread
let result = handle.join().unwrap();
println!("Sum of 1-100: {}", result);
}
Multiple Threads
use std::thread;
fn main() {
let mut handles = vec![];
for i in 0..10 {
let handle = thread::spawn(move || {
println!("Thread {} is running", i);
i * i // Return the square
});
handles.push(handle);
}
// Collect results from all threads
let results: Vec<i32> = handles
.into_iter()
.map(|handle| handle.join().unwrap())
.collect();
println!("Results: {:?}", results);
}
Moving Data into Threads
Using move Closures
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5];
let handle = thread::spawn(move || {
// data is moved into the thread
println!("Vector in thread: {:?}", data);
data.iter().sum::<i32>()
});
// data is no longer available in main thread
// println!("{:?}", data); // This would cause a compile error
let sum = handle.join().unwrap();
println!("Sum: {}", sum);
}
Sharing Data Between Threads
use std::sync::Arc;
use std::thread;
fn main() {
let data = Arc::new(vec![1, 2, 3, 4, 5]);
let mut handles = vec![];
for i in 0..3 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
println!("Thread {}: {:?}", i, data_clone);
data_clone.iter().sum::<i32>()
});
handles.push(handle);
}
for handle in handles {
let sum = handle.join().unwrap();
println!("Sum from thread: {}", sum);
}
}
Thread Safety and Synchronization
Mutex for Shared Mutable State
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..1000 {
let mut num = counter_clone.lock().unwrap();
*num += 1;
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final counter value: {}", *counter.lock().unwrap());
}
RwLock for Read-Heavy Workloads
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));
let mut handles = vec![];
// Spawn reader threads
for i in 0..5 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let reader = data_clone.read().unwrap();
println!("Reader {}: {:?}", i, *reader);
thread::sleep(Duration::from_millis(100));
});
handles.push(handle);
}
// Spawn a writer thread
let data_clone = Arc::clone(&data);
let writer_handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
let mut writer = data_clone.write().unwrap();
writer.push(6);
println!("Writer: Added 6 to the vector");
});
handles.push(writer_handle);
for handle in handles {
handle.join().unwrap();
}
println!("Final data: {:?}", *data.read().unwrap());
}
Thread Communication Patterns
Producer-Consumer Pattern
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
use std::collections::VecDeque;
struct Buffer {
queue: Mutex<VecDeque<i32>>,
not_empty: Condvar,
not_full: Condvar,
capacity: usize,
}
impl Buffer {
fn new(capacity: usize) -> Self {
Buffer {
queue: Mutex::new(VecDeque::new()),
not_empty: Condvar::new(),
not_full: Condvar::new(),
capacity,
}
}
fn produce(&self, item: i32) {
let mut queue = self.queue.lock().unwrap();
// Wait while buffer is full
while queue.len() == self.capacity {
queue = self.not_full.wait(queue).unwrap();
}
queue.push_back(item);
println!("Produced: {}", item);
// Notify consumers that buffer is not empty
self.not_empty.notify_one();
}
fn consume(&self) -> i32 {
let mut queue = self.queue.lock().unwrap();
// Wait while buffer is empty
while queue.is_empty() {
queue = self.not_empty.wait(queue).unwrap();
}
let item = queue.pop_front().unwrap();
println!("Consumed: {}", item);
// Notify producers that buffer is not full
self.not_full.notify_one();
item
}
}
fn main() {
let buffer = Arc::new(Buffer::new(5));
let mut handles = vec![];
// Producer thread
let buffer_producer = Arc::clone(&buffer);
let producer = thread::spawn(move || {
for i in 0..10 {
buffer_producer.produce(i);
thread::sleep(Duration::from_millis(100));
}
});
handles.push(producer);
// Consumer threads
for _ in 0..2 {
let buffer_consumer = Arc::clone(&buffer);
let consumer = thread::spawn(move || {
for _ in 0..5 {
buffer_consumer.consume();
thread::sleep(Duration::from_millis(200));
}
});
handles.push(consumer);
}
for handle in handles {
handle.join().unwrap();
}
}
Work Distribution Pattern
use std::sync::{Arc, Mutex};
use std::thread;
struct WorkQueue {
tasks: Mutex<Vec<i32>>,
}
impl WorkQueue {
fn new(tasks: Vec<i32>) -> Self {
WorkQueue {
tasks: Mutex::new(tasks),
}
}
fn get_task(&self) -> Option<i32> {
let mut tasks = self.tasks.lock().unwrap();
tasks.pop()
}
}
fn worker(id: usize, queue: Arc<WorkQueue>) {
while let Some(task) = queue.get_task() {
println!("Worker {} processing task {}", id, task);
// Simulate work
let result = task * task;
println!("Worker {} completed task {} with result {}", id, task, result);
thread::sleep(std::time::Duration::from_millis(100));
}
println!("Worker {} finished", id);
}
fn main() {
let tasks = (1..=20).collect();
let queue = Arc::new(WorkQueue::new(tasks));
let mut handles = vec![];
// Spawn worker threads
for i in 0..4 {
let queue_clone = Arc::clone(&queue);
let handle = thread::spawn(move || {
worker(i, queue_clone);
});
handles.push(handle);
}
// Wait for all workers to complete
for handle in handles {
handle.join().unwrap();
}
println!("All work completed!");
}
Parallel Algorithms
Parallel Map-Reduce
use std::sync::{Arc, Mutex};
use std::thread;
fn parallel_map_reduce<T, U, F, R>(
data: Vec<T>,
map_fn: F,
reduce_fn: R,
num_threads: usize,
) -> U
where
T: Send + 'static,
U: Send + 'static,
F: Fn(T) -> U + Send + Sync + 'static,
R: Fn(U, U) -> U + Send + Sync + 'static,
{
let data = Arc::new(Mutex::new(data));
let results = Arc::new(Mutex::new(Vec::new()));
let map_fn = Arc::new(map_fn);
let mut handles = vec![];
// Map phase
for _ in 0..num_threads {
let data_clone = Arc::clone(&data);
let results_clone = Arc::clone(&results);
let map_fn_clone = Arc::clone(&map_fn);
let handle = thread::spawn(move || {
loop {
let item = {
let mut data_guard = data_clone.lock().unwrap();
data_guard.pop()
};
match item {
Some(item) => {
let result = map_fn_clone(item);
results_clone.lock().unwrap().push(result);
}
None => break,
}
}
});
handles.push(handle);
}
// Wait for map phase to complete
for handle in handles {
handle.join().unwrap();
}
// Reduce phase
let results = Arc::try_unwrap(results).unwrap().into_inner().unwrap();
results.into_iter().reduce(reduce_fn).unwrap()
}
fn main() {
let numbers: Vec<i32> = (1..=1000).collect();
// Calculate sum of squares in parallel
let sum_of_squares = parallel_map_reduce(
numbers,
|x| x * x, // Map: square each number
|acc, x| acc + x, // Reduce: sum all squares
4, // Use 4 threads
);
println!("Sum of squares: {}", sum_of_squares);
// Verify with sequential calculation
let expected: i32 = (1..=1000).map(|x| x * x).sum();
println!("Expected: {}", expected);
assert_eq!(sum_of_squares, expected);
}
Parallel Merge Sort
use std::sync::Arc;
use std::thread;
fn merge<T: Clone + Ord>(left: &[T], right: &[T]) -> Vec<T> {
let mut result = Vec::with_capacity(left.len() + right.len());
let mut left_idx = 0;
let mut right_idx = 0;
while left_idx < left.len() && right_idx < right.len() {
if left[left_idx] <= right[right_idx] {
result.push(left[left_idx].clone());
left_idx += 1;
} else {
result.push(right[right_idx].clone());
right_idx += 1;
}
}
result.extend_from_slice(&left[left_idx..]);
result.extend_from_slice(&right[right_idx..]);
result
}
fn parallel_merge_sort<T: Clone + Ord + Send + 'static>(
mut data: Vec<T>,
min_chunk_size: usize,
) -> Vec<T> {
if data.len() <= min_chunk_size {
data.sort();
return data;
}
let mid = data.len() / 2;
let right_half = data.split_off(mid);
let left_half = data;
let left_handle = thread::spawn(move || {
parallel_merge_sort(left_half, min_chunk_size)
});
let right_handle = thread::spawn(move || {
parallel_merge_sort(right_half, min_chunk_size)
});
let left_sorted = left_handle.join().unwrap();
let right_sorted = right_handle.join().unwrap();
merge(&left_sorted, &right_sorted)
}
fn main() {
let mut data: Vec<i32> = (0..1000).rev().collect(); // Reverse sorted
println!("Original data length: {}", data.len());
let sorted = parallel_merge_sort(data, 100);
// Verify sorting
let is_sorted = sorted.windows(2).all(|w| w[0] <= w[1]);
println!("Is sorted: {}", is_sorted);
println!("First 10 elements: {:?}", &sorted[..10]);
println!("Last 10 elements: {:?}", &sorted[sorted.len()-10..]);
}
Thread Local Storage
use std::cell::RefCell;
use std::thread;
thread_local! {
static COUNTER: RefCell<u32> = RefCell::new(0);
}
fn increment_counter() {
COUNTER.with(|c| {
let mut counter = c.borrow_mut();
*counter += 1;
println!("Thread {:?} counter: {}", thread::current().id(), *counter);
});
}
fn main() {
let mut handles = vec![];
for i in 0..3 {
let handle = thread::spawn(move || {
for j in 0..3 {
increment_counter();
println!("Thread {} iteration {}", i, j);
}
});
handles.push(handle);
}
// Also increment in main thread
for _ in 0..2 {
increment_counter();
}
for handle in handles {
handle.join().unwrap();
}
}
Thread Pools
Simple Thread Pool Implementation
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
println!("Sending terminate message to all workers.");
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv();
match job {
Ok(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Err(_) => {
println!("Worker {} disconnected; shutting down.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Executing task {} on thread {:?}", i, thread::current().id());
thread::sleep(std::time::Duration::from_millis(1000));
println!("Task {} completed", i);
});
}
thread::sleep(std::time::Duration::from_millis(5000));
println!("Shutting down.");
}
Best Practices
1. Prefer Message Passing Over Shared State
use std::sync::mpsc;
use std::thread;
// Good: Message passing
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let values = vec![1, 2, 3, 4, 5];
for val in values {
tx.send(val).unwrap();
thread::sleep(std::time::Duration::from_millis(100));
}
});
for received in rx {
println!("Received: {}", received);
}
}
2. Use Scoped Threads for Borrowed Data
use std::thread;
fn main() {
let data = vec![1, 2, 3, 4, 5];
// Using crossbeam for scoped threads (conceptual example)
// In real code: crossbeam::scope(|s| { ... })
// Standard library doesn't have scoped threads yet,
// but the principle is to ensure all threads complete
// before borrowed data goes out of scope
let sum = data.iter().sum::<i32>();
println!("Sum: {}", sum);
}
3. Handle Thread Panics Gracefully
use std::thread;
use std::panic;
fn main() {
let handle = thread::spawn(|| {
panic::set_hook(Box::new(|info| {
println!("Thread panicked: {:?}", info);
}));
panic!("Oops!");
});
match handle.join() {
Ok(_) => println!("Thread completed successfully"),
Err(e) => println!("Thread panicked: {:?}", e),
}
}
4. Use Appropriate Synchronization Primitives
use std::sync::{Arc, Mutex, RwLock};
// Use Mutex for exclusive access
struct Counter {
value: Arc<Mutex<i32>>,
}
// Use RwLock for read-heavy workloads
struct Cache {
data: Arc<RwLock<std::collections::HashMap<String, String>>>,
}
// Use atomic types for simple operations
use std::sync::atomic::{AtomicUsize, Ordering};
static GLOBAL_COUNTER: AtomicUsize = AtomicUsize::new(0);
fn increment_global() {
GLOBAL_COUNTER.fetch_add(1, Ordering::SeqCst);
}
5. Avoid Deadlocks
use std::sync::{Arc, Mutex};
use std::thread;
// Good: Consistent lock ordering
fn main() {
let resource1 = Arc::new(Mutex::new(0));
let resource2 = Arc::new(Mutex::new(0));
let r1_clone = Arc::clone(&resource1);
let r2_clone = Arc::clone(&resource2);
let handle1 = thread::spawn(move || {
// Always acquire locks in the same order
let _guard1 = r1_clone.lock().unwrap();
let _guard2 = r2_clone.lock().unwrap();
// Do work...
});
let r1_clone = Arc::clone(&resource1);
let r2_clone = Arc::clone(&resource2);
let handle2 = thread::spawn(move || {
// Same order here too
let _guard1 = r1_clone.lock().unwrap();
let _guard2 = r2_clone.lock().unwrap();
// Do work...
});
handle1.join().unwrap();
handle2.join().unwrap();
}
Rust's thread system provides powerful tools for concurrent programming while maintaining safety through the ownership system. The compiler prevents data races at compile time, allowing you to write efficient parallel code with confidence. Use these patterns to build robust, concurrent applications that scale with available hardware.