- 作者:老汪软件技巧
- 发表时间:2024-10-07 10:02
- 浏览量:
本文结合源码,讲解了调度平台的一个调度请求,在执行器中如何被处理?
当调度平台向某个执行器发送run指令时,会被EmbedServer的内部类EmbedHttpServerHandler,使用线程池异步处理。那么本次调度就成功触发了。run请求的参数如下
{
"jobId": 1,
"executorHandler": "demoJobHandler",
"executorParams": "",
"executorBlockStrategy": "SERIAL_EXECUTION",
"executorTimeout": 0,
"logId": 2,
"logDateTime": 1728027839906,
"glueType": "BEAN",
"glueSource": "",
"glueUpdatetime": 1541254891000,
"broadcastIndex": 0,
"broadcastTotal": 1
}
二 向JobThread提交任务
接着会调用ExecutorBizImpl#run。
首先根据入参中jobId,从jobThreadRepository查找JobThread:每个任务体在执行器中,由固定的某个线程来运行。
// key是jobId
private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap();
2.1 JobThread为null时2.2 JobThread不为null时
JobThread不为null时,会增加对入参中executorBlockStrategy的处理。
当JobThread为null时,本次执行时会新建JobThread实例,也就不可能有积压的任务;也就不用做阻塞处理。
因为本次任务只是提交到JobThread的队列中,如果队列中有积压的任务,或者JobThread正在执行某个任务,本次请求并不会被立即执行,即被阻塞。
任务阻塞时,处理策略有三个:
使用Discard Later时,本次调度将失败。
使用Cover Early时,本次调度成功;队列中等待的任务将不会被执行,但是正在执行中的任务体,如果不能响应interrupt,还是会被完整的执行掉。
三 JobThread类3.1 创建实例
JobThread继承了Thread类,增加了以下属性:
执行器给每个任务,都会创建一个JobThread实例。
即同一执行器上的不同任务间,是线程隔离的。不会因为任务A阻塞或异常,而影响任务B的执行。
3.2 线程任务
JobThread重写了run方法,主要流程如下:
注意这几点:
注意,当一个任务的执行周期大于90秒时,每次执行,都得重新创建JobThread。
因为任务周期很长时,如1天执行一次;那一天之内,JobThread一直在空转,资源浪费严重。不如及时终止它,下次需要时再创建。
四 结果回调4.1 结果入队
当JobThread处理完一个任务后,会将结果封装为HandleCallbackParam实例,调用TriggerCallbackThread.pushCallBack,添加到TriggerCallbackThread的callBackQueue中。
4.2 结果回调
TriggerCallbackThread是在XxlJobExecutor.start()中被创建并启动。
有两个守护线程:首次回调线程、失败重试线程
// 队列
private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue();
// 消费队列的线程
private Thread triggerCallbackThread;
// 回调失败重试线程
private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false;
五 总结