- 作者:老汪软件技巧
- 发表时间:2024-09-27 04:00
- 浏览量:
本文主要记录了在springboot中通过事件发布机制+延迟队列实现定时任务的需求,主要内容有:
实现的思维导图如下:
事件发布机制
事件发布机制需要实现三个角色,分别为
事件发布机制的通信单元
在Spring框架中、提供了对事件的封装,实体类只需要继承ApplicationEvent抽象类即可。代码如下:
@Getter
@Setter
public class CustomSpringEvent extends ApplicationEvent {
private String message;
public CustomSpringEvent(Object source, String message) {
super(source);
this.message = message;
}
}
可以把CustomSpringEvent理解为这个事件传输的基本单位、封装了我们需要传递的信息。
事件发布者机制的生产者
事件发布者:主要职责就是发送任务的,相当于告诉监听者、该起床干活了。具体代码如下:
@Component
public class CustomEventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
// 发布事件
public void publish(String message) {
CustomSpringEvent event = new CustomSpringEvent(this, message);
publisher.publishEvent(event);
}
}
当然也可以直接自动注入private ApplicationEventPublisher publisher;通过调用接口的publishEvent()方法进行事件发布。
事件发布机制的消费者
监听者的实现如下:
@Slf4j
@Component
public class CustomEventListener implements ApplicationListener {
@Override
public void onApplicationEvent(CustomSpringEvent event) {
log.info("收到事件:{}", event.getMessage());
}
}
在测试包下写下如下代码进行代码测试:
@SpringBootTest
public class EventTest {
@Autowired
private CustomEventPublisher eventPublisher;
@Test
public void testEvent() {
eventPublisher.publish("hello world");
}
}
可以看到运行结果符合预期。事件成功发布和消费。
延迟任务
延迟任务、在JUC包下提供了Delayed接口、该接口实现类Comparable接口、因此需要实现如下两个方法:
除此之外、为了保存多个定时任务、我们需要一个容器即延迟队列DeleayQueue、延迟队列DelayQueue实现了BlockingQueue。关于BlockQueue的介绍请参看。延迟队列的会根据事件发生的先后顺序进行排列,时间到了就可以调用take()方法取出消费,否则就阻塞在此。
因此需要改造一下之前的事件类。在原代码的基础上修改代码如下:
@Getter
@Setter
public class CustomSpringEvent extends ApplicationEvent implements Delayed {
private String message;
public CustomSpringEvent(Object source, String message,long millis) {
// 用于计算任务距离当前的时间差
// millis:距离当前系统时间多少毫秒之后执行。
super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofMillis(millis)));
this.message = message;
}
@Override
public long getDelay(TimeUnit unit) {
long millis = this.getTimestamp();
long currentTime = System.currentTimeMillis();
long duration = millis - currentTime;
return unit.convert(duration, unit);
}
@Override
public int compareTo(Delayed o) {
long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (delta < 0 ? -1 : (delta > 0 ? 1 : 0));
}
}
此时需要重新捋一下事件的发布过程了、不再是简单的发布消费了。
因此修改发布代码如下:
@Component
@Slf4j
public class CustomEventPublisher implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
// 延迟任务队列容器
private DelayQueue delayQueue = new DelayQueue<>();
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
// 延迟任务入队
public void publish(CustomSpringEvent event) {
boolean offer = this.delayQueue.offer(event);
if (offer) {
log.info("延迟任务 [{}] 已加入队列",event.getMessage());
}
}
// 延迟任务出队并发布
private void publishDelayTask() {
while (true) {
try {
CustomSpringEvent event = this.delayQueue.take();
this.publisher.publishEvent(event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@PostConstruct
public void start() {
new Thread(() -> publishDelayTask()).start();
}
}
编写测试代码如下:
@Test
public void testEvent() {
CustomSpringEvent task1 = new CustomSpringEvent(this, "5s后执行", 5000);
CustomSpringEvent task2 = new CustomSpringEvent(this, "10s后执行", 10000);
CustomSpringEvent task3 = new CustomSpringEvent(this, "20s后执行", 20000);
eventPublisher.publish(task1);
eventPublisher.publish(task2);
eventPublisher.publish(task3);
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
运行结果如下:
可以看到结果符合预期、没有任何毛病!
优化点通过线程池来来执行任务。配置线程池:
@Configuration
@EnableAsync
public class ThreadPoolConfiguration {
@Bean("listenerExecutor")
public ThreadPoolExecutor poolExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
return threadPoolExecutor;
}
@Bean("publisherExecutor")
public ThreadPoolExecutor publisherExecutor() {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
return threadPoolExecutor;
}
}
关于监听器的实现可以通过注解来实现
@Slf4j
@Component
@EnableAsync
public class CustomEventListener {
@EventListener
@Async("listenerExecutor")
public void onCustomEvent(CustomSpringEvent event) {
log.info("Received spring custom event - {}", event.getMessage());
}
}
发布者也可以不实现接口的方式来实现
@Component
@Slf4j
public class EventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;;
public void publish(CustomSpringEvent event)
{
log.info("publish event:{}",event);
applicationEventPublisher.publishEvent(event);
}
}
关于任务的发布这一个也可以交给线程池实现
@Autowired
@Qualifier("publisherExecutor")
private ThreadPoolExecutor publisherThreadPool;
@PostConstruct
public void start() {
//new Thread(() -> publishDelayTask()).start();
publisherThreadPool.execute(() -> publishDelayTask());
}