- 作者:老汪软件技巧
- 发表时间:2024-09-18 07:27
- 浏览量:
它是由京东零售开源的项目,作者是天涯泪小武。
为什么会学习这个框架
最近在学习java并发中的CompletableFuture,它除了提供了更为好用和强大的 Future 特性之外,还提供了函数式编程、异步任务编排组合(可以将多个异步任务串联起来,组成一个完整的链式调用)等能力。
在上网查阅资料的过程中发现一款可支持线程编排的并行框架 asyncTool,在Gitee官网找到后发现该项目不仅开源作者还写了文档去一行一行代码进行教学,这让一个对并发编排任务感兴趣但又没有基础的人来了兴趣,因此我结合源码以及作者的介绍以及其他博客的分析去学习了该框架。
框架的作用
这个模块我特别喜欢作者的介绍,先引出问题然后回答,让我一个并没有并发项目经验的人深刻理解这个框架究竟是要做什么,以下是对作者介绍的借鉴和自己的总结:
为什么需要一个这样的带任务编排的框架
1.当用户请求某个功能时,可能需要去调用各种各样的接口最后汇总结果去展示给用户。
2.在一些需求中可能不同块代码之间有依赖关系,后执行的需要依赖先执行的,以及超时、阻塞、异常等情况的处理
用图片来解释就是(图片来自作者):
并发框架的监控
任务编排其实可以通过CompletableFuture的api经过复杂的组装后实现,但对于该类来说,使用者在supplyAsync()后无法知道该异步任务的执行情况,虽然也有一些方法可以获取执行结果以及执行中的异常,但如果该任务没有执行被跳过或者其他情况那么就不知道了,因此并发框架应该能对每一步任务进行监控,无论成功失败都应该有回调。
每个任务之间应该有强弱依赖
例如上面的图片中的一些情况:
所以在写该框架时每个任务都有自己的依赖任务和下级任务,依赖任务按要求执行后才能执行该任务
任务可以使用依赖任务中的结果
既然任务之间有执行顺序,那么一定想使用上级的结果来处理本级的任务吧。
超时时间设置
虽然每一个任务单独的时间无法设置,但可以设置全组任务的timeout,来控制该任务的执行时间
框架高性能低线程
该框架中全程无锁
AB执行后才能执行C这种情况下,C会运行在AB中后执行完的那个线程上,严格贯彻低线程理念
框架的应用串行
串行比较好设计,那就来三个任务依次执行即可
在该框架中任务需要实现IWorker接口和ICallback接口,并重写以下方法:
模拟串行场景:A任务对参数+1,之后B任务对参数+2,之后C任务对参数+3。
A任务:
public class WorkerA implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
@Override
public void begin() {
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 1;
return res;
}
@Override
public void result(boolean success, Integer param, WorkResult workResult ) {
System.out.println("A - param:" + JSON.toJSONString(param));
System.out.println("A - result:" + JSON.toJSONString(workResult));
System.out.println("A - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
@Override
public Integer defaultValue() {
System.out.println("A - defaultValue");
return 101;
}
}
B任务:
public class WorkerB implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
@Override
public void begin() {
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 2;
return res;
}
@Override
public void result(boolean success, Integer param, WorkResult workResult ) {
System.out.println("B - param:" + JSON.toJSONString(param));
System.out.println("B - result:" + JSON.toJSONString(workResult));
System.out.println("B - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
@Override
public Integer defaultValue() {
System.out.println("B - defaultValue");
return 102;
}
}
任务3:
public class WorkerC implements IWorker<Integer, Integer>, ICallback<Integer, Integer> {
@Override
public void begin() {
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- start --" + SystemClock.now());
}
@Override
public Integer action(Integer object, Map<String, WorkerWrapper> allWrappers) {
Integer res = object + 3;
return res;
}
@Override
public void result(boolean success, Integer param, WorkResult workResult ) {
System.out.println("C - param:" + JSON.toJSONString(param));
System.out.println("C - result:" + JSON.toJSONString(workResult));
System.out.println("C - Thread:" + Thread.currentThread().getName() + "- end --" + SystemClock.now());
}
@Override
public Integer defaultValue() {
System.out.println("C - defaultValue");
return 103;
}
}
编写WorkerWrapper包装类,该类就是任务编排的core类
//A没有depend
WorkerWrapper wrapperA = new WorkerWrapper.Builder()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(1)
.build();
//B的depend是A
WorkerWrapper wrapperB = new WorkerWrapper.Builder()
.id("workerB")
.worker(workerB)
.callback(workerB)
.param(2)
.depend(wrapperA)
.build();
//C的depend是B
WorkerWrapper wrapperC = new WorkerWrapper.Builder()
.id("workerC")
.worker(workerC)
.callback(workerC)
.param(3)
.depend(wrapperB)
.build();
//begin
Async.beginWork(1000, wrapperA);
通过WorkerWrapper中的静态内部类Build去构建一个WorkerWrapper对象,从而实现任务的编排
最后通过Async类提交任务执行
并行
并行只需3个任务一并丢进beginWork中即可
Async.beginWork(1000, wrapperA,wrapperB,wrapperC);
阻塞等待 - 先串行,后并行
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Worker1 worker1 = new Worker1();
Worker2 worker2 = new Worker2();
Worker3 worker3 = new Worker3();
WorkerWrapper wrapper1 = new WorkerWrapper.Builder()
.id("worker1")
.worker(worker1)
.callback(worker1)
.param(1)
.build();
WorkerWrapper wrapper2 = new WorkerWrapper.Builder()
.id("worker2")
.worker(worker2)
.depend(wrapper1)
.callback(worker2)
.param(2)
.build();
WorkerWrapper wrapper3 = new WorkerWrapper.Builder()
.id("worker3")
.worker(worker3)
.depend(wrapper1)
.callback(worker3)
.param(3)
.build();
Async.beginWork(5000,wrapper1);
}
}
让BC都依赖于A然后把A丢进beginWork即可.
阻塞等待 - 先并行,后串行
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Worker1 worker1 = new Worker1();
Worker2 worker2 = new Worker2();
Worker3 worker3 = new Worker3();
WorkerWrapper wrapper1 = new WorkerWrapper.Builder()
.id("worker1")
.worker(worker1)
.callback(worker1)
.param(1)
.build();
WorkerWrapper wrapper2 = new WorkerWrapper.Builder()
.id("worker2")
.worker(worker2)
.callback(worker2)
.param(2)
.build();
WorkerWrapper wrapper3 = new WorkerWrapper.Builder()
.id("worker3")
.worker(worker3)
.depend(wrapper1,wrapper2)
.callback(worker3)
.param(3)
.build();
Async.beginWork(5000,wrapper1,wrapper2);
}
}
C依赖AB,把AB 一起丢入即可
异常/超时回调
这2种场景,可以基于以上场景微调,即可debug调试。
Async.beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper)
1.基于全组设定的timeout,如果超时了,则worker中的返回值使用defaultValue() 2.如果当前Worker任务异常了,则当前任务使用defaultValue(),并且depend当前任务的,也FastFail,返回defaultValue()
框架的实现包结构
图片来自涛声依旧叭
回调包即为实现作者对并发框架每一步任务监控的想法的实现
包装类即为编排任务核心类
执行器即对编排任务根据依赖关系去执行的实现
简单调用流程
Async执行器触发WorkerWrapper包装器类执行,WorkerWrapper中包装了任务类IWorker和回调类ICallback以及任务之间的依赖关系还有异常返回
回调接口:IWorker、ICallback
IWorker
/**
* 每个最小执行单元需要实现该接口
* @author wuweifeng wrote on 2019-11-19.
*/
public interface IWorker<T, V> {
/**
* 在这里做耗时操作,如rpc请求、IO等
*
* @param object
* object
*/
V action(T object, Map allWrappers );
/**
* 超时、异常时,返回的默认值
* @return 默认值
*/
V defaultValue();
}
T入参泛型 V出参泛型
ICallback
/**
* 每个执行单元执行完毕后,会回调该接口
* 需要监听执行结果的,实现该接口即可
* @author wuweifeng wrote on 2019-11-19.
*/
public interface ICallback<T, V> {
void begin();
/**
* 耗时操作执行完毕后,就给value注入值
*
*/
void result(boolean success, T param, WorkResult workResult );
}
/**
* 默认回调类,如果不设置的话,会默认给这个回调
* @author wuweifeng wrote on 2019-11-19.
*/
public class DefaultCallback<T, V> implements ICallback<T, V> {
@Override
public void begin() {
}
@Override
public void result(boolean success, T param, WorkResult workResult) {
}
}
不传回调类默认给一个空的回调
包装类WorkerWrapper
有WorkerWrapper和DependWrapper两个类,因为依赖任务中还有个必要属性must,即判断是否某个任务执行后才能往后执行,还是只要任一执行即能向后执行。
源码
/**
* 对每个worker及callback进行包装,一对一
*
* @author wuweifeng wrote on 2019-11-19.
*/
public class WorkerWrapper {
/**
* 该wrapper的唯一标识
*/
private String id;
/**
* worker将来要处理的param
*/
private T param;
private IWorker worker;
private ICallback callback;
/**
* 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程
* -------2
* 1
* -------3
* 如1后面有2、3
*/
private List> nextWrappers;
/**
* 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己
* 通过must字段来控制是否依赖项必须完成
* 1
* -------3
* 2
* 1、2执行完毕后才能执行3
*/
private List dependWrappers;
/**
* 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调
* 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
*
* 1-finish, 2-error, 3-working
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 该map存放所有wrapper的id和wrapper映射
*/
private Map<String, WorkerWrapper> forParamUseWrappers;
/**
* 也是个钩子变量,用来存临时的结果
*/
private volatile WorkResult workResult = WorkResult.defaultResult();
/**
* 是否在执行自己前,去校验nextWrapper的执行结果
* 1 4
* -------3
* 2
* 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。
* 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的
*/
private volatile boolean needCheckNextWrapperResult = true;
private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
}
几个重要属性:
构建
在阅读源码的过程中发现WorkerWrapper类中有一个静态内部类Builder,看了分析文章以及查阅资料后才知道这是建造者模式
建造者模式: 建造者模式将对象的创建过程和表现分离,并且调用方通过指挥者调用方法对对象进行构建,使得调用方不再关心对象构建过程,构建对象的具体过程可以根据传入类型的不同而改变。
//使用建造者模式构建WorkerWrapper
WorkerWrapper wrapperA = new WorkerWrapper.Builder()
.id("workerA")
.worker(workerA)
.callback(workerA)
.param(null)
.depend(wrapperB, wrapperC)
.build();
public static class Builder<W, C> {
/**
* 该wrapper的唯一标识
*/
private String id = UUID.randomUUID().toString();
/**
* worker将来要处理的param
*/
private W param;
private IWorker<W, C> worker;
private ICallback<W, C> callback;
/**
* 自己后面的所有
*/
private List<WorkerWrapper, ?>> nextWrappers;
/**
* 自己依赖的所有
*/
private List<DependWrapper> dependWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
private Set<WorkerWrapper, ?>> selfIsMustSet;
private boolean needCheckNextWrapperResult = true;
}
内部类Builder中的属性和WorkerWrapper中的一样,主要是通过build()方法去构建WorkerWrapper对象。
AsyncTool执行器工作过程
不传入线程池默认
public static final ThreadPoolExecutor COMMON_POOL =
new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,
1024,
15L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
(ThreadFactory) Thread::new);
第一个参数:表示线程池的核心线程数是系统可用处理器数量的两倍。这样做可以确保在大多数情况下,有足够的线程来并行处理任务,充分利用多核CPU的计算能力。
第二个参数:最大线程数
第三/四个参数:存活时间,当线程数大于核心线程数时,多余的空闲线程在终止前等待新任务的最长时间。
第五个参数:LinkedBlockingQueue实例,这是一个基于链表结构的阻塞队列。它的大小没有限制(或者说是Integer.MAX_VALUE)
第六个参数:线程工厂
核心方法
public static boolean beginWork(long timeout, ThreadPoolExecutor pool, List workerWrappers ) throws ExecutionException, InterruptedException {
//没有任务直接返回false
if(workerWrappers == null || workerWrappers.size() == 0) {
return false;
}
//定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result
Map forParamUseWrappers = new ConcurrentHashMap<>();
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i);
//用CompletableFuture异步执行任务
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool);
}
try {
//用CompletableFuture异步执行任务全部执行完后获取结果
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
Set set = new HashSet<>();
totalWorkers(workerWrappers, set);
for (WorkerWrapper wrapper : set) {
wrapper.stopNow();
}
return false;
}
}
WorkerWrapper处理任务核心work()
/**
* 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的
*/
private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map forParamUseWrappers) {
this.forParamUseWrappers = forParamUseWrappers;
//将自己放到所有wrapper的集合里去
forParamUseWrappers.put(id, this);
long now = SystemClock.now();
//总的已经超时了,就快速失败,进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(poolExecutor, now, remainTime);
return;
}
//如果自己已经执行过了。
//可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(poolExecutor, now, remainTime);
return;
}
//如果在执行前需要校验nextWrapper的状态
if (needCheckNextWrapperResult) {
//如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了
if (!checkNextWrapperResult()) {
fastFail(INIT, new SkippedException());
beginNext(poolExecutor, now, remainTime);
return;
}
}
//如果没有任何依赖,说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
beginNext(poolExecutor, now, remainTime);
return;
}
/*如果有前方依赖,存在两种情况
一种是前面只有一个wrapper。即 A -> B
一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。
所以需要B来做判断,必须A、C、D都完成,自己才能执行 */
//只有一个依赖
if (dependWrappers.size() == 1) {
doDependsOneJob(fromWrapper);
beginNext(poolExecutor, now, remainTime);
} else {
//有多个依赖时
doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime);
}
}
作者的注释十分清晰
工作步骤是:
只有一个依赖的话,执行完就可以执行自己了
多个依赖的话,需要判断must依赖是否已经全执行才可以执行自己
执行fire()
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
private void fire() {
//阻塞取结果
workResult = workerDoJob();
}
/**
* 具体的单个worker执行任务
*/
private WorkResult workerDoJob() {
//1.Check重复执行
if (!checkIsNullResult()) {
return workResult;
}
try {
//2.设置Wrapper状态为Working
//如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
return workResult;
}
//3.回调begin
callback.begin();
//4.执行耗时操作action
V resultValue = worker.action(param, forParamUseWrappers);
//5.设置Wrapper状态为FINISH
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return workResult;
}
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//6.回调result
callback.result(true, param, workResult);
return workResult;
} catch (Exception e) {
//7.异常处理:设置状态ERROR\EXCEPTION,结果设置为默认值
fastFail(WORKING, e);
return workResult;
}
}
beginNext()
/**
* 进行下一个任务
*/
private void beginNext(ExecutorService executorService, long now, long remainTime) {
//花费的时间
long costTime = SystemClock.now() - now;
//1.后续没有任务了
if (nextWrappers == null) {
return;
}
//2.后续只有1个任务,使用当前任务的线程执行next任务
if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
return;
}
//3.后续有多个任务,使用CompletableFuture[]包装,有几个任务就起几个线程执行
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
}
//4.阻塞获取Future结果,注意这里没有超时时间,超时时间由全局统一控制。
try {
CompletableFuture.allOf(futures).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
开启下一个任务,如果没有就结束程序,有一个就启动一个线程,有多个就启动多个。
doDependsOneJob()
private void doDependsOneJob(WorkerWrapper dependWrapper) {
//1.依赖超时?
if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultResult();
fastFail(INIT, null);
}
//2.依赖异常?
else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
fastFail(INIT, null);
}
//3.依赖正常
else {
//前面任务正常完毕了,该自己了
fire();
}
}
如果判断到只有一个依赖的时候就要进入这个方法,看自己依赖的方法执行情况,如果依赖的方法超时那自己也超时
doDependsJobs()
先判断依赖的任务是否是must,如果一个都不是那就执行自己。
如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干,等到必须完成的完成了的时候就会让该任务进行的
如果fromWrapper是必须的,就判断是否所有的必须都完成了,并检查他们的情况,有超时即自己也超时
如何确保多依赖任务不被重复执行
//可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(poolExecutor, now, remainTime);
return;
}
框架的特点SystemClock
在项目中获取当前时间没有使用System.currentTimeMillis()而是自己写了一个SystemClock
项目中通过单例模式创建了一个时钟类SystemClock
/**
* 用于解决高并发下System.currentTimeMillis卡顿
* @author lry
*/
public class SystemClock {
private final int period;
private final AtomicLong now;
/**
* 单例模式
*/
private static class InstanceHolder {
private static final SystemClock INSTANCE = new SystemClock(1);
}
private SystemClock(int period) {
this.period = period;
this.now = new AtomicLong(System.currentTimeMillis());
scheduleClockUpdating();
}
private static SystemClock instance() {
return InstanceHolder.INSTANCE;
}
/**
* 守护线程
* 每1ms写一次now系统当前时间。
*/
private void scheduleClockUpdating() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "System Clock");
//守护线程,必须在线程启动前设置
thread.setDaemon(true);
return thread;
});
//定时周期执行任务
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}
private long currentTimeMillis() {
return now.get();
}
/**
* 用来替换原来的System.currentTimeMillis()
*/
public static long now() {
return instance().currentTimeMillis();
}
}
创建了一个守护线程,每1ms对 AtomicLong now进行更新 System.currentTimeMillis(),因为守护线程的执行周期是每1ms执行一次,这里是有1ms的延迟。
ScheduledExecutorService 是 ExecutorService 的子类,它基于 ExecutorService 功能实现周期执行的任务。
参考
手写中间件之——并发框架_天涯泪小武的博客-CSDN博客
并行编排AsyncTool框架源码解析_涛声依旧叭的博客-CSDN博客