当前位置:恩施知识网 > 情感人生 > 正文

车祸现场线上突然宕机一条订单消息丢失了

一、写在前面之前写过一篇文章《RabbitMQ是如何收发消息的?(通俗易懂)》,我们用一个简单易懂的电商场景给大家引入说明了一个消息中间件的使用场景。
同时,我们还基于RabbitMQ的HelloWorld级别的代码,给出了订单服务和仓储服务如何基于MQ中间件收发消息的示例。二、业务场景回顾这篇文章,我们来稍微深入探讨一些MQ中间件使用中的基础技术问题。
首先回顾一下上篇文章做出来的一个架构图,看看订单服务和消息服务是如何基于MQ来收发消息的。
我们稍微把这个图细化一点,简单
一、写在前面

之前写过一篇文章《RabbitMQ是如何收发消息的?(通俗易懂)》,我们用一个简单易懂的电商场景给大家引入说明了一个消息中间件的使用场景。

同时,我们还基于RabbitMQ的HelloWorld级别的代码,给出了订单服务和仓储服务如何基于MQ中间件收发消息的示例。

二、业务场景回顾

这篇文章,我们来稍微深入探讨一些MQ中间件使用中的基础技术问题。

首先回顾一下上篇文章做出来的一个架构图,看看订单服务和消息服务是如何基于MQ来收发消息的。

我们稍微把这个图细化一点,简单来说就是多个订单服务实例给queue推送消息,多个仓储服务每个消费一部分消息。如下图所示:

三、意外宕机,问题凸现

假如你线上对MQ技术的使用就到此为止了,那么基本可以跟offer说拜拜了。。。

因为如果是我的话,作为一个面试官就没法继续往下问了。你这个MQ的使用以及理解的深度仅此而已的话,那基本就是刚刚对MQ技术入门的程度。

如果面试官要继续问,完全可以问下面的问题:

那你说说如果仓储服务作为消费者服务,刚收到了一个订单消息,但是在完成消息的处理之前,也就是还没对订单完成仓储调度发货,结果这个仓储服务突然就宕机了,这个时候会发生什么事情?

所以说,大家还是要对这个技术了解得稍微深入一点点,否则随便被问几个问题就完蛋了。

大伙儿先来看看下面的图,感受一下车祸现场。

RabbitMQ这个中间件默认的一个行为,就是只要仓储服务收到一个订单消息,RabbitMQ就会立马把这条订单消息给标记为删除,这个行为叫做自动ack,也就是投递完成一条消息就自动确认这个消息处理完毕了。

但是接着如果此时仓储服务收到了一个订单消息,但是还没来得及对仓库系统完成商品的调度发货,结果直接就宕机了。

此时,明显这个订单消息就丢失了啊,因为RabbitMQ那里已经没有了。。。

这会导致什么样的尴尬体验呢?就是一个用户支付了8999元,对一个iphone8下了订单,结果呢,死等活等了好几天,就是不见网站上显示他的iphone8在发货。

搞了半天,原因就是他的那个iphone8的订单在仓储服务那里,还没来得及调度发货直接就宕机了,导致这个订单消息就一直丢失了,始终没有给这个用户通知仓库系统进行发货。

这个问题,是不是很尴尬?所以说,技术问题是会严重影响企业的核心业务流程的!

各位小伙伴,还记得上一讲咱们的仓储服务消费消息的代码中,有一行关键的代码:

这行代码对channel.basicConsume()方法,传入的第二个参数:true,其实就是一个关键的参数。

这个true就代表了一个核心的含义,他的意思是,RabbitMQ只要把一个消息投递到仓储服务手上,立马就标记这个消息删除了。

但是在这个默认的配置之下,要是仓储服务收到一个订单消息,结果还没来得及完成耗时几十秒的仓储调度发货的业务逻辑,结果突然宕机了,那么这个订单消息就永久性丢失了!

找了半天,原来问题的症结在这里啊!大家是不是明白了,上一篇文章最后为什么我会说,这个代码目前为止还有很多的问题。

所以这个时候,我们如果希望不要因为仓储服务的突然宕机导致一条订单消息丢失,就需要改造一下仓储服务消费消息的代码了。

首先,我们需要把那个参数从true改为false,如下代码所示:

只要修改为false之后,RabbitMQ就不会盲目的投递消息到仓储服务,立马就删除消息了,说白了就是关闭autoAck的行为,不要自作主张的认为消息处理成功了。

