05消息投递和消息延迟
消息投递
丢失消息的可能
存在以下三种方式可能丢失的情况:
- 消息从生产者写入到消息队列的过程;
- 消息在消息队列中的存储场景;
- 消息被消费者消费的过程。
在消息生产的过程中丢失消息
存在网络抖动,一旦发生抖动,消息就有可能因为网络的错误而丢失。
解决方案:消息重传2~3次。
在消息队列中丢失消息
消息队列是采用异步刷盘,即当达到某一时间间隔或者累积一定的消息数量的时候再刷盘。如果发生机器掉电或者机器异常重启,Page Cache中还没有来得及刷盘的消息就会丢失了。
解决方案:把刷盘的间隔设置很短或者设置累积一条消息就就刷盘,但这样频繁刷盘会对性能有比较大的影响,而且从经验来看,出现机器宕机或者掉电的几率也不高
更好的解决方案:以kafka为例,可以考虑以集群方式部署Kafka服务,通过部署多个副本备份数据保证消息尽量不丢失。
在消费的过程中消息丢失
一个消费者消费消息的进度是记录在消息队列集群中的,而消费的过程分为三步:接收消息、处理消息、更新消费进度。
这里面接收消息和处理消息的过程都可能会发生异常或者失败,比如消息接收时网络发生抖动,导致消息并没有被正确地接收到;处理消息时可能发生一些业务的异常导致处理流程未执行完成,这时如果更新消费进度,这条失败的消息就永远不会被处理了,也可以认为是丢失了。
注意:一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后还会重复地消费这条消息。
消息幂等性(防止重复消费)
生产端
在消息生产过程中,在Kafka0.11版本和Pulsar中都支持“producer idempotency”的特性,翻译过来就是生产过程的幂等性,这种特性保证消息虽然可能在生产端产生重复,但是最终在消息队列存储时只会存储一份。
它的做法是给每一个生产者一个唯一的ID,并且为生产的每一条消息赋予一个唯一ID,消息队列的服务端会存储<生产者ID,最后一条消息ID>的映射。当某一个生产者产生新的消息时,消息队列服务端会比对消息ID是否与存储的最后一条ID一致,如果一致就认为是重复的消息,服务端会自动丢弃。
消费端
从通用层和业务层两个层面来考虑。
在通用层面,你可以在消息被生产的时候使用发号器给它生成一个全局唯一的消息ID,消息被处理之后把这个ID存储在数据库中,在处理下一条消息之前先从数据库里面查询这个全局ID是否被消费过,如果被消费过就放弃消费。
业务层面,增加乐观锁的方式。具体的操作方式是这样的:你给每个人的账号数据中增加一个版本号的字段,在生产消息时先查询这个账户的版本号,并且将版本号连同消息一起发送给消息队列。消费端在拿到消息和版本号后,在执行更新账户金额SQL的时候带上版本号,类似于执行:
update user set amount = amount + 20, version=version+1 where userId=1 and version=1;
降低消息的延迟
监控消息延迟
监控消息的延迟有两种方式:
- 使用消息队列提供的工具,通过监控消息的堆积来完成;
- 通过生成监控消息的方式来监控消息的延迟情况。
减少消息延迟
消费端和消息队列两个层面来完成
- 优化消费代码提升性能;
- 增加消费者的数量(这个方式比较简单)。