1. java
  2. /concurrency
  3. /concurrent-collections

Master Java Concurrent Collections and Thread-Safe Data Structures

Java Concurrent Collections

Concurrent collections are thread-safe data structures designed for high-performance multi-threaded environments. Unlike synchronized collections that use simple locking, concurrent collections employ sophisticated algorithms like lock-free programming, compare-and-swap operations, and segment-based locking to achieve better scalability and performance.

Why Concurrent Collections?

Traditional collections are not thread-safe, and naive synchronization approaches have significant limitations:

Problems with Traditional Approaches:

  • Not Thread-Safe: ArrayList, HashMap, etc. can become corrupted under concurrent access
  • Synchronized Wrappers: Collections.synchronizedMap() provides safety but poor performance
  • Coarse-Grained Locking: Entire collection locked during any operation
  • Iteration Issues: ConcurrentModificationException during concurrent iteration/modification
  • Blocking Operations: No built-in support for producer-consumer patterns

Concurrent Collections Benefits:

  • Thread-Safe: Designed for concurrent access from multiple threads
  • High Performance: Fine-grained locking or lock-free algorithms
  • Scalable: Performance doesn't degrade significantly with thread count
  • Fail-Safe Iteration: Iterators work on snapshots, no ConcurrentModificationException
  • Rich APIs: Specialized methods for concurrent operations
import java.util.*;
import java.util.concurrent.*;

public class ConcurrentCollectionsBasics {
    
    public static void demonstrateProblems() throws InterruptedException {
        System.out.println("=== Problems with Non-Concurrent Collections ===");
        
        // Unsafe HashMap
        Map<String, Integer> unsafeMap = new HashMap<>();
        
        // Multiple threads modifying HashMap can corrupt it
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    // This can cause infinite loops or data corruption
                    unsafeMap.put("key" + threadId + "-" + j, j);
                }
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("Unsafe map size: " + unsafeMap.size() + " (expected: 5000)");
        
        // Synchronized wrapper - safe but slow
        Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
        
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    syncMap.put("sync-key" + threadId + "-" + j, j);
                }
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        long syncTime = System.currentTimeMillis() - startTime;
        
        System.out.println("Synchronized map size: " + syncMap.size() + " (time: " + syncTime + "ms)");
        System.out.println();
    }
    
    public static void demonstrateConcurrentSolution() throws InterruptedException {
        System.out.println("=== Concurrent Collections Solution ===");
        
        ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        
        Thread[] threads = new Thread[5];
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    concurrentMap.put("concurrent-key" + threadId + "-" + j, j);
                }
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        long concurrentTime = System.currentTimeMillis() - startTime;
        
        System.out.println("Concurrent map size: " + concurrentMap.size() + " (time: " + concurrentTime + "ms)");
        System.out.println("Performance improvement: Much faster and always correct");
        System.out.println();
    }
    
    public static void main(String[] args) throws InterruptedException {
        demonstrateProblems();
        demonstrateConcurrentSolution();
    }
}

ConcurrentHashMap

ConcurrentHashMap is the most commonly used concurrent collection, offering thread-safe key-value storage with excellent performance:

import java.util.concurrent.*;
import java.util.*;

public class ConcurrentHashMapExample {
    
    public static void demonstrateBasicOperations() {
        System.out.println("=== ConcurrentHashMap Basic Operations ===");
        
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // Basic operations are thread-safe
        map.put("apple", 10);
        map.put("banana", 20);
        map.put("cherry", 30);
        
        System.out.println("Initial map: " + map);
        
        // Atomic operations
        Integer oldValue = map.replace("apple", 15);
        System.out.println("Replaced apple: old=" + oldValue + ", new=" + map.get("apple"));
        
        // Conditional operations
        boolean replaced = map.replace("banana", 20, 25);
        System.out.println("Conditional replace banana: " + replaced);
        
        Integer absent = map.putIfAbsent("date", 40);
        System.out.println("Put if absent 'date': " + absent + ", map=" + map);
        
        Integer present = map.putIfAbsent("apple", 100);
        System.out.println("Put if absent 'apple': " + present + " (should be 15)");
        
        // Remove operations
        boolean removed = map.remove("cherry", 30);
        System.out.println("Conditional remove cherry: " + removed);
        
        System.out.println("Final map: " + map);
        System.out.println();
    }
    