接着,我们需要改造一下处理订单消息的代码,如下代码所示。

这段代码,说白了,就是在对订单完成了调度发货之后,在finally代码块中手动执行了ack操作,说我自己已经完成了耗时几十秒的业务逻辑的处理,现在可以手动ack通知RabbitMQ,这个消息处理完毕了。

此时整个架构运行流程大致看起来跟下面的图那样子。

架构流程改成上面那样后,就意味着只有完成了仓储调度发货的代码业务逻辑,确保仓库系统收到通知之后,仓储服务才会在代码中手动发送ack消息给RabbitMQ。

此时,RabbitMQ收到了这个ack消息,才会标记对应的订单消息被删除了。

如果说在仓储服务收到了订单消息,但是还没来得及完成仓储调度发货的业务逻辑,那也就绝对不会执行这条订单消息的ack操作,然后RabbitMQ也就不会收到这条订单消息的ack通知。

一旦RabbitMQ发现代表消费者的某个仓储服务实例突然宕机了,而这个仓储服务收到的一些订单消息还没来得及处理,没给自己发送那些消息的ack通知。

此时,RabbitMQ会自动对这条订单消息重发推送给其他在运行中的仓储服务实例,让其他的仓储服务实例去处理这条订单消息。

这样的话,就可以保证这条订单消息不会因为某个仓储服务实例的宕机而丢失,他会确保必须由某个仓储服务实例完成这条订单消息的调度发货处理,然后才会删除那条订单消息。

四、总结 tips

最后再来一张图,大家直观的感受一下:

好了,各位同学,这篇文章是不是相对稍微深入一点点,让大家了解到了一些使用MQ技术时候要考虑的一些问题?

实际上无论是RocketMQ、Kafka还是RabbitMQ,都有类似的autoAck或者是手动ack的机制。

线上生产环境中运行时,你必须要考虑到消费者服务可能宕机的问题。

如果消费者服务没处理完消息就自己宕机了,那么一定会导致部分消息的丢失,进而影响核心业务流程的运转。

因此大家在线上使用MQ时,一定要充分考虑这些潜在问题,同时结合具体的MQ提供的一些API、参数来进行合理设置,确保消息不要随意丢失。

------------- END -------------

扫码免费获取600 页石杉老师原创精品文章汇总PDF

原创技术文章汇总

车祸现场线上突然宕机一条订单消息丢失了

面试官杠上重复消费、消息堆积、消息丢失、顺序消息?

消息队列在互联网技术存储方面使用如此广泛,几乎所有的后端技术面试官都要在消息队列的使用和原理方面对小伙伴们进行360 的刁难。

什么,这么多问题啊!别慌,现在就来找找解决方案。

一、 重复消费

现在消息队列一般都能保证at least once的,也就是消息至少一次投递。 在这种情况为什么会出现重复消费的问题呢?通常都是由于网络原因造成的 ,原因如下:通常消息被成功消费后消费者都会发送一个成功标志给MQ,MQ收到这个标志就表示消息已经成功消费了,就不会再发送给其他消费者了。但是如果因为网络这个标志没有送到MQ就丢失了,MQ就认为这个消息没有被成功消费,就会再次发送给其他消费者消费,就造成重复了。

这时我们看这个问题就变成了我们怎么保证消费端的幂等性。

幂等性 是指一个操作其执行任意多次所产生的影响均与一次执行的影响相同,大白话就是你同样的参数调用我这个接口,调用多少次结果都相同。

怎么保证消息队列消费的幂等性

其实还是得结合业务来思考,我在这里给出几个解决方案:

1. 分布式锁 。生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后就是这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

2.唯一键防重 。基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

3.先查后写。 要写数据库前,先根据主键查一下,如果这数据都有了,你就别插入了,update一下好了。

4. 关闭重试机制 。如果把重试机制关掉的话不显示,虽然解决了重复消费的问题,但是可能会造成丢失消息,不建议这么做。

不同的业务可以选择不同的方案,如果服务的并发量不高,可以考虑唯一键防重或者先查后写的方案;如果并发量较高,追求性能,沐子推荐采用分布式锁实现幂等性(本公司目前采用的方案)

二、 消息堆积

1. 消息堆积的产生原因

消息堆积的原因主要在于两方面,其一为消费的太慢或消费方出现异常,其二为生产方生产的太快,总的来说就是 消息的速度赶不上生产的速度,生产和消费速度不匹配造成的 。

2. 消息堆积的解决方案

