• 作者:老汪软件技巧
  • 发表时间:2024-10-04 17:00
  • 浏览量:

之前文章《使用 CompletableFuture 最常见的错误(附实战代码)》中,我们提到:对于并发任务编排,CompletableFuture 原生提供的两种策略 anyOf 和 allOf 并不能满足实际开发需要,最常使用的策略是 allFastFail:当任务异常时,返回结果为异常。本文将讨论快速失败实现的方法,内容涉及CFFU实现源码以及其他实现方式,希望对你理解并发编程有所帮助。

CFFU 实现

快速失败实际上是一个 any 逻辑, 其短路终止逻辑为

当任一个任务执行失败或者所有任务执行成功

为了可以复用 anyOf 逻辑,可以将不触发终止逻辑的 CompletableFuture 转换为 未完成状态的 CompletableFuture。第一种情况需要把执行成功的任务转换成未完成,会触发终止逻辑;第二种情况需要所有任务成功的逻辑,否则,处理成未完成任务。

CFFU 使用了这样的处理,为便于理解,保留了原始注释,代码如下:

@Contract(pure = true)
@SafeVarargs
public static  CompletableFuture> allResultsFastFailOf(CompletionStage... cfs) {
    return allResultsFastFailOf0(requireCfsAndEleNonNull(cfs));
}
// 具体实现
private static  CompletableFuture> allResultsFastFailOf0(CompletionStage[] cfs) {
    final int len = cfs.length;
    if (len == 0) return completedFuture(arrayList());
    // convert input cf to non-minimal-stage CF instance for SINGLE input in order to
    // ensure that the returned cf is not minimal-stage instance(UnsupportedOperationException)
  
  	// 这里 toNonMinCf0 安全地将 CompletionStage 转换成了 CompletableFuture
    if (len == 1) return toNonMinCf0(cfs[0]).thenApply(CompletableFutureUtils::arrayList);
  
    final CompletableFuture[] successOrBeIncomplete = new CompletableFuture[len];
    // NOTE: fill ONE MORE element of failedOrBeIncomplete LATER
    final CompletableFuture[] failedOrBeIncomplete = new CompletableFuture[len + 1];
    fill0(cfs, successOrBeIncomplete, failedOrBeIncomplete);
    // NOTE: fill the ONE MORE element of failedOrBeIncomplete HERE:
    //       a cf that is successful when all given cfs success, otherwise be incomplete
  	// 第二种情况
    failedOrBeIncomplete[len] = allResultsOf0(successOrBeIncomplete);
		
  	// 复用 anyOf 逻辑
  	// 这里anyOf执行完直接强转类型即可,因为如果成功,结果类型必为 List
    return f_cast(CompletableFuture.anyOf(failedOrBeIncomplete));
}
// 这个方法提取出来是为了代码复用
private static  void fill0(CompletionStage[] stages,
                              CompletableFuture[] successOrBeIncomplete,
                              CompletableFuture[] failedOrBeIncomplete) {
    for (int i = 0; i < stages.length; i++) {
        final CompletableFuture f = f_toCf0(stages[i]);
      	// 注意这里必须转换:异常结束 -> 未完成状态的 CompletableFuture,否则,会有竞争存在,结果会错误
        successOrBeIncomplete[i] = exceptionallyCompose(f, ex -> new CompletableFuture<>());
      	// 第一种情况
        failedOrBeIncomplete[i] = f.thenCompose(v -> new CompletableFuture<>());
    }
}

注意 fill0 这个方法独立出来是为了复用,在 allSuccessOf 和 anySuccessOf中会复用相关逻辑。

以 anySuccessOf 为例,其终止逻辑为:

任意一个任务执行成功或者所有任务执行失败

这里的逻辑和快速失败是相反的,不再赘述相关代码。

手动实现

你也可以使用 CompletableFuture 的写方法,通过 complete 和 completeExceptionally 实现原子写和单次写,写入的条件也就是以上的终止逻辑。

所有任务执行成功的逻辑可以使用 CountDownLatch 等同步工具类实现。在笔者之前写的《深入理解 Future, CompletableFuture, ListenableFuture,回调机制》文中没有考虑这种逻辑,使用了超时时间进行兜底。

// 使用 JDK21
public static CompletableFuture> fastFail(Iterable> cfs) {
    int size = Iterables.size(cfs);
    if (size == 0) return CompletableFuture.completedFuture(List.of());
    CompletableFuture> result = new CompletableFuture<>();
    LongAdder success = new LongAdder();
    CountDownLatch latch = new CountDownLatch(size);
    cfs.forEach(cf -> cf.whenComplete((v, ex) -> {
            if (ex != null) {
                result.completeExceptionally(ex);
            }
            success.increment();
            latch.countDown();
        }
    ));
    try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
        executor.submit(() -> {
            try {
                latch.wait();
            } catch (InterruptedException ignored) {}
            if (success.intValue() == size) {
                List list = Streams.stream(cfs)
                    .map(CompletableFuture::join)
                    .toList();
                result.complete(list);
            }
        });
    }
    return result;
}

此外,如果你关心取消功能的话,对于其他未完成的任务,获取结果后可以进行批量取消。由于 CompletableFuture 并不能取消提交到线程池中的任务,对于已提交到线程池的任务,其实际意义可能不大。对于未提交的任务,可以置为取消状态(CancellationException)。如果你想实现相关功能的话,可以使用 Guava 的 ListenableFuture。