• 作者:老汪软件技巧
  • 发表时间:2024-09-16 21:02
  • 浏览量:

Java9 对于CompletableFuture做了新的增强,除了万众期盼的超时功能(orTimeout),还有面向继承、安全发布的相关方法。本文中,我们将详细分析各个新增方法,同时说明其安全发布的重要性,最后提出相关的实践原则。

1. newIncompleteFuture

public  CompletableFuture newIncompleteFuture() {
    return new CompletableFuture();
}

这是一个面向继承的方法,Java支持方法返回值协变,子类可以返回自己的实现。如果你不需要继承 CompletableFuture,对于使用来说,这个方法没有新增新的功能。

2. 默认执行器

public Executor defaultExecutor() {
    return ASYNC_POOL;
}

子类可以指定默认执行器,不过需要注意调用不显示指定执行器的async相关方法,其执行回调所在的线程有两种可能:当前调用线程或者子类指定的线程。如果使用 CompletableFuture,推荐显式指定执行器;如果使用 CFFU,无需每次都显式指定执行器。

3. 复制方法

public CompletableFuture copy() {
    return uniCopyStage(this);
}

此方法等同与调用 cf.thenApply(x -> x), 目的是实现保护性复制,避免后续操作对于原示例的修改。

4. 只读形式与安全发布

如何安全发布 CompletableFuture ?Java9新增方法鉴赏_如何安全发布 CompletableFuture ?Java9新增方法鉴赏_

// 返回结果仅支持 CompletionStage 操作
public CompletionStage minimalCompletionStage() {
    return uniAsMinimalStage();
}
private MinimalStage uniAsMinimalStage() {
    Object r;
  	// 性能优化1:若已有结果,当前线程下直接返回stage
    if ((r = result) != null)
        return new MinimalStage(encodeRelay(r));
  	// 性能优化2:若还未有结果,注册回调
    MinimalStage d = new MinimalStage();
    unipush(new UniRelay(d, this));
    return d;
}
// 实际实现,CompletableFuture 独有方法均不支持
static final class MinimalStage extends CompletableFuture {
        MinimalStage() { }
        MinimalStage(Object r) { super(r); }
        @Override public  CompletableFuture newIncompleteFuture() {
            return new MinimalStage(); }
        @Override public T get() {
            throw new UnsupportedOperationException(); }
        @Override public T get(long timeout, TimeUnit unit) {
            throw new UnsupportedOperationException(); }
        @Override public T getNow(T valueIfAbsent) {
            throw new UnsupportedOperationException(); }
        @Override public T join() {
            throw new UnsupportedOperationException(); }
        @Override public T resultNow() {
            throw new UnsupportedOperationException(); }
        @Override public Throwable exceptionNow() {
            throw new UnsupportedOperationException(); }
        @Override public boolean complete(T value) {
            throw new UnsupportedOperationException(); }
        @Override public boolean completeExceptionally(Throwable ex) {
            throw new UnsupportedOperationException(); }
        @Override public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException(); }
        @Override public void obtrudeValue(T value) {
            throw new UnsupportedOperationException(); }
        @Override public void obtrudeException(Throwable ex) {
            throw new UnsupportedOperationException(); }
        @Override public boolean isDone() {
            throw new UnsupportedOperationException(); }
        @Override public boolean isCancelled() {
            throw new UnsupportedOperationException(); }
        @Override public boolean isCompletedExceptionally() {
            throw new UnsupportedOperationException(); }
        @Override public State state() {
            throw new UnsupportedOperationException(); }
        @Override public int getNumberOfDependents() {
            throw new UnsupportedOperationException(); }
        @Override public CompletableFuture completeAsync
            (Supplier supplier, Executor executor) {
            throw new UnsupportedOperationException(); }
        @Override public CompletableFuture completeAsync
            (Supplier supplier) {
            throw new UnsupportedOperationException(); }
        @Override public CompletableFuture orTimeout
            (long timeout, TimeUnit unit) {
            throw new UnsupportedOperationException(); }
        @Override public CompletableFuture completeOnTimeout
            (T value, long timeout, TimeUnit unit) {
            throw new UnsupportedOperationException(); }
  			// 返回新的CompletableFuture, 等效于thenApply(x -> x)
        @Override public CompletableFuture toCompletableFuture() {
            Object r;
            if ((r = result) != null)
                return new CompletableFuture(encodeRelay(r));
            else {
                CompletableFuture d = new CompletableFuture<>();
                unipush(new UniRelay(d, this));
                return d;
            }
        }
    }

此方法返回只读的 CompletionStage,可以实现安全发布。可以类比于一个方法返回了ImmutableList,优点是可以防止后续的错误操作,比如其他线程强制写结果(对应List::add方法)。

有这样一个安全发布的保证至关重要,特别是面临复杂场景时,可以减轻编程负担,放心地使用 CompletableFuture 返回的结果。

我们知道CompletionStage接口定义了链式异步回调相关方法, CompletableFuture 相关读写操作(如 join, complete, obtrudeValue 等),不在CompletionStage的定义下,可以通过方法toCompletableFuture进行中转。

class InventoryServiceDemo {
    final Executor executor = Executors.newCachedThreadPool();
    // 获取库存信息
  	// 方法可以返回 CompletionStage,相比于CompletableFuture更安全
    CompletionStage getInventoryAsync(String productId) {
        return CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(Duration.ofSeconds(2));
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return 42;
                }, executor)
                .minimalCompletionStage();
    }
}

笔者曾在《深入理解 Future, CompletableFuture, ListenableFuture,回调机制》一文中质疑 CompletionStage#toCompletableFuture 方法破坏了接口设计的相关原则,CompletableFuture#minimalCompletionStage 不如直接使用接口。事实上,我当时还是没有理解Doug Lea(道哥)的设计思路。

CompletionStage#toCompletableFuture 表明 CompletableFuture 可以作为 CompletionStage 的默认实现,实现两者类型的快速转换。很多情况下,两者的区别不大。CompletableFuture#minimalCompletionStage 底层思想是提供只读功能。5. 异步写操作

public CompletableFuture completeAsync(Supplier supplier) {
    return completeAsync(supplier, defaultExecutor());
}
public CompletableFuture completeAsync(Supplier supplier,
                                          Executor executor) {
    if (supplier == null || executor == null)
        throw new NullPointerException();
    executor.execute(new AsyncSupply(this, supplier));
    return this;
}

这两个方法很容易理解,不多作介绍。

6. CFFU的优化拓展

开源项目CFFU(功夫未来)对于CompletableFuture 进行了拓展,以下仅列举和Java9新增功能相关的拓展点:

可以在对象创建时指定执行器,后续操作无需重复指定Java8 版本下可以使用后续版本的功能提供了相关安全发布功能,比如安全的超时功能内部的便利方法很多应用了保护性复制方法CffuFactoryBuilder 支持以参数形式配置是否允许强制重写结果7. 实践原则异步方法可以返回 CompletionStage,其相比于CompletableFuture更为安全,同时可以提示用户不要使用阻塞相关方法就像推荐使用不可变对象一样,推荐默认使用 CompletableFuture#minimalCompletionStage 方法无论是对于方法入参还是返回结果,使用接口一般都优于实体类理想的 Future 设计应该是读写分离的,不可变的。虽然 CompletableFuture 并没有完全遵循以上原则,但是我们在使用时应当注意遵循。