1)生产端:一般当生产端发生积压(Broker正常的情况下)就要查看你的业务逻辑是否有异常的耗时步骤导致的,是否需要改并行化操作等。

Broker端:当Broker端发生积压我们首先要查看,消息队列内存使用情况, 如果有分区的的话还得看每个分区积压的消息数量差异。当每个分区的消息积压数据量相对均匀的话,我们大致可以认为是流量激增。需要在消费端做优化,或者同时需要增加Broker节点(相当于存储扩容),如果分区加压消息数量差异很大的话(有的队列满了,有的队列可能还是空闲状态),我们这时候就要检查我们的路由转发规则是否合理。

2) 增加消费者 ,多部署几台消费者机器(横向扩展),提升消费者的消费能力。

3)此种情况可以将这些 消费不成功的消息转发到其它队列里去(类似死信队列) ,后面再慢慢分析死信队列里的消息处理问题。

4) mq 中的消息过期失效了。可以采取一个方案,就是批量重导 ,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。

总之,上面说到消息积压的问题,我们需要查看是否有无限重发的消息或者有进入死锁的程序等等,当确定是流量激增的话,我们需要评估是否需要增加资源还是通过限流的方式解决,当短时间大量消息需要处理时,在资源允许的情况下,我们可以新启一批消费者与消息队列,将原来的消费者中的消息直接作为生产者转发到临时应急队列中,这样大概率的能够快速解决消息积压。与其事后处理不如我们在设计之初就要把积压考虑进来,对于数据量非常大,但是实时性要求不高的场景,可以设计出批量消息发送,当队列积累到一定阀值再做批量消费消费,这里需要注意的就是重复消费带来的影响,设计不好就是一场灾难。

三、 消息丢失

一般来讲消息丢失的途径有三个: 生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据 。

1. 生产者弄丢数据

a、丢失的原因:因为网络传输的不稳定性,当生产者在向MQ发送消息的过程中,MQ没有成功接收到消息,但是生产者却以为MQ成功接收到了消息,不会再次重复发送该消息,从而导致消息的丢失。

b、解决办法:有两个解决办法,第一个方法: 向broker发送消息时,如果由于网络抖动等原因导致消息发送失败,可以设置 失败重试次数让消息重发 。
第二个方法: 事务机制和confirm机制,最常用的是confirm机制 ;

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 MQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

2. MQ弄丢数据

a、丢失的原因:MQ接收到生产者发送过来的消息,是存在内存中的,如果没有被消费完,此时MQ宕机了,那么再次启动的时候,原来内存中的那些消息都丢失了。

b、解决办法: 开启MQ的持久化 。结合上面的说到的confirm机制,只有当消息成功持久化磁盘之后,才会回调生产者的接口返回ack消息,否则都算失败,生产者会重新发送。存入磁盘的消息不会丢失,就算MQ挂掉了,重启之后,他会读取磁盘中的消息,不会导致消息的丢失。

注意,哪怕是你给 MQ 开启了持久化机制,也有一种可能,就是这个消息写到了MQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时MQ挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,MQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

3. 消费者弄丢数据

a、丢失的原因:如果MQ成功的把消息发送给了消费者,那么MQ的ack机制会自动的返回成功,表明发送消息成功,下次就不会发送这个消息。但如果就在此时,消费者还没处理完该消息,然后宕机了,那么这个消息就丢失了。

b、解决的办法:1)简单来说,就是必须关闭 MQ 的自动提交,把 自动提交改为手动提交 ,也就是说当我消费成功后才会进行提交。

2)消费者端已经正常接收到消息但是在执行后续消息处理时发生了异常,最终返回处理失败。 重试-进行重新消费问题,如果一直这样重复消费都持续失败到一定次数,可以投递到 DLQ 死信队列,应用可以监控死信队列来做人工干预 。

四、 顺序消费

比如一个电商的下单操作,下单后先减库存然后生成订单,这个操作就需要顺序执行的。队列本身是有顺序的,但是为什么还要保证顺序消费呢,主要是因为生产环境服务实例一般都是集群,当消费者是多个实例时,队列中的消息会分发到所有实例进行消费(同一个消息只能发给一个消费者实例),这样就不能保证消息顺序的消费,因为你不能确保哪台机器执行消费端业务代码的速度快。

保证每次只有单个消费实例消费

所以对于需要保证顺序消费的业务,我们可以只部署一个消费者实例,然后设置MQ 每次只推送一个消息,再开启手动 ack 即可。这样MQ 每次只会从队列推送一个消息过来,处理完成之后我们 ack 回应,再消费下一个,就能确保消息顺序性。

