• 作者:老汪软件技巧
  • 发表时间:2024-08-26 21:02
  • 浏览量:

最后,我们希望在 UI 线程中进行处理。

通过描述对数据的最终处理(在 UI 中显示)和对错误的处理(显示在 popup 中)来触发(subscribe)。

如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个timeout的操作符即可。

Reactor 中增加超时控制的例子

userService.getFavorites(userId) // 1
           .timeout(Duration.ofMillis(800))  // 2
           .onErrorResume(cacheService.cachedFavoritesFor(userId))  // 3
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

如果流在超时时限没有发出(emit)任何值,则发出错误(error)。

一旦收到错误,交由cacheService处理。

处理链后边的内容与上例类似。

Futures 比回调要好一点,但即使在 Java 8 引入了CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future还有一个问题:当对Future对象最终调用get()方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。

考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。

CompletableFuture处理组合的例子

CompletableFuture> ids = ifhIds();  // 1
CompletableFuture> result = ids.thenComposeAsync(l -> { // 2
        Stream> zip =
                        l.stream().map(i -> {  // 3
                                                 CompletableFuture nameTask = ifhName(i);  // 4
                                                 CompletableFuture statTask = ifhStat(i);  // 5
                                                 return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);  // 6
                                         });
        List> combinationList = zip.collect(Collectors.toList());   // 7
        CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
        CompletableFuture allDone = CompletableFuture.allOf(combinationArray);  // 8
        return allDone.thenApply(v -> combinationList.stream()
                                                                                                 .map(CompletableFuture::join)  // 9
                                                                                                 .collect(Collectors.toList()));
});
List results = result.join();  // 10
assertThat(results).contains(
                                "Name NameJoe has stats 103",
                                "Name NameBart has stats 104",
                                "Name NameHenry has stats 105",
                                "Name NameNicole has stats 106",
                                "Name NameABSLAJNFOAJNFOANFANSF has stats 121");

以一个 Future 开始,其中封装了后续将获取和处理的 ID 的 list。

获取到 list 后边进一步对其启动异步处理任务。

对于 list 中的每一个元素:

异步地得到相应的 name。

异步地得到相应的 statistics。

将两个结果一一组合。

我们现在有了一个 list,元素是 Future(表示组合的任务,类型是CompletableFuture),为了执行这些任务, 我们需要将这个 list(元素构成的流) 转换为数组(List)。

将这个数组传递给CompletableFuture.allOf,返回一个Future,当所以任务都完成了,那么这个Future也就完成了。

有点麻烦的地方在于allOf返回的是CompletableFuture,所以我们遍历这个 Future 的List, ,然后使用join()来手机它们的结果(不会导致阻塞,因为AllOf确保这些 Future 全部完成)

10

一旦整个异步流水线被触发,我们等它完成处理,然后返回结果列表。

由于 Reactor 内置许多组合操作,因此以上例子可以简单地实现:

Reactor 实现与 Future 同样功能的代码

Flux ids = ifhrIds();  // 1
Flux combinations =
                ids.flatMap(id -> {  // 2
                        Mono nameTask = ifhrName(id);  // 3
                        Mono statTask = ifhrStat(id); // 4
                        return nameTask.zipWith(statTask,  // 5
                                        (name, stat) -> "Name " + name + " has stats " + stat);
                });
Mono> result = combinations.collectList();  // 6
List results = result.block();  // 7
assertThat(results).containsExactly(   // 8
                "Name NameJoe has stats 103",
                "Name NameBart has stats 104",
                "Name NameHenry has stats 105",
                "Name NameNicole has stats 106",
                "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

这一次,我们从一个异步方式提供的ids序列(Flux)开始。

对于序列中的每一个元素,我们异步地处理它(flatMap方法内)两次。

获取相应的 name。

获取相应的 statistic.

异步地组合两个值。

随着序列中的元素值“到位”,它们收集一个List中。

在生成流的环节,我们可以继续异步地操作Flux流,对其进行组合和订阅(subscribe)。 最终我们很可能得到一个Mono。由于是测试,我们阻塞住(block()),等待流处理过程结束, 然后直接返回集合。

响应式编程原理_响应式编程函数式编程_

Assert 结果。

回调或 Future 遇到的窘境是类似的,这也是响应式编程要通过Publisher-Suscriber方式来解决的。

3.3. 从命令式编程到响应式编程

类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:

3.3.1. 可编排性与可读性

可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。

这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。

Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

3.3.2. 就像装配流水线

你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说Subscriber)。

原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

3.3.3. 操作符(Operators)

在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对Publisher进行相应的处理,然后将Publisher包装为一个新的Publisher。就像一个链条, 数据源自第一个Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。

理解了操作符会创建新的Publisher实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。

虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。

3.3.4.subscribe()之前什么都不会发生

在 Reactor 中,当你创建了一条Publisher处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。

当真正“订阅(subscrib)”的时候,你需要将Publisher关联到一个Subscriber上,然后 才会触发整个链的流动。这时候,Subscriber会向上游发送一个request信号,一直到达源头 的Publisher。

3.3.5. 背压 Backpressure

向上游传递信号这一点也被用于实现背压,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。

在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用request机制来告知源头它一次最多能够处理n个元素。

中间环节的操作也可以影响request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。

这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。

3.3.6. Hot vs Cold

在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:

一些示例图onBackpressureBuffer

flatMap

subscribe

原文见:

更多图片见:projectreactor.io/docs/core/r…