Shared State
While message passing is often preferred, shared state concurrency is sometimes necessary or more efficient. Rust provides several synchronization primitives that allow safe access to shared data across threads while preventing data races at compile time.
Mutex<T> - Mutual Exclusion
Mutex<T>
provides mutual exclusion, ensuring only one thread can access the data at a time.
Basic Mutex Usage
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter_clone.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Mutex with Error Handling
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
#[derive(Debug)]
enum BankError {
InsufficientFunds,
AccountLocked,
}
struct BankAccount {
balance: Mutex<u64>,
id: u32,
}
impl BankAccount {
fn new(id: u32, initial_balance: u64) -> Self {
BankAccount {
balance: Mutex::new(initial_balance),
id,
}
}
fn deposit(&self, amount: u64) -> Result<(), BankError> {
let mut balance = self.balance.lock().map_err(|_| BankError::AccountLocked)?;
*balance += amount;
println!("Deposited {} to account {}. New balance: {}", amount, self.id, *balance);
Ok(())
}
fn withdraw(&self, amount: u64) -> Result<(), BankError> {
let mut balance = self.balance.lock().map_err(|_| BankError::AccountLocked)?;
if *balance >= amount {
*balance -= amount;
println!("Withdrew {} from account {}. New balance: {}", amount, self.id, *balance);
Ok(())
} else {
Err(BankError::InsufficientFunds)
}
}
fn get_balance(&self) -> Result<u64, BankError> {
let balance = self.balance.lock().map_err(|_| BankError::AccountLocked)?;
Ok(*balance)
}
}
fn main() {
let account = Arc::new(BankAccount::new(1, 1000));
let mut handles = vec![];
// Spawn deposit threads
for i in 0..5 {
let account_clone = Arc::clone(&account);
let handle = thread::spawn(move || {
if let Err(e) = account_clone.deposit(100) {
println!("Deposit failed: {:?}", e);
}
thread::sleep(Duration::from_millis(100));
});
handles.push(handle);
}
// Spawn withdraw threads
for i in 0..3 {
let account_clone = Arc::clone(&account);
let handle = thread::spawn(move || {
if let Err(e) = account_clone.withdraw(200) {
println!("Withdrawal failed: {:?}", e);
}
thread::sleep(Duration::from_millis(150));
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
if let Ok(final_balance) = account.get_balance() {
println!("Final balance: {}", final_balance);
}
}
RwLock<T> - Reader-Writer Lock
RwLock<T>
allows multiple readers or one writer, optimizing for read-heavy workloads.
Basic RwLock Usage
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
fn main() {
let data = Arc::new(RwLock::new(vec![1, 2, 3, 4, 5]));
let mut handles = vec![];
// Spawn reader threads
for i in 0..5 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
let reader = data_clone.read().unwrap();
println!("Reader {}: data = {:?}", i, *reader);
thread::sleep(Duration::from_millis(100));
});
handles.push(handle);
}
// Spawn writer threads
for i in 0..2 {
let data_clone = Arc::clone(&data);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
let mut writer = data_clone.write().unwrap();
writer.push(i + 10);
println!("Writer {}: added {}", i, i + 10);
thread::sleep(Duration::from_millis(50));
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final data: {:?}", *data.read().unwrap());
}
Cache Implementation with RwLock
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
struct Cache<K, V> {
data: RwLock<HashMap<K, (V, Instant)>>,
ttl: Duration,
}
impl<K, V> Cache<K, V>
where
K: Clone + Eq + std::hash::Hash,
V: Clone,
{
fn new(ttl: Duration) -> Self {
Cache {
data: RwLock::new(HashMap::new()),
ttl,
}
}
fn get(&self, key: &K) -> Option<V> {
let data = self.data.read().unwrap();
if let Some((value, timestamp)) = data.get(key) {
if timestamp.elapsed() < self.ttl {
Some(value.clone())
} else {
None
}
} else {
None
}
}
fn insert(&self, key: K, value: V) {
let mut data = self.data.write().unwrap();
data.insert(key, (value, Instant::now()));
}
fn cleanup_expired(&self) {
let mut data = self.data.write().unwrap();
data.retain(|_, (_, timestamp)| timestamp.elapsed() < self.ttl);
}
fn len(&self) -> usize {
let data = self.data.read().unwrap();
data.len()
}
}
fn main() {
let cache = Arc::new(Cache::new(Duration::from_millis(500)));
let mut handles = vec![];
// Writer threads
for i in 0..3 {
let cache_clone = Arc::clone(&cache);
let handle = thread::spawn(move || {
for j in 0..5 {
let key = format!("key_{}_{}", i, j);
let value = format!("value_{}_{}", i, j);
cache_clone.insert(key.clone(), value);
println!("Inserted: {} -> value", key);
thread::sleep(Duration::from_millis(100));
}
});
handles.push(handle);
}
// Reader threads
for i in 0..5 {
let cache_clone = Arc::clone(&cache);
let handle = thread::spawn(move || {
for _ in 0..10 {
let key = format!("key_{}_{}", i % 3, (i + 1) % 5);
match cache_clone.get(&key) {
Some(value) => println!("Reader {}: Found {} -> {}", i, key, value),
None => println!("Reader {}: Key {} not found or expired", i, key),
}
thread::sleep(Duration::from_millis(80));
}
});
handles.push(handle);
}
// Cleanup thread
let cache_cleanup = Arc::clone(&cache);
let cleanup_handle = thread::spawn(move || {
for _ in 0..5 {
thread::sleep(Duration::from_millis(300));
let before = cache_cleanup.len();
cache_cleanup.cleanup_expired();
let after = cache_cleanup.len();
println!("Cleanup: {} -> {} entries", before, after);
}
});
for handle in handles {
handle.join().unwrap();
}
cleanup_handle.join().unwrap();
println!("Final cache size: {}", cache.len());
}
Atomic Types
Atomic types provide lock-free synchronization for simple operations.
Basic Atomic Operations
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let flag = Arc::new(AtomicBool::new(false));
let mut handles = vec![];
// Spawn incrementer threads
for i in 0..10 {
let counter_clone = Arc::clone(&counter);
let flag_clone = Arc::clone(&flag);
let handle = thread::spawn(move || {
for _ in 0..100 {
counter_clone.fetch_add(1, Ordering::SeqCst);
// Set flag when reaching certain values
if counter_clone.load(Ordering::SeqCst) == 500 {
flag_clone.store(true, Ordering::SeqCst);
println!("Thread {} set flag at count 500", i);
}
}
});
handles.push(handle);
}
// Monitor thread
let counter_monitor = Arc::clone(&counter);
let flag_monitor = Arc::clone(&flag);
let monitor_handle = thread::spawn(move || {
while !flag_monitor.load(Ordering::SeqCst) {
let current = counter_monitor.load(Ordering::SeqCst);
println!("Current count: {}", current);
thread::sleep(std::time::Duration::from_millis(100));
}
println!("Flag detected! Final monitoring...");
});
for handle in handles {
handle.join().unwrap();
}
monitor_handle.join().unwrap();
println!("Final counter value: {}", counter.load(Ordering::SeqCst));
println!("Final flag value: {}", flag.load(Ordering::SeqCst));
}
Memory Ordering Examples
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
fn demonstrate_memory_ordering() {
let x = Arc::new(AtomicUsize::new(0));
let y = Arc::new(AtomicUsize::new(0));
// Different memory orderings
let examples = vec![
("Relaxed", Ordering::Relaxed),
("Acquire", Ordering::Acquire),
("Release", Ordering::Release),
("AcqRel", Ordering::AcqRel),
("SeqCst", Ordering::SeqCst),
];
for (name, ordering) in examples {
println!("\n=== {} Ordering ===", name);
x.store(0, Ordering::SeqCst);
y.store(0, Ordering::SeqCst);
let x_clone = Arc::clone(&x);
let y_clone = Arc::clone(&y);
let writer = thread::spawn(move || {
x_clone.store(1, ordering);
y_clone.store(1, ordering);
});
let reader = thread::spawn(move || {
while y.load(ordering) == 0 {
// Spin until y is set
}
let x_val = x.load(ordering);
println!("Reader saw x = {} when y became 1", x_val);
});
writer.join().unwrap();
reader.join().unwrap();
}
}
fn main() {
demonstrate_memory_ordering();
}
Lock-Free Data Structures
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::ptr;
struct Node<T> {
data: T,
next: AtomicPtr<Node<T>>,
}
struct LockFreeStack<T> {
head: AtomicPtr<Node<T>>,
size: AtomicUsize,
}
impl<T> LockFreeStack<T> {
fn new() -> Self {
LockFreeStack {
head: AtomicPtr::new(ptr::null_mut()),
size: AtomicUsize::new(0),
}
}
fn push(&self, data: T) {
let new_node = Box::into_raw(Box::new(Node {
data,
next: AtomicPtr::new(ptr::null_mut()),
}));
loop {
let head = self.head.load(Ordering::Acquire);
unsafe {
(*new_node).next.store(head, Ordering::Relaxed);
}
match self.head.compare_exchange_weak(
head,
new_node,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
self.size.fetch_add(1, Ordering::Relaxed);
break;
}
Err(_) => continue,
}
}
}
fn pop(&self) -> Option<T> {
loop {
let head = self.head.load(Ordering::Acquire);
if head.is_null() {
return None;
}
let next = unsafe { (*head).next.load(Ordering::Relaxed) };
match self.head.compare_exchange_weak(
head,
next,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
self.size.fetch_sub(1, Ordering::Relaxed);
let data = unsafe { Box::from_raw(head).data };
return Some(data);
}
Err(_) => continue,
}
}
}
fn len(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
}
// Note: This is a simplified implementation for demonstration
// Real lock-free data structures require careful handling of memory reclamation
unsafe impl<T: Send> Send for LockFreeStack<T> {}
unsafe impl<T: Send> Sync for LockFreeStack<T> {}
fn main() {
let stack = Arc::new(LockFreeStack::new());
let mut handles = vec![];
// Producer threads
for i in 0..5 {
let stack_clone = Arc::clone(&stack);
let handle = thread::spawn(move || {
for j in 0..10 {
let value = i * 10 + j;
stack_clone.push(value);
println!("Pushed: {}", value);
}
});
handles.push(handle);
}
// Consumer threads
for _ in 0..3 {
let stack_clone = Arc::clone(&stack);
let handle = thread::spawn(move || {
for _ in 0..15 {
if let Some(value) = stack_clone.pop() {
println!("Popped: {}", value);
}
thread::sleep(std::time::Duration::from_millis(10));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Remaining items in stack: {}", stack.len());
while let Some(value) = stack.pop() {
println!("Final pop: {}", value);
}
}
Condition Variables
Condvar
allows threads to wait for certain conditions to be met.
Basic Condvar Usage
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
fn main() {
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let pair_clone = Arc::clone(&pair);
// Waiting thread
let waiter = thread::spawn(move || {
let (lock, cvar) = &*pair_clone;
let mut started = lock.lock().unwrap();
while !*started {
println!("Waiting for condition...");
started = cvar.wait(started).unwrap();
}
println!("Condition met! Continuing execution.");
});
// Signaling thread
let signaler = thread::spawn(move || {
thread::sleep(Duration::from_millis(1000));
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
println!("Condition signaled!");
});
waiter.join().unwrap();
signaler.join().unwrap();
}
Producer-Consumer with Condvar
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time::Duration;
struct BoundedQueue<T> {
queue: Mutex<VecDeque<T>>,
not_empty: Condvar,
not_full: Condvar,
capacity: usize,
}
impl<T> BoundedQueue<T> {
fn new(capacity: usize) -> Self {
BoundedQueue {
queue: Mutex::new(VecDeque::new()),
not_empty: Condvar::new(),
not_full: Condvar::new(),
capacity,
}
}
fn push(&self, item: T) {
let mut queue = self.queue.lock().unwrap();
while queue.len() == self.capacity {
queue = self.not_full.wait(queue).unwrap();
}
queue.push_back(item);
self.not_empty.notify_one();
}
fn pop(&self) -> T {
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() {
queue = self.not_empty.wait(queue).unwrap();
}
let item = queue.pop_front().unwrap();
self.not_full.notify_one();
item
}
fn try_pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
if let Some(item) = queue.pop_front() {
self.not_full.notify_one();
Some(item)
} else {
None
}
}
fn len(&self) -> usize {
let queue = self.queue.lock().unwrap();
queue.len()
}
}
fn main() {
let queue = Arc::new(BoundedQueue::new(5));
let mut handles = vec![];
// Producer threads
for i in 0..3 {
let queue_clone = Arc::clone(&queue);
let handle = thread::spawn(move || {
for j in 0..10 {
let item = format!("item_{}_{}", i, j);
println!("Producing: {}", item);
queue_clone.push(item);
thread::sleep(Duration::from_millis(100));
}
println!("Producer {} finished", i);
});
handles.push(handle);
}
// Consumer threads
for i in 0..2 {
let queue_clone = Arc::clone(&queue);
let handle = thread::spawn(move || {
for _ in 0..15 {
let item = queue_clone.pop();
println!("Consumer {} consumed: {}", i, item);
thread::sleep(Duration::from_millis(150));
}
println!("Consumer {} finished", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final queue length: {}", queue.len());
}
Barrier Synchronization
Barrier
allows multiple threads to synchronize at a specific point.
Basic Barrier Usage
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
fn main() {
let num_threads = 5;
let barrier = Arc::new(Barrier::new(num_threads));
let mut handles = vec![];
for i in 0..num_threads {
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// Phase 1: Setup work
println!("Thread {} doing setup work...", i);
thread::sleep(Duration::from_millis((i as u64) * 100 + 500));
println!("Thread {} finished setup", i);
// Wait for all threads to complete setup
let barrier_result = barrier_clone.wait();
if barrier_result.is_leader() {
println!("Thread {} is the leader after setup phase", i);
}
// Phase 2: Main work (all threads start together)
println!("Thread {} starting main work...", i);
thread::sleep(Duration::from_millis(300));
println!("Thread {} finished main work", i);
// Wait for all threads to complete main work
let barrier_result = barrier_clone.wait();
if barrier_result.is_leader() {
println!("Thread {} is the leader after main phase", i);
}
// Phase 3: Cleanup (all threads start together)
println!("Thread {} doing cleanup...", i);
thread::sleep(Duration::from_millis(100));
println!("Thread {} finished cleanup", i);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("All threads completed all phases!");
}
Parallel Algorithm with Barriers
use std::sync::{Arc, Barrier, Mutex};
use std::thread;
fn parallel_sum_with_barriers(data: Vec<i32>, num_threads: usize) -> i32 {
let chunk_size = (data.len() + num_threads - 1) / num_threads;
let data = Arc::new(data);
let barrier = Arc::new(Barrier::new(num_threads));
let partial_sums = Arc::new(Mutex::new(vec![0; num_threads]));
let mut handles = vec![];
for thread_id in 0..num_threads {
let data_clone = Arc::clone(&data);
let barrier_clone = Arc::clone(&barrier);
let partial_sums_clone = Arc::clone(&partial_sums);
let handle = thread::spawn(move || {
let start = thread_id * chunk_size;
let end = ((thread_id + 1) * chunk_size).min(data_clone.len());
// Phase 1: Compute partial sum
let mut local_sum = 0;
for i in start..end {
local_sum += data_clone[i];
}
println!("Thread {} computed partial sum: {} for range {}..{}",
thread_id, local_sum, start, end);
// Store partial sum
{
let mut sums = partial_sums_clone.lock().unwrap();
sums[thread_id] = local_sum;
}
// Wait for all threads to complete their partial sums
barrier_clone.wait();
// Phase 2: One thread computes final sum
if thread_id == 0 {
let sums = partial_sums_clone.lock().unwrap();
let total_sum: i32 = sums.iter().sum();
println!("Final sum: {}", total_sum);
return total_sum;
}
0 // Other threads return 0
});
handles.push(handle);
}
// Collect results and return the sum from thread 0
let results: Vec<i32> = handles.into_iter()
.map(|h| h.join().unwrap())
.collect();
results[0] // Thread 0 computed the final sum
}
fn main() {
let data: Vec<i32> = (1..=100).collect();
let num_threads = 4;
println!("Computing sum of 1..=100 using {} threads", num_threads);
let result = parallel_sum_with_barriers(data.clone(), num_threads);
let expected: i32 = data.iter().sum();
println!("Parallel result: {}", result);
println!("Expected result: {}", expected);
assert_eq!(result, expected);
}
Advanced Synchronization Patterns
Read-Copy-Update (RCU) Pattern
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
struct RcuData<T> {
data: Arc<RwLock<Arc<T>>>,
}
impl<T> RcuData<T>
where
T: Clone,
{
fn new(initial_data: T) -> Self {
RcuData {
data: Arc::new(RwLock::new(Arc::new(initial_data))),
}
}
fn read(&self) -> Arc<T> {
let guard = self.data.read().unwrap();
Arc::clone(&*guard)
}
fn update<F>(&self, updater: F)
where
F: FnOnce(&T) -> T,
{
let current = {
let guard = self.data.read().unwrap();
Arc::clone(&*guard)
};
let new_data = updater(&*current);
let new_arc = Arc::new(new_data);
{
let mut guard = self.data.write().unwrap();
*guard = new_arc;
}
}
}
fn main() {
let config = Arc::new(RcuData::new(vec![1, 2, 3, 4, 5]));
let mut handles = vec![];
// Reader threads
for i in 0..5 {
let config_clone = Arc::clone(&config);
let handle = thread::spawn(move || {
for j in 0..10 {
let data = config_clone.read();
println!("Reader {} iteration {}: {:?}", i, j, *data);
thread::sleep(Duration::from_millis(100));
}
});
handles.push(handle);
}
// Writer thread
let config_writer = Arc::clone(&config);
let writer_handle = thread::spawn(move || {
for i in 0..5 {
thread::sleep(Duration::from_millis(200));
config_writer.update(|current| {
let mut new_data = current.clone();
new_data.push(i + 10);
println!("Writer: Added {}", i + 10);
new_data
});
}
});
for handle in handles {
handle.join().unwrap();
}
writer_handle.join().unwrap();
let final_data = config.read();
println!("Final data: {:?}", *final_data);
}
Best Practices
1. Choose the Right Synchronization Primitive
use std::sync::{Arc, Mutex, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
// Use Mutex for exclusive access
struct Counter {
value: Mutex<usize>,
}
// Use RwLock for read-heavy workloads
struct Config {
data: RwLock<std::collections::HashMap<String, String>>,
}
// Use atomics for simple counters/flags
struct Stats {
requests: AtomicUsize,
errors: AtomicUsize,
}
impl Stats {
fn increment_requests(&self) {
self.requests.fetch_add(1, Ordering::Relaxed);
}
fn increment_errors(&self) {
self.errors.fetch_add(1, Ordering::Relaxed);
}
fn get_stats(&self) -> (usize, usize) {
(
self.requests.load(Ordering::Relaxed),
self.errors.load(Ordering::Relaxed),
)
}
}
2. Avoid Deadlocks
use std::sync::{Arc, Mutex};
use std::thread;
// Good: Consistent lock ordering
fn transfer_funds(
from: &Arc<Mutex<u64>>,
to: &Arc<Mutex<u64>>,
amount: u64,
) -> Result<(), &'static str> {
// Always acquire locks in the same order (by address)
let (first, second) = if Arc::as_ptr(from) < Arc::as_ptr(to) {
(from, to)
} else {
(to, from)
};
let mut first_guard = first.lock().unwrap();
let mut second_guard = second.lock().unwrap();
// Determine which is from and which is to
let (from_balance, to_balance) = if Arc::ptr_eq(from, first) {
(&mut *first_guard, &mut *second_guard)
} else {
(&mut *second_guard, &mut *first_guard)
};
if *from_balance >= amount {
*from_balance -= amount;
*to_balance += amount;
Ok(())
} else {
Err("Insufficient funds")
}
}
fn main() {
let account1 = Arc::new(Mutex::new(1000));
let account2 = Arc::new(Mutex::new(500));
let mut handles = vec![];
// Concurrent transfers in both directions
for i in 0..5 {
let acc1 = Arc::clone(&account1);
let acc2 = Arc::clone(&account2);
let handle = thread::spawn(move || {
if i % 2 == 0 {
transfer_funds(&acc1, &acc2, 100).ok();
} else {
transfer_funds(&acc2, &acc1, 50).ok();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Account 1: {}", *account1.lock().unwrap());
println!("Account 2: {}", *account2.lock().unwrap());
}
3. Use RAII for Lock Management
use std::sync::{Arc, Mutex, MutexGuard};
struct SafeContainer<T> {
data: Arc<Mutex<T>>,
}
impl<T> SafeContainer<T> {
fn new(data: T) -> Self {
SafeContainer {
data: Arc::new(Mutex::new(data)),
}
}
fn with_lock<F, R>(&self, operation: F) -> R
where
F: FnOnce(&mut T) -> R,
{
let mut guard = self.data.lock().unwrap();
operation(&mut *guard)
// Lock is automatically released when guard goes out of scope
}
fn try_with_lock<F, R>(&self, operation: F) -> Option<R>
where
F: FnOnce(&mut T) -> R,
{
if let Ok(mut guard) = self.data.try_lock() {
Some(operation(&mut *guard))
} else {
None
}
}
}
fn main() {
let container = SafeContainer::new(vec![1, 2, 3]);
container.with_lock(|data| {
data.push(4);
data.push(5);
});
let result = container.with_lock(|data| data.len());
println!("Container length: {}", result);
}
4. Handle Poisoned Mutexes
use std::sync::{Arc, Mutex, PoisonError};
use std::thread;
fn safe_increment(counter: &Arc<Mutex<i32>>) -> Result<i32, String> {
match counter.lock() {
Ok(mut guard) => {
*guard += 1;
Ok(*guard)
}
Err(poisoned) => {
// Recover from poisoned mutex
let mut guard = poisoned.into_inner();
*guard += 1;
Err(format!("Mutex was poisoned, but recovered. New value: {}", *guard))
}
}
}
fn main() {
let counter = Arc::new(Mutex::new(0));
// Thread that will panic and poison the mutex
let counter_clone = Arc::clone(&counter);
let panic_handle = thread::spawn(move || {
let _guard = counter_clone.lock().unwrap();
panic!("This will poison the mutex!");
});
// Wait for the panic
let _ = panic_handle.join();
// Try to use the poisoned mutex
match safe_increment(&counter) {
Ok(value) => println!("Successfully incremented to: {}", value),
Err(error) => println!("Error: {}", error),
}
}
5. Prefer Smaller Critical Sections
use std::sync::{Arc, Mutex};
use std::thread;
struct Statistics {
data: Mutex<(usize, usize)>, // (count, sum)
}
impl Statistics {
fn new() -> Self {
Statistics {
data: Mutex::new((0, 0)),
}
}
// Good: Minimal critical section
fn add_value(&self, value: usize) {
let (new_count, new_sum) = {
let mut data = self.data.lock().unwrap();
data.0 += 1;
data.1 += value;
*data
}; // Lock released here
// Expensive operations outside the lock
if new_count % 100 == 0 {
println!("Processed {} values, sum: {}", new_count, new_sum);
}
}
fn get_average(&self) -> f64 {
let (count, sum) = {
let data = self.data.lock().unwrap();
*data
}; // Lock released immediately
if count > 0 {
sum as f64 / count as f64
} else {
0.0
}
}
}
fn main() {
let stats = Arc::new(Statistics::new());
let mut handles = vec![];
for i in 0..10 {
let stats_clone = Arc::clone(&stats);
let handle = thread::spawn(move || {
for j in 0..50 {
stats_clone.add_value(i * 50 + j);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final average: {:.2}", stats.get_average());
}
Shared state concurrency in Rust provides powerful tools for coordination between threads while maintaining safety through the type system. Choose the appropriate synchronization primitive based on your access patterns and performance requirements. Remember that Rust's ownership system prevents data races at compile time, making concurrent programming safer and more reliable.