completable future nice example
I understand your requirement. This is a common pattern for making parallel API calls using CompletableFuture
. Here’s a comprehensive example:
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
// Domain classes
class Account {
private String accountId;
private String accountName;
private RegistrationDetails registrationDetails;
// other fields...
// getters and setters
public String getAccountId() { return accountId; }
public RegistrationDetails getRegistrationDetails() { return registrationDetails; }
}
class RegistrationDetails {
private String registrationId;
private Date registrationDate;
private String status;
// other fields...
}
// Service class
@Service
public class AccountService {
@Autowired
private AccountApiClient accountApiClient; // Your API client
public Map<String, CompletableFuture<Account>> createAccountFuturesMap(List<String> accountIds) {
// Filter accountIds based on your criteria
List<String> filteredAccountIds = accountIds.stream()
.filter(this::shouldProcessAccount) // Your filter logic
.collect(Collectors.toList());
// Create map with CompletableFutures
Map<String, CompletableFuture<Account>> accountFuturesMap = new ConcurrentHashMap<>();
for (String accountId : filteredAccountIds) {
CompletableFuture<Account> accountFuture = CompletableFuture
.supplyAsync(() -> {
// API call to fetch account details
return accountApiClient.getAccountById(accountId);
})
.exceptionally(throwable -> {
// Handle exceptions
log.error("Error fetching account for ID: " + accountId, throwable);
return null; // or return a default Account object
});
accountFuturesMap.put(accountId, accountFuture);
}
return accountFuturesMap;
}
// Process the futures map
public void processAccounts(Map<String, CompletableFuture<Account>> accountFuturesMap) {
// Wait for all futures to complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
accountFuturesMap.values().toArray(new CompletableFuture[0])
);
// Process results when all complete
allFutures.thenRun(() -> {
accountFuturesMap.forEach((accountId, future) -> {
try {
Account account = future.join(); // This will not block as future is already complete
if (account != null) {
RegistrationDetails regDetails = account.getRegistrationDetails();
// Process registration details
processRegistrationDetails(accountId, regDetails);
}
} catch (Exception e) {
log.error("Error processing account: " + accountId, e);
}
});
}).join(); // Wait for all processing to complete
}
private boolean shouldProcessAccount(String accountId) {
// Your filter logic here
return true; // example
}
private void processRegistrationDetails(String accountId, RegistrationDetails regDetails) {
// Your processing logic here
log.info("Processing registration for account: " + accountId);
}
}
Here’s an enhanced version with more features:
@Service
public class AccountProcessingService {
private static final int MAX_CONCURRENT_REQUESTS = 10;
private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_CONCURRENT_REQUESTS);
@Autowired
private AccountApiClient accountApiClient;
// Main method that orchestrates the process
public Map<String, RegistrationDetails> fetchAndProcessAccounts(List<String> accountIds) {
// Step 1: Create futures map
Map<String, CompletableFuture<Account>> futuresMap = createAccountFuturesMap(accountIds);
// Step 2: Process and extract registration details
return extractRegistrationDetails(futuresMap);
}
// Create map with CompletableFutures with rate limiting and timeout
private Map<String, CompletableFuture<Account>> createAccountFuturesMap(List<String> accountIds) {
Map<String, CompletableFuture<Account>> futuresMap = new ConcurrentHashMap<>();
// Filter and create futures with custom executor
accountIds.stream()
.filter(this::isValidAccountId)
.forEach(accountId -> {
CompletableFuture<Account> future = CompletableFuture
.supplyAsync(() -> fetchAccountWithRetry(accountId), executorService)
.orTimeout(30, TimeUnit.SECONDS) // Timeout after 30 seconds
.handle((account, throwable) -> {
if (throwable != null) {
log.error("Failed to fetch account: " + accountId, throwable);
// Return empty account or handle as needed
return createEmptyAccount(accountId);
}
return account;
});
futuresMap.put(accountId, future);
});
return futuresMap;
}
// Fetch account with retry logic
private Account fetchAccountWithRetry(String accountId) {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
return accountApiClient.getAccountById(accountId);
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
throw new RuntimeException("Failed to fetch account after " + maxRetries + " attempts", e);
}
try {
Thread.sleep(1000 * retryCount); // Exponential backoff
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", ie);
}
}
}
return null;
}
// Extract registration details from completed futures
private Map<String, RegistrationDetails> extractRegistrationDetails(
Map<String, CompletableFuture<Account>> futuresMap) {
Map<String, RegistrationDetails> registrationMap = new ConcurrentHashMap<>();
// Create a combined future that completes when all account futures complete
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futuresMap.values().toArray(new CompletableFuture[0])
);
// Wait for all to complete and then process
try {
allFutures.get(60, TimeUnit.SECONDS); // Overall timeout
// Now all futures are complete, extract registration details
futuresMap.forEach((accountId, future) -> {
try {
Account account = future.getNow(null); // Won't block as future is complete
if (account != null && account.getRegistrationDetails() != null) {
registrationMap.put(accountId, account.getRegistrationDetails());
}
} catch (Exception e) {
log.error("Error extracting registration details for account: " + accountId, e);
}
});
} catch (Exception e) {
log.error("Error waiting for all futures to complete", e);
}
return registrationMap;
}
// Alternative approach - process as they complete
public void processAccountsAsTheyComplete(List<String> accountIds) {
Map<String, CompletableFuture<Account>> futuresMap = createAccountFuturesMap(accountIds);
futuresMap.forEach((accountId, future) -> {
future.thenAccept(account -> {
if (account != null) {
// Process immediately when each account is fetched
processAccount(account);
}
});
});
}
// Batch processing with CompletableFuture
public List<RegistrationDetails> batchProcessAccounts(List<String> accountIds) {
List<CompletableFuture<RegistrationDetails>> futures = accountIds.stream()
.map(accountId ->
CompletableFuture
.supplyAsync(() -> accountApiClient.getAccountById(accountId), executorService)
.thenApply(account -> account.getRegistrationDetails())
.exceptionally(ex -> {
log.error("Error processing account: " + accountId, ex);
return null;
})
)
.collect(Collectors.toList());
// Wait for all and collect results
return futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private boolean isValidAccountId(String accountId) {
return accountId != null && !accountId.trim().isEmpty();
}
private Account createEmptyAccount(String accountId) {
Account account = new Account();
account.setAccountId(accountId);
return account;
}
private void processAccount(Account account) {
// Your processing logic
log.info("Processing account: " + account.getAccountId());
}
@PreDestroy
public void cleanup() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
}
Usage example:
@RestController
@RequestMapping("/accounts")
public class AccountController {
@Autowired
private AccountProcessingService accountProcessingService;
@PostMapping("/process")
public ResponseEntity<Map<String, RegistrationDetails>> processAccounts(
@RequestBody List<String> accountIds) {
try {
Map<String, RegistrationDetails> results =
accountProcessingService.fetchAndProcessAccounts(accountIds);
return ResponseEntity.ok(results);
} catch (Exception e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
}
}
}
Key benefits of this approach:
- Parallel Processing: All API calls happen in parallel, significantly reducing total processing time
- Non-blocking: Using CompletableFuture ensures the main thread isn’t blocked
- Error Handling: Each future has exception handling to prevent one failure from affecting others
- Timeout Support: Prevents hanging on slow API calls
- Retry Logic: Handles transient failures
- Thread Pool Management: Controls the number of concurrent API calls
This pattern is commonly used in microservices architectures where you need to aggregate data from multiple sources efficiently.