Contents

JUC:异步编程陷阱:CompletableFuture回调地狱解决方案

一、回调地狱的典型症状

当使用CompletableFuture进行多级异步操作时,若采用传统的嵌套回调模式,代码会迅速失控:

CompletableFuture.supplyAsync(() -> fetchOrder())
    .thenAccept(order -> {
        CompletableFuture.supplyAsync(() -> checkInventory(order))
            .thenAccept(inventory -> {
                CompletableFuture.supplyAsync(() -> calculatePrice(inventory))
                    .thenAccept(price -> {
                        CompletableFuture.runAsync(() -> sendNotification(price));
                    });
            });
    });

问题诊断

  • 缩进失控:每层回调增加一级缩进,形成"箭头代码"
  • 异常处理分散:每个回调需单独处理异常
  • 上下文丢失:多层嵌套后原始参数难以追溯

二、结构化解决方案

通过链式组合操作替代嵌套回调,实现扁平化处理。

1. 链式组合(thenCompose)

CompletableFuture.supplyAsync(() -> fetchOrder())
    .thenCompose(order ->
            CompletableFuture.supplyAsync(() -> checkInventory(order)))
    .thenCompose(inventory ->
            CompletableFuture.supplyAsync(() -> calculatePrice(inventory)))
    .thenAccept(price ->
            CompletableFuture.runAsync(() -> sendNotification(price)));

优势

  • 保持代码线性结构
  • 自动传递上下文参数

2. 合并分支(thenCombine)

处理需要合并多个异步结果的场景:

CompletableFuture<Inventory> inventoryFuture = getInventory();
CompletableFuture<PriceRule> priceRuleFuture = getPriceRule();

inventoryFuture.thenCombine(priceRuleFuture, (inventory, rule) -> 
    calculateFinalPrice(inventory, rule))
    .thenAccept(this::sendNotification);

三、异常处理统一化

通过统一异常处理管道避免分散的try-catch

1. 全局异常处理(handle)

CompletableFuture.supplyAsync(() -> fetchOrder())
    .handle((order, ex) -> {
        if (ex != null) {
            log.error("Fetch order failed", ex);
            return defaultOrder();
        }
        return order;
    })
    .thenApplyAsync(this::checkInventory)
    .exceptionally(ex -> {
        log.error("Check inventory failed", ex);
        return fallbackInventory();
    });

2. 异常恢复(exceptionally)

CompletableFuture.supplyAsync(() -> fetchOrder())
    .thenApplyAsync(this::checkInventory)
    .exceptionally(ex -> {
        return retryCheckInventory(); // 重试逻辑
    });

四、上下文传递优化

解决多层回调中的参数传递问题。

封装领域对象

class OrderContext {
    Order order;
    Inventory inventory;
    Price price;
}

CompletableFuture.supplyAsync(() -> {
        OrderContext ctx = new OrderContext();
        ctx.order = fetchOrder();
        return ctx;
    })
    .thenApplyAsync(ctx -> {
        ctx.inventory = checkInventory(ctx.order);
        return ctx;
    })
    .thenApplyAsync(ctx -> {
        ctx.price = calculatePrice(ctx.inventory);
        return ctx;
    });

五、响应式编程融合

结合响应式编程思想提升可维护性。

1. 构建异步流水线

CompletionStage<Void> pipeline = CompletableFuture.supplyAsync(this::fetchOrder)
    .thenApplyAsync(this::enrichOrder)
    .thenApplyAsync(this::validateOrder)
    .thenAcceptAsync(this::persistOrder);

pipeline.toCompletableFuture().join();

2. 组合操作符

CompletableFuture<Order> future = CompletableFuture.supplyAsync(this::fetchOrder)
    .thenApplyAsync(order -> 
        CompletableFuture.supplyAsync(() -> checkInventory(order))
            .thenCombine(
                CompletableFuture.supplyAsync(() -> getUserProfile()),
                (inventory, profile) -> applyDiscount(inventory, profile)
            )
    ).thenCompose(Function.identity());

六、工具类封装实践

通过工具类消除样板代码。

AsyncPipeline工具类示例

public class AsyncPipeline {
    private CompletableFuture<?> future;

    public AsyncPipeline start(Supplier<?> supplier) {
        this.future = CompletableFuture.supplyAsync(supplier);
        return this;
    }

    public AsyncPipeline then(Function<?, ?> fn) {
        future = future.thenApplyAsync(fn);
        return this;
    }

    public <U> CompletableFuture<U> build() {
        return (CompletableFuture<U>) future;
    }
}

// 使用示例
AsyncPipeline pipeline = new AsyncPipeline()
    .start(() -> fetchOrder())
    .then(order -> checkInventory(order))
    .then(inventory -> calculatePrice(inventory));

七、调试与监控

1. 线程追踪

future.whenComplete((res, ex) -> {
    System.out.println("Thread: " + Thread.currentThread().getName());
});

2. 可视化工具

  • JDK Flight Recorder:监控CompletableFuture执行链路
  • Micrometer:统计异步操作耗时

八、总结:最佳实践清单

  1. 拒绝嵌套:使用thenCompose保持链式结构
  2. 集中处理异常:统一使用handle/exceptionally
  3. 明确上下文:通过DTO对象传递参数
  4. 合理命名:每个处理阶段使用有意义的Lambda名称
  5. 控制并发度:使用自定义线程池避免资源耗尽

最终优化示例

public CompletableFuture<Void> processOrder() {
    return CompletableFuture.supplyAsync(this::fetchOrder, ioPool)
        .thenApplyAsync(this::validateOrder, cpuPool)
        .thenComposeAsync(this::checkInventory, ioPool)
        .thenApplyAsync(this::calculatePrice, cpuPool)
        .thenAcceptAsync(this::sendNotification, ioPool)
        .exceptionally(ex -> {
            log.error("Process failed", ex);
            return null;
        });
}