代码编织梦想

1、消息整体处理过程

这里我们将消息的整体处理阶段分为3个阶段进行分析:

  • Producer发送消息阶段。

  • Broker处理消息阶段。

  • Consumer消费消息阶段。

Producer发送消息阶段

发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或者说降低丢失的可能性)。

手段一:提供SYNC的发送消息方式,等待broker处理结果。

RocketMQ提供了3种发送消息方式,分别是:

  • 同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

  • 异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

  • Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer只负责把请求发出去,而不处理响应结果。

我们在调用producer.send方法时,不指定回调方法,则默认采用同步发送消息的方式,这也是丢失几率最小的一种发送方式。

手段二:发送消息如果失败或者超时,则重新发送。

发送重试源码如下,本质其实就是一个for循环,当发送消息发生异常的时候重新循环发送。默认重试3次,重试次数可以通过producer指定。

手段三:broker提供多master模式,即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

如果broker只有一个节点,则broker宕机了,即使producer有重试机制,也没用,因此利用多主模式,当某台broker宕机了,换一台broker进行投递。

总结

producer消息发送方式虽然有3种,但为了减小丢失消息的可能性尽量采用同步的发送方式,同步等待发送结果,利用同步发送+重试机制+多个master节点,尽可能减小消息丢失的可能性。

Broker处理消息阶段

手段四:提供同步刷盘的策略

public enum FlushDiskType {
SYNC_FLUSH, //同步刷盘
ASYNC_FLUSH//异步刷盘(默认)
}

我们知道,当消息投递到broker之后,会先存到page cache,然后根据broker设置的刷盘策略是否立即刷盘,也就是如果刷盘策略为异步,broker并不会等待消息落盘就会返回producer成功,也就是说当broker所在的服务器突然宕机,则会丢失部分页的消息。

手段五:提供主从模式,同时主从支持同步双写

即使broker设置了同步刷盘,如果主broker磁盘损坏,也是会导致消息丢失。

因此可以给broker指定slave,同时设置master为SYNC_MASTER,然后将slave设置为同步刷盘策略。

此模式下,producer每发送一条消息,都会等消息投递到master和slave都落盘成功了,broker才会当作消息投递成功,保证休息不丢失。

总结

在broker端,消息丢失的可能性主要在于刷盘策略和同步机制。

RocketMQ默认broker的刷盘策略为异步刷盘,如果有主从,同步策略也默认的是异步同步,这样子可以提高broker处理消息的效率,但是会有丢失的可能性。因此可以通过同步刷盘策略+同步slave策略+主从的方式解决丢失消息的可能。

Consumer消费消息阶段

手段六:consumer默认提供的是At least Once机制

从producer投递消息到broker,即使前面这些过程保证了消息正常持久化,但如果consumer消费消息没有消费到也不能理解为消息绝对的可靠。因此RockerMQ默认提供了At least Once机制保证消息可靠消费。

何为At least Once?

Consumer先pull 消息到本地,消费完成后,才向服务器返回ack。

通常消费消息的ack机制一般分为两种思路:

  • 先提交后消费;

  • 先消费,消费成功后再提交;

思路一可以解决重复消费的问题但是会丢失消息,因此Rocketmq默认实现的是思路二,由各自consumer业务方保证幂等来解决重复消费问题。

手段七:消费消息重试机制

当消费消息失败了,如果不提供重试消息的能力,则也不能算完全的可靠消费,因此RocketMQ本身提供了重新消费消息的能力。

总结

consumer端要保证消费消息的可靠性,主要通过At least Once+消费重试机制保证。

2、如何保证消息不被重复消费

回答这个问题,首先你别听到重复消息这个事儿,就一无所知吧,你先大概说一说可能会有哪些重复消费的问题。

首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是 MQ 自己保证的,是由我们开发来保证的。挑一个 Kafka 来举个例子,说说怎么重复消费吧。

Kafka 实际上有个 offset 的概念,就是每个消息写进去,都有一个 offset,代表消息的序号,然后 consumer 消费了数据之后,每隔一段时间(定时定期),会把自己消费过的消息的 offset 提交一下,表示“我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的 offset 来继续消费吧”。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。

