- 作者:老汪软件技巧
- 发表时间:2024-01-01 16:00
- 浏览量:
目录
基本介绍
什么是死信交换机
在定义业务队列的时候,要考虑指定一个死信交换机,死信交换机可以和任何一个普通的队列进行绑定,然后在业务队列出现死信的时候就会将数据发送到死信队列。
什么是死信队列
死信队列实际上就是一个普通的队列,只是这个队列跟死信交换机进行了绑定,用来存放死信而已
中有一种交换器叫 DLX,全称为 Dead--,可以称之为死信交换器。当消息在一个队列中变成死信(dead )之后,它会被重新发送到另外一个交换器中,这个交换器就是 DLX,绑定在 DLX 上的队列就称之为死信队列。
要注意的是,DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何队列上被指定,实际上就是设置某个队列的属性。当这个队列存在死信时, 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。
消息进入到死信队列的情况 消息过期
MessageProperties messageProperties=new MessageProperties();
//设置此条消息的过期时间为10秒
messageProperties.setExpiration("10000");
队列过期
Map arguments =new HashMap<>();
//指定死信交换机,通过x-dead-letter-exchange 来设置
arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
//设置死信路由key,value 为死信交换机和死信队列绑定的key
arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
//队列的过期时间
arguments.put("x-message-ttl",10000);
return new Queue(QUEUE_NORMAL,true,false,false,arguments);
TTL: Time to Live的简称,过期时间
队列达到最大长度(先入队的消息会被发送到DLX)
Map arguments = new HashMap();
//设置队列的最大长度 ,对头的消息会被挤出变成死信
arguments.put("x-max-length", 5);
消费者拒绝消息不进行重新投递
从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。
.yml 启动手动确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
*
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitConfig.QUEUE})
public void process(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
//对消息不确认, ack单词是 确认 的意思
try {
System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
//要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException e) {
e.printStackTrace();
}
}
void (long , , )
消费者拒绝消息
开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
*
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitConfig.QUEUE})
public void process(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
try {
System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
//要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
.(.().(), true);
代码实战 实战架构
如上图,消息到达正常的交换机.nomal.a,通过与正常的队列queue.noaml.a绑定,消息会到达正常队列,如果消息变为死消息以后则会转发到与正常队列绑定的死信交换机中,死信交换机会转发到与其绑定的死信队列queue.deal.a。
工程概述
工程采用架构,主要用到的依赖为:
org.springframework.boot
spring-boot-starter-amqp
org.projectlombok
lombok
.yml配置文件如下:
server:
port: 8080
spring:
rabbitmq:
host: 123.249.70.148
port: 5673
username: admin
password: 123456
virtual-host: /
配置类:创建队列及交换机并进行绑定
@Configuration
public class RabbitConfigDeal {
}
创建正常交换机
@Bean
public DirectExchange normalExchange(){
return ExchangeBuilder.directExchange("exchange.normal.a").build();
}
创建死信交换机
@Bean
public DirectExchange deadExchange(){
return ExchangeBuilder.directExchange("exchange.dead.a").build();
}
创建死信队列
@Bean
public Queue deadQueue(){
return QueueBuilder.durable("queue.dead.a").build();
}
创建正常队列,设置他的绑定死信交换机,以及对应绑定的路由key为order
@Bean
public Queue normalQueue(){
Map arguments =new HashMap<>();
arguments.put("x-message-ttl",20000);
arguments.put("x-dead-letter-exchange","exchange.dead.a");
arguments.put("x-dead-letter-routing-key","order");
return QueueBuilder.durable("queue.normal.a")
.withArguments(arguments).build();
}
绑定正常交换机和正常队列
@Bean
public Binding bindingNormal(DirectExchange normalExchange,Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
}
绑定死信交换机和死信队列
@Bean
public Binding bindingDeal(DirectExchange deadExchange,Queue deadQueue){
return BindingBuilder.bind(deadQueue).to(deadExchange).with("order");
}
业务类:发送消息及接收消息
@Component
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
}
发送消息方法
public void sendMsg(){
//添加消息属性
Message message = MessageBuilder.withBody("hello word!".getBytes(StandardCharsets.UTF_8))
.build();
rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
log.info("发送消息时间:{}",new Date());
}
这里用的路由key为info
接受消息
@RabbitListener(queues = {"queue.dead.a"})
public void receiveMsg(Message message){
byte[] body = message.getBody();
String queue = message.getMessageProperties().getConsumerQueue();
String msg=new String(body);
log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
}
在消息传递的过程中,实际上传递的对象为 org..amqp.core. ,它主要由两部分组成:
// 消息属性
byte[] body // 消息内容
@
使用 @ 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理
主启动类n:实现接口
/**
* @author 风轻云淡
*/
@SpringBootApplication
public class RabbitMq01Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(RabbitMq01Application.class, args);
}
@Resource
private MessageService messageService;
/**
* 程序一启动就会调用该方法
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
messageService.sendMsg();
}
}
在中,提供了一个接口:。 该接口中,只有一个run方法,他执行的时机是:容器启动完成之后,就会紧接着执行这个接口实现类的run方法。
由于该方法是在容器启动完成之后,才执行的,所以,这里可以从容器中拿到其他已经注入的bean。
启动主启动类后查看控制台:
2023-09-28 10:46:17.772 INFO 71700 --- [ main]
c.e.rabbitmq01.service.MessageService :
发送消息时间:Thu Sep 28 10:46:17 CST 2023
2023-09-28 10:46:37.824 INFO 71700 --- [ntContainer#0-1]
c.e.rabbitmq01.service.MessageService :
queue.dead.a接收到消息时间:Thu Sep 28 10:46:37 CST 2023,消息为hello word!
我们在这里可以看见17s的时候发送了消息,在经过了20s,即到37s的时候我们在死信队列queue.dead.a接受到了消息。