- 作者:老汪软件技巧
- 发表时间:2024-09-08 00:03
- 浏览量:
RabbitMQrabbitmq 工作模式
RPC实现原理
如上图所示,同一个回调队列,服务器将响应消息发送到回调队列,通过correlation id关联请求和响应。
通道 channels
Channel 是一种逻辑上的连接,它存在于实际的 TCP 连接之上。一个 TCP 连接可以支持多个 Channel. 可以将channels视为“共享单个 TCP 连接的轻量级连接”与连接相似,通道也应是长期存在的。也就是说,没有必要在每次操作时都打开一个通道,这样做效率很低,因为打开通道需要一次网络往返。
可以在服务端、客户端设置可以打开的通道数、每个连接允许打开的通道数,使用两者中较低的值。
# 设置每个连接能打开的最大channel数
channel_max = 100
# 设置每个节点允许打开的最大channel数
channel_max_per_node
如果是RabbitMQ Java 客户端, 通过ConnectionFactory#setRequestedChannelMax控制
交换机 exchange
所有消息都会先发到exchange交换机,再由交换机根据路由策略转发到相应的队列。交换机类型有如下几种:
Dead Lettering Exchanges(死信交换机)
以下四种情况,消息会被发到死信交换机
消费者使用basic.reject或basic.nack并且requeue参数设置为false消息设置TTL过期队列超出长度限制,消息被丢弃消息返回仲裁队列的次数超过了投递限制的次数
如果整个队列过期,队列中的消息不会被置为死信。
队列(Queues)
RabbitMQ 中的队列是有序的消息集合,以“amq.”开头的队列为rabbitmq代理内部使用,尝试声明名称违反此规则的队列将导致通道级异常。
在使用队列之前,必须先声明它。如果队列尚不存在,则声明队列将导致创建它。如果队列已存在且其属性与声明中的属性相同,则声明将无效。当现有队列属性与声明中的属性不同时,将PRECONDITION_FAILED引发通道级异常。
队列属性
Quorum Queues(仲裁队列)
RabbitMQ 仲裁队列是一种现代队列类型,它基于Raft 共识算法实现了持久的、复制的 FIFO 队列,提供强一致性保障。
自从RabbitMQ3.10起,Quorum queues支持Queue TTL和message TTL.使用message TTL时,每条消息内存开销会增加2个字节。
Quorum queue不支持Global Qos,只能对每个Consumer单独设置prefetch count。
仲裁队列支持消费者优先级,但不支持消息优先级。
仲裁队列适用于需要高一致性、高可用性和在分布式环境中运行的关键任务应用。不适合低延迟、高吞吐、无强一致性需求、存储资源有限、场景简单等情况。
每个quorum queue都有个主副本称为queue leader。所有队列操作都是通过领导者然后复制到followers。默认会选择声明队列的客户端所连接的节点作为初始领导者(client-local),可以通过queue-leader-locator策略设置balanced,如果队列总数少于1000个,选择托管仲裁队列领导者数量最少的节点。如果队列总数超过 1000 个,则选择一个随机节点。
Classic Queues(经典队列)
队列中存储的数据不会被复制,如果对数据安全要求严格的建议使用quorum queue或stream.
4.0之前Classic queue支持Durable(持久性)和Transient(非持久),4.0版本开始,transient被弃用并移除。
classic queue支持队列独占、队列和消息TTL、队列长度限制、消息优先级、消费者优先级、死信队列。不支持处理 poison message
3.12版本之前,在Lazy模式下运行的queue,会将消息直接写入磁盘(可以减少内存使用),而不会将消息保留在内存中。3.12之后,忽略此配置,所有队列自动以lazy模式运行
优先级队列
优先级队列(Priority Queue)允许消息根据优先级进行排序,高优先级的消息会先于低优先级的消息被消费使用优先级队列会产生额外的内存、磁盘和cpu成本。优先级实际最大可以设置到255,但建议设置为1-5,最高不超过10。
延时队列
RabbitMQ有两种方式实现延时队列
流(Streams)
Streams是RabbitMQ 3.9引入的一个新特性(类似Kafka),它旨在提供一种高吞吐量、低延迟的消息传递机制,同时支持消息的持久化和复制。这个特性是为了满足需要处理大量消息的场景,比如日志聚合、事件源、实时分析等。
任何消费者可以从日志中的任何点开始读取数据,由x-stream-offset参数控制
流数据安全性
只有数据被复制到流副本的法定人数后(quorum),才会向发布者发出确认(publisher confirms)。流不会明确将数据从page cache刷新到磁盘,而是依靠操作系统自身的刷新行为。这意味着如果服务器某个节点不受控制的关闭,可能会导致该节点上托管的副本数据丢失,单个节点上的数据丢失通常只会从系统中的其他节点重新复制。
如果对数据安全性有更高的要求,优先考虑使用仲裁队列(Quorum Queues),因为仲裁队列需要至少一定数量(quorum)的节点将 数据写入并刷新到磁盘后,才会向发布者发出确认。
对于没有使用发布者确认机制的,RabbitMQ不提供任何保证。
stream数据保留策略
与kafka类似,stream数据清理受两个属性控制:
心跳检测
除非已知环境在每个主机(RabbitMQ 节点和应用程序)上使用TCP keepalive,否则不建议停用心跳
消费者消费者确认(Consumer Acknowledgment)
可以自动确认或手动确认, 支持批量确认。
消费者拒绝消息
basic.reject 拒绝单个消息basic.nack 拒绝一个或多个消息
requeue策略: 当消息重新排队时,如果可能,它将被放置到队列中的原始位置。如果不行(由于多个消费者共享一个队列时,其他消费者同时交付并确认),该消息将被重新排队到更靠近队列头的位置。 为了避免循环重复处理无法ack的消息,消费者应该限制requeue的次数或延迟入队
QoS
消息以异步方式传递(发送)给客户端,并且任何给定时刻,一个通道上可能存在多条“正在传输”的消息。客户端的手动确认本质上也是异步的,但流动方向相反。消费者可以通过basic.qos设置预取计数值,值定义了允许在通道上发送的最大未确认消息数。当该数量达到配置的计数时,RabbitMQ 将停止在通道上发送更多消息,直到至少一条未完成的消息得到确认,如果值为0,表示不限制。
发布者发布者确认(Publisher Confirms)
生产者发布消息后,RabbitMQ 不会立即返回确认,而是在消息成功被写入队列(并持久化,如果是持久化消息)后,向生产者发送确认。
使用场景
SpringBoot启用Publisher Confirms需要开启配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
Java Spring Publisher Confirm代码示例
@Configuration
@Slf4j
public class MqConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
//消息未送达队列触发回调
rabbitTemplate.setReturnsCallback((returnCallback) -> {
// 消息发送失败
log.error("消息发送失败,未送达队列。message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
returnCallback.getMessage(), returnCallback.getReplyCode(), returnCallback.getReplyText()
, returnCallback.getExchange(), returnCallback.getRoutingKey());
});
//消息消费后ack状态
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("ack提交后回调:{};消息:{}", ack, correlationData);
if (!ack) {
// 发送失败
MqConfig.log.error("ack失败:{}", correlationData);
}
});
}
}
事务
不建议使用,过于繁重,吞吐量降低250倍
生产/消费代码示例
发布者 sender.go
package main
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"log"
url "net/url"
"time"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
url := "amqps://" + url.QueryEscape("username") + ":" + url.QueryEscape("password") + "@127.0.0.1:5671/"
conn, err := amqp.Dial(url)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"hello",
"fanout",
true,
false,
false,
false,
nil)
failOnError(err, "Failed to declare an exchange")
//声明原始队列
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
amqp.Table{
"x-dead-letter-exchange": "deadHelloExchange", //绑定死信交换机
"x-dead-letter-routing-key": "deadHelloRoutingKey", //死信routing key
//"x-message-ttl": 5000,
},
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"hello", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
//声明死信交换机
err = ch.ExchangeDeclare(
"deadHelloExchange",
"direct",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a exchange")
//声明死信队列
dlx, err := ch.QueueDeclare(
"deadHelloQueue", // 死信队列名称
false,
true,
false,
false,
nil,
)
failOnError(err, "Failed to declare a dead letter queue")
//死信队列绑定到死信交换器
err = ch.QueueBind(
dlx.Name,
"deadHelloRoutingKey",
"deadHelloExchange",
false,
nil,
)
failOnError(err, "Failed to bind dead letter queue to exchange")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := "Hello World!"
err = ch.PublishWithContext(ctx,
"hello",
"",
true, //如果设置为true,必须确保消息至少被一个队列接收,如果消息不能被路由到任何队列,且mandatory=true,则RabbitMQ会将消息返回给发布者
false, //如果为true,RabbitMQ 会确保消息只会被已经准备好消费的消费者接收。也就是说,如果队列中没有等待消费的消息(即没有活跃的消费者),消息将不会被投递,而是返回给发布者。
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Expiration: "50000", //设置消息TTL
})
}
消费者 receive.go
package main
import (
amqp "github.com/rabbitmq/amqp091-go"
"log"
"net/url"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
url := "amqps://" + url.QueryEscape("username") + ":" + url.QueryEscape("password") + "@127.0.0.1:5671/"
conn, err := amqp.Dial(url)
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明队列,不存在创建
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
//绑定队列和交换机
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"hello", // exchange
false,
nil,
)
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil)
failOnError(err, "Failed to register a consumer")
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}