    public static void demonstrateAtomicOperations() throws InterruptedException {
        System.out.println("=== Atomic Operations ===");
        
        ConcurrentHashMap<String, Integer> counters = new ConcurrentHashMap<>();
        counters.put("counter1", 0);
        counters.put("counter2", 0);
        
        // Multiple threads incrementing counters
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    // Atomic increment using compute methods
                    counters.compute("counter1", (key, val) -> val == null ? 1 : val + 1);
                    
                    // Alternative using merge
                    counters.merge("counter2", 1, Integer::sum);
                }
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("Counter1 (compute): " + counters.get("counter1"));
        System.out.println("Counter2 (merge): " + counters.get("counter2"));
        System.out.println("Expected: 10000 each");
        System.out.println();
    }
    
    public static void demonstrateComputeMethods() {
        System.out.println("=== Compute Methods ===");
        
        ConcurrentHashMap<String, List<String>> groupedData = new ConcurrentHashMap<>();
        
        // compute() - always computes new value
        groupedData.compute("fruits", (key, list) -> {
            List<String> newList = list == null ? new ArrayList<>() : new ArrayList<>(list);
            newList.add("apple");
            return newList;
        });
        
        // computeIfAbsent() - only compute if key is absent
        groupedData.computeIfAbsent("vegetables", k -> new ArrayList<>()).add("carrot");
        groupedData.computeIfAbsent("fruits", k -> new ArrayList<>()).add("banana");
        
        // computeIfPresent() - only compute if key is present
        groupedData.computeIfPresent("fruits", (key, list) -> {
            list.add("cherry");
            return list;
        });
        
        groupedData.computeIfPresent("meat", (key, list) -> {
            list.add("beef"); // Won't execute because "meat" key doesn't exist
            return list;
        });
        
        System.out.println("Grouped data: " + groupedData);
        
        // Practical example: word frequency counting
        String text = "apple banana apple cherry banana apple";
        ConcurrentHashMap<String, Integer> wordCount = new ConcurrentHashMap<>();
        
        Arrays.stream(text.split(" "))
              .parallel() // Can be safely processed in parallel
              .forEach(word -> wordCount.merge(word, 1, Integer::sum));
        
        System.out.println("Word frequencies: " + wordCount);
        System.out.println();
    }
    
    public static void demonstrateIterationSafety() throws InterruptedException {
        System.out.println("=== Safe Iteration ===");
        
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // Populate map
        for (int i = 0; i < 100; i++) {
            map.put("key" + i, i);
        }
        
        // Start a thread that modifies the map during iteration
        Thread modifier = new Thread(() -> {
            try {
                Thread.sleep(100);
                for (int i = 100; i < 200; i++) {
                    map.put("newkey" + i, i);
                    Thread.sleep(1); // Slow down to overlap with iteration
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        modifier.start();
        
        // Iterate while map is being modified - no ConcurrentModificationException
        System.out.println("Iterating while map is being modified:");
        int count = 0;
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            count++;
            if (count % 20 == 0) {
                System.out.println("Processed " + count + " entries, current: " + entry.getKey());
                try {
                    Thread.sleep(50); // Give modifier time to add entries
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        modifier.join();
        System.out.println("Final map size: " + map.size());
        System.out.println("Iteration completed without exceptions");
        System.out.println();
    }
    
    public static void demonstratePerformanceComparison() throws InterruptedException {
        System.out.println("=== Performance Comparison ===");
        
        int numThreads = 8;
        int operationsPerThread = 10000;
        
        // Test synchronized HashMap
        Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
        long syncTime = testMapPerformance(syncMap, numThreads, operationsPerThread, "Synchronized HashMap");
        
        // Test ConcurrentHashMap
        ConcurrentHashMap<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        long concurrentTime = testMapPerformance(concurrentMap, numThreads, operationsPerThread, "ConcurrentHashMap");
        
        System.out.println("Performance improvement: " + 
                         (double) syncTime / concurrentTime + "x faster");
        System.out.println();
    }
    
    private static long testMapPerformance(Map<String, Integer> map, int numThreads, 
                                         int operationsPerThread, String mapType) throws InterruptedException {
        Thread[] threads = new Thread[numThreads];
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < numThreads; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < operationsPerThread; j++) {
                    String key = "key" + threadId + "-" + j;
                    map.put(key, j);
                    map.get(key);
                    if (j % 2 == 0) {
                        map.remove(key);
                    }
                }
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        for (Thread thread : threads) {
            thread.join();
        }
        
        long endTime = System.currentTimeMillis();
        System.out.println(mapType + " time: " + (endTime - startTime) + "ms");
        return endTime - startTime;
    }
    
    public static void main(String[] args) throws InterruptedException {
        demonstrateBasicOperations();
        demonstrateAtomicOperations();
        demonstrateComputeMethods();
        demonstrateIterationSafety();
        demonstratePerformanceComparison();
    }
}

Blocking Queues

Blocking queues are essential for producer-consumer patterns and thread communication:

import java.util.concurrent.*;
import java.util.*;

public class BlockingQueuesExample {
    
    public static void demonstrateArrayBlockingQueue() throws InterruptedException {
        System.out.println("=== ArrayBlockingQueue ===");
        
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3); // Fixed capacity
        
        // Producer
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String item = "Item-" + i;
                    System.out.println("Producing: " + item);
                    queue.put(item); // Blocks if queue is full
                    System.out.println("Produced: " + item + " (queue size: " + queue.size() + ")");
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // Consumer
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    Thread.sleep(1000); // Slower than producer
                    String item = queue.take(); // Blocks if queue is empty
                    System.out.println("Consumed: " + item + " (queue size: " + queue.size() + ")");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
        
        producer.join();
        consumer.join();
        System.out.println();
    }
    
    public static void demonstrateLinkedBlockingQueue() throws InterruptedException {
        System.out.println("=== LinkedBlockingQueue ===");
        
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); // Unbounded by default
        
        // Multiple producers
        Thread[] producers = new Thread[3];
        for (int i = 0; i < producers.length; i++) {
            final int producerId = i;
            producers[i] = new Thread(() -> {
                try {
                    for (int j = 1; j <= 3; j++) {
                        int item = producerId * 10 + j;
                        queue.put(item);
                        System.out.println("Producer " + producerId + " added: " + item);
                        Thread.sleep(300);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // Single consumer
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 9; i++) { // 3 producers × 3 items each
                    Integer item = queue.take();
                    System.out.println("Consumer got: " + item);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // Start all threads
        for (Thread producer : producers) {
            producer.start();
        }
        consumer.start();
        
        // Wait for completion
        for (Thread producer : producers) {
            producer.join();
        }
        consumer.join();
        System.out.println();
    }
    
    public static void demonstratePriorityBlockingQueue() throws InterruptedException {
        System.out.println("=== PriorityBlockingQueue ===");
        
        // Queue that orders tasks by priority
        BlockingQueue<Task> taskQueue = new PriorityBlockingQueue<>();
        
        // Task class with priority
        class Task implements Comparable<Task> {
            final String name;
            final int priority;
            
            Task(String name, int priority) {
                this.name = name;
                this.priority = priority;
            }
            
            @Override
            public int compareTo(Task other) {
                return Integer.compare(other.priority, this.priority); // Higher priority first
            }
            
            @Override
            public String toString() {
                return name + "(priority:" + priority + ")";
            }
        }
        
        // Add tasks in random order
        Thread taskAdder = new Thread(() -> {
            try {
                taskQueue.put(new Task("Low priority task", 1));
                Thread.sleep(100);
                taskQueue.put(new Task("High priority task", 5));
                Thread.sleep(100);
                taskQueue.put(new Task("Medium priority task", 3));
                Thread.sleep(100);
                taskQueue.put(new Task("Critical task", 10));
                Thread.sleep(100);
                taskQueue.put(new Task("Another medium task", 3));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // Process tasks in priority order
        Thread taskProcessor = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    Task task = taskQueue.take();
                    System.out.println("Processing: " + task);
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        taskAdder.start();
        Thread.sleep(50); // Let some tasks accumulate
        taskProcessor.start();
        
        taskAdder.join();
        taskProcessor.join();
        System.out.println();
    }
    
    public static void demonstrateDelayQueue() throws InterruptedException {
        System.out.println("=== DelayQueue ===");
        
        DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
        
        // Delayed task implementation
        class DelayedTask implements Delayed {
            final String name;
            final long executeTime;
            
            DelayedTask(String name, long delayMs) {
                this.name = name;
                this.executeTime = System.currentTimeMillis() + delayMs;
            }
            
            @Override
            public long getDelay(TimeUnit unit) {
                long remaining = executeTime - System.currentTimeMillis();
                return unit.convert(remaining, TimeUnit.MILLISECONDS);
            }
            
            @Override
            public int compareTo(Delayed other) {
                return Long.compare(this.executeTime, ((DelayedTask) other).executeTime);
            }
            
            @Override
            public String toString() {
                return name;
            }
        }
        
        // Schedule tasks with different delays
        Thread scheduler = new Thread(() -> {
            delayQueue.put(new DelayedTask("Task 1 (2s delay)", 2000));
            delayQueue.put(new DelayedTask("Task 2 (1s delay)", 1000));
            delayQueue.put(new DelayedTask("Task 3 (3s delay)", 3000));
            delayQueue.put(new DelayedTask("Task 4 (500ms delay)", 500));
            System.out.println("All tasks scheduled");
        });
        
        // Execute tasks when their delay expires
        Thread executor = new Thread(() -> {
            try {
                for (int i = 0; i < 4; i++) {
                    DelayedTask task = delayQueue.take(); // Blocks until delay expires
                    System.out.println("Executing: " + task + " at " + new Date());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        long startTime = System.currentTimeMillis();
        scheduler.start();
        executor.start();
        
        scheduler.join();
        executor.join();
        
        long totalTime = System.currentTimeMillis() - startTime;
        System.out.println("Total execution time: " + totalTime + "ms");
        System.out.println();
    }
    
    public static void demonstrateNonBlockingOperations() {
        System.out.println("=== Non-blocking Operations ===");
        
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
        
        // Fill the queue
        queue.offer("Item1");
        queue.offer("Item2");
        
        System.out.println("Queue after offers: " + queue);
        
        // Try non-blocking operations
        boolean added = queue.offer("Item3"); // Won't block, returns false
        System.out.println("Offer Item3 (queue full): " + added);
        
        String item = queue.poll(); // Won't block, returns item or null
        System.out.println("Poll: " + item);
        
        added = queue.offer("Item3"); // Now should succeed
        System.out.println("Offer Item3 (after poll): " + added);
        
        // Timed operations
        try {
            boolean addedWithTimeout = queue.offer("Item4", 1, TimeUnit.SECONDS);
            System.out.println("Offer with timeout: " + addedWithTimeout);
            
            String itemWithTimeout = queue.poll(1, TimeUnit.SECONDS);
            System.out.println("Poll with timeout: " + itemWithTimeout);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("Final queue: " + queue);
        System.out.println();
    }
    
    public static void main(String[] args) throws InterruptedException {
        demonstrateArrayBlockingQueue();
        demonstrateLinkedBlockingQueue();
        demonstratePriorityBlockingQueue();
        demonstrateDelayQueue();
        demonstrateNonBlockingOperations();
    }
}

Other Concurrent Collections

Java provides additional concurrent collections for specific use cases:

import java.util.concurrent.*;
import java.util.*;

public class OtherConcurrentCollections {
    
    public static void demonstrateConcurrentLinkedQueue() throws InterruptedException {
        System.out.println("=== ConcurrentLinkedQueue ===");
        
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        
        // Multiple producers adding to queue
        Thread[] producers = new Thread[3];
        for (int i = 0; i < producers.length; i++) {
            final int producerId = i;
            producers[i] = new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    String item = "Producer" + producerId + "-Item" + j;
                    queue.offer(item); // Non-blocking
                    System.out.println("Added: " + item);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
        
        // Consumer polling from queue
        Thread consumer = new Thread(() -> {
            int consumed = 0;
            while (consumed < 15) { // 3 producers × 5 items
                String item = queue.poll(); // Non-blocking
                if (item != null) {
                    System.out.println("Consumed: " + item);
                    consumed++;
                } else {
                    try {
                        Thread.sleep(50); // Wait a bit if queue is empty
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        });
        
        for (Thread producer : producers) {
            producer.start();
        }
        consumer.start();
        
        for (Thread producer : producers) {
            producer.join();
        }
        consumer.join();
        
        System.out.println("Final queue size: " + queue.size());
        System.out.println();
    }
    
    public static void demonstrateCopyOnWriteArrayList() throws InterruptedException {
        System.out.println("=== CopyOnWriteArrayList ===");
        
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // Populate initial data
        list.add("Item1");
        list.add("Item2");
        list.add("Item3");
        
        // Reader threads (frequent reads)
        Thread[] readers = new Thread[3];
        for (int i = 0; i < readers.length; i++) {
            final int readerId = i;
            readers[i] = new Thread(() -> {
                for (int j = 0; j < 10; j++) {
                    System.out.println("Reader " + readerId + " sees: " + list);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
        
        // Writer thread (infrequent writes)
        Thread writer = new Thread(() -> {
            try {
                Thread.sleep(500);
                list.add("NewItem1");
                System.out.println("*** Added NewItem1 ***");
                
                Thread.sleep(1000);
                list.add("NewItem2");
                System.out.println("*** Added NewItem2 ***");
                
                Thread.sleep(1000);
                list.remove("Item1");
                System.out.println("*** Removed Item1 ***");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        for (Thread reader : readers) {
            reader.start();
        }
        writer.start();
        
        for (Thread reader : readers) {
            reader.join();
        }
        writer.join();
        
        System.out.println("Final list: " + list);
        System.out.println();
    }
    
    public static void demonstrateConcurrentSkipListMap() {
        System.out.println("=== ConcurrentSkipListMap ===");
        
        ConcurrentSkipListMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
        
        // Add elements in random order
        int[] keys = {15, 3, 8, 1, 12, 6, 20, 10, 4, 18};
        for (int key : keys) {
            skipListMap.put(key, "Value" + key);
        }
        
        System.out.println("Skip list map (sorted): " + skipListMap);
        
        // Navigation methods
        System.out.println("First entry: " + skipListMap.firstEntry());
        System.out.println("Last entry: " + skipListMap.lastEntry());
        System.out.println("Lower than 10: " + skipListMap.lowerEntry(10));
        System.out.println("Floor of 11: " + skipListMap.floorEntry(11));
        System.out.println("Ceiling of 11: " + skipListMap.ceilingEntry(11));
        System.out.println("Higher than 10: " + skipListMap.higherEntry(10));
        
        // Range operations
        System.out.println("Submap [5, 15): " + skipListMap.subMap(5, 15));
        System.out.println("Head map (<10): " + skipListMap.headMap(10));
        System.out.println("Tail map (>=12): " + skipListMap.tailMap(12));
        
        // Concurrent modification-safe iteration
        System.out.print("Iterating while modifying: ");
        for (Map.Entry<Integer, String> entry : skipListMap.entrySet()) {
            System.out.print(entry.getKey() + " ");
            if (entry.getKey() == 8) {
                skipListMap.put(25, "Value25"); // Safe to modify during iteration
            }
        }
        System.out.println();
        System.out.println("After iteration: " + skipListMap);
        System.out.println();
    }
    
    public static void demonstrateTransferQueue() throws InterruptedException {
        System.out.println("=== LinkedTransferQueue ===");
        
        LinkedTransferQueue<String> transferQueue = new LinkedTransferQueue<>();
        
        // Producer that transfers directly to waiting consumer
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String item = "TransferItem" + i;
                    System.out.println("Transferring: " + item);
                    
                    if (i % 2 == 0) {
                        // transfer() waits for a consumer to be ready
                        transferQueue.transfer(item);
                        System.out.println("Transferred directly: " + item);
                    } else {
                        // put() adds to queue normally
                        transferQueue.put(item);
                        System.out.println("Queued: " + item);
                    }
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // Consumer that sometimes waits
        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(1000); // Start consuming after some items are produced
                
                for (int i = 1; i <= 5; i++) {
                    String item = transferQueue.take();
                    System.out.println("Consumed: " + item);
                    Thread.sleep(800); // Slower consumer
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
        
        producer.join();
        consumer.join();
        
        System.out.println("Transfer queue size: " + transferQueue.size());
        System.out.println();
    }
    
    public static void demonstratePerformanceCharacteristics() throws InterruptedException {
        System.out.println("=== Performance Characteristics ===");
        
        int numOperations = 100000;
        
        // Test ConcurrentHashMap vs ConcurrentSkipListMap
        System.out.println("Testing map operations (" + numOperations + " operations):");
        
        ConcurrentHashMap<Integer, String> hashMap = new ConcurrentHashMap<>();
        ConcurrentSkipListMap<Integer, String> skipListMap = new ConcurrentSkipListMap<>();
        
        // Hash map performance
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < numOperations; i++) {
            hashMap.put(i, "Value" + i);
        }
        long hashMapTime = System.currentTimeMillis() - startTime;
        
        // Skip list map performance
        startTime = System.currentTimeMillis();
        for (int i = 0; i < numOperations; i++) {
            skipListMap.put(i, "Value" + i);
        }
        long skipListTime = System.currentTimeMillis() - startTime;
        
        System.out.println("ConcurrentHashMap: " + hashMapTime + "ms");
        System.out.println("ConcurrentSkipListMap: " + skipListTime + "ms");
        System.out.println("Hash map is " + (double) skipListTime / hashMapTime + "x faster for puts");
        
        // Test iteration performance
        startTime = System.currentTimeMillis();
        for (Map.Entry<Integer, String> entry : hashMap.entrySet()) {
            // Just iterate
        }
        long hashMapIterTime = System.currentTimeMillis() - startTime;
        
        startTime = System.currentTimeMillis();
        for (Map.Entry<Integer, String> entry : skipListMap.entrySet()) {
            // Just iterate (will be in sorted order)
        }
        long skipListIterTime = System.currentTimeMillis() - startTime;
        
        System.out.println("Iteration - HashMap: " + hashMapIterTime + "ms");
        System.out.println("Iteration - SkipListMap: " + skipListIterTime + "ms");
        System.out.println();
    }
    
    public static void main(String[] args) throws InterruptedException {
        demonstrateConcurrentLinkedQueue();
        demonstrateCopyOnWriteArrayList();
        demonstrateConcurrentSkipListMap();
        demonstrateTransferQueue();
        demonstratePerformanceCharacteristics();
    }
}

Best Practices and Patterns

import java.util.concurrent.*;
import java.util.*;

public class ConcurrentCollectionsBestPractices {
    
    public static void demonstrateChoosingRightCollection() {
        System.out.println("=== Choosing the Right Collection ===");
        
        // Use case 1: High-frequency reads, infrequent writes
        System.out.println("1. High read, low write scenario:");
        CopyOnWriteArrayList<String> readHeavyList = new CopyOnWriteArrayList<>();
        System.out.println("   Recommended: CopyOnWriteArrayList");
        System.out.println("   - Zero cost for reads");
        System.out.println("   - Expensive writes (full array copy)");
        System.out.println("   - Perfect for event listeners, observers");
        
        // Use case 2: Need sorted concurrent map
        System.out.println("\n2. Sorted concurrent map:");
        ConcurrentSkipListMap<String, Integer> sortedMap = new ConcurrentSkipListMap<>();
        System.out.println("   Recommended: ConcurrentSkipListMap");
        System.out.println("   - Maintains sort order");
        System.out.println("   - Log(n) operations");
        System.out.println("   - NavigableMap operations");
        
        // Use case 3: Producer-consumer patterns
        System.out.println("\n3. Producer-consumer patterns:");
        BlockingQueue<String> producerConsumer = new LinkedBlockingQueue<>();
        System.out.println("   Recommended: BlockingQueue implementations");
        System.out.println("   - ArrayBlockingQueue: bounded, array-based");
        System.out.println("   - LinkedBlockingQueue: optionally bounded, linked");
        System.out.println("   - PriorityBlockingQueue: priority ordering");
        
        // Use case 4: General concurrent map
        System.out.println("\n4. General concurrent key-value storage:");
        ConcurrentHashMap<String, Object> generalMap = new ConcurrentHashMap<>();
        System.out.println("   Recommended: ConcurrentHashMap");
        System.out.println("   - Best general-purpose concurrent map");
        System.out.println("   - Excellent performance");
        System.out.println("   - Rich atomic operations API");
        
        System.out.println();
    }
    
    public static void demonstrateCommonPitfalls() {
        System.out.println("=== Common Pitfalls ===");
        
        ConcurrentHashMap<String, List<String>> groupMap = new ConcurrentHashMap<>();
        
        // PITFALL 1: Non-atomic compound operations
        System.out.println("Pitfall 1: Non-atomic compound operations");
        
        // BAD: Race condition between check and put
        String key = "group1";
        if (!groupMap.containsKey(key)) {
            groupMap.put(key, new ArrayList<>()); // Another thread might have added it
        }
        List<String> list = groupMap.get(key);
        list.add("item1"); // Might be null!
        
        // GOOD: Use atomic operations
        groupMap.computeIfAbsent("group2", k -> new ArrayList<>()).add("item2");
        System.out.println("✓ Use computeIfAbsent for atomic check-and-create");
        
        // PITFALL 2: Modifying values returned by concurrent collections
        System.out.println("\nPitfall 2: Unsafe modification of returned values");
        
        // BAD: The list itself is not thread-safe
        List<String> unsafeList = groupMap.get("group2");
        // Multiple threads adding to this list can cause corruption
        
        // GOOD: Use thread-safe lists or synchronize access
        groupMap.put("group3", Collections.synchronizedList(new ArrayList<>()));
        // OR use CopyOnWriteArrayList for read-heavy scenarios
        groupMap.put("group4", new CopyOnWriteArrayList<>());
        System.out.println("✓ Ensure nested collections are also thread-safe");
        
        // PITFALL 3: Size-based decisions
        System.out.println("\nPitfall 3: Making decisions based on size()");
        
        BlockingQueue<String> queue = new LinkedBlockingQueue<>();
        
        // BAD: Size can change between check and action
        // if (queue.size() > 0) {
        //     String item = queue.poll(); // Might return null!
        // }
        
        // GOOD: Use the operation's return value
        String item = queue.poll();
        if (item != null) {
            // Process item
            System.out.println("✓ Check return values instead of size");
        }
        
        System.out.println();
    }
    
    public static void demonstratePerformanceTips() throws InterruptedException {
        System.out.println("=== Performance Tips ===");
        
        // Tip 1: Choose initial capacity wisely
        System.out.println("1. Initial capacity matters:");
        
        long startTime = System.currentTimeMillis();
        ConcurrentHashMap<Integer, String> smallMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 100000; i++) {
            smallMap.put(i, "Value" + i);
        }
        long smallMapTime = System.currentTimeMillis() - startTime;
        
        startTime = System.currentTimeMillis();
        ConcurrentHashMap<Integer, String> rightSizedMap = new ConcurrentHashMap<>(100000);
        for (int i = 0; i < 100000; i++) {
            rightSizedMap.put(i, "Value" + i);
        }
        long rightSizedTime = System.currentTimeMillis() - startTime;
        
        System.out.println("   Default capacity: " + smallMapTime + "ms");
        System.out.println("   Right-sized capacity: " + rightSizedTime + "ms");
        System.out.println("   Improvement: " + (double) smallMapTime / rightSizedTime + "x faster");
        
        // Tip 2: Batch operations when possible
        System.out.println("\n2. Batch operations:");
        ConcurrentHashMap<String, Integer> batchMap = new ConcurrentHashMap<>();
        
        // Individual puts
        startTime = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
            batchMap.put("key" + i, i);
        }
        long individualTime = System.currentTimeMillis() - startTime;
        
        // Batch with putAll
        Map<String, Integer> batchData = new HashMap<>();
        for (int i = 10000; i < 20000; i++) {
            batchData.put("key" + i, i);
        }
        
        startTime = System.currentTimeMillis();
        batchMap.putAll(batchData);
        long batchTime = System.currentTimeMillis() - startTime;
        
        System.out.println("   Individual puts: " + individualTime + "ms");
        System.out.println("   Batch putAll: " + batchTime + "ms");
        
        // Tip 3: Use appropriate queue capacity
        System.out.println("\n3. Queue capacity considerations:");
        System.out.println("   - ArrayBlockingQueue: Fixed capacity, good for bounded scenarios");
        System.out.println("   - LinkedBlockingQueue: Unbounded by default, can cause memory issues");
        System.out.println("   - Always consider setting capacity limits");
        
        ArrayBlockingQueue<String> boundedQueue = new ArrayBlockingQueue<>(1000);
        LinkedBlockingQueue<String> boundedLinkedQueue = new LinkedBlockingQueue<>(1000);
        System.out.println("   ✓ Set appropriate capacity limits");
        
        System.out.println();
    }
    
    public static void demonstrateMonitoringAndDebugging() {
        System.out.println("=== Monitoring and Debugging ===");
        
        ConcurrentHashMap<String, Integer> monitoredMap = new ConcurrentHashMap<>();
        BlockingQueue<String> monitoredQueue = new ArrayBlockingQueue<>(100);
        
        // Monitoring ConcurrentHashMap
        System.out.println("ConcurrentHashMap monitoring:");
        for (int i = 0; i < 50; i++) {
            monitoredMap.put("key" + i, i);
        }
        
        System.out.println("   Size: " + monitoredMap.size());
        System.out.println("   Is empty: " + monitoredMap.isEmpty());
        // Note: ConcurrentHashMap doesn't expose internal metrics like segment info in recent versions
        
        // Monitoring BlockingQueue
        System.out.println("\nBlockingQueue monitoring:");
        for (int i = 0; i < 30; i++) {
            monitoredQueue.offer("item" + i);
        }
        
        System.out.println("   Size: " + monitoredQueue.size());
        System.out.println("   Remaining capacity: " + monitoredQueue.remainingCapacity());
        System.out.println("   Is empty: " + monitoredQueue.isEmpty());
        
        // Debugging tip: Use toString() for snapshots
        System.out.println("\nDebugging tip - collection snapshots:");
        System.out.println("   Queue contents: " + monitoredQueue);
        System.out.println("   ✓ toString() provides useful debugging info");
        
        // Performance monitoring
        System.out.println("\nPerformance monitoring considerations:");
        System.out.println("   - Monitor queue capacity utilization");
        System.out.println("   - Watch for frequent blocking operations");
        System.out.println("   - Use JVM monitoring tools (JConsole, VisualVM)");
        System.out.println("   - Consider custom metrics for business logic");
        
        System.out.println();
    }
    
    public static void main(String[] args) throws InterruptedException {
        demonstrateChoosingRightCollection();
        demonstrateCommonPitfalls();
        demonstratePerformanceTips();
        demonstrateMonitoringAndDebugging();
    }
}

Summary

Concurrent collections provide thread-safe, high-performance alternatives to traditional collections:

Key Collections:

  • ConcurrentHashMap: High-performance concurrent map with atomic operations
  • BlockingQueue: Producer-consumer patterns with blocking operations
  • CopyOnWriteArrayList: Read-optimized list with expensive writes
  • ConcurrentSkipListMap: Sorted concurrent map with navigation methods
  • ConcurrentLinkedQueue: Non-blocking FIFO queue

Key Benefits:

  • Thread Safety: Built for concurrent access without external synchronization
  • Performance: Superior to synchronized wrappers
  • Rich APIs: Specialized methods for concurrent operations
  • Fail-Safe Iteration: No ConcurrentModificationException
  • Scalability: Performance scales well with thread count

Best Practices:

  • Choose Appropriately: Match collection to access patterns
  • Use Atomic Operations: Leverage compute(), merge(), compareAndSet()
  • Avoid Compound Operations: Don't rely on size() for decisions
  • Consider Capacity: Set appropriate initial sizes and limits
  • Monitor Performance: Watch for bottlenecks and blocking operations
  • Nested Thread Safety: Ensure contained objects are also thread-safe

Common Patterns:

  • Producer-Consumer: Use BlockingQueue implementations
  • Cache Implementation: ConcurrentHashMap with compute methods
  • Event Systems: CopyOnWriteArrayList for listener lists
  • Sorted Concurrent Data: ConcurrentSkipListMap for ordered access

Concurrent collections are essential for building scalable, thread-safe applications in Java.