这样MQ 每次只会从队列推送一个消息过来,处理完成之后我们 ack 回应,再消费下一个,就能确保消息顺序性。

但是这样的操作也会降低消费者的性能, 一个消费者消费消息时,其他消费者会阻塞,所以很多场景下可能并不会采用这样的方案。

所以一般会根据场景,制定一定的策略来解决消费顺序问题。

多线程并发抢占出现消费乱序问题

当MQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。

多线程并发抢占出现消费乱序问题,将消息ID进行hash计算,将相同值放入同一个内存队列,让指定线程执行,即可解决顺序消费问题。

在多个分区中保证消息顺序和消息处理效率

首先使用多个分区,消息可以被发送端发送至多个分区,保证消息发送的效率。然后在消费端在拉消息时使用ConutdownLunch来记录一组有序消息的个数。如果达到个数,说明已拉取到完整的一组有序消息。然后在消费端根据消息序号进行排序,消费端将排好序的消息发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。即可最大程度上既保证顺序又保证效率!

RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,支持顺序消息,所以如果有这种场景或者要使用MQ,我建议你直接使用RocketMQ即可。

我们说了一些处理与分析问题的方法,这里有一个最重要的点就是我们需要有一套实用的监控发现工具或者方式,在问题第一时间发现才是王道,不然我们上面所说的都空谈,当问题发现的时候损失已经无法挽回。所以我们要在设计系统之初需要要为监控系统或者程序提供完备或者必须的日志,接口,数据等,这要才是一个合理的设计。当没有监控系统的情况下我们必须自己设计一套简单分析接口。

最后, 如果我的文章对你有所帮助或者有所启发, 欢迎关注公众号(微信搜索公众号:首席架构师专栏),里面有许多技术干货,也有我对技术的思考和感悟,还有作为架构师的验验分享;关注后回复 【面试题】,有我准备的面试题、架构师大型项目实战视频等福利 , 我会带着你一起学习、成长,让我们一起加油!!!

车祸现场线上突然宕机一条订单消息丢失了

RabbitMQ,RocketMQ,Kafka 事务性,消息丢失和重复发送处理策略

我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用,同时网络环境也是不稳定的,造成了我们多个机器之间的数据同步问题,这就是典型的分布式事务问题。

在分布式事务中事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是要保证不同节点之间的数据一致性。

1、2PC(二阶段提交)方案 - 强一致性

2、3PC(三阶段提交)方案

3、TCC (Try-Confirm-Cancel)事务 - 最终一致性

4、Saga事务 - 最终一致性

5、本地消息表 - 最终一致性

6、MQ事务 - 最终一致性

消息的生产方,除了维护自己的业务逻辑之外,同时需要维护一个消息表。这个消息表里面记录的就是需要同步到别的服务的信息,当然这个消息表,每个消息都有一个状态值,来标识这个消息有没有被成功处理。

发送放的业务逻辑以及消息表中数据的插入将在一个事务中完成,这样避免了业务处理成功 + 事务消息发送失败,或业务处理失败 + 事务消息发送成功,这个问题。

举个栗子:

我们假定目前有两个服务,订单服务,购物车服务,用户在购物车中对几个商品进行合并下单,之后需要情况购物车中刚刚已经下单的商品信息。

1、消息的生产方也就是订单服务,完成了自己的逻辑(对商品进行下单操作)然后把这个消息通过 mq 发送到需要进行数据同步的其他服务中,也就是我们栗子中的购物车服务。

2、其他服务(购物车服务)会监听这个队列;

1、如果收到这个消息,并且数据同步执行成功了,当然这也是一个本地事务,就通过 mq 回复消息的生产方(订单服务)消息已经处理了,然后生产方就能标识本次事务已经结束。如果是一个业务上的错误,就回复消息的生产方,需要进行数据回滚了。

2、很久没收到这个消息,这种情况是不会发生的,消息的发送方会有一个定时的任务,会定时重试发送消息表中还没有处理的消息;

3、消息的生产方(订单服务)如果收到消息回执;

1、成功的话就修改本次消息已经处理完,也就是本次分布式事务的同步已经完成;

2、如果消息的结果是执行失败,同时在本地回滚本次事务,标识消息已经处理完成;

