重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

如何保障消息中间件不丢失

这篇文章主要介绍“如何保障消息中间件不丢失”,在日常操作中,相信很多人在如何保障消息中间件不丢失问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何保障消息中间件不丢失”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联,是成都地区的互联网解决方案提供商,用心服务为企业提供网站建设、app软件开发微信小程序开发、系统按需网站建设和微信代运营服务。经过数10年的沉淀与积累,沉淀的是技术和服务,让客户少走弯路,踏实做事,诚实做人,用情服务,致力做一个负责任、受尊敬的企业。对客户负责,就是对自己负责,对企业负责。

前言

RabbitMQ,RocketMQ,Kafka 等。引入中间件的好处可以起到抗高并发,削峰,业务解耦的作用。

如何保障消息中间件不丢失

如上图

  1. 订单服务投递消息给MQ中间件

  2. 库存服务监听MQ中间件消息,从而进行消费

之前公司在业务搭建的时候用的就是这种MQ解耦机制,那么如何保障将订单服务的消息成功投递给MQ中间件,保证消息的可靠性?

问题

可能会有人疑问,订单服务发起消息,返回成功不就OK了吗?下面有一个Demo代码

如何保障消息中间件不丢失

一般的发送消息都是这么写的,但是有一个场景必须在业务搭建之初就要考虑。如果MQ服务器突然宕机了呢?我们发送的消息是不是就没有了呢?

是的!一般MQ中间件为了保证提供系统的吞吐量会把消息保存在内存中,如果不作其他处理,MQ一旦宕机,消息就会全部丢失。这个在业务中是绝对不允许的,造成的影响是非常大的!

那么如何解决这个问题?

消息持久化

MQ中发消息的时候会有一个durable参数可以设置,设置为true,就会持久化!

如何保障消息中间件不丢失

这样的话MQ服务器及时宕机,重启后磁盘文件中有消息的存储,这样就不会丢失了,但是这种方式也仅仅只是有概率的消息不丢失。

如果消息刚刚保存到MQ内存中,还没来得及更新到磁盘中,突然宕机了(一般高并发情况下发生几率会很高),尤其是大量消息投递的过程中。

如何才能做到一定持久化到磁盘中呢?

confirm机制

上面的问题主要在于,没有人告诉我们持久化是否成功。好在MQ有回调通知特性,confirm机制来通知我们是否持久化成功。

如何保障消息中间件不丢失

confirm机制原理

  • 消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息- 给生产者。

  • 如果消息不成功,MQ会返回一个nack消息给生产者。

如何保障消息中间件不丢失

上面的demo代码中有两个确认机制,一个ACK回调,一个NACK回调。

这样是不是就可以100%确保消息不丢失了呢?

吞吐量问题严重

试想一下,如果我们生产者每发一条消息,都要 MQ 持久化到磁盘中,然后再发起 ack 或 nack 的回调。这样的话是不是我们 MQ 的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的。这个在高并发场景下是不能够接受的,吞吐量太低了。

所以 MQ 持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。

所以 comfirm 机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够 100%保障消息不丢失,因为即使加上了 confirm 机制,消息在 MQ 内存中还没有刷盘到磁盘就宕机了,还是没法处理。

消息提前持久化 + 定时任务

其实本质的原因是无法确定是否持久化。

ps:图画的有点辣鸡~~~~

如何保障消息中间件不丢失

流程操作

  1. 订单服务生产者再投递消息之前,先把消息持久化到 redis 或 DB 中,建议 redis,高性能。消息的状态为发送中

  2. confirm 机制监听消息是否发送成功?如 ack 成功消息,删除 redis 中此消息。

  3. 如果 nack 不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定。

  4. 这边加了个定时任务,来拉取隔一定时间了,消息状态还是为发送中的,这个状态就表明,订单服务是没有收到 ack 成功消息。

  5. 定时任务会作补偿性的投递消息。这个时候如果 MQ 回调 ack 成功接收了,再把 redis 中此消息删除。