有这么个场景。数据 1/2/3 依次进入 kafka,kafka 会给这三条数据每条分配一个 offset,代表这条数据的序号,我们就假设分配的 offset 依次是 152/153/154。消费者从 kafka 去消费的时候,也是按照这个顺序去消费。假如当消费者消费了 offset=153 的这条数据,刚准备去提交 offset 到 zookeeper,此时消费者进程被重启了。

那么此时消费过的数据 1/2 的 offset 并没有提交,kafka 也就不知道你已经消费了 offset=153 这条数据。那么重启之后,消费者会找 kafka 说,嘿,哥儿们,你给我接着把上次我消费到的那个地方后面的数据继续给我传递过来。由于之前的 offset 没有提交成功,那么数据 1/2 会再次传过来,如果此时消费者没有去重的话,那么就会导致重复消费。

如果消费者干的事儿是拿一条数据就往数据库里写一条,会导致说,你可能就把数据 1/2 在数据库里插入了 2 次,那么数据就错啦。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

举个例子吧。假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性。

幂等性,通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。

比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。

比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

 如果本文对你有帮助,别忘记给我个3连 ,点赞,转发,评论,
学习更多JAVA知识与技巧,关注博主学习JAVA 课件,源码,安装包,还有最新大厂面试资料等等等
咱们下期见。
收藏 等于白嫖,点赞才是真情。

 

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/m0_68850571/article/details/126310697

ribbitmq总结-爱代码爱编程

总结:两种类型(点对点模型、订阅发布) 注意手动ack和 自动ack的区别。 点对点模型:基本消息模型、work模型。 work模型:能这多劳 。配置:channal.basicQos(1); 解释:就是一个消息只能被一个消费者消费。 订阅发布:dirct、fanout、topic。 解释:广播模式。 ribbitmq常见问题。 一、顺序消费:把有

Rocketmq如何保证消息不丢失,如何保证消息不被重复消费-爱代码爱编程