3、如果消息丢失,也就是回执消息没有收到,这种情况也不太会发生,消息的发送方(订单服务)会有一个定时的任务,定时重试发送消息表中还没有处理的消息,下游的服务需要做幂等,可能会收到多次重复的消息,如果一个回复消息生产方中的某个回执信息丢失了,后面持续收到生产方的 mq 消息,然后再次回复消息的生产方回执信息,这样总能保证发送者能成功收到回执,消息的生产方在接收回执消息的时候也要做到幂等性。

这里有两个很重要的操作:

1、服务器处理消息需要是幂等的,消息的生产方和接收方都需要做到幂等性;

2、发送放需要添加一个定时器来遍历重推未处理的消息,避免消息丢失,造成的事务执行断裂。

该方案的优缺点

优点:

1、在设计层面上实现了消息数据的可靠性,不依赖消息中间件,弱化了对 mq 特性的依赖。

2、简单,易于实现。

缺点:

主要是需要和业务数据绑定到一起,耦合性比较高,使用相同的数据库,会占用业务数据库的一些资源。

下面分析下几种消息队列对事务的支持

RocketMQ 中的事务,它解决的问题是,确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。并且,RocketMQ 增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。

主要是两个方面,正常的事务提交和事务消息补偿

正常的事务提交

1、发送消息(half消息),这个 half 消息和普通消息的区别,在事务提交 之前,对于消费者来说,这个消息是不可见的。

2、MQ SERVER写入信息,并且返回响应的结果;

3、根据MQ SERVER响应的结果,决定是否执行本地事务,如果MQ SERVER写入信息成功执行本地事务,否则不执行;

如果MQ SERVER没有收到 Commit 或者 Rollback 的消息,这种情况就需要进行补偿流程了

补偿流程

1、MQ SERVER如果没有收到来自消息发送方的 Commit 或者 Rollback 消息,就会向消息发送端也就是我们的服务器发起一次查询,查询当前消息的状态;

2、消息发送方收到对应的查询请求,查询事务的状态,然后把状态重新推送给MQ SERVER,MQ SERVER就能之后后续的流程了。

相比于本地消息表来处理分布式事务,MQ 事务是把原本应该在本地消息表中处理的逻辑放到了 MQ 中来完成。

Kafka 中的事务解决问题,确保在一个事务中发送的多条信息,要么都成功,要么都失败。也就是保证对多个分区写入操作的原子性。

通过配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once,满足了读取-处理-写入这种模式的应用程序。当然 Kafka 中的事务主要也是来处理这种模式的。

什么是读取-处理-写入模式呢?

栗如:在流计算中,用 Kafka 作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这个过程中,要保证每条消息只被处理一次,这样才能保证最终结果的成功。Kafka 事务的原子性就保证了,读取和写入的原子性,两者要不一起成功,要不就一起失败回滚。

这里来分析下 Kafka 的事务是如何实现的

它的实现原理和 RocketMQ 的事务是差不多的,都是基于两阶段提交来实现的,在实现上可能更麻烦

先来介绍下事务协调者,为了解决分布式事务问题,Kafka 引入了事务协调者这个角色,负责在服务端协调整个事务。这个协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。

Kafka 集群中也有一个特殊的用于记录事务日志的主题,里面记录的都是事务的日志。同时会有多个协调者的存在,每个协调者负责管理和使用事务日志中的几个分区。这样能够并行的执行事务,提高性能。

下面看下具体的流程

事务的提交

1、协调者设置事务的状态为PrepareCommit,写入到事务日志中;

2、协调者在每个分区中写入事务结束的标识,然后客户端就能把之前过滤的未提交的事务消息放行给消费端进行消费了;

事务的回滚

1、协调者设置事务的状态为PrepareAbort,写入到事务日志中;

2、协调者在每个分区中写入事务回滚的标识,然后之前未提交的事务消息就能被丢弃了;

这里引用一下【消息队列高手课中的图片】

RabbitMQ 中事务解决的问题是确保生产者的消息到达MQ SERVER,这和其他 MQ 事务还是有点差别的,这里也不展开讨论了。

先来分析下一条消息在 MQ 中流转所经历的阶段。

生产阶段 :生产者产生消息,通过网络发送到 Broker 端。

存储阶段 :Broker 拿到消息,需要进行落盘,如果是集群版的 MQ 还需要同步数据到其他节点。

消费阶段 :消费者在 Broker 端拉数据,通过网络传输到达消费者端。

发生网络丢包、网络故障等这些会导致消息的丢失

