Master Java Thread Safety and Concurrent Programming
Java Thread Safety
Thread safety is a critical concept in concurrent programming that ensures correct behavior when multiple threads access shared data simultaneously. A piece of code is thread-safe if it functions correctly during simultaneous execution by multiple threads, maintaining data integrity and avoiding race conditions.
Understanding Thread Safety
Thread safety issues arise when multiple threads access shared mutable state without proper coordination:
Thread Safety Problems:
- Race Conditions: Multiple threads modifying data simultaneously
- Data Corruption: Partial updates visible to other threads
- Visibility Issues: Changes made by one thread not seen by others
- Atomicity Violations: Compound operations interrupted mid-execution
- Deadlocks: Threads waiting for each other indefinitely
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadSafetyBasics {
// Unsafe counter - not thread-safe
static class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // Read-modify-write operation (not atomic)
}
public int getCount() {
return count;
}
}
// Thread-safe counter using synchronization
static class SafeCounter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
// Thread-safe counter using atomic operations
static class AtomicCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet();
}
public int getCount() {
return count.get();
}
}
public static void demonstrateThreadSafetyIssues() throws InterruptedException {
System.out.println("=== Thread Safety Demonstration ===");
UnsafeCounter unsafe = new UnsafeCounter();
SafeCounter safe = new SafeCounter();
AtomicCounter atomic = new AtomicCounter();
int numThreads = 10;
int incrementsPerThread = 1000;
int expectedTotal = numThreads * incrementsPerThread;
// Test unsafe counter
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
unsafe.increment();
}
});
}
long startTime = System.currentTimeMillis();
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
long unsafeTime = System.currentTimeMillis() - startTime;
System.out.println("Unsafe counter: " + unsafe.getCount() + " (expected: " + expectedTotal +
") Time: " + unsafeTime + "ms");
// Test safe counter
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
safe.increment();
}
});
}
startTime = System.currentTimeMillis();
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
long safeTime = System.currentTimeMillis() - startTime;
System.out.println("Safe counter: " + safe.getCount() + " (expected: " + expectedTotal +
") Time: " + safeTime + "ms");
// Test atomic counter
for (int i = 0; i < numThreads; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
atomic.increment();
}
});
}
startTime = System.currentTimeMillis();
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
long atomicTime = System.currentTimeMillis() - startTime;
System.out.println("Atomic counter: " + atomic.getCount() + " (expected: " + expectedTotal +
") Time: " + atomicTime + "ms");
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateThreadSafetyIssues();
}
}
Immutability for Thread Safety
Immutable objects are inherently thread-safe because they cannot be modified after creation:
import java.util.*;
public class ImmutabilityExample {
// Immutable class example
public static final class ImmutablePerson {
private final String name;
private final int age;
private final List<String> hobbies;
public ImmutablePerson(String name, int age, List<String> hobbies) {
this.name = name;
this.age = age;
// Defensive copy to ensure immutability
this.hobbies = Collections.unmodifiableList(new ArrayList<>(hobbies));
}
public String getName() { return name; }
public int getAge() { return age; }
public List<String> getHobbies() { return hobbies; }
// Builder pattern for complex immutable objects
public static class Builder {
private String name;
private int age;
private List<String> hobbies = new ArrayList<>();
public Builder setName(String name) { this.name = name; return this; }
public Builder setAge(int age) { this.age = age; return this; }
public Builder addHobby(String hobby) { this.hobbies.add(hobby); return this; }
public ImmutablePerson build() {
return new ImmutablePerson(name, age, hobbies);
}
}
@Override
public String toString() {
return "ImmutablePerson{name='" + name + "', age=" + age + ", hobbies=" + hobbies + "}";
}
}
// Effectively immutable class
public static class EffectivelyImmutablePoint {
private final int x, y;
public EffectivelyImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() { return x; }
public int getY() { return y; }
public EffectivelyImmutablePoint move(int dx, int dy) {
return new EffectivelyImmutablePoint(x + dx, y + dy);
}
@Override
public String toString() {
return "Point(" + x + ", " + y + ")";
}
}
public static void demonstrateImmutability() throws InterruptedException {
System.out.println("=== Immutability for Thread Safety ===");
// Create immutable person
ImmutablePerson person = new ImmutablePerson.Builder()
.setName("John Doe")
.setAge(30)
.addHobby("Reading")
.addHobby("Programming")
.build();
// Multiple threads can safely read immutable objects
Thread[] readers = new Thread[5];
for (int i = 0; i < readers.length; i++) {
final int threadId = i;
readers[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
System.out.println("Thread " + threadId + " reads: " + person);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
for (Thread reader : readers) reader.start();
for (Thread reader : readers) reader.join();
// Demonstrate effectively immutable operations
EffectivelyImmutablePoint point = new EffectivelyImmutablePoint(0, 0);
System.out.println("Original point: " + point);
EffectivelyImmutablePoint moved = point.move(5, 3);
System.out.println("Moved point: " + moved);
System.out.println("Original unchanged: " + point);
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateImmutability();
}
}
Atomic Operations and Variables
Java provides atomic classes for lock-free thread-safe operations:
import java.util.concurrent.atomic.*;
public class AtomicOperationsExample {
public static void demonstrateAtomicVariables() throws InterruptedException {
System.out.println("=== Atomic Variables ===");
AtomicInteger counter = new AtomicInteger(0);
AtomicLong longCounter = new AtomicLong(0);
AtomicBoolean flag = new AtomicBoolean(false);
AtomicReference<String> reference = new AtomicReference<>("initial");
Thread[] threads = new Thread[5];
// Atomic integer operations
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.incrementAndGet();
longCounter.addAndGet(2);
}
});
}
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
System.out.println("Counter: " + counter.get() + " (expected: 5000)");
System.out.println("Long counter: " + longCounter.get() + " (expected: 10000)");
// Compare-and-swap operations
boolean success = counter.compareAndSet(5000, 0);
System.out.println("CAS reset to 0: " + success + ", new value: " + counter.get());
// Atomic reference operations
String oldValue = reference.getAndSet("updated");
System.out.println("Reference updated: old=" + oldValue + ", new=" + reference.get());
System.out.println();
}
// Thread-safe accumulator using atomic operations
static class ThreadSafeAccumulator {
private final AtomicLong sum = new AtomicLong(0);
private final AtomicInteger count = new AtomicInteger(0);
public void add(long value) {
sum.addAndGet(value);
count.incrementAndGet();
}
public double getAverage() {
int currentCount = count.get();
return currentCount == 0 ? 0.0 : (double) sum.get() / currentCount;
}
public long getSum() { return sum.get(); }
public int getCount() { return count.get(); }
}
public static void demonstrateAtomicAccumulator() throws InterruptedException {
System.out.println("=== Atomic Accumulator ===");
ThreadSafeAccumulator accumulator = new ThreadSafeAccumulator();
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 1; j <= 100; j++) {
accumulator.add(threadId * 100 + j);
try {
Thread.sleep(1); // Simulate some work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
System.out.println("Total sum: " + accumulator.getSum());
System.out.println("Count: " + accumulator.getCount());
System.out.println("Average: " + String.format("%.2f", accumulator.getAverage()));
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateAtomicVariables();
demonstrateAtomicAccumulator();
}
}
Thread-Safe Design Patterns
Common patterns for achieving thread safety:
import java.util.concurrent.*;
import java.util.*;
public class ThreadSafeDesignPatterns {
// 1. Thread confinement pattern
static class ThreadLocalExample {
private static final ThreadLocal<Integer> threadLocalValue = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;
}
};
public static void increment() {
threadLocalValue.set(threadLocalValue.get() + 1);
}
public static int getValue() {
return threadLocalValue.get();
}
public static void cleanup() {
threadLocalValue.remove();
}
}
// 2. Safe publication pattern
static class SafePublication {
private final Object state;
// Safe publication through constructor
public SafePublication(Object state) {
this.state = state; // Published safely due to final field
}
public Object getState() {
return state;
}
// Safe publication through static factory
private static volatile SafePublication instance;
public static SafePublication getInstance(Object state) {
if (instance == null) {
synchronized (SafePublication.class) {
if (instance == null) {
instance = new SafePublication(state);
}
}
}
return instance;
}
}
// 3. Copy-on-write pattern
static class CopyOnWriteMap<K, V> {
private volatile Map<K, V> map = new HashMap<>();
public synchronized V put(K key, V value) {
Map<K, V> newMap = new HashMap<>(map);
V oldValue = newMap.put(key, value);
map = newMap; // Atomic reference update
return oldValue;
}
public V get(K key) {
return map.get(key); // No synchronization needed for reads
}
public Set<Map.Entry<K, V>> entrySet() {
return map.entrySet(); // Snapshot view
}
}
// 4. Double-checked locking pattern
static class Singleton {
private static volatile Singleton instance;
private Singleton() {}
public static Singleton getInstance() {
if (instance == null) { // First check (no locking)
synchronized (Singleton.class) {
if (instance == null) { // Second check (with locking)
instance = new Singleton();
}
}
}
return instance;
}
}
// 5. Producer-consumer with bounded buffer
static class BoundedBuffer<T> {
private final T[] buffer;
private int head = 0, tail = 0, count = 0;
@SuppressWarnings("unchecked")
public BoundedBuffer(int capacity) {
buffer = (T[]) new Object[capacity];
}
public synchronized void put(T item) throws InterruptedException {
while (count == buffer.length) {
wait(); // Buffer is full
}
buffer[tail] = item;
tail = (tail + 1) % buffer.length;
count++;
notifyAll(); // Wake up waiting consumers
}
public synchronized T take() throws InterruptedException {
while (count == 0) {
wait(); // Buffer is empty
}
T item = buffer[head];
buffer[head] = null; // Help GC
head = (head + 1) % buffer.length;
count--;
notifyAll(); // Wake up waiting producers
return item;
}
public synchronized int size() {
return count;
}
}
public static void demonstrateThreadLocalPattern() throws InterruptedException {
System.out.println("=== Thread Local Pattern ===");
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 5; j++) {
ThreadLocalExample.increment();
System.out.println("Thread " + threadId + " value: " +
ThreadLocalExample.getValue());
}
ThreadLocalExample.cleanup();
});
}
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
System.out.println();
}
public static void demonstrateBoundedBuffer() throws InterruptedException {
System.out.println("=== Bounded Buffer Pattern ===");
BoundedBuffer<String> buffer = new BoundedBuffer<>(3);
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 7; i++) {
String item = "Item-" + i;
buffer.put(item);
System.out.println("Produced: " + item + " (buffer size: " + buffer.size() + ")");
Thread.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 7; i++) {
Thread.sleep(500); // Slower than producer
String item = buffer.take();
System.out.println("Consumed: " + item + " (buffer size: " + buffer.size() + ")");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateThreadLocalPattern();
demonstrateBoundedBuffer();
// Test singleton
System.out.println("=== Singleton Pattern ===");
Singleton s1 = Singleton.getInstance();
Singleton s2 = Singleton.getInstance();
System.out.println("Same instance: " + (s1 == s2));
System.out.println();
}
}
Best Practices and Guidelines
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ThreadSafetyBestPractices {
// Best Practice 1: Prefer immutability
public static final class ImmutableCache<K, V> {
private final ConcurrentHashMap<K, V> cache;
public ImmutableCache(ConcurrentHashMap<K, V> cache) {
this.cache = new ConcurrentHashMap<>(cache);
}
public V get(K key) {
return cache.get(key);
}
public ImmutableCache<K, V> put(K key, V value) {
ConcurrentHashMap<K, V> newCache = new ConcurrentHashMap<>(cache);
newCache.put(key, value);
return new ImmutableCache<>(newCache);
}
public int size() {
return cache.size();
}
}
// Best Practice 2: Use appropriate synchronization level
static class ReadWriteCache<K, V> {
private final Map<K, V> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
public V get(K key) {
lock.readLock().lock();
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(K key, V value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public int size() {
lock.readLock().lock();
try {
return cache.size();
} finally {
lock.readLock().unlock();
}
}
}
// Best Practice 3: Minimize synchronization scope
static class MinimalSyncExample {
private final Object lock = new Object();
private int counter = 0;
private String lastUpdate = "";
// BAD: Over-synchronization
public synchronized void badUpdate(String info) {
counter++;
performExpensiveOperation(); // Don't hold lock during expensive operations
lastUpdate = info;
}
// GOOD: Minimal synchronization
public void goodUpdate(String info) {
performExpensiveOperation(); // Do expensive work outside sync block
synchronized (lock) {
counter++;
lastUpdate = info;
}
}
private void performExpensiveOperation() {
try {
Thread.sleep(100); // Simulate expensive operation
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public synchronized int getCounter() { return counter; }
public synchronized String getLastUpdate() { return lastUpdate; }
}
// Best Practice 4: Document thread safety guarantees
/**
* Thread-safe statistics collector.
* All methods are thread-safe and can be called concurrently.
* Statistics are updated atomically.
*/
static class ThreadSafeStatistics {
private final AtomicLong count = new AtomicLong(0);
private final AtomicLong sum = new AtomicLong(0);
private final AtomicReference<Double> min = new AtomicReference<>(Double.MAX_VALUE);
private final AtomicReference<Double> max = new AtomicReference<>(Double.MIN_VALUE);
/**
* Adds a value to the statistics. Thread-safe.
* @param value the value to add
*/
public void addValue(double value) {
count.incrementAndGet();
sum.addAndGet((long) value);
updateMin(value);
updateMax(value);
}
private void updateMin(double value) {
min.updateAndGet(current -> Math.min(current, value));
}
private void updateMax(double value) {
max.updateAndGet(current -> Math.max(current, value));
}
/** Returns the count of values added. Thread-safe. */
public long getCount() { return count.get(); }
/** Returns the average of all values. Thread-safe. */
public double getAverage() {
long currentCount = count.get();
return currentCount == 0 ? 0.0 : (double) sum.get() / currentCount;
}
/** Returns the minimum value. Thread-safe. */
public double getMin() { return min.get(); }
/** Returns the maximum value. Thread-safe. */
public double getMax() { return max.get(); }
}
public static void demonstrateBestPractices() throws InterruptedException {
System.out.println("=== Thread Safety Best Practices ===");
// Test thread-safe statistics
ThreadSafeStatistics stats = new ThreadSafeStatistics();
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
stats.addValue(threadId * 100 + j);
}
});
}
for (Thread thread : threads) thread.start();
for (Thread thread : threads) thread.join();
System.out.println("Statistics:");
System.out.println(" Count: " + stats.getCount());
System.out.println(" Average: " + String.format("%.2f", stats.getAverage()));
System.out.println(" Min: " + stats.getMin());
System.out.println(" Max: " + stats.getMax());
System.out.println();
}
public static void printGuidelines() {
System.out.println("=== Thread Safety Guidelines ===");
System.out.println("1. Prefer immutable objects");
System.out.println("2. Use thread-safe collections (ConcurrentHashMap, etc.)");
System.out.println("3. Minimize shared mutable state");
System.out.println("4. Use atomic operations for simple state");
System.out.println("5. Synchronize access to shared mutable state");
System.out.println("6. Use appropriate synchronization primitives");
System.out.println("7. Avoid holding locks during expensive operations");
System.out.println("8. Document thread safety guarantees");
System.out.println("9. Use ThreadLocal for thread-specific data");
System.out.println("10. Test with multiple threads");
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateBestPractices();
printGuidelines();
}
}
Summary
Thread safety is essential for reliable concurrent Java applications:
Key Principles:
- Immutability: Immutable objects are inherently thread-safe
- Atomic Operations: Use atomic classes for lock-free thread safety
- Synchronization: Coordinate access to shared mutable state
- Confinement: Keep data thread-local when possible
- Safe Publication: Properly publish objects to other threads
Thread Safety Techniques:
- Synchronization: synchronized blocks/methods
- Atomic Variables: AtomicInteger, AtomicReference, etc.
- Concurrent Collections: ConcurrentHashMap, BlockingQueue
- Locks: ReentrantLock, ReadWriteLock for advanced scenarios
- ThreadLocal: Thread-specific storage
Design Patterns:
- Immutable Objects: No state changes after construction
- Thread Confinement: Data accessed by single thread only
- Safe Publication: Proper object publication to other threads
- Copy-on-Write: Create copies for modifications
- Producer-Consumer: Bounded buffers with blocking operations
Best Practices:
- Minimize Shared State: Reduce need for synchronization
- Use Appropriate Tools: Match synchronization to use case
- Document Thread Safety: Clear contracts and guarantees
- Test Thoroughly: Use stress testing and code review
- Avoid Common Pitfalls: Double-checked locking, improper publication
Thread safety requires careful design and implementation but is crucial for building robust concurrent applications.