Master Java ExecutorService and Thread Pool Management
Java ExecutorService
ExecutorService is a higher-level replacement for working with threads directly. It provides a framework for asynchronous task execution using thread pools, making concurrent programming more manageable and efficient. Instead of creating and managing threads manually, ExecutorService handles thread lifecycle, reuse, and resource management automatically.
Why Use ExecutorService?
Direct thread management has several drawbacks that ExecutorService addresses:
Problems with Manual Thread Management:
- Resource Overhead: Creating new threads is expensive
- Thread Lifecycle: Managing thread creation, execution, and cleanup
- Resource Limits: Uncontrolled thread creation can exhaust system resources
- Task Management: No built-in support for task queuing and scheduling
- Error Handling: Difficult to handle exceptions across multiple threads
ExecutorService Benefits:
- Thread Reuse: Pools threads to reduce creation overhead
- Resource Control: Limits number of concurrent threads
- Task Queuing: Built-in work queue management
- Future Results: Easy handling of task results and exceptions
- Lifecycle Management: Controlled startup and shutdown
import java.util.concurrent.*;
import java.util.*;
public class ExecutorServiceBasics {
// Manual thread management (problematic)
public static void manualThreadExample() {
System.out.println("=== Manual Thread Management ===");
List<Thread> threads = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
Thread thread = new Thread(() -> {
System.out.println("Task " + taskId + " executing on " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
threads.add(thread);
thread.start();
}
// Wait for all threads to complete
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("All manual threads completed\n");
}
// ExecutorService approach (better)
public static void executorServiceExample() {
System.out.println("=== ExecutorService Management ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executing on " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
System.out.println("All executor tasks completed\n");
}
public static void main(String[] args) {
manualThreadExample();
executorServiceExample();
}
}
Types of Executors
Java provides several pre-configured executor types for common use cases:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecutorTypes {
private static final AtomicInteger taskCounter = new AtomicInteger(0);
private static Runnable createTask(String taskName) {
return () -> {
int taskId = taskCounter.incrementAndGet();
System.out.println(taskName + " Task-" + taskId + " started on " +
Thread.currentThread().getName());
try {
Thread.sleep(1000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(taskName + " Task-" + taskId + " completed");
};
}
public static void demonstrateFixedThreadPool() {
System.out.println("=== Fixed Thread Pool ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
// Submit 5 tasks to a pool of 2 threads
for (int i = 1; i <= 5; i++) {
executor.submit(createTask("Fixed"));
}
} finally {
shutdownExecutor(executor);
}
System.out.println();
}
public static void demonstrateCachedThreadPool() {
System.out.println("=== Cached Thread Pool ===");
ExecutorService executor = Executors.newCachedThreadPool();
try {
// Submit tasks with delays
for (int i = 1; i <= 3; i++) {
executor.submit(createTask("Cached"));
try {
Thread.sleep(500); // Stagger submissions
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} finally {
shutdownExecutor(executor);
}
System.out.println();
}
public static void demonstrateSingleThreadExecutor() {
System.out.println("=== Single Thread Executor ===");
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// All tasks execute sequentially on single thread
for (int i = 1; i <= 3; i++) {
executor.submit(createTask("Single"));
}
} finally {
shutdownExecutor(executor);
}
System.out.println();
}
public static void demonstrateScheduledExecutor() {
System.out.println("=== Scheduled Thread Pool ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
try {
// Schedule task with delay
scheduler.schedule(createTask("Delayed"), 2, TimeUnit.SECONDS);
// Schedule recurring task
ScheduledFuture<?> recurringTask = scheduler.scheduleAtFixedRate(
createTask("Recurring"), 1, 3, TimeUnit.SECONDS);
// Let it run for a while
Thread.sleep(10000);
// Cancel recurring task
recurringTask.cancel(false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
shutdownExecutor(scheduler);
}
System.out.println();
}
public static void demonstrateWorkStealingPool() {
System.out.println("=== Work Stealing Pool ===");
ExecutorService executor = Executors.newWorkStealingPool();
try {
// Submit tasks of varying duration
for (int i = 1; i <= 6; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Work-stealing Task-" + taskId + " on " +
Thread.currentThread().getName());
try {
Thread.sleep(taskId * 200); // Variable duration
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Work stealing pool doesn't guarantee immediate task execution
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
shutdownExecutor(executor);
}
System.out.println();
}
private static void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
taskCounter.set(0);
demonstrateFixedThreadPool();
taskCounter.set(0);
demonstrateCachedThreadPool();
taskCounter.set(0);
demonstrateSingleThreadExecutor();
taskCounter.set(0);
demonstrateScheduledExecutor();
taskCounter.set(0);
demonstrateWorkStealingPool();
}
}
Submitting Tasks and Handling Results
ExecutorService provides multiple ways to submit tasks and handle their results:
import java.util.concurrent.*;
import java.util.*;
public class TaskSubmission {
// Runnable tasks (no return value)
public static void demonstrateRunnableTasks() throws InterruptedException {
System.out.println("=== Runnable Tasks ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
// Submit runnable task
Future<?> future1 = executor.submit(() -> {
System.out.println("Runnable task executing");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Submit runnable with result
Future<String> future2 = executor.submit(() -> {
System.out.println("Runnable with result executing");
}, "Task completed successfully");
// Wait for completion
future1.get(); // Returns null for Runnable
String result = future2.get();
System.out.println("Result: " + result);
} catch (ExecutionException e) {
System.err.println("Task execution failed: " + e.getCause());
} finally {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
System.out.println();
}
// Callable tasks (return values)
public static void demonstrateCallableTasks() throws InterruptedException {
System.out.println("=== Callable Tasks ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// Submit individual callable
Future<Integer> future1 = executor.submit(() -> {
Thread.sleep(1000);
return 42;
});
// Submit multiple callables
List<Callable<String>> tasks = Arrays.asList(
() -> {
Thread.sleep(800);
return "Task 1 result";
},
() -> {
Thread.sleep(1200);
return "Task 2 result";
},
() -> {
Thread.sleep(600);
return "Task 3 result";
}
);
// Get result from single task
Integer result1 = future1.get();
System.out.println("Single task result: " + result1);
// Execute all tasks and wait for completion
List<Future<String>> futures = executor.invokeAll(tasks);
System.out.println("All tasks completed:");
for (int i = 0; i < futures.size(); i++) {
String result = futures.get(i).get();
System.out.println("Task " + (i + 1) + ": " + result);
}
} catch (ExecutionException e) {
System.err.println("Task execution failed: " + e.getCause());
} finally {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
System.out.println();
}
// Handling timeouts and cancellation
public static void demonstrateTimeoutsAndCancellation() {
System.out.println("=== Timeouts and Cancellation ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
// Long-running task
Future<String> longTask = executor.submit(() -> {
try {
Thread.sleep(5000); // 5 seconds
return "Long task completed";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "Long task interrupted";
}
});
// Try to get result with timeout
try {
String result = longTask.get(2, TimeUnit.SECONDS);
System.out.println("Result: " + result);
} catch (TimeoutException e) {
System.out.println("Task timed out - cancelling...");
boolean cancelled = longTask.cancel(true); // Interrupt if running
System.out.println("Cancellation successful: " + cancelled);
}
// Task that throws exception
Future<Integer> faultyTask = executor.submit(() -> {
Thread.sleep(500);
throw new RuntimeException("Simulated error");
});
try {
Integer result = faultyTask.get();
System.out.println("Faulty task result: " + result);
} catch (ExecutionException e) {
System.err.println("Task failed with exception: " + e.getCause().getMessage());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
System.out.println();
}
public static void main(String[] args) throws InterruptedException {
demonstrateRunnableTasks();
demonstrateCallableTasks();
demonstrateTimeoutsAndCancellation();
}
}
Custom Thread Pools
For specific requirements, you can create custom thread pools with ThreadPoolExecutor:
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CustomThreadPools {
public static void demonstrateThreadPoolExecutor() {
System.out.println("=== Custom ThreadPoolExecutor ===");
// Custom thread factory
ThreadFactory customThreadFactory = new ThreadFactory() {
private final AtomicInteger threadCount = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "CustomWorker-" + threadCount.getAndIncrement());
thread.setDaemon(false);
return thread;
}
};
// Custom rejection handler
RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("Task rejected: " + r.toString());
System.err.println("Executor state: active=" + executor.getActiveCount() +
", pool=" + executor.getPoolSize() +
", queue=" + executor.getQueue().size());
}
};
// Create custom thread pool
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // Core pool size
5, // Maximum pool size
60L, // Keep alive time
TimeUnit.SECONDS, // Time unit
new ArrayBlockingQueue<>(3), // Work queue (limited capacity)
customThreadFactory, // Thread factory
rejectionHandler // Rejection handler
);
try {
// Submit more tasks than the pool can handle
for (int i = 1; i <= 10; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("Task " + taskId + " executing on " +
Thread.currentThread().getName());
try {
Thread.sleep(2000); // Simulate work
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskId + " completed");
});
// Print executor state
System.out.println("Submitted task " + taskId +
" - Active: " + executor.getActiveCount() +
", Pool: " + executor.getPoolSize() +
", Queue: " + executor.getQueue().size());
} catch (RejectedExecutionException e) {
System.err.println("Task " + taskId + " rejected!");
}
Thread.sleep(300); // Stagger submissions
}
// Monitor executor statistics
Thread.sleep(1000);
printExecutorStats(executor);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
System.out.println();
}
public static void demonstrateScheduledThreadPool() {
System.out.println("=== Custom ScheduledThreadPoolExecutor ===");
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(
2, // Core pool size
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "Scheduler-" + count.getAndIncrement());
}
}
);
// Configure behavior
scheduler.setRemoveOnCancelPolicy(true);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
try {
// Schedule one-time task
scheduler.schedule(() -> {
System.out.println("One-time task executed at " + System.currentTimeMillis());
}, 1, TimeUnit.SECONDS);
// Schedule periodic task
ScheduledFuture<?> periodicTask = scheduler.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed at " + System.currentTimeMillis() +
" on " + Thread.currentThread().getName());
}, 2, 2, TimeUnit.SECONDS);
// Schedule task with fixed delay
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("Fixed delay task executed");
try {
Thread.sleep(1000); // Variable execution time
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 3, TimeUnit.SECONDS);
// Let tasks run for a while
Thread.sleep(8000);
// Cancel periodic task
periodicTask.cancel(false);
System.out.println("Periodic task cancelled");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
System.out.println();
}
private static void printExecutorStats(ThreadPoolExecutor executor) {
System.out.println("\n=== Executor Statistics ===");
System.out.println("Core pool size: " + executor.getCorePoolSize());
System.out.println("Maximum pool size: " + executor.getMaximumPoolSize());
System.out.println("Current pool size: " + executor.getPoolSize());
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Largest pool size: " + executor.getLargestPoolSize());
System.out.println("Task count: " + executor.getTaskCount());
System.out.println("Completed tasks: " + executor.getCompletedTaskCount());
System.out.println("Queue size: " + executor.getQueue().size());
System.out.println("Is shutdown: " + executor.isShutdown());
System.out.println("Is terminated: " + executor.isTerminated());
System.out.println();
}
public static void main(String[] args) {
demonstrateThreadPoolExecutor();
demonstrateScheduledThreadPool();
}
}
Best Practices and Common Patterns
import java.util.concurrent.*;
import java.util.*;
public class ExecutorBestPractices {
// Proper shutdown pattern
public static void demonstrateProperShutdown() {
System.out.println("=== Proper Shutdown Pattern ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
// Submit tasks
for (int i = 1; i <= 3; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " starting");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
System.out.println("Task " + taskId + " interrupted");
Thread.currentThread().interrupt();
return;
}
System.out.println("Task " + taskId + " completed");
});
}
} finally {
shutdownExecutorProperly(executor);
}
System.out.println();
}
private static void shutdownExecutorProperly(ExecutorService executor) {
// Step 1: Disable new tasks from being submitted
executor.shutdown();
try {
// Step 2: Wait for existing tasks to complete
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
// Step 3: Cancel currently executing tasks
System.out.println("Forcing shutdown...");
executor.shutdownNow();
// Step 4: Wait a while for tasks to respond to cancellation
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
}
} catch (InterruptedException e) {
// Re-cancel if current thread also interrupted
executor.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
// Exception handling in tasks
public static void demonstrateExceptionHandling() {
System.out.println("=== Exception Handling ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
try {
// Task that throws unchecked exception
Future<String> future1 = executor.submit(() -> {
Thread.sleep(500);
throw new RuntimeException("Task failed!");
});
// Task that completes successfully
Future<String> future2 = executor.submit(() -> {
Thread.sleep(1000);
return "Task succeeded";
});
// Handle results and exceptions
handleFuture(future1, "Task 1");
handleFuture(future2, "Task 2");
} finally {
shutdownExecutorProperly(executor);
}
System.out.println();
}
private static void handleFuture(Future<String> future, String taskName) {
try {
String result = future.get();
System.out.println(taskName + " result: " + result);
} catch (ExecutionException e) {
System.err.println(taskName + " failed: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println(taskName + " interrupted");
}
}
// Resource management pattern
public static void demonstrateResourceManagement() {
System.out.println("=== Resource Management Pattern ===");
// Use try-with-resources for automatic cleanup
try (ExecutorServiceWrapper wrapper = new ExecutorServiceWrapper(
Executors.newFixedThreadPool(2))) {
ExecutorService executor = wrapper.getExecutor();
// Submit tasks
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int value = i;
futures.add(executor.submit(() -> {
Thread.sleep(500);
return value * value;
}));
}
// Collect results
for (int i = 0; i < futures.size(); i++) {
try {
Integer result = futures.get(i).get();
System.out.println("Result " + (i + 1) + ": " + result);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Task " + (i + 1) + " failed: " + e.getMessage());
}
}
} // Executor automatically shutdown here
System.out.println();
}
// Wrapper class for automatic resource management
static class ExecutorServiceWrapper implements AutoCloseable {
private final ExecutorService executor;
public ExecutorServiceWrapper(ExecutorService executor) {
this.executor = executor;
}
public ExecutorService getExecutor() {
return executor;
}
@Override
public void close() {
System.out.println("Auto-closing executor...");
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// Performance considerations
public static void demonstratePerformanceConsiderations() {
System.out.println("=== Performance Considerations ===");
// Test different pool sizes
int numTasks = 20;
testPoolPerformance("Small Pool (2 threads)", 2, numTasks);
testPoolPerformance("Medium Pool (5 threads)", 5, numTasks);
testPoolPerformance("Large Pool (10 threads)", 10, numTasks);
testPoolPerformance("Cached Pool", -1, numTasks); // -1 for cached pool
System.out.println();
}
private static void testPoolPerformance(String poolType, int poolSize, int numTasks) {
ExecutorService executor = (poolSize == -1) ?
Executors.newCachedThreadPool() :
Executors.newFixedThreadPool(poolSize);
long startTime = System.currentTimeMillis();
try {
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < numTasks; i++) {
futures.add(executor.submit(() -> {
// Simulate I/O-bound task
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}));
}
// Wait for all tasks to complete
for (Future<Void> future : futures) {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
// Ignore for performance test
}
}
} finally {
shutdownExecutorProperly(executor);
}
long endTime = System.currentTimeMillis();
System.out.println(poolType + ": " + (endTime - startTime) + " ms");
}
public static void main(String[] args) {
demonstrateProperShutdown();
demonstrateExceptionHandling();
demonstrateResourceManagement();
demonstratePerformanceConsiderations();
}
}
Summary
ExecutorService provides powerful thread pool management:
Key Benefits:
- Thread Reuse: Reduces overhead of thread creation
- Resource Control: Limits concurrent thread count
- Task Management: Built-in queuing and scheduling
- Future Results: Easy asynchronous result handling
- Lifecycle Management: Controlled shutdown procedures
Executor Types:
- FixedThreadPool: Fixed number of threads, good for CPU-bound tasks
- CachedThreadPool: Creates threads as needed, good for I/O-bound tasks
- SingleThreadExecutor: Sequential execution, good for ordering
- ScheduledThreadPool: Supports delayed and periodic execution
- WorkStealingPool: Fork-join pool for recursive tasks
Best Practices:
- Always Shutdown: Use proper shutdown patterns
- Handle Exceptions: Check Future results for exceptions
- Size Appropriately: Match pool size to workload characteristics
- Monitor Performance: Track executor statistics
- Use Try-With-Resources: For automatic cleanup
- Avoid Blocking: Don't block threads waiting for other tasks
Common Pitfalls:
- Forgetting Shutdown: Prevents JVM termination
- Inappropriate Pool Size: Too small (underutilization) or too large (resource waste)
- Ignoring Exceptions: Silent failures in submitted tasks
- Blocking Tasks: Deadlocks when tasks wait for each other
ExecutorService is essential for building scalable, efficient concurrent applications in Java.