order service 9
Story 9: Add Asynchronous Tasks with CompletableFuture
// AsyncOrderProcessor.java
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Map;
import java.util.HashMap;
public class AsyncOrderProcessor {
private final ConcurrentHashMap<String, AtomicInteger> inventory;
private final ExecutorService asyncExecutor;
private final PaymentService paymentService;
private final NotificationService notificationService;
private final ShippingService shippingService;
public AsyncOrderProcessor() {
inventory = new ConcurrentHashMap<>();
inventory.put("Laptop", new AtomicInteger(100));
inventory.put("Mouse", new AtomicInteger(200));
inventory.put("Keyboard", new AtomicInteger(150));
inventory.put("Monitor", new AtomicInteger(75));
inventory.put("iPhone", new AtomicInteger(100));
inventory.put("AirPods", new AtomicInteger(120));
// Dedicated thread pool for async tasks
asyncExecutor = Executors.newFixedThreadPool(10);
paymentService = new PaymentService();
notificationService = new NotificationService();
shippingService = new ShippingService();
}
public CompletableFuture<OrderResult> processOrderAsync(Order order) {
System.out.println("Starting async processing for order: " + order.getOrderId());
// Step 1: Validate order asynchronously
return CompletableFuture
.supplyAsync(() -> validateOrder(order), asyncExecutor)
.thenCompose(validationResult -> {
if (!validationResult.isSuccess()) {
return CompletableFuture.completedFuture(
new OrderResult(order.getOrderId(), false, "Validation failed"));
}
// Step 2: Check and update inventory
return CompletableFuture.supplyAsync(() ->
checkAndUpdateInventory(order), asyncExecutor);
})
.thenCompose(inventoryResult -> {
if (!inventoryResult.isSuccess()) {
return CompletableFuture.completedFuture(inventoryResult);
}
// Step 3: Process payment asynchronously
return paymentService.processPaymentAsync(order, asyncExecutor)
.thenApply(paymentResult -> {
if (!paymentResult) {
// Rollback inventory
rollbackInventory(order);
return new OrderResult(order.getOrderId(), false, "Payment failed");
}
return new OrderResult(order.getOrderId(), true, "Payment successful");
});
})
.thenCompose(paymentResult -> {
if (!paymentResult.isSuccess()) {
return CompletableFuture.completedFuture(paymentResult);
}
// Step 4: Parallel execution of notification and shipping prep
CompletableFuture<Void> notificationFuture =
notificationService.sendNotificationAsync(order, asyncExecutor);
CompletableFuture<String> shippingFuture =
shippingService.prepareShippingAsync(order, asyncExecutor);
// Combine both futures
return CompletableFuture.allOf(notificationFuture, shippingFuture)
.thenApply(v -> {
String trackingId = shippingFuture.join();
return new OrderResult(order.getOrderId(), true,
"Order completed. Tracking: " + trackingId);
});
})
.exceptionally(throwable -> {
System.err.println("Error processing order " + order.getOrderId() +
": " + throwable.getMessage());
return new OrderResult(order.getOrderId(), false,
"Error: " + throwable.getMessage());
});
}
private OrderResult validateOrder(Order order) {
System.out.println("Validating order " + order.getOrderId() +
" on thread " + Thread.currentThread().getName());
if (order.getTotalAmount() <= 0 || order.getItems().isEmpty()) {
return new OrderResult(order.getOrderId(), false, "Invalid order");
}
return new OrderResult(order.getOrderId(), true, "Validation passed");
}
private OrderResult checkAndUpdateInventory(Order order) {
System.out.println("Checking inventory for order " + order.getOrderId() +
" on thread " + Thread.currentThread().getName());
Map<String, Integer> itemsNeeded = new HashMap<>();
for (String item : order.getItems()) {
itemsNeeded.merge(item, 1, Integer::sum);
}
// Check availability
for (Map.Entry<String, Integer> entry : itemsNeeded.entrySet()) {
String item = entry.getKey();
int needed = entry.getValue();
AtomicInteger stock = inventory.get(item);
if (stock == null || stock.get() < needed) {
return new OrderResult(order.getOrderId(), false,
"Insufficient inventory for " + item);
}
}
// Update inventory
for (Map.Entry<String, Integer> entry : itemsNeeded.entrySet()) {
inventory.get(entry.getKey()).addAndGet(-entry.getValue());
}
return new OrderResult(order.getOrderId(), true, "Inventory updated");
}
private void rollbackInventory(Order order) {
for (String item : order.getItems()) {
AtomicInteger stock = inventory.get(item);
if (stock != null) {
stock.incrementAndGet();
}
}
}
public void shutdown() {
asyncExecutor.shutdown();
try {
if (!asyncExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
asyncExecutor.shutdownNow();
}
} catch (InterruptedException e) {
asyncExecutor.shutdownNow();
}
}
}
// OrderResult.java
public class OrderResult {
private final long orderId;
private final boolean success;
private final String message;
private final long processingTime;
public OrderResult(long orderId, boolean success, String message) {
this.orderId = orderId;
this.success = success;
this.message = message;
this.processingTime = System.currentTimeMillis();
}
public boolean isSuccess() { return success; }
public String getMessage() { return message; }
public long getOrderId() { return orderId; }
@Override
public String toString() {
return String.format("OrderResult{orderId=%d, success=%b, message='%s'}",
orderId, success, message);
}
}
// PaymentService.java
import java.util.Random;
import java.util.concurrent.*;
public class PaymentService {
private final Random random = new Random();
public CompletableFuture<Boolean> processPaymentAsync(Order order, ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Processing payment for order " + order.getOrderId() +
" on thread " + Thread.currentThread().getName());
try {
// Simulate payment processing
Thread.sleep(100 + random.nextInt(200));
// 95% success rate
boolean success = random.nextDouble() < 0.95;
if (success) {
System.out.println("Payment successful for order " + order.getOrderId());
} else {
System.out.println("Payment failed for order " + order.getOrderId());
}
return success;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}, executor);
}
}
// NotificationService.java
import java.util.concurrent.*;
public class NotificationService {
public CompletableFuture<Void> sendNotificationAsync(Order order, ExecutorService executor) {
return CompletableFuture.runAsync(() -> {
System.out.println("Sending notification for order " + order.getOrderId() +
" on thread " + Thread.currentThread().getName());
try {
// Simulate notification sending
Thread.sleep(50);
if (order.isPremiumUser()) {
System.out.println("📧 Premium notification sent for order " + order.getOrderId());
} else {
System.out.println("📧 Standard notification sent for order " + order.getOrderId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, executor);
}
}
// ShippingService.java
import java.util.Random;
import java.util.concurrent.*;
public class ShippingService {
private final Random random = new Random();
public CompletableFuture<String> prepareShippingAsync(Order order, ExecutorService executor) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Preparing shipping for order " + order.getOrderId() +
" on thread " + Thread.currentThread().getName());
try {
// Simulate shipping preparation
Thread.sleep(150);
String trackingId = "TRACK-" + order.getOrderId() + "-" +
random.nextInt(10000);
System.out.println("📦 Shipping prepared for order " + order.getOrderId() +
" with tracking: " + trackingId);
return trackingId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "ERROR";
}
}, executor);
}
}
// AsyncMain.java - Main class for Story 9
import java.util.concurrent.*;
import java.util.*;
public class AsyncMain {
public static void main(String[] args) throws Exception {
AsyncOrderProcessor processor = new AsyncOrderProcessor();
// Create sample orders
List<Order> orders = new ArrayList<>();
for (int i = 0; i < 20; i++) {
List<String> items = Arrays.asList(
i % 2 == 0 ? "Laptop" : "iPhone",
i % 3 == 0 ? "Mouse" : "Keyboard"
);
orders.add(new Order(5000 + i, 500 + i, items, i % 3 == 0, 999.99));
}
// Process orders asynchronously and collect futures
List<CompletableFuture<OrderResult>> futures = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (Order order : orders) {
CompletableFuture<OrderResult> future = processor.processOrderAsync(order);
futures.add(future);
// Add callback for each order
future.thenAccept(result -> {
System.out.println("✅ Completed: " + result);
});
}
// Wait for all orders to complete
CompletableFuture<Void> allOrders = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
allOrders.get(); // Wait for completion
long endTime = System.currentTimeMillis();
// Print statistics
System.out.println("\n=== Processing Statistics ===");
System.out.println("Total orders: " + orders.size());
System.out.println("Total time: " + (endTime - startTime) + " ms");
System.out.println("Average time per order: " +
(endTime - startTime) / orders.size() + " ms");
long successCount = futures.stream()
.map(CompletableFuture::join)
.filter(OrderResult::isSuccess)
.count();
System.out.println("Success rate: " + (successCount * 100.0 / orders.size()) + "%");
processor.shutdown();
}
}
Key Concepts in Story 9:
- CompletableFuture – Represents async computation that can be chained
- thenCompose() – Chains dependent async operations (flatMap for Futures)
- thenApply() – Transforms result (map for Futures)
- allOf() – Combines multiple futures to wait for all
- exceptionally() – Handles errors in the async pipeline
- supplyAsync() vs runAsync() – Returns value vs void
What this achieves:
- Non-blocking async processing pipeline
- Parallel execution of independent tasks (notification & shipping)
- Automatic error propagation through the chain
- Better resource utilization (threads don't block waiting)
- Improved throughput for I/O-bound operations
Sample Output:
Starting async processing for order: 5000
Validating order 5000 on thread pool-1-thread-1
Checking inventory for order 5000 on thread pool-1-thread-2
Processing payment for order 5000 on thread pool-1-thread-3
Sending notification for order 5000 on thread pool-1-thread-4
Preparing shipping for order 5000 on thread pool-1-thread-5
✅ Completed: OrderResult{orderId=5000, success=true, message='Order completed. Tracking: TRACK-5000-4521'}
Benefits:
- Much faster overall processing (parallel vs sequential)
- Better responsiveness (returns future immediately)
- Composable async operations
- Clean error handling across async boundaries
Would you like to proceed with Story 10 next?