1. java
  2. /concurrency
  3. /completable-future

Master Java CompletableFuture and Asynchronous Programming

Java CompletableFuture

CompletableFuture is Java's implementation of a composable, asynchronous programming model introduced in Java 8. It represents a future result of an asynchronous computation and provides a rich API for building complex asynchronous workflows. Unlike traditional Future, CompletableFuture can be completed manually and supports composition, combination, and error handling.

Why CompletableFuture?

Traditional approaches to asynchronous programming have limitations that CompletableFuture addresses:

Problems with Traditional Approaches:

  • Callback Hell: Nested callbacks become hard to read and maintain
  • Limited Composition: Difficult to combine multiple async operations
  • Error Handling: Complex exception propagation across async calls
  • Blocking Operations: Future.get() blocks the calling thread
  • Manual Thread Management: Explicit thread coordination

CompletableFuture Benefits:

  • Non-blocking: Operations don't block the calling thread
  • Composable: Chain and combine operations easily
  • Functional Style: Supports lambda expressions and method references
  • Error Handling: Built-in exception handling and recovery
  • Flexible Completion: Can be completed manually or automatically
import java.util.concurrent.*;
import java.util.function.*;

public class CompletableFutureBasics {
    
    // Traditional blocking approach
    public static void traditionalApproach() {
        System.out.println("=== Traditional Blocking Approach ===");
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        try {
            // Submit tasks and block waiting for results
            Future<String> future1 = executor.submit(() -> {
                Thread.sleep(1000);
                return "Result 1";
            });
            
            Future<String> future2 = executor.submit(() -> {
                Thread.sleep(1500);
                return "Result 2";
            });
            
            // Blocking calls - main thread waits
            System.out.println("Waiting for results...");
            String result1 = future1.get(); // Blocks
            String result2 = future2.get(); // Blocks
            
            System.out.println("Combined: " + result1 + " + " + result2);
            
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
        
        System.out.println();
    }
    
    // CompletableFuture approach
    public static void completableFutureApproach() {
        System.out.println("=== CompletableFuture Non-blocking Approach ===");
        
        // Create async operations
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Result 1";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Result 2";
        });
        
        // Combine results asynchronously
        CompletableFuture<String> combined = future1.thenCombine(future2, (r1, r2) -> {
            System.out.println("Combining results asynchronously");
            return "Combined: " + r1 + " + " + r2;
        });
        
        // Non-blocking result handling
        combined.thenAccept(result -> {
            System.out.println("Final result: " + result);
        });
        
        System.out.println("Main thread continues immediately");
        
        // Wait for completion (only for demo purposes)
        try {
            combined.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
        
        System.out.println();
    }
    
    public static void main(String[] args) {
        traditionalApproach();
        completableFutureApproach();
    }
}

Creating CompletableFuture

Multiple ways to create and complete CompletableFuture instances:

import java.util.concurrent.*;
import java.util.Random;

public class CreatingCompletableFuture {
    
    private static final Random random = new Random();
    
