Master Java Concurrent Collections and Thread-Safe Data Structures
Java Concurrent Collections
Concurrent collections are thread-safe data structures designed for high-performance multi-threaded environments. Unlike synchronized collections that use simple locking, concurrent collections employ sophisticated algorithms like lock-free programming, compare-and-swap operations, and segment-based locking to achieve better scalability and performance.
Why Concurrent Collections?
Traditional collections are not thread-safe, and naive synchronization approaches have significant limitations:
Problems with Traditional Approaches:
- Not Thread-Safe: ArrayList, HashMap, etc. can become corrupted under concurrent access
- Synchronized Wrappers: Collections.synchronizedMap() provides safety but poor performance
- Coarse-Grained Locking: Entire collection locked during any operation
- Iteration Issues: ConcurrentModificationException during concurrent iteration/modification
- Blocking Operations: No built-in support for producer-consumer patterns
Concurrent Collections Benefits:
- Thread-Safe: Designed for concurrent access from multiple threads
- High Performance: Fine-grained locking or lock-free algorithms
- Scalable: Performance doesn't degrade significantly with thread count
- Fail-Safe Iteration: Iterators work on snapshots, no ConcurrentModificationException
- Rich APIs: Specialized methods for concurrent operations
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentCollectionsBasics {
public static void demonstrateProblems() throws InterruptedException {
System.out.println("=== Problems with Non-Concurrent Collections ===");
// Unsafe HashMap
Map<String, Integer> unsafeMap = new HashMap<>();
// Multiple threads modifying HashMap can corrupt it
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
// This can cause infinite loops or data corruption
unsafeMap.put("key" + threadId + "-" + j, j);
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("Unsafe map size: " + unsafeMap.size() + " (expected: 5000)");
// Synchronized wrapper - safe but slow
Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
long startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
syncMap.put("sync-key" + threadId + "-" + j, j);
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long syncTime = System.currentTimeMillis() - startTime;
System.out.println("Synchronized map size: " + syncMap.size() + " (time: " + syncTime + "ms)");
System.out.println();
}
public static void demonstrateConcurrentSolution() throws InterruptedException {
System.out.println("=== Concurrent Collections Solution ===");
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
Thread[] threads = new Thread[5];
long startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
concurrentMap.put("concurrent-key" + threadId + "-" + j, j);
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long concurrentTime = System.currentTimeMillis() - startTime;
System.out.println("Concurrent map size: " + concurrentMap.size() + " (time: " + concurrentTime + "ms)");
System.out.println("Performance improvement: Much faster and always correct");
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateProblems();
demonstrateConcurrentSolution();
}
}
ConcurrentHashMap
ConcurrentHashMap is the most commonly used concurrent collection, offering thread-safe key-value storage with excellent performance:
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentHashMapExample {
public static void demonstrateBasicOperations() {
System.out.println("=== ConcurrentHashMap Basic Operations ===");
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// Basic operations are thread-safe
map.put("apple", 10);
map.put("banana", 20);
map.put("cherry", 30);
System.out.println("Initial map: " + map);
// Atomic operations
Integer oldValue = map.replace("apple", 15);
System.out.println("Replaced apple: old=" + oldValue + ", new=" + map.get("apple"));
// Conditional operations
boolean replaced = map.replace("banana", 20, 25);
System.out.println("Conditional replace banana: " + replaced);
Integer absent = map.putIfAbsent("date", 40);
System.out.println("Put if absent 'date': " + absent + ", map=" + map);
Integer present = map.putIfAbsent("apple", 100);
System.out.println("Put if absent 'apple': " + present + " (should be 15)");
// Remove operations
boolean removed = map.remove("cherry", 30);
System.out.println("Conditional remove cherry: " + removed);
System.out.println("Final map: " + map);
System.out.println();
}
public static void demonstrateAtomicOperations() throws InterruptedException {
System.out.println("=== Atomic Operations ===");
ConcurrentHashMap<String, Integer> counters = new ConcurrentHashMap<>();
counters.put("counter1", 0);
counters.put("counter2", 0);
// Multiple threads incrementing counters
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
// Atomic increment using compute methods
counters.compute("counter1", (key, val) -> val == null ? 1 : val + 1);
// Alternative using merge
counters.merge("counter2", 1, Integer::sum);
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("Counter1 (compute): " + counters.get("counter1"));
System.out.println("Counter2 (merge): " + counters.get("counter2"));
System.out.println("Expected: 10000 each");
System.out.println();
}
public static void demonstrateComputeMethods() {
System.out.println("=== Compute Methods ===");
ConcurrentHashMap<String, List<String>> groupedData = new ConcurrentHashMap<>();
// compute() - always computes new value
groupedData.compute("fruits", (key, list) -> {
List<String> newList = list == null ? new ArrayList<>() : new ArrayList<>(list);
newList.add("apple");
return newList;
});
// computeIfAbsent() - only compute if key is absent
groupedData.computeIfAbsent("vegetables", k -> new ArrayList<>()).add("carrot");
groupedData.computeIfAbsent("fruits", k -> new ArrayList<>()).add("banana");
// computeIfPresent() - only compute if key is present
groupedData.computeIfPresent("fruits", (key, list) -> {
list.add("cherry");
return list;
});
groupedData.computeIfPresent("meat", (key, list) -> {
list.add("beef"); // Won't execute because "meat" key doesn't exist
return list;
});
System.out.println("Grouped data: " + groupedData);
// Practical example: word frequency counting
String text = "apple banana apple cherry banana apple";
ConcurrentHashMap<String, Integer> wordCount = new ConcurrentHashMap<>();
Arrays.stream(text.split(" "))
.parallel() // Can be safely processed in parallel
.forEach(word -> wordCount.merge(word, 1, Integer::sum));
System.out.println("Word frequencies: " + wordCount);
System.out.println();
}
public static void demonstrateIterationSafety() throws InterruptedException {
System.out.println("=== Safe Iteration ===");
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// Populate map
for (int i = 0; i < 100; i++) {
map.put("key" + i, i);
}
// Start a thread that modifies the map during iteration
Thread modifier = new Thread(() -> {
try {
Thread.sleep(100);
for (int i = 100; i < 200; i++) {
map.put("newkey" + i, i);
Thread.sleep(1); // Slow down to overlap with iteration
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
modifier.start();
// Iterate while map is being modified - no ConcurrentModificationException
System.out.println("Iterating while map is being modified:");
int count = 0;
for (Map.Entry<String, Integer> entry : map.entrySet()) {
count++;
if (count % 20 == 0) {
System.out.println("Processed " + count + " entries, current: " + entry.getKey());
try {
Thread.sleep(50); // Give modifier time to add entries
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
modifier.join();
System.out.println("Final map size: " + map.size());
System.out.println("Iteration completed without exceptions");
System.out.println();
}
public static void demonstratePerformanceComparison() throws InterruptedException {
System.out.println("=== Performance Comparison ===");
int numThreads = 8;
int operationsPerThread = 10000;
// Test synchronized HashMap
Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
long syncTime = testMapPerformance(syncMap, numThreads, operationsPerThread, "Synchronized HashMap");
// Test ConcurrentHashMap
ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
long concurrentTime = testMapPerformance(concurrentMap, numThreads, operationsPerThread, "ConcurrentHashMap");
System.out.println("Performance improvement: " +
(double) syncTime / concurrentTime + "x faster");
System.out.println();
}
private static long testMapPerformance(Map<String, Integer> map, int numThreads,
int operationsPerThread, String mapType) throws InterruptedException {
Thread[] threads = new Thread[numThreads];
long startTime = System.currentTimeMillis();
for (int i = 0; i < numThreads; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < operationsPerThread; j++) {
String key = "key" + threadId + "-" + j;
map.put(key, j);
map.get(key);
if (j % 2 == 0) {
map.remove(key);
}
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
long endTime = System.currentTimeMillis();
System.out.println(mapType + " time: " + (endTime - startTime) + "ms");
return endTime - startTime;
}
public static void main(String[] args) throws InterruptedException {
demonstrateBasicOperations();
demonstrateAtomicOperations();
demonstrateComputeMethods();
demonstrateIterationSafety();
demonstratePerformanceComparison();
}
}
Blocking Queues
Blocking queues are essential for producer-consumer patterns and thread communication:
import java.util.concurrent.*;
import java.util.*;
public class BlockingQueuesExample {
public static void demonstrateArrayBlockingQueue() throws InterruptedException {
System.out.println("=== ArrayBlockingQueue ===");
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // Fixed capacity
// Producer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = "Item-" + i;
System.out.println("Producing: " + item);
queue.put(item); // Blocks if queue is full
System.out.println("Produced: " + item + " (queue size: " + queue.size() + ")");
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
Thread.sleep(1000); // Slower than producer
String item = queue.take(); // Blocks if queue is empty
System.out.println("Consumed: " + item + " (queue size: " + queue.size() + ")");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
System.out.println();
}
public static void demonstrateLinkedBlockingQueue() throws InterruptedException {
System.out.println("=== LinkedBlockingQueue ===");
BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); // Unbounded by default
// Multiple producers
Thread[] producers = new Thread[3];
for (int i = 0; i < producers.length; i++) {
final int producerId = i;
producers[i] = new Thread(() -> {
try {
for (int j = 1; j <= 3; j++) {
int item = producerId * 10 + j;
queue.put(item);
System.out.println("Producer " + producerId + " added: " + item);
Thread.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Single consumer
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 9; i++) { // 3 producers × 3 items each
Integer item = queue.take();
System.out.println("Consumer got: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Start all threads
for (Thread producer : producers) {
producer.start();
}
consumer.start();
// Wait for completion
for (Thread producer : producers) {
producer.join();
}
consumer.join();
System.out.println();
}
public static void demonstratePriorityBlockingQueue() throws InterruptedException {
System.out.println("=== PriorityBlockingQueue ===");
// Queue that orders tasks by priority
BlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();
// Task class with priority
class Task implements Comparable<Task> {
final String name;
final int priority;
Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(Task other) {
return Integer.compare(other.priority, this.priority); // Higher priority first
}
@Override
public String toString() {
return name + "(priority:" + priority + ")";
}
}
// Add tasks in random order
Thread taskAdder = new Thread(() -> {
try {
taskQueue.put(new Task("Low priority task", 1));
Thread.sleep(100);
taskQueue.put(new Task("High priority task", 5));
Thread.sleep(100);
taskQueue.put(new Task("Medium priority task", 3));
Thread.sleep(100);
taskQueue.put(new Task("Critical task", 10));
Thread.sleep(100);
taskQueue.put(new Task("Another medium task", 3));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Process tasks in priority order
Thread taskProcessor = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
Task task = taskQueue.take();
System.out.println("Processing: " + task);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
taskAdder.start();
Thread.sleep(50); // Let some tasks accumulate
taskProcessor.start();
taskAdder.join();
taskProcessor.join();
System.out.println();
}
public static void demonstrateDelayQueue() throws InterruptedException {
System.out.println("=== DelayQueue ===");
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// Delayed task implementation
class DelayedTask implements Delayed {
final String name;
final long executeTime;
DelayedTask(String name, long delayMs) {
this.name = name;
this.executeTime = System.currentTimeMillis() + delayMs;
}
@Override
public long getDelay(TimeUnit unit) {
long remaining = executeTime - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
}
@Override
public String toString() {
return name;
}
}
// Schedule tasks with different delays
Thread scheduler = new Thread(() -> {
delayQueue.put(new DelayedTask("Task 1 (2s delay)", 2000));
delayQueue.put(new DelayedTask("Task 2 (1s delay)", 1000));
delayQueue.put(new DelayedTask("Task 3 (3s delay)", 3000));
delayQueue.put(new DelayedTask("Task 4 (500ms delay)", 500));
System.out.println("All tasks scheduled");
});
// Execute tasks when their delay expires
Thread executor = new Thread(() -> {
try {
for (int i = 0; i < 4; i++) {
DelayedTask task = delayQueue.take(); // Blocks until delay expires
System.out.println("Executing: " + task + " at " + new Date());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
long startTime = System.currentTimeMillis();
scheduler.start();
executor.start();
scheduler.join();
executor.join();
long totalTime = System.currentTimeMillis() - startTime;
System.out.println("Total execution time: " + totalTime + "ms");
System.out.println();
}
public static void demonstrateNonBlockingOperations() {
System.out.println("=== Non-blocking Operations ===");
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
// Fill the queue
queue.offer("Item1");
queue.offer("Item2");
System.out.println("Queue after offers: " + queue);
// Try non-blocking operations
boolean added = queue.offer("Item3"); // Won't block, returns false
System.out.println("Offer Item3 (queue full): " + added);
String item = queue.poll(); // Won't block, returns item or null
System.out.println("Poll: " + item);
added = queue.offer("Item3"); // Now should succeed
System.out.println("Offer Item3 (after poll): " + added);
// Timed operations
try {
boolean addedWithTimeout = queue.offer("Item4", 1, TimeUnit.SECONDS);
System.out.println("Offer with timeout: " + addedWithTimeout);
String itemWithTimeout = queue.poll(1, TimeUnit.SECONDS);
System.out.println("Poll with timeout: " + itemWithTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Final queue: " + queue);
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateArrayBlockingQueue();
demonstrateLinkedBlockingQueue();
demonstratePriorityBlockingQueue();
demonstrateDelayQueue();
demonstrateNonBlockingOperations();
}
}
Other Concurrent Collections
Java provides additional concurrent collections for specific use cases:
import java.util.concurrent.*;
import java.util.*;
public class OtherConcurrentCollections {
public static void demonstrateConcurrentLinkedQueue() throws InterruptedException {
System.out.println("=== ConcurrentLinkedQueue ===");
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// Multiple producers adding to queue
Thread[] producers = new Thread[3];
for (int i = 0; i < producers.length; i++) {
final int producerId = i;
producers[i] = new Thread(() -> {
for (int j = 0; j < 5; j++) {
String item = "Producer" + producerId + "-Item" + j;
queue.offer(item); // Non-blocking
System.out.println("Added: " + item);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
// Consumer polling from queue
Thread consumer = new Thread(() -> {
int consumed = 0;
while (consumed < 15) { // 3 producers × 5 items
String item = queue.poll(); // Non-blocking
if (item != null) {
System.out.println("Consumed: " + item);
consumed++;
} else {
try {
Thread.sleep(50); // Wait a bit if queue is empty
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
});
for (Thread producer : producers) {
producer.start();
}
consumer.start();
for (Thread producer : producers) {
producer.join();
}
consumer.join();
System.out.println("Final queue size: " + queue.size());
System.out.println();
}
public static void demonstrateCopyOnWriteArrayList() throws InterruptedException {
System.out.println("=== CopyOnWriteArrayList ===");
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// Populate initial data
list.add("Item1");
list.add("Item2");
list.add("Item3");
// Reader threads (frequent reads)
Thread[] readers = new Thread[3];
for (int i = 0; i < readers.length; i++) {
final int readerId = i;
readers[i] = new Thread(() -> {
for (int j = 0; j < 10; j++) {
System.out.println("Reader " + readerId + " sees: " + list);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
// Writer thread (infrequent writes)
Thread writer = new Thread(() -> {
try {
Thread.sleep(500);
list.add("NewItem1");
System.out.println("*** Added NewItem1 ***");
Thread.sleep(1000);
list.add("NewItem2");
System.out.println("*** Added NewItem2 ***");
Thread.sleep(1000);
list.remove("Item1");
System.out.println("*** Removed Item1 ***");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
for (Thread reader : readers) {
reader.start();
}
writer.start();
for (Thread reader : readers) {
reader.join();
}
writer.join();
System.out.println("Final list: " + list);
System.out.println();
}
public static void demonstrateConcurrentSkipListMap() {
System.out.println("=== ConcurrentSkipListMap ===");
ConcurrentSkipListMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
// Add elements in random order
int[] keys = {15, 3, 8, 1, 12, 6, 20, 10, 4, 18};
for (int key : keys) {
skipListMap.put(key, "Value" + key);
}
System.out.println("Skip list map (sorted): " + skipListMap);
// Navigation methods
System.out.println("First entry: " + skipListMap.firstEntry());
System.out.println("Last entry: " + skipListMap.lastEntry());
System.out.println("Lower than 10: " + skipListMap.lowerEntry(10));
System.out.println("Floor of 11: " + skipListMap.floorEntry(11));
System.out.println("Ceiling of 11: " + skipListMap.ceilingEntry(11));
System.out.println("Higher than 10: " + skipListMap.higherEntry(10));
// Range operations
System.out.println("Submap [5, 15): " + skipListMap.subMap(5, 15));
System.out.println("Head map (<10): " + skipListMap.headMap(10));
System.out.println("Tail map (>=12): " + skipListMap.tailMap(12));
// Concurrent modification-safe iteration
System.out.print("Iterating while modifying: ");
for (Map.Entry<Integer, String> entry : skipListMap.entrySet()) {
System.out.print(entry.getKey() + " ");
if (entry.getKey() == 8) {
skipListMap.put(25, "Value25"); // Safe to modify during iteration
}
}
System.out.println();
System.out.println("After iteration: " + skipListMap);
System.out.println();
}
public static void demonstrateTransferQueue() throws InterruptedException {
System.out.println("=== LinkedTransferQueue ===");
LinkedTransferQueue<String> transferQueue = new LinkedTransferQueue<>();
// Producer that transfers directly to waiting consumer
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = "TransferItem" + i;
System.out.println("Transferring: " + item);
if (i % 2 == 0) {
// transfer() waits for a consumer to be ready
transferQueue.transfer(item);
System.out.println("Transferred directly: " + item);
} else {
// put() adds to queue normally
transferQueue.put(item);
System.out.println("Queued: " + item);
}
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Consumer that sometimes waits
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000); // Start consuming after some items are produced
for (int i = 1; i <= 5; i++) {
String item = transferQueue.take();
System.out.println("Consumed: " + item);
Thread.sleep(800); // Slower consumer
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
System.out.println("Transfer queue size: " + transferQueue.size());
System.out.println();
}
public static void demonstratePerformanceCharacteristics() throws InterruptedException {
System.out.println("=== Performance Characteristics ===");
int numOperations = 100000;
// Test ConcurrentHashMap vs ConcurrentSkipListMap
System.out.println("Testing map operations (" + numOperations + " operations):");
ConcurrentHashMap<Integer, String> hashMap = new ConcurrentHashMap<>();
ConcurrentSkipListMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
// Hash map performance
long startTime = System.currentTimeMillis();
for (int i = 0; i < numOperations; i++) {
hashMap.put(i, "Value" + i);
}
long hashMapTime = System.currentTimeMillis() - startTime;
// Skip list map performance
startTime = System.currentTimeMillis();
for (int i = 0; i < numOperations; i++) {
skipListMap.put(i, "Value" + i);
}
long skipListTime = System.currentTimeMillis() - startTime;
System.out.println("ConcurrentHashMap: " + hashMapTime + "ms");
System.out.println("ConcurrentSkipListMap: " + skipListTime + "ms");
System.out.println("Hash map is " + (double) skipListTime / hashMapTime + "x faster for puts");
// Test iteration performance
startTime = System.currentTimeMillis();
for (Map.Entry<Integer, String> entry : hashMap.entrySet()) {
// Just iterate
}
long hashMapIterTime = System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
for (Map.Entry<Integer, String> entry : skipListMap.entrySet()) {
// Just iterate (will be in sorted order)
}
long skipListIterTime = System.currentTimeMillis() - startTime;
System.out.println("Iteration - HashMap: " + hashMapIterTime + "ms");
System.out.println("Iteration - SkipListMap: " + skipListIterTime + "ms");
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateConcurrentLinkedQueue();
demonstrateCopyOnWriteArrayList();
demonstrateConcurrentSkipListMap();
demonstrateTransferQueue();
demonstratePerformanceCharacteristics();
}
}
Best Practices and Patterns
import java.util.concurrent.*;
import java.util.*;
public class ConcurrentCollectionsBestPractices {
public static void demonstrateChoosingRightCollection() {
System.out.println("=== Choosing the Right Collection ===");
// Use case 1: High-frequency reads, infrequent writes
System.out.println("1. High read, low write scenario:");
CopyOnWriteArrayList<String> readHeavyList = new CopyOnWriteArrayList<>();
System.out.println(" Recommended: CopyOnWriteArrayList");
System.out.println(" - Zero cost for reads");
System.out.println(" - Expensive writes (full array copy)");
System.out.println(" - Perfect for event listeners, observers");
// Use case 2: Need sorted concurrent map
System.out.println("\n2. Sorted concurrent map:");
ConcurrentSkipListMap<String, Integer> sortedMap = new ConcurrentSkipListMap<>();
System.out.println(" Recommended: ConcurrentSkipListMap");
System.out.println(" - Maintains sort order");
System.out.println(" - Log(n) operations");
System.out.println(" - NavigableMap operations");
// Use case 3: Producer-consumer patterns
System.out.println("\n3. Producer-consumer patterns:");
BlockingQueue<String> producerConsumer = new LinkedBlockingQueue<>();
System.out.println(" Recommended: BlockingQueue implementations");
System.out.println(" - ArrayBlockingQueue: bounded, array-based");
System.out.println(" - LinkedBlockingQueue: optionally bounded, linked");
System.out.println(" - PriorityBlockingQueue: priority ordering");
// Use case 4: General concurrent map
System.out.println("\n4. General concurrent key-value storage:");
ConcurrentHashMap<String, Object> generalMap = new ConcurrentHashMap<>();
System.out.println(" Recommended: ConcurrentHashMap");
System.out.println(" - Best general-purpose concurrent map");
System.out.println(" - Excellent performance");
System.out.println(" - Rich atomic operations API");
System.out.println();
}
public static void demonstrateCommonPitfalls() {
System.out.println("=== Common Pitfalls ===");
ConcurrentHashMap<String, List<String>> groupMap = new ConcurrentHashMap<>();
// PITFALL 1: Non-atomic compound operations
System.out.println("Pitfall 1: Non-atomic compound operations");
// BAD: Race condition between check and put
String key = "group1";
if (!groupMap.containsKey(key)) {
groupMap.put(key, new ArrayList<>()); // Another thread might have added it
}
List<String> list = groupMap.get(key);
list.add("item1"); // Might be null!
// GOOD: Use atomic operations
groupMap.computeIfAbsent("group2", k -> new ArrayList<>()).add("item2");
System.out.println("✓ Use computeIfAbsent for atomic check-and-create");
// PITFALL 2: Modifying values returned by concurrent collections
System.out.println("\nPitfall 2: Unsafe modification of returned values");
// BAD: The list itself is not thread-safe
List<String> unsafeList = groupMap.get("group2");
// Multiple threads adding to this list can cause corruption
// GOOD: Use thread-safe lists or synchronize access
groupMap.put("group3", Collections.synchronizedList(new ArrayList<>()));
// OR use CopyOnWriteArrayList for read-heavy scenarios
groupMap.put("group4", new CopyOnWriteArrayList<>());
System.out.println("✓ Ensure nested collections are also thread-safe");
// PITFALL 3: Size-based decisions
System.out.println("\nPitfall 3: Making decisions based on size()");
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// BAD: Size can change between check and action
// if (queue.size() > 0) {
// String item = queue.poll(); // Might return null!
// }
// GOOD: Use the operation's return value
String item = queue.poll();
if (item != null) {
// Process item
System.out.println("✓ Check return values instead of size");
}
System.out.println();
}
public static void demonstratePerformanceTips() throws InterruptedException {
System.out.println("=== Performance Tips ===");
// Tip 1: Choose initial capacity wisely
System.out.println("1. Initial capacity matters:");
long startTime = System.currentTimeMillis();
ConcurrentHashMap<Integer, String> smallMap = new ConcurrentHashMap<>();
for (int i = 0; i < 100000; i++) {
smallMap.put(i, "Value" + i);
}
long smallMapTime = System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
ConcurrentHashMap<Integer, String> rightSizedMap = new ConcurrentHashMap<>(100000);
for (int i = 0; i < 100000; i++) {
rightSizedMap.put(i, "Value" + i);
}
long rightSizedTime = System.currentTimeMillis() - startTime;
System.out.println(" Default capacity: " + smallMapTime + "ms");
System.out.println(" Right-sized capacity: " + rightSizedTime + "ms");
System.out.println(" Improvement: " + (double) smallMapTime / rightSizedTime + "x faster");
// Tip 2: Batch operations when possible
System.out.println("\n2. Batch operations:");
ConcurrentHashMap<String, Integer> batchMap = new ConcurrentHashMap<>();
// Individual puts
startTime = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
batchMap.put("key" + i, i);
}
long individualTime = System.currentTimeMillis() - startTime;
// Batch with putAll
Map<String, Integer> batchData = new HashMap<>();
for (int i = 10000; i < 20000; i++) {
batchData.put("key" + i, i);
}
startTime = System.currentTimeMillis();
batchMap.putAll(batchData);
long batchTime = System.currentTimeMillis() - startTime;
System.out.println(" Individual puts: " + individualTime + "ms");
System.out.println(" Batch putAll: " + batchTime + "ms");
// Tip 3: Use appropriate queue capacity
System.out.println("\n3. Queue capacity considerations:");
System.out.println(" - ArrayBlockingQueue: Fixed capacity, good for bounded scenarios");
System.out.println(" - LinkedBlockingQueue: Unbounded by default, can cause memory issues");
System.out.println(" - Always consider setting capacity limits");
ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(1000);
LinkedBlockingQueue<String> boundedLinkedQueue = new LinkedBlockingQueue<>(1000);
System.out.println(" ✓ Set appropriate capacity limits");
System.out.println();
}
public static void demonstrateMonitoringAndDebugging() {
System.out.println("=== Monitoring and Debugging ===");
ConcurrentHashMap<String, Integer> monitoredMap = new ConcurrentHashMap<>();
BlockingQueue<String> monitoredQueue = new ArrayBlockingQueue<>(100);
// Monitoring ConcurrentHashMap
System.out.println("ConcurrentHashMap monitoring:");
for (int i = 0; i < 50; i++) {
monitoredMap.put("key" + i, i);
}
System.out.println(" Size: " + monitoredMap.size());
System.out.println(" Is empty: " + monitoredMap.isEmpty());
// Note: ConcurrentHashMap doesn't expose internal metrics like segment info in recent versions
// Monitoring BlockingQueue
System.out.println("\nBlockingQueue monitoring:");
for (int i = 0; i < 30; i++) {
monitoredQueue.offer("item" + i);
}
System.out.println(" Size: " + monitoredQueue.size());
System.out.println(" Remaining capacity: " + monitoredQueue.remainingCapacity());
System.out.println(" Is empty: " + monitoredQueue.isEmpty());
// Debugging tip: Use toString() for snapshots
System.out.println("\nDebugging tip - collection snapshots:");
System.out.println(" Queue contents: " + monitoredQueue);
System.out.println(" ✓ toString() provides useful debugging info");
// Performance monitoring
System.out.println("\nPerformance monitoring considerations:");
System.out.println(" - Monitor queue capacity utilization");
System.out.println(" - Watch for frequent blocking operations");
System.out.println(" - Use JVM monitoring tools (JConsole, VisualVM)");
System.out.println(" - Consider custom metrics for business logic");
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateChoosingRightCollection();
demonstrateCommonPitfalls();
demonstratePerformanceTips();
demonstrateMonitoringAndDebugging();
}
}
Summary
Concurrent collections provide thread-safe, high-performance alternatives to traditional collections:
Key Collections:
- ConcurrentHashMap: High-performance concurrent map with atomic operations
- BlockingQueue: Producer-consumer patterns with blocking operations
- CopyOnWriteArrayList: Read-optimized list with expensive writes
- ConcurrentSkipListMap: Sorted concurrent map with navigation methods
- ConcurrentLinkedQueue: Non-blocking FIFO queue
Key Benefits:
- Thread Safety: Built for concurrent access without external synchronization
- Performance: Superior to synchronized wrappers
- Rich APIs: Specialized methods for concurrent operations
- Fail-Safe Iteration: No ConcurrentModificationException
- Scalability: Performance scales well with thread count
Best Practices:
- Choose Appropriately: Match collection to access patterns
- Use Atomic Operations: Leverage compute(), merge(), compareAndSet()
- Avoid Compound Operations: Don't rely on size() for decisions
- Consider Capacity: Set appropriate initial sizes and limits
- Monitor Performance: Watch for bottlenecks and blocking operations
- Nested Thread Safety: Ensure contained objects are also thread-safe
Common Patterns:
- Producer-Consumer: Use BlockingQueue implementations
- Cache Implementation: ConcurrentHashMap with compute methods
- Event Systems: CopyOnWriteArrayList for listener lists
- Sorted Concurrent Data: ConcurrentSkipListMap for ordered access
Concurrent collections are essential for building scalable, thread-safe applications in Java.