1、消息整体处理过程 这里我们将消息的整体处理阶段分为3个阶段进行分析: Producer发送消息阶段。 Broker处理消息阶段。 Consumer消费消息阶段。 Producer发送消息阶段 发送消息阶段涉及到Producer到broker的网络通信,因此丢失消息的几率一定会有,那RocketMQ在此阶段用了哪些手段保证消息不丢失了(或

MQ如何保证消息不丢失-爱代码爱编程

一、Rabbit MQ 基础知识 1.生产者(Producer): 投递消息的一方 生产者创建消息,然后对消息体进行序列化+消息的标签发送给RabbitMQ。 2.消费者(Consumer): 接收消息的一方 消费者连接到RabbitMQ 服务器,并订阅队列中消息。消费者接收到消息后,先进行反序列化,然后消费进行处理。 3.Broker: 消息中

rocketMQ如何避免消息重复消费-爱代码爱编程

前言 在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,造成重复消费。这个重复简单可以概括为以下情况: produce发送到Broker时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者producer宕机,导致服务端Broker 对 producer应答失败。 如果此时生产者

RocketMQ 重复消费,消费顺序控制,消息丢失问题处理方案-爱代码爱编程

1.消费顺序控制 假设有三条消息,创建订单M1,订单付款M2,订单完成M3三个消息,在MQ集群下,假设M1发送到MQ1,M2发送到MQ2,那么将不能保证消息的执行顺序。 最简单的处理方式就是,将消息发布到同一个MQ上,先进先出原则,这样就能保证M1消息先于M2消息发送。 但是还存在一个问题就是,假设M1发送成功了,然后发送了M2,但是M1在消费的时候产生了

rocketmq如何保证消息的不丢失-爱代码爱编程

不知道大家对于这个问题遇到过没有,或者大家听到这个问题的第一反应是什么,应该如何做,如何避免消息丢失,一起来看看 首先我们知道rocketmq的一个消息从生产到最终的消费过程需要经历总共三个阶段,或者说会经过三个地方,分别是producer的发送端、broker的持久化机制、以及consumer的消费端 从生产者producer的角度:消息生产之后

RocketMQ如何保证消息不丢失+消息顺序+处理积压消息+消息轨迹-爱代码爱编程

使用RocketMQ如何保证消息不丢失?  1、哪些环节会有丢消息的可能?  其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。 然后关于3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来 得及写入

MQ是如何保证消息不丢失的,你这样回答面试官一定说I Want U-爱代码爱编程

目录 序言 RocketMQ RabbitMQ Kafka 序言   又到了金三银四的找工作阶段,你一定被问过MQ是如何保证消息可靠性的或者MQ是如何保证消息不丢失的。我们都知道MQ发送消息一般分为三个阶段分别是生产者发送消息到MQ、MQ存储消息到内存或者硬盘,消费者消费消息。但是这三个过程都有可能因为种种原因导致消息丢失。例如在生产

rocketMQ使用 重复消费、消息堆积、顺序消费、消息丢失等问题-爱代码爱编程

一:顺序发送 通过一定算法,将一组顺序消息发送到同一个broker下面的同一个队列,消费者进行顺序监听即可。 例如:一条信息的唯一标识 通过一定算法 路由到 同一个 broker 下到 某一个队列下。 通过业务层面处理。 // RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上 // RocketMQ默

9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息-爱代码爱编程

大家好,我是君哥。 引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们来聊一聊 RocketMQ 怎么做能确保消息不丢失。 1 RocketMQ 简介 RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图: RocketMQ 主要包括 P

MQ消息丢失,消息一致性,重复消费解决方案-爱代码爱编程

大家好,我是Leo。 这是开端的第三次循环了。当前正在正处于RocketMQ基础原理。 3万字聊聊什么是RocketMQ(一) 4万字聊聊阿里二面,保证你看不完 聊聊Redis面试题 2万字聊聊什么是秒杀系统(中) 3万字聊聊什么是Redis(完结篇) 3万字聊聊什么是MySQL(初篇) 本章概括 分布式事务 由何而来 我们在使用MQ

rocketMq 中关于消息不丢失,重复消费,顺序消费问题-爱代码爱编程

消息不丢失 Producer-----【1】—>broker----【2】—>consumer 可能会丢失数据的阶段: 生产者发送消息到brokerbroker收到后放在内存中,还没来得及刷磁盘挂了消费者消费数据失败针对不同的情况的解决方式 发送到broker失败引起丢失:** 采用同步重试发送,等待broker响应ack,发送失败可以重

聊聊MQ,如何避免消息丢失?如何避免重复消费?-爱代码爱编程

前言 我在工作中,使用到消息中间件MQ的业务还是挺多的,我从事在一家交通行业的公司,业务中经常会涉及处理一些违法数据的场景,项目中经常会使用到RabbitMQ,今天想跟大家聊聊怎样避免消息丢失和重复消费是问题。 在我们发生消息的时候,如果MQ服务器突然宕机了会出现什么情况?是不是我们发过去的消息全部没有了吗? 是的,一般MQ中间件为

MQ如何保证消息不丢失?-爱代码爱编程

1.mq原则 MQ传输过程中,消息数据不能多,也不能少,不能多是说消息不能重复消费,这个我们下一章解决;不能少,就是说不能丢失数据。如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的,本章详细介绍不能少的问题。 2.丢失数据场景 丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和

如何防止rocketmq重复消费_成为一枚软男的博客-爱代码爱编程

什么样的情况下会出现消息重复消费? 在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,在一些比较敏感的场景下,重复消费会造成比较严重的后果,例如重复转账,重复支付。 这个重复简单可以概括为以下情况: 发送消息时重复。 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪

rocketmq 消息丢失总结_rocketmq消息丢失_我是真的菜(ㄒoㄒ)的博客-爱代码爱编程

总结rocketmq消息丢失四种情况 1、 生产者网络波动 消息丢失 解决方法:利用rocketmq自带的事务机制处理,首先发送half消息于rocketmq服务器,此时消息于消费者不可见,等生产者业务处理完成则发送,否