    public static void demonstrateCreationMethods() {
        System.out.println("=== CompletableFuture Creation Methods ===");
        
        // 1. Already completed future
        CompletableFuture<String> completed = CompletableFuture.completedFuture("Already done");
        completed.thenAccept(result -> System.out.println("Completed: " + result));
        
        // 2. Manual completion
        CompletableFuture<String> manual = new CompletableFuture<>();
        // Complete it later
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(500);
                manual.complete("Manually completed");
            } catch (InterruptedException e) {
                manual.completeExceptionally(e);
            }
        });
        manual.thenAccept(result -> System.out.println("Manual: " + result));
        
        // 3. SupplyAsync - returns a value
        CompletableFuture<Integer> supplied = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return random.nextInt(100);
        });
        supplied.thenAccept(result -> System.out.println("Supplied: " + result));
        
        // 4. RunAsync - no return value
        CompletableFuture<Void> ran = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(800);
                System.out.println("RunAsync completed on " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 5. With custom executor
        ExecutorService customExecutor = Executors.newFixedThreadPool(2);
        CompletableFuture<String> withExecutor = CompletableFuture.supplyAsync(() -> {
            System.out.println("Custom executor: " + Thread.currentThread().getName());
            return "Custom executor result";
        }, customExecutor);
        withExecutor.thenAccept(result -> System.out.println("Custom: " + result));
        
        // Wait for all to complete
        CompletableFuture.allOf(completed, manual, supplied, ran, withExecutor)
                        .join(); // Block until all complete
        
        customExecutor.shutdown();
        System.out.println();
    }
    
    public static void demonstrateCompletion() {
        System.out.println("=== Manual Completion ===");
        
        CompletableFuture<String> future = new CompletableFuture<>();
        
        // Complete with different outcomes
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
                
                if (random.nextBoolean()) {
                    future.complete("Success!");
                } else {
                    future.completeExceptionally(new RuntimeException("Failed!"));
                }
            } catch (InterruptedException e) {
                future.completeExceptionally(e);
            }
        });
        
        // Handle both success and failure
        future.whenComplete((result, throwable) -> {
            if (throwable != null) {
                System.out.println("Completed exceptionally: " + throwable.getMessage());
            } else {
                System.out.println("Completed successfully: " + result);
            }
        });
        
        // Alternative completion
        CompletableFuture<String> alternative = new CompletableFuture<>();
        
        // Complete with timeout
        CompletableFuture.delayedExecutor(500, TimeUnit.MILLISECONDS).execute(() -> {
            alternative.complete("Timeout completion");
        });
        
        alternative.thenAccept(result -> System.out.println("Alternative: " + result));
        
        // Wait for completion
        try {
            future.get(2, TimeUnit.SECONDS);
            alternative.get(1, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.err.println("Completion error: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    public static void main(String[] args) {
        demonstrateCreationMethods();
        demonstrateCompletion();
    }
}

Transforming and Chaining Operations

CompletableFuture supports powerful transformation and chaining operations:

import java.util.concurrent.*;
import java.util.function.*;

public class TransformationAndChaining {
    
    public static void demonstrateTransformations() {
        System.out.println("=== Transformations ===");
        
        // thenApply - transforms the result
        CompletableFuture<String> original = CompletableFuture.supplyAsync(() -> {
            System.out.println("Original computation on " + Thread.currentThread().getName());
            return "hello";
        });
        
        CompletableFuture<String> transformed = original.thenApply(s -> {
            System.out.println("Transform 1 on " + Thread.currentThread().getName());
            return s.toUpperCase();
        }).thenApply(s -> {
            System.out.println("Transform 2 on " + Thread.currentThread().getName());
            return s + " WORLD";
        });
        
        transformed.thenAccept(result -> System.out.println("Final result: " + result));
        
        // thenCompose - flattens nested CompletableFutures
        CompletableFuture<String> composed = original.thenCompose(s -> {
            return CompletableFuture.supplyAsync(() -> {
                System.out.println("Composed operation on " + Thread.currentThread().getName());
                return s + " composed";
            });
        });
        
        composed.thenAccept(result -> System.out.println("Composed result: " + result));
        
        // Wait for completion
        CompletableFuture.allOf(transformed, composed).join();
        System.out.println();
    }
    
    public static void demonstrateAsyncVariants() {
        System.out.println("=== Async Variants ===");
        
        CompletableFuture<String> source = CompletableFuture.supplyAsync(() -> {
            System.out.println("Source on " + Thread.currentThread().getName());
            return "data";
        });
        
        // thenApply vs thenApplyAsync
        CompletableFuture<String> sync = source.thenApply(s -> {
            System.out.println("Sync transform on " + Thread.currentThread().getName());
            return s + "-sync";
        });
        
        CompletableFuture<String> async = source.thenApplyAsync(s -> {
            System.out.println("Async transform on " + Thread.currentThread().getName());
            return s + "-async";
        });
        
        // Custom executor for async operations
        ExecutorService customExecutor = Executors.newSingleThreadExecutor(r -> {
            Thread t = new Thread(r, "CustomThread");
            t.setDaemon(true);
            return t;
        });
        
        CompletableFuture<String> customAsync = source.thenApplyAsync(s -> {
            System.out.println("Custom async on " + Thread.currentThread().getName());
            return s + "-custom";
        }, customExecutor);
        
        // Collect all results
        CompletableFuture.allOf(sync, async, customAsync)
                        .thenRun(() -> {
                            System.out.println("All transformations completed");
                        }).join();
        
        customExecutor.shutdown();
        System.out.println();
    }
    
    public static void demonstrateChaining() {
        System.out.println("=== Complex Chaining ===");
        
        // Simulate a multi-step process: fetch user -> fetch preferences -> apply settings
        CompletableFuture<String> process = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Step 1: Fetching user ID");
                sleep(500);
                return "user123";
            })
            .thenCompose(userId -> {
                System.out.println("Step 2: Fetching user data for " + userId);
                return CompletableFuture.supplyAsync(() -> {
                    sleep(700);
                    return new User(userId, "John Doe", "[email protected]");
                });
            })
            .thenCompose(user -> {
                System.out.println("Step 3: Fetching preferences for " + user.name);
                return CompletableFuture.supplyAsync(() -> {
                    sleep(400);
                    return new Preferences("dark", "english", true);
                }).thenApply(prefs -> new UserWithPreferences(user, prefs));
            })
            .thenApply(userWithPrefs -> {
                System.out.println("Step 4: Applying settings");
                sleep(200);
                return "Settings applied for " + userWithPrefs.user.name + 
                       " with theme: " + userWithPrefs.preferences.theme;
            });
        
        process.thenAccept(result -> System.out.println("Process completed: " + result));
        
        // Handle the result
        try {
            String result = process.get(3, TimeUnit.SECONDS);
            System.out.println("Final outcome: " + result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.err.println("Process failed: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    // Helper classes
    static class User {
        final String id, name, email;
        User(String id, String name, String email) {
            this.id = id; this.name = name; this.email = email;
        }
    }
    
    static class Preferences {
        final String theme, language;
        final boolean notifications;
        Preferences(String theme, String language, boolean notifications) {
            this.theme = theme; this.language = language; this.notifications = notifications;
        }
    }
    
    static class UserWithPreferences {
        final User user;
        final Preferences preferences;
        UserWithPreferences(User user, Preferences preferences) {
            this.user = user; this.preferences = preferences;
        }
    }
    
    private static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) {
        demonstrateTransformations();
        demonstrateAsyncVariants();
        demonstrateChaining();
    }
}

Combining Multiple Futures

CompletableFuture provides powerful methods to combine multiple async operations:

import java.util.concurrent.*;
import java.util.*;
import java.util.stream.Collectors;

public class CombiningFutures {
    
    public static void demonstrateThenCombine() {
        System.out.println("=== Combining Two Futures ===");
        
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "Hello";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(1500);
            return "World";
        });
        
        // Combine when both complete
        CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
            System.out.println("Combining: " + s1 + " + " + s2);
            return s1 + " " + s2;
        });
        
        combined.thenAccept(result -> System.out.println("Combined result: " + result));
        
        // Alternative: combine with side effects
        CompletableFuture<Void> bothComplete = future1.thenAcceptBoth(future2, (s1, s2) -> {
            System.out.println("Both completed: " + s1 + " and " + s2);
        });
        
        CompletableFuture.allOf(combined, bothComplete).join();
        System.out.println();
    }
    
    public static void demonstrateAnyOf() {
        System.out.println("=== First to Complete ===");
        
        CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {
            sleep(2000);
            return "Slow result";
        });
        
        CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            return "Fast result";
        });
        
        CompletableFuture<String> medium = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "Medium result";
        });
        
        // Get result from first completed
        CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(slow, fast, medium);
        
        firstCompleted.thenAccept(result -> {
            System.out.println("First completed: " + result);
        });
        
        // Wait for first completion, but let others continue
        try {
            Object result = firstCompleted.get(1, TimeUnit.SECONDS);
            System.out.println("Winner: " + result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.err.println("No completion within timeout");
        }
        
        System.out.println();
    }
    
    public static void demonstrateAllOf() {
        System.out.println("=== Wait for All ===");
        
        // Create multiple independent tasks
        List<CompletableFuture<String>> futures = Arrays.asList(
            CompletableFuture.supplyAsync(() -> { sleep(800); return "Task 1"; }),
            CompletableFuture.supplyAsync(() -> { sleep(1200); return "Task 2"; }),
            CompletableFuture.supplyAsync(() -> { sleep(600); return "Task 3"; }),
            CompletableFuture.supplyAsync(() -> { sleep(1000); return "Task 4"; })
        );
        
        // Wait for all to complete and collect results
        CompletableFuture<List<String>> allResults = CompletableFuture
            .allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                                 .map(CompletableFuture::join)
                                 .collect(Collectors.toList()));
        
        allResults.thenAccept(results -> {
            System.out.println("All completed:");
            results.forEach(result -> System.out.println("  " + result));
        });
        
        // Alternative: process results as they complete
        System.out.println("Processing as they complete:");
        futures.forEach(future -> {
            future.thenAccept(result -> {
                System.out.println("  Completed: " + result + " at " + System.currentTimeMillis());
            });
        });
        
        allResults.join(); // Wait for all
        System.out.println();
    }
    
    public static void demonstrateComplexCombination() {
        System.out.println("=== Complex Combination Pattern ===");
        
        // Simulate microservices calls
        CompletableFuture<String> userService = CompletableFuture.supplyAsync(() -> {
            sleep(500);
            return "UserData";
        });
        
        CompletableFuture<String> authService = CompletableFuture.supplyAsync(() -> {
            sleep(700);
            return "AuthToken";
        });
        
        CompletableFuture<String> settingsService = CompletableFuture.supplyAsync(() -> {
            sleep(400);
            return "Settings";
        });
        
        // Combine user and auth first, then add settings
        CompletableFuture<String> userAuth = userService.thenCombine(authService, (user, auth) -> {
            System.out.println("User authenticated");
            return user + "+" + auth;
        });
        
        CompletableFuture<String> complete = userAuth.thenCombine(settingsService, (userAuth2, settings) -> {
            System.out.println("All services combined");
            return userAuth2 + "+" + settings;
        });
        
        // Add timeout handling
        CompletableFuture<String> withTimeout = complete.orTimeout(2, TimeUnit.SECONDS);
        
        withTimeout.thenAccept(result -> {
            System.out.println("Services combined: " + result);
        }).exceptionally(throwable -> {
            System.err.println("Service combination failed: " + throwable.getMessage());
            return null;
        });
        
        try {
            withTimeout.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.err.println("Overall timeout: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    private static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) {
        demonstrateThenCombine();
        demonstrateAnyOf();
        demonstrateAllOf();
        demonstrateComplexCombination();
    }
}

Error Handling and Recovery

Robust error handling is crucial for asynchronous applications:

import java.util.concurrent.*;
import java.util.function.Function;

public class ErrorHandlingAndRecovery {
    
    public static void demonstrateBasicErrorHandling() {
        System.out.println("=== Basic Error Handling ===");
        
        // Future that might fail
        CompletableFuture<String> risky = CompletableFuture.supplyAsync(() -> {
            if (Math.random() < 0.5) {
                throw new RuntimeException("Random failure!");
            }
            return "Success!";
        });
        
        // Handle success and failure
        risky.whenComplete((result, throwable) -> {
            if (throwable != null) {
                System.out.println("Operation failed: " + throwable.getMessage());
            } else {
                System.out.println("Operation succeeded: " + result);
            }
        });
        
        // Alternative: separate handlers
        CompletableFuture<String> handled = risky
            .thenApply(result -> {
                System.out.println("Processing success: " + result);
                return result.toUpperCase();
            })
            .exceptionally(throwable -> {
                System.out.println("Handling exception: " + throwable.getMessage());
                return "RECOVERED";
            });
        
        try {
            String result = handled.get(2, TimeUnit.SECONDS);
            System.out.println("Final result: " + result);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.err.println("Unexpected error: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    public static void demonstrateRecoveryStrategies() {
        System.out.println("=== Recovery Strategies ===");
        
        // Strategy 1: Provide default value
        CompletableFuture<String> withDefault = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Service unavailable");
            })
            .exceptionally(throwable -> {
                System.out.println("Using default value due to: " + throwable.getMessage());
                return "DEFAULT_VALUE";
            });
        
        // Strategy 2: Retry operation
        CompletableFuture<String> withRetry = createRetryableFuture()
            .exceptionally(throwable -> {
                System.out.println("Retry failed, using fallback");
                return "FALLBACK";
            });
        
        // Strategy 3: Alternative computation
        CompletableFuture<String> withAlternative = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Primary failed");
            })
            .handle((result, throwable) -> {
                if (throwable != null) {
                    System.out.println("Primary failed, trying alternative");
                    return CompletableFuture.supplyAsync(() -> {
                        sleep(500);
                        return "ALTERNATIVE_RESULT";
                    });
                } else {
                    return CompletableFuture.completedFuture(result);
                }
            })
            .thenCompose(Function.identity());
        
        // Collect all results
        CompletableFuture.allOf(withDefault, withRetry, withAlternative)
            .thenRun(() -> {
                try {
                    System.out.println("Default strategy: " + withDefault.get());
                    System.out.println("Retry strategy: " + withRetry.get());
                    System.out.println("Alternative strategy: " + withAlternative.get());
                } catch (InterruptedException | ExecutionException e) {
                    System.err.println("Error collecting results: " + e.getMessage());
                }
            }).join();
        
        System.out.println();
    }
    
    private static CompletableFuture<String> createRetryableFuture() {
        return retryOperation(() -> {
            if (Math.random() < 0.8) { // 80% chance of failure
                throw new RuntimeException("Operation failed");
            }
            return "SUCCESS";
        }, 3);
    }
    
    private static CompletableFuture<String> retryOperation(Supplier<String> operation, int maxRetries) {
        CompletableFuture<String> future = new CompletableFuture<>();
        attemptOperation(operation, maxRetries, 0, future);
        return future;
    }
    
    private static void attemptOperation(Supplier<String> operation, int maxRetries, int attempt, 
                                       CompletableFuture<String> future) {
        CompletableFuture.supplyAsync(operation)
            .whenComplete((result, throwable) -> {
                if (throwable == null) {
                    future.complete(result);
                } else if (attempt < maxRetries - 1) {
                    System.out.println("Attempt " + (attempt + 1) + " failed, retrying...");
                    // Exponential backoff
                    CompletableFuture.delayedExecutor(100 * (1L << attempt), TimeUnit.MILLISECONDS)
                        .execute(() -> attemptOperation(operation, maxRetries, attempt + 1, future));
                } else {
                    System.out.println("All " + maxRetries + " attempts failed");
                    future.completeExceptionally(throwable);
                }
            });
    }
    
    public static void demonstrateChainedErrorHandling() {
        System.out.println("=== Chained Error Handling ===");
        
        CompletableFuture<String> chain = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Step 1: Initial computation");
                return "INITIAL";
            })
            .thenApply(s -> {
                System.out.println("Step 2: First transformation");
                if (s.length() > 5) {
                    throw new RuntimeException("String too long!");
                }
                return s.toLowerCase();
            })
            .thenApply(s -> {
                System.out.println("Step 3: Second transformation");
                return s + "_processed";
            })
            .exceptionally(throwable -> {
                System.out.println("Error in chain: " + throwable.getMessage());
                return "error_recovered";
            })
            .thenApply(s -> {
                System.out.println("Step 4: Final transformation");
                return s.toUpperCase();
            });
        
        chain.thenAccept(result -> System.out.println("Chain result: " + result));
        
        // Handle specific exception types
        CompletableFuture<String> typed = CompletableFuture
            .supplyAsync(() -> {
                throw new IllegalArgumentException("Invalid input");
            })
            .handle((result, throwable) -> {
                if (throwable instanceof CompletionException) {
                    Throwable cause = throwable.getCause();
                    if (cause instanceof IllegalArgumentException) {
                        System.out.println("Handling IllegalArgumentException");
                        return "INVALID_INPUT_HANDLED";
                    }
                }
                if (throwable != null) {
                    System.out.println("Handling other exception: " + throwable.getClass().getSimpleName());
                    return "UNKNOWN_ERROR_HANDLED";
                }
                return result;
            });
        
        typed.thenAccept(result -> System.out.println("Typed handling result: " + result));
        
        CompletableFuture.allOf(chain, typed).join();
        System.out.println();
    }
    
    private static void sleep(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) {
        demonstrateBasicErrorHandling();
        demonstrateRecoveryStrategies();
        demonstrateChainedErrorHandling();
    }
}