补偿机制方案

这样的机制其实就是一个补偿机制,我不管 MQ 有没有真正的接收到,只要我的 redis 中的消息状态也是为==发送中==,就表示此消息没有正确成功投递。再启动定时任务去监控,发起补偿投递。

机制的优化

当然定时任务那边我们还可以加上一个补偿的次数,如果大于 3 次,还是没有收到 ack 消息,那就直接把消息的状态设置为【失败】,由人工去排查到底是为什么?

这样的话方案就比较完美了,保障了 100%的消息不丢失、磁盘要是坏了,那就没法保障了,就要考虑集群方案。

方案问题

不过这样的方案,就会有可能发送多次相同的消息,很有可能 MQ 已经收到了消息,就是 ack 消息回调时出现网络故障,没有让生产者收到。那就要要求消费者一定在消费的时候保障幂等性

幂等含义

我们先了解一下什么叫幂等?在分布式应用中,幂等是非常重要的,也就是相同条件下对一个业务的操作,不管操作多少次,结果都是一样。

分布式幂等

为什么要有幂等这种场景?因为在大的系统中,都是分布式部署,如:订单业务库存业务有可能都是独立部署的,都是单独的服务。用户下订单,会调用到订单服务和库存服务。

分布式异常问题

因为分布式部署,很有可能在调用库存服务时,因为网络等原因,订单服务调用失败,但其实库存服务已经处理完成。只是返回给订单服务处理结果时出现了异常。这个时候一般系统会作补偿方案,也就是订单服务再此放起库存服务的调用,库存减 1。

update m_goods set count = count - 1 where g_id=10

这样就出现了问题,其实上一次调用已经减了 1,只是订单服务没有收到处理结果。现在又调用一次,又要减 1,这样就不符合业务了,多扣了。

幂等这个概念就是,不管库存服务在相同条件下调用几次,处理结果都一样。这样才能保证补偿方案的可行性。

乐观锁方案

借鉴网上的乐观锁方案,例如:

update m_goods set count = count -1 , version = version + 1 where g_id=2 and version = 1

根据 version 版本,也就是在操作库存前先获取当前商品的 version 版本号,然后操作的时候带上此 version 号。我们梳理下,我们第一次操作库存时,得到 version 结果如下:

  1. 调用库存服务 version 变成了 2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的 version 还是 1,再执行上面的 sql 语句时,就不会执行;因为 version 已经变为 2 了,where 条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

唯一 ID + 指纹码

此方案是网上找到的一篇博客所写的,感觉不错。

原理就是利用数据库主键去重,业务完成后插入主键标识~

select count(1) from t_check where ID = 唯一ID + 指纹码
  • 唯一 ID 就是业务表的唯一的主键,如商品 ID

  • 指纹码就是为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号的方式。

上面的 sql 语句:

  • 返回如果为 0 表示没有操作过,那业务操作后就可以 insert into t_check(唯一 ID+指纹码)

  • 返回如果大于 0 表示操作过,就直接返回

好处:实现简单

坏处:高并发下数据库瓶颈

解决方案:根据 ID 进行分库分表进行算法路由

redis 原子操作(推荐使用)

利用 redis 的原子操作,做个操作完成的标记。这个性能就比较好。但会遇到一些问题。

问题:我们是否需要把业务结果进行数据落库,如果落库,关键解决的问题时数据库和 redis 操作如何做到原子性?

这个意思就是库存减 1 了,但 redis 进行操作完成标记时,失败了怎么办?也就是一定要保证落库和 redis 要么一起成功,要么一起失败

第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?

这个意思就是库存减 1,不落库,直接先操作 redis 操作完成标记,然后由另外的同步服务进行库存落库,这个就是增加了系统复杂性,而且同步策略的设置。

到此,关于“如何保障消息中间件不丢失”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


网页名称:如何保障消息中间件不丢失
当前路径:http://cqcxhl.com/article/gicgeo.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP