• 作者:老汪软件技巧
  • 发表时间:2024-09-18 21:56
  • 浏览量:

前言

在前面的文章中,记录了RocketMQ消息广播,延迟消息等机制,今天介绍一下一个特别特别重要的功能---消息过滤。顾名思义,它可以让消费者根据特定条件选择性地接收消息,从而优化资源利用,提高系统效率。

废话不多说,接下来就一起学习一下消息过滤这块的知识点吧。

消息过滤的概念

早在之前的RabbitMQ系列文章中,就介绍过关于消息过滤相关的概念。和RocketMQ不太一样,这里重新记录下!

RocketMQ中的消息过滤是指在消费者端对接收到的消息进行条件过滤,只有符合特定条件的消息才会被消费者处理,不符合条件的消息被忽略。发明消息过滤的人真是天才!这一机制在生产者端设置消息属性和在消费者端设置消息过滤器来实现,很有效地减少了不必要的数据传输,提高了数据传输的效率!

消息过滤的实现方式

RocketMQ提供了两种主要的消息过滤方式:Tag过滤和SQL92过滤。

Tag过滤

Tag过滤是RocketMQ中最常用的消息过滤方式。在其他的消息队列中也比较常用。在发送消息时,生产者可以为每条消息指定一个或多个Tag,这些Tag可以用来描述消息的类型或属性。消费者在订阅消息时,当然也可以指定一个或多个Tag作为过滤条件,只接收包含这些Tag的消息。

Tag过滤的实现机制是什么呢,请看VCR:

生产者发送消息时,为每条消息设置Tag。消费者订阅消息时,指定需要接收的Tag列表。RocketMQ服务端在接收到订阅请求后,会根据订阅的Tag列表和消息中的Tag进行匹配,只将符合条件的消息发送给消费者。SQL92过滤

除了Tag过滤之外,RocketMQ还支持SQL92过滤。SQL92就比较厉害了,它支持消费者使用SQL表达式来定义更复杂的过滤条件,可以实现更精细化的消息过滤。

接下来介绍一下SQL92过滤的实现机制:

生产者发送消息时,可以为消息设置多个用户属性。消费者在订阅消息时,确定一个SQL表达式作为过滤条件。这个表达式可以基于消息的用户属性进行匹配。RocketMQ服务端在接收到订阅请求后,会解析SQL表达式,并且根据表达式中的条件对消息进行过滤,只将符合条件的消息发送给消费者。

SQL92过滤默认是关闭的,需要在Broker的配置文件中设置enablePropertyFilter=true才能启用。因为SQL过滤需要在服务端进行复杂的表达式解析和属性匹配,所以它的性能相比Tag过滤肯定会稍低一些。

消息过滤案例

接下来我们通过代码进行实践,与上篇文章一样,创建生产者和消费者。

生产者生成消息,详细注释如下:

package c.x.r.filter;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TagProducer {
    public static void main(String[] args) throws Exception {
        // 创建DefaultMQProducer实例,生产者组名"please_rename_unique_group_name"
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 启动生产者实例
        producer.start();
        // 定义字符串数组,包含三个不同的标签
        String[] tags = new String[] {"TagA", "TagB", "TagC"};
        for (int i = 0; i < 15; i++) {
            // 创建新的消息对象,主题为"TagFilterTest",标签为tags数组中的一个元素
            // 消息体为"Hello world",编码为字节数组
            Message msg = new Message("TagFilterTest",
                    tags[i % tags.length],
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送消息,并获取发送结果
            SendResult sendResult = producer.send(msg);
            // 打印发送结果的信息
            System.out.printf("%s%n", sendResult);
        }
        // 关闭生产者实例,释放资源
        producer.shutdown();
    }
}

上面通过循环取余的算法,标签A,B,C各五条消息,如下图:

消费者代码如下,包含注释:

package c.x.r.filter;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import org.apache.rocketmq.client.exception.MQClientException;
public class TagConsumer {
    public static void main(String[] args) throws MQClientException {
        // 创建DefaultMQPushConsumer实例,消费者组名"please_rename_unique_group_name"
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 订阅主题"TagFilterTest"的消息,过滤表达式为"TagA || TagC",
        // 仅接收标签为TagA或TagC的消息
        consumer.subscribe("TagFilterTest", "TagA || TagC");
        // 消息监听器,处理接收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                // 输出接收到的新消息列表
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回消费状态,消息已成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        // 输出消费者启动成功的提示信息
        System.out.printf("Consumer Started.%n");
    }
}

消费者只会消费到来自生产者标签为A和C的消息。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!