Best Practices and Patterns

import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureBestPractices {
    
    // Use custom executor for CPU-intensive tasks
    private static final ExecutorService CPU_EXECUTOR = 
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
    // Use different executor for I/O tasks
    private static final ExecutorService IO_EXECUTOR = 
        Executors.newCachedThreadPool();
    
    public static void demonstrateExecutorUsage() {
        System.out.println("=== Proper Executor Usage ===");
        
        // CPU-intensive task
        CompletableFuture<Long> cpuTask = CompletableFuture.supplyAsync(() -> {
            System.out.println("CPU task on: " + Thread.currentThread().getName());
            long sum = 0;
            for (int i = 0; i < 1_000_000; i++) {
                sum += i;
            }
            return sum;
        }, CPU_EXECUTOR);
        
        // I/O task
        CompletableFuture<String> ioTask = CompletableFuture.supplyAsync(() -> {
            System.out.println("I/O task on: " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000); // Simulate I/O
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "I/O completed";
        }, IO_EXECUTOR);
        
        CompletableFuture.allOf(cpuTask, ioTask)
            .thenRun(() -> System.out.println("Both tasks completed"))
            .join();
        
        System.out.println();
    }
    
    public static void demonstrateTimeoutHandling() {
        System.out.println("=== Timeout Handling ===");
        
        // CompletableFuture with timeout (Java 9+)
        CompletableFuture<String> withTimeout = CompletableFuture
            .supplyAsync(() -> {
                try {
                    Thread.sleep(2000); // Longer than timeout
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Completed";
            })
            .orTimeout(1, TimeUnit.SECONDS)
            .exceptionally(throwable -> {
                if (throwable instanceof TimeoutException) {
                    return "TIMEOUT";
                }
                return "ERROR: " + throwable.getMessage();
            });
        
        withTimeout.thenAccept(result -> System.out.println("Timeout result: " + result));
        
        // Custom timeout implementation
        CompletableFuture<String> customTimeout = createWithTimeout(
            () -> {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Long operation";
            },
            1500, TimeUnit.MILLISECONDS
        );
        
        customTimeout.thenAccept(result -> System.out.println("Custom timeout: " + result));
        
        CompletableFuture.allOf(withTimeout, customTimeout).join();
        System.out.println();
    }
    
    private static <T> CompletableFuture<T> createWithTimeout(Supplier<T> supplier, 
                                                             long timeout, TimeUnit unit) {
        CompletableFuture<T> future = CompletableFuture.supplyAsync(supplier);
        
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.schedule(() -> {
            future.completeExceptionally(new TimeoutException("Operation timed out"));
        }, timeout, unit);
        
        future.whenComplete((result, throwable) -> scheduler.shutdown());
        
        return future;
    }
    
    public static void demonstrateResourceManagement() {
        System.out.println("=== Resource Management ===");
        
        // Using try-with-resources pattern
        try (ExecutorService executor = Executors.newFixedThreadPool(2)) {
            
            CompletableFuture<String> task = CompletableFuture
                .supplyAsync(() -> {
                    return "Resource managed task";
                }, executor)
                .thenApply(result -> {
                    System.out.println("Processing: " + result);
                    return result.toUpperCase();
                });
            
            String result = task.get(2, TimeUnit.SECONDS);
            System.out.println("Result: " + result);
            
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.err.println("Task failed: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    public static void demonstrateCommonPitfalls() {
        System.out.println("=== Common Pitfalls and Solutions ===");
        
        // Pitfall 1: Blocking in completion callbacks
        System.out.println("Pitfall 1: Blocking in callbacks");
        CompletableFuture<String> badExample = CompletableFuture.supplyAsync(() -> "data");
        
        // BAD: Blocking in callback
        badExample.thenAccept(result -> {
            System.out.println("BAD: This blocks the completion thread");
            try {
                Thread.sleep(1000); // DON'T DO THIS
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // GOOD: Use async variant for blocking operations
        badExample.thenAcceptAsync(result -> {
            System.out.println("GOOD: This uses a separate thread");
            try {
                Thread.sleep(1000); // OK in async variant
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // Pitfall 2: Exception swallowing
        System.out.println("Pitfall 2: Exception handling");
        
        CompletableFuture<String> exceptionExample = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("Something went wrong");
            })
            .thenApply(result -> {
                // This won't be called due to exception
                return result.toUpperCase();
            })
            .exceptionally(throwable -> {
                System.out.println("GOOD: Exception handled properly");
                return "RECOVERED";
            });
        
        exceptionExample.thenAccept(result -> System.out.println("Final: " + result));
        
        // Pitfall 3: Not waiting for completion
        System.out.println("Pitfall 3: Completion waiting");
        
        // BAD: Might exit before completion
        CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Background task";
        }).thenAccept(result -> System.out.println("Background: " + result));
        
        // GOOD: Properly wait for completion
        CompletableFuture<Void> properWait = CompletableFuture
            .supplyAsync(() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "Waited task";
            })
            .thenAccept(result -> System.out.println("Waited: " + result));
        
        properWait.join(); // Ensure completion
        
        System.out.println();
    }
    
    public static void cleanup() {
        CPU_EXECUTOR.shutdown();
        IO_EXECUTOR.shutdown();
        try {
            if (!CPU_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
                CPU_EXECUTOR.shutdownNow();
            }
            if (!IO_EXECUTOR.awaitTermination(5, TimeUnit.SECONDS)) {
                IO_EXECUTOR.shutdownNow();
            }
        } catch (InterruptedException e) {
            CPU_EXECUTOR.shutdownNow();
            IO_EXECUTOR.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) {
        try {
            demonstrateExecutorUsage();
            demonstrateTimeoutHandling();
            demonstrateResourceManagement();
            demonstrateCommonPitfalls();
        } finally {
            cleanup();
        }
    }
}

Summary

CompletableFuture enables powerful asynchronous programming:

Key Features:

  • Non-blocking: Operations don't block calling threads
  • Composable: Chain and combine operations fluently
  • Error Handling: Built-in exception handling and recovery
  • Flexible Completion: Manual or automatic completion
  • Timeout Support: Built-in timeout capabilities

Core Operations:

  • Creation: supplyAsync(), runAsync(), completedFuture()
  • Transformation: thenApply(), thenCompose(), thenAccept()
  • Combination: thenCombine(), allOf(), anyOf()
  • Error Handling: exceptionally(), handle(), whenComplete()

Best Practices:

  • Use Appropriate Executors: Separate pools for CPU vs I/O tasks
  • Handle Exceptions: Always provide error handling paths
  • Avoid Blocking: Use async variants for blocking operations
  • Manage Resources: Properly shutdown custom executors
  • Use Timeouts: Prevent indefinite waiting
  • Chain Efficiently: Prefer composition over nested callbacks

Common Patterns:

  • Pipeline Processing: Chain transformations with thenApply/thenCompose
  • Parallel Execution: Use allOf() for independent tasks
  • Fallback Strategies: Use exceptionally() for error recovery
  • Timeout Handling: Use orTimeout() or custom timeout logic

CompletableFuture is essential for building responsive, scalable asynchronous applications in modern Java.