• 作者:老汪软件技巧
  • 发表时间:2024-10-07 10:02
  • 浏览量:

本文结合源码,讲解了调度平台的一个调度请求,在执行器中如何被处理?

接收run请求

当调度平台向某个执行器发送run指令时,会被EmbedServer的内部类EmbedHttpServerHandler,使用线程池异步处理。那么本次调度就成功触发了。run请求的参数如下

{
    "jobId": 1,
    "executorHandler": "demoJobHandler",
    "executorParams": "",
    "executorBlockStrategy": "SERIAL_EXECUTION",
    "executorTimeout": 0,
    "logId": 2,
    "logDateTime": 1728027839906,
    "glueType": "BEAN",
    "glueSource": "",
    "glueUpdatetime": 1541254891000,
    "broadcastIndex": 0,
    "broadcastTotal": 1
}

二 向JobThread提交任务

接着会调用ExecutorBizImpl#run。

首先根据入参中jobId,从jobThreadRepository查找JobThread:每个任务体在执行器中,由固定的某个线程来运行。

// key是jobId
private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap();

2.1 JobThread为null时2.2 JobThread不为null时

JobThread不为null时,会增加对入参中executorBlockStrategy的处理。

当JobThread为null时,本次执行时会新建JobThread实例,也就不可能有积压的任务;也就不用做阻塞处理。

因为本次任务只是提交到JobThread的队列中,如果队列中有积压的任务,或者JobThread正在执行某个任务,本次请求并不会被立即执行,即被阻塞。

任务阻塞时,处理策略有三个:

使用Discard Later时,本次调度将失败。

使用Cover Early时,本次调度成功;队列中等待的任务将不会被执行,但是正在执行中的任务体,如果不能响应interrupt,还是会被完整的执行掉。

三 JobThread类3.1 创建实例

JobThread继承了Thread类,增加了以下属性:

执行器给每个任务,都会创建一个JobThread实例。

即同一执行器上的不同任务间,是线程隔离的。不会因为任务A阻塞或异常,而影响任务B的执行。

3.2 线程任务

JobThread重写了run方法,主要流程如下:

注意这几点:

注意,当一个任务的执行周期大于90秒时,每次执行,都得重新创建JobThread。

因为任务周期很长时,如1天执行一次;那一天之内,JobThread一直在空转,资源浪费严重。不如及时终止它,下次需要时再创建。

四 结果回调4.1 结果入队

当JobThread处理完一个任务后,会将结果封装为HandleCallbackParam实例,调用TriggerCallbackThread.pushCallBack,添加到TriggerCallbackThread的callBackQueue中。

4.2 结果回调

TriggerCallbackThread是在XxlJobExecutor.start()中被创建并启动。

有两个守护线程:首次回调线程、失败重试线程

// 队列
private LinkedBlockingQueue callBackQueue = new LinkedBlockingQueue();
// 消费队列的线程
private Thread triggerCallbackThread;
// 回调失败重试线程
private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false;

五 总结