• 作者:老汪软件技巧
  • 发表时间:2024-09-20 00:01
  • 浏览量:

前言

在前面的文章中我们介绍了RocketMQ的广播消息,作为一款合格的消息队列,不支持延迟消息那肯定是不行的。在前面我们了解的RabbitMQ中,它的延迟消息是通过死信队列来实现的。

然鹅,RocketMQ的实现方式与其不同,今天我们一起来了解下!

延迟消息的概念

延迟消息是指在消息发送的时候指定一个延迟时间,消息不会立即被消费,而是会在达到预定的时间点之后才被投递给消费者。这个思想经常应用在订单超时处理、任务调度很多种业务场景中。比如我们在pdd买东西未付款提醒这些场景。

延迟消息的实现原理

RocketMQ通过好多种延迟级别来实现延迟消息的功能。在发送延迟消息时,生产者需要指定一个延迟级别,注意不是具体的延迟时间。目前,RocketMQ支持18个固定的延迟级别,每个级别对应了一定的延迟时间,从几秒到几小时不等。比如:

如果这样看着不太清晰的话,部署完RocketMQ集群后按下面步骤操作即可看到:

当生产者发送延迟消息时,会通过setDelayTimeLevel(int level)方法设置延迟级别。消息发送后并不会立即到达消费者的队列中,而是会被暂时存放在一个特殊的队列中。Broker端会周期性地检查这些延迟队列,一旦消息的延迟时间到达,就会将其重新投递到原来的目标主题中,供消费者消费。

延迟消息的使用方法

Java代码中使用RocketMQ发送延迟消息,可以通过下面的步骤实现:

消息延迟发送设置_消息延迟发送怎么设置_

首先是创建生产者,使用生产者集群发送消息,详细代码注释已经标好:

package com.xiaowei.rocketmq.Scheduled;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledProducer {
    public static void main(String[] args) throws Exception {
        // 创建DefaultMQProducer实例,生产者组名"ExampleProducerGroup"
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // 启动生产者
        producer.start();
        // 需要发送的消息总数
        int totalMessagesToSend = 100;
        // 循环发送消息
        for (int i = 0; i < totalMessagesToSend; i++) {
            // 创建消息对象,设置主题"TestTopic",消息体为"Hello scheduled message "加上当前循环次数
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // 设置消息的延迟等级为3。消息在一段时间之后才被消费,这个见下文。
            message.setDelayTimeLevel(3);
            // 发送消息到RocketMQ
            producer.send(message);
        }
        // 关闭生产者实例,释放资源
        producer.shutdown();
    }
}

先启动消费者后再启动生产者:

消费者代码和注释如下:

package com.xiaowei.rocketmq.Scheduled;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import org.apache.rocketmq.client.exception.MQClientException;
public class ScheduledConsumer {
    public static void main(String[] args) throws MQClientException {
        // DefaultMQPushConsumer实例,消费者组名"ExampleConsumer"
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        // 订阅"TestTopic"主题,过滤表达式为"*",接收这个主题下的所有消息
        consumer.subscribe("TestTopic", "*");
        // 消息监听器,用于处理接收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) {
                for (MessageExt message : messages) {
                    // 输出消息的ID和消息从存储到消费的时间差(毫秒)
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                            + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                }
                // 返回消费状态,消息已经成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
    }
}

可以看到过了十秒之后消费者这边接收到了消息:

延迟消息的应用场景

这个八股是面试中的重点,在这里也记录下吧,已帮助到更多在面试的小伙伴儿们:

注意事项

在使用RocketMQ的延迟消息,同时也需要注意几点:

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!