• 作者:老汪软件技巧
  • 发表时间:2024-09-27 04:00
  • 浏览量:

本文主要记录了在springboot中通过事件发布机制+延迟队列实现定时任务的需求,主要内容有:

实现的思维导图如下:

事件发布机制

事件发布机制需要实现三个角色,分别为

事件发布机制的通信单元

在Spring框架中、提供了对事件的封装,实体类只需要继承ApplicationEvent抽象类即可。代码如下:

@Getter
@Setter
public class CustomSpringEvent extends ApplicationEvent {
    private String message;
    public CustomSpringEvent(Object source, String message) {
        super(source);
        this.message = message;
    }
}

可以把CustomSpringEvent理解为这个事件传输的基本单位、封装了我们需要传递的信息。

事件发布者机制的生产者

事件发布者:主要职责就是发送任务的,相当于告诉监听者、该起床干活了。具体代码如下:

@Component
public class CustomEventPublisher implements ApplicationEventPublisherAware {
    private ApplicationEventPublisher publisher;
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
    // 发布事件
    public void publish(String message) {
        CustomSpringEvent event = new CustomSpringEvent(this, message);
        publisher.publishEvent(event);
    }
}

当然也可以直接自动注入private ApplicationEventPublisher publisher;通过调用接口的publishEvent()方法进行事件发布。

事件发布机制的消费者

监听者的实现如下:

@Slf4j
@Component
public class CustomEventListener implements ApplicationListener {
    @Override
    public void onApplicationEvent(CustomSpringEvent event) {
        log.info("收到事件:{}", event.getMessage());
    }
}

在测试包下写下如下代码进行代码测试:

@SpringBootTest
public class EventTest {
​
    @Autowired
    private CustomEventPublisher eventPublisher;
    @Test
    public void testEvent() {
        eventPublisher.publish("hello world");
    }
}

可以看到运行结果符合预期。事件成功发布和消费。

延时队列怎么实现_延迟队列最好方案_

延迟任务

延迟任务、在JUC包下提供了Delayed接口、该接口实现类Comparable接口、因此需要实现如下两个方法:

除此之外、为了保存多个定时任务、我们需要一个容器即延迟队列DeleayQueue、延迟队列DelayQueue实现了BlockingQueue。关于BlockQueue的介绍请参看。延迟队列的会根据事件发生的先后顺序进行排列,时间到了就可以调用take()方法取出消费,否则就阻塞在此。

因此需要改造一下之前的事件类。在原代码的基础上修改代码如下:

    @Getter
    @Setter
    public class CustomSpringEvent extends ApplicationEvent  implements Delayed {
    ​
        private String message;
        public CustomSpringEvent(Object source, String message,long millis) {
            // 用于计算任务距离当前的时间差
            // millis:距离当前系统时间多少毫秒之后执行。
            super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofMillis(millis)));
            this.message = message;
        }
    ​
        @Override
        public long getDelay(TimeUnit unit) {
            long millis = this.getTimestamp();
            long currentTime = System.currentTimeMillis();
            long duration = millis - currentTime;
            return unit.convert(duration, unit);
        }
    ​
        @Override
        public int compareTo(Delayed o) {
            long delta = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
            return (delta < 0 ? -1 : (delta > 0 ? 1 : 0));
        }
    }

此时需要重新捋一下事件的发布过程了、不再是简单的发布消费了。

因此修改发布代码如下:

@Component
@Slf4j
public class CustomEventPublisher implements ApplicationEventPublisherAware {
​
    private ApplicationEventPublisher publisher;
​
    // 延迟任务队列容器
    private DelayQueue delayQueue = new DelayQueue<>();
​
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
​
    // 延迟任务入队
    public void publish(CustomSpringEvent event) {
        boolean offer = this.delayQueue.offer(event);
        if (offer) {
            log.info("延迟任务 [{}] 已加入队列",event.getMessage());
        }
    }
​
    // 延迟任务出队并发布
    private void publishDelayTask() {
        while (true) {
            try {
                CustomSpringEvent event = this.delayQueue.take();
                this.publisher.publishEvent(event);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
​
    @PostConstruct
    public void start() {
        new Thread(() -> publishDelayTask()).start();
    }
}

编写测试代码如下:

@Test
public void testEvent() {
​
    CustomSpringEvent task1 = new CustomSpringEvent(this, "5s后执行", 5000);
    CustomSpringEvent task2 = new CustomSpringEvent(this, "10s后执行", 10000);
    CustomSpringEvent task3 = new CustomSpringEvent(this, "20s后执行", 20000);
​
    eventPublisher.publish(task1);
    eventPublisher.publish(task2);
    eventPublisher.publish(task3);
​
    try {
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
​
}

运行结果如下:

可以看到结果符合预期、没有任何毛病!

优化点通过线程池来来执行任务。配置线程池:

@Configuration
@EnableAsync
public class ThreadPoolConfiguration {
​
    @Bean("listenerExecutor")
    public ThreadPoolExecutor poolExecutor() {
​
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                10, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
​
        return threadPoolExecutor;
    }
​
    @Bean("publisherExecutor")
    public ThreadPoolExecutor publisherExecutor() {
​
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
                10, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
​
        return threadPoolExecutor;
    }
}

关于监听器的实现可以通过注解来实现

@Slf4j
@Component
@EnableAsync
public class CustomEventListener {
    @EventListener
    @Async("listenerExecutor")
    public void onCustomEvent(CustomSpringEvent event) {
        log.info("Received spring custom event - {}", event.getMessage());
    }
}

发布者也可以不实现接口的方式来实现

@Component
@Slf4j
public class EventPublisher {
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;;
    public void publish(CustomSpringEvent event)
    {
        log.info("publish event:{}",event);
        applicationEventPublisher.publishEvent(event);
    }
}

关于任务的发布这一个也可以交给线程池实现

@Autowired
@Qualifier("publisherExecutor")
private ThreadPoolExecutor publisherThreadPool;
​
@PostConstruct
public void start() {
  //new Thread(() -> publishDelayTask()).start();
  publisherThreadPool.execute(() -> publishDelayTask());
}