在生产者发送消息之前,通过channel.txSelect开启一个事务,接着发送消息, 如果消息投递 server 失败,进行事务回滚channel.txRollback,然后重新发送, 如果 server 收到消息,就提交事务channel.txCommit

不过使用事务性能不好,这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

使用确认机制,生产者将信道设置成 confirm 确认模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 deliveryTag 和 multiple 参数),这就使得生产者知晓消息已经正确到达了目的地了。

multiple 为 true 表示的是批量的消息确认,为 true 的时候,表示小于等于返回的 deliveryTag 的消息 id 都已经确认了,为 false 表示的是消息 id 为返回的 deliveryTag 的消息,已经确认了。

确认机制有三种类型

1、同步确认

2、批量确认

3、异步确认

同步模式的效率很低,因为每一条消息度都需要等待确认好之后,才能处理下一条;

批量确认模式相比同步模式效率是很高,不过有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送;

异步模式就是个很好的选择了,不会有同步模式的阻塞问题,同时效率也很高,是个不错的选择。

Kafaka 中引入了一个 broker。 broker 会对生产者和消费者进行消息的确认,生产者发送消息到 broker,如果没有收到 broker 的确认就可以选择继续发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

只要正确处理 Broker 的确认响应,就可以避免消息的丢失。

RocketMQ 提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

防止在存储阶段消息额丢失,可以做持久化,防止异常情况(重启,关闭,宕机)。。。

RabbitMQ 持久化中有三部分:

消息的持久化,在投递时指定 delivery_mode=2(1是非持久化),消息的持久化,需要配合队列的持久,只设置消息的持久化,重启之后队列消失,继而消息也会丢失。所以如果只设置消息持久化而不设置队列的持久化意义不大。

对于持久化,如果所有的消息都设置持久化,会影响写入的性能,所以可以选择对可靠性要求比较高的消息进行持久化处理。

不过消息持久化并不能百分之百避免消息的丢失

比如数据在落盘的过程中宕机了,消息还没及时同步到内存中,这也是会丢数据的,这种问题可以通过引入镜像队列来解决。

镜像队列的作用:引入镜像队列,可已将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上来保证服务的可用性。(更细节的这里不展开讨论了)

操作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中。

Kafka 收到消息后也会先存储在也缓存中(Page Cache)中,之后由操作系统根据自己的策略进行刷盘或者通过 fsync 命令强制刷盘。如果系统挂掉,在 PageCache 中的数据就会丢失。也就是对应的 Broker 中的数据就会丢失了。

处理思路

1、控制竞选分区 leader 的 Broker。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。

2、控制消息能够被写入到多个副本中才能提交,这样避免上面的问题1。

1、将刷盘方式改成同步刷盘;

2、对于多个节点的 Broker,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段就很简单了,如果在网络传输中丢失,这个消息之后还会持续的推送给消费者,在消费阶段我们只需要控制在业务逻辑处理完成之后再去进行消费确认就行了。

总结:对于消息的丢失,也可以借助于本地消息表的思路,消息产生的时候进行消息的落盘,长时间未处理的消息,使用定时重推到队列中。

消息在 MQ 中的传递,大致可以归类为下面三种:

1、At most once: 至多一次。消息在传递时,最多会被送达一次。是不安全的,可能会丢数据。

2、At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。

3、Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

大部分消息队列满足的都是At least once,也就是可以允许重复的消息出现。

我们消费者需要满足幂等性,通常有下面几种处理方案

1、利用数据库的唯一性

根据业务情况,选定业务中能够判定唯一的值作为数据库的唯一键,新建一个流水表,然后执行业务操作和流水表数据的插入放在同一事务中,如果流水表数据已经存在,那么就执行失败,借此保证幂等性。也可先查询流水表的数据,没有数据然后执行业务,插入流水表数据。不过需要注意,数据库读写延迟的情况。

2、数据库的更新增加前置条件

3、给消息带上唯一ID

每条消息加上唯一ID,利用方法1中通过增加流水表,借助数据库的唯一性来处理重复消息的消费。

免责申明:以上内容属作者个人观点,版权归原作者所有,不代表恩施知识网立场!登载此文只为提供信息参考,并不用于任何商业目的。如有侵权或内容不符,请联系我们处理,谢谢合作!
当前文章地址:https://www.esly.wang/qinggang/49113.html 感谢你把文章分享给有需要的朋友!
上一篇:电池的发展历程是怎样的,电池发展史完整版 下一篇:缴纳电话费没有发票怎么处理「通讯费发票」

文章评论