- 作者:老汪软件技巧
- 发表时间:2024-11-15 11:02
- 浏览量:
在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。
发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
(消费逻辑副本,这里的实现方式,可以是订阅者在消费的时候,由消费者或者 Broker 本身记录好消费的 OFFSET ,当然也有别的实现方式)
队列模式和发布 - 订阅模式是并存的,有些消息队列同时支持这两种消息模型,比如 ActiveMQ。
这两种模型,生产者=发布者,消费者=订阅者,队列=主题,没有本质的区别。
最大的区别在于:一份消息数据能不能被消费多次的问题。
在发布 - 订阅模型中,如果只有一个订阅者,那和队列模型就基本一样。所以发布 - 订阅模型在功能层面可以兼容队列模型。
现代的消息队列产品使用的消息模型大多是这种发布 - 订阅模型。
RocketMQ 和 Kafka 的消息模型
RocketMQ,Kafka 是一样的消费模型,只是有些部分名字不一样,这里以 RocketMQ 为例,看一下它的发布-订阅模型的样子:
为什么它的发布订阅模型中,在 Topic 里面还有队列(Queue)的概念?队列在 RocketMQ 中的作用是什么呢?为了并行消费,提升消费效率(这其实牺牲了消息有序性)。
这个也与“消费-确认”机制有关,“消费-确认”机制虽然确保消息不会在传递过程中由于网络或服务器故障丢失。但是也带来一个问题:为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。
这样的话,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ 在主题下面增加了队列的概念。
如此以来,每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。但是,需要注意的是,这样做,消息就只能在队列上保证消息的有序性,主题层面无法保证消息的严格顺序。
而 Kafka 的消息模型一样,只是在主题中,Queue 的名字不叫队列,而是叫做 Partition,功能和含义,是没有任何区别的。
至此,Topic,Broker,Producer,Consumer,Publisher,Subscriber......等这些名词,也就都有了眉目。
消息可靠性
一般说消息可靠性,通常说的都是 2 个点:
服务如何不中断?消息如何不丢失?服务如何不中断
首先明确一点,大规模停电,那没办法。
其次,现代的分布式系统中,保证服务不中断的方法几乎是相通的,用道家思想讲就是:大道殊途同归。那就是:冗余。
比如: Redis 有哨兵集群,MySQL 有主从复制,包括国内常见的分布式系统,叫得出名字的厂,没有谁家的服务是单机的。消息队列也一样,也有 Broker 的副本主从机制,比如 Kafka 集群的 Controller 切换。
这些主从机制,通常有延伸到选举算法,常见的比如:Paxos,Raft 等等,根据算法,又会衍生出很多产品,比如 zookeeper。
消息尽量不丢失
一般采取就是消息确认机制。
处理分布式事务
在实际应用中,比较常见的分布式事务实现有 2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel) 和事务消息。
消息队列中,通常要判断这个产品是否支持事务消息。
但是要注意一点,事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。 比如:
在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致。统计数据延迟。但是最终数据要一致。
事务消息需要消息队列提供相应的功能才能实现,Kafka 和 RocketMQ 都提供了事务相关功能。当然,不同的消息队列有不同的实现细节,通常,都是使用“半消息”这个思路,不过半消息,实际和两阶段提交,理论上没啥区别。
如上图,如果在第 4 步骤,出现问题,那么这个分布式事务的解决,那就不可靠了。RocketMQ 好一点,有事务反查机制,但是 Kafka,可就直接报异常了,用户自己处理。
当然,不同的厂,有不同的解决方案。
幂等消费
在 MQTT 协议(一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议)中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
现在常用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 。也就是说,消息队列很难保证消息不重复。
Amazon SQS 也同样, 标准队列提供至少一次传送,因此每条消息至少会传送一次。 FIFO 队列提供一次性处理,因此每条消息仅传送一次,并且在使用器处理并删除它之前始终可用。 队列中不会引入重复消息。
所以,幂等,一定是在使用消息队列时要时刻注意的事情。消费者,一般都要实现幂等消费,实现幂等的方法:
消息积压
在使用消息队列遇到的问题中,消息积压是最常遇到的问题,并且,这个问题不太好解决。
因为不同类型的消息队列,其业务模型不同,实现细节不同,处理方式也就不同。
所以,这样的问题,通常需要有合理的思路。
消息积压的直接原因,一定是系统中的某个部分出现了性能问题,来不及处理上游发送的消息,才会导致消息积压。而这“某个部分”,通常就是消费端的逻辑。
消费端的性能优化
业务逻辑优化:同步改异步扩容
但是通过水平扩容,增加消费端的并发数来提升总体的消费性能。需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。
这是因为,大多数的消息队列,通常都是一个消费者和队列,是有强映射关系的。比如 Kafka 的 Partition,如果只是扩容消费者的话,不能解决当前有积压问题的 Broker 的,此时还需要做 Partition 和消费者的 Rebalance,但是这样的 Rebalance 会影响到线上的消费情况。
所以,对于 Kafka 的消费积压,更多的做法是,新开一个 Topic,设置更多的 Partition 和更大的消费组。然后将积压的消息,在消费者端直接将消息传到新的 Topic 上,让更多的新消费者来处理积压的消息。
不过,在一些厂的自研消息队列中,会用自己的方式解决 Rebalance 带来的问题。比如 QMQ,在 Partition 和 Consumer Group 之间,加了一个中间层,做自动映射,这样就可以无伤扩容,达到效果。但是这样也会带来别的问题,比如中间层的维护问题。
AWS SQS 和 SNS 和上面这些消息队列有什么异同?SQS消息模型
SQS 是队列模型,和下图几乎是一致
中间的这个队列 Queue 就是 SQS 用来维护消息的 broker。
队列类型
当然,SQS 提供了两种队列类型:
消息可靠性
Q:服务如何不中断?
A:AWS 自动做好数据“冗余”的工作。如果真崩了,赔钱。
Q:消息怎么不丢失?
A:消息确认。尤其是消费的时候需要确认,通常在编码时候,有规定对应的返回,如果返回了异常消息 ID,那么队列会自动重复此消息,如果没有任何返回,那么队列默认消费确认成功,将进行消息删除。消息确认的前提就是,在 SQS 中,是会进行消息持久化的。
module.exports.handler = async (event) => {
const failedMessageIds = [];
await Bluebird.map(event.Records, async (record) => {
const { messageId, receiptHandle } = record;
const body = JSON.parse(record.body);
try {
// 业务逻辑
);
} catch (e) {
logger.error({ body, type: 'sqs:error', e });
failedMessageIds.push(messageId);
}
});
return {
batchItemFailures: failedMessageIds.map((id) => ({
itemIdentifier: id
}))
};
};
事务消息
目前从文档还没有看到有这部分的支持。不过知道这种“半消息”的原理,我们是可以使用 SQS 自己做一个这样的事务。
比如:用一个 SQS 用来保存半消息,标记为 A,用一个 SQS 保存消息,标记为 B。生产时,投递半消息到 SQS(A),A 的接受者要去进行事务确认,事务成功,则将消息发送的 SQS(B),下游消费 SQS(B)。
幂等消费
SQS 提供了两种队列标准:
标准队列FIFO 队列
无限制吞吐量 – 标准队列支持每个 API 操作(SendMessage、ReceiveMessage 或 DeleteMessage)每秒几乎无限次的 API 调用。至少一次传递 – 消息至少传送一次,但偶尔会传送消息的多个副本。最大努力排序 – 消息偶尔可能按不同于其发送时的顺序传送。
高吞吐量 - 如果您使用批处理,则 FIFO 队列支持每个 API 方法(SendMessageBatch、ReceiveMessage 或 DeleteMessageBatch)每秒最多 3000 条消息。每秒 3000 个事务代表 300 次 API 调用,每次调用带有包含 10 条消息的一个批处理。要申请提高配额,请。在不使用批处理的情况下,FIFO 队列的每个 API 方法(SendMessage、ReceiveMessage 或 DeleteMessage)每秒最多支持 300 个 API 调用。仅处理一次 – 消息传递一次并在使用者处理并删除它之前保持可用。不会将重复项引入到队列中。先进先出传递 – 严格保持消息的发送和接收顺序。
在标准队列中,消息服务质量为:至少传递一次。那就意味着,需要用户自己做消息幂等的处理。FIFO 队列中,消息服务质量为:仅处理一次。从文档上得到的信息是可以不用做消息幂等。消息积压问题原因
从官方文档可以得到的信息是,在以下两种情况下会有消息积压:
解决办法
标准和 FIFO SQS 队列
FIFO SQS 队列
扩展消息组
属于同一消息组的消息按照相对于消息组的顺序逐个处理。当接收具有多个消息组 ID 的消息时,Amazon SQS 会首先尝试返回具有相同消息组 ID 的尽可能多消息。这可让其他消费者处理具有不同消息组 ID 的消息。当属于特定消息组 ID 的消息不可见时,任何其他消费者都不能处理具有相同消息组 ID 的消息。但是,消费者可以处理来自其他消息组的消息。尝试增加顺序不重要的消息组中的数量。
批量消费的配置 Example
SQS 中,提供批量消费,每次消费者消费的时候,可以配置每次消费获取多少消息。
queue:
handler: src/handlers/queue.handler
environment:
TZ: XXXXX
events:
- sqs:
batchSize: 10
functionResponseType: ReportBatchItemFailures
arn:
Fn::GetAtt:
- QueueName
- Arn
上述片段中,batchSize:10就是每次消费条目数.
注意事项
SQS 一个队列,只能对应一个消费者。不过,生产者可以随意,谁都可以往队列里面扔东西。
所以在使用SQS的时候:
每一个SQS,最好只对应一个业务。也就是一类业务消息,放到一个队列里面。往往在SQS里面发生的消息积压,都是生产者太繁多了。消费者端,最好是批量获取消息,5-10个都可以。因为每触发一个消息,是一个lambda,lambda本身是有数量瓶颈的,所以每一个lambda,尽可能的,物尽其用。做好异常处理。将有异常的消息,返回给SQS,这样才能进行消息重试。每一个SQS队列,最好都搭配放置一个死信队列。这样至少确保消息都成功消费了,即使一直失败,也能够追溯到是哪些消息失败了。SNS
SNS 是属于标准的发布-订阅模型。它牛的一点在于,它不仅可以 Application to Application,还可以 Application to Person。
这里借用官方文档的图:
消息模型
SNS 的消息模型,和下图的标准“发布 - 订阅模型”一样。只不过,订阅者可以有更多的选择。
主题类型消息可靠性
Q:服务如何不中断?
A:AWS 自动做好数据“冗余”的工作。发布的消息存储在多个地理位置分散的服务器和数据中心。如果真崩了,赔钱。
Q:消息怎么不丢失?
A:
事务消息
因为不存在持久化,所以对于事务消息这个功能来讲,也就不具备此功能。
消息积压
没有持久化,也就不存在消息积压。
对 SNS 这种设计的思考功能简单。它就是个传话的,然而传的话保存不保存,由消费者自己决定。维护容易。不持久化,那就说明它本身是一个“无状态”的节点。对于无状态的 broker,维护起来相对容易,将来扩展起来灵活度也是足够的。数据的存储和计算分离。有点类似新兴的消息队列:Pulsar,数据存储用类似 HDFS 思想的东西去存储,数据计算分发由 Broker 决定。SNS 和 SQS 为什么会配合起来使用?
很大一个原因是:SNS 本身不进行数据的持久化,使用 SQS 进行消息的持久化,以及消费确认,让消息更有迹可循。
当然也要分场景,一个警报将会被触发,你想要向 10 个不同的电子邮件地址发送消息,并向一些手机发送短信。持久性、批处理和重试并不重要,此时,当然就不需要 SQS。
但是,如果业务场景,需要追溯到消息的消费情况,那么此时,SQS 作为 SNS 的消费者当然是更合适的,后续 SQS 自己的消费者进行业务处理。
比如,如果是一个论坛系统,发出去一个帖子,然后对于发布的每个帖子,可能需要采取多项操作的地方,此时 SNS 搭配 SQS 更合适。
而第二种场景中,SNS 就和 RabbitMQ 中的 Exchange 模块很类似。如下图:
参考工作经验总结极客时间《消息队列高手课》RocketMQ 官方文档Kafka 官方文档RabbitMQ 文档AWS SQS SNS 相关文档内容CSDN [ 云计算 | AWS ] 对比分析:Amazon SNS 与 SQS 消息服务的异同与选择_aws sqs sns-CSDN博客