代码编织梦想

今天讨论的主题是如何配置Kafka无消息丢失,主要包含,在Kafka在什么情况下能够保证消息不丢失,以及具体的应对方法。

关于Kafka在什么情况下才能保证消息不丢失,一句话概括,Kafka只对“已提交”的消息做有限度的持久化保证。

  • 已提交的消息:当Kafka的若干哥Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交,此时,这条消息在Kafka看来就正式变为“已提交”消息了。

这个地方的若干,不是一个也不是所有Broker,是取决于你对“已提交”的定义。你可以选择只要有一个Broker成功保存该消息就算是已提交,也可以是令所有Broker都成功保存该消息才算是已提交。不论哪种情况,Kafka只对已提交的消息做持久化保证这件事情是不变的。

  • 有限度的持久化保证:Kafka也不能保证任何情况下都做到不丢失消息,极端情况下,如地球不存在了Kafka也不能保证保证消息不丢失。Kafka保证消息不丢失是有前提的,假如你的消息保存在N个Broker上,那么这个前提就是N个Broker中至少有一个存活。只要这个条件成立,Kafka就能保证你的消息不丢失。

Kafka只要满足一定的条件且是已提交的消息就可以保证不丢失消息,但是我们还是会出现消息丢失的情况,下边具体分析:

案例1:生产者程序丢失数据

Producer 程序丢失消息,这应该算是被抱怨最多的数据丢失场景了。 目前Kafka Producer是异步发送消息,也就是调用producer.send(msg)这个API,立即返回,但是此时并不能认为消息已经发送成功。如果出现消息丢失,我们是无法知道的。

使用这种方式,可能导致消息没有发送成功的原因有很多,比如常见的网络抖动,导致消息压根没有发送到Broker端,或者消息本身不合格导致Broker拒绝接收等。

解决此问题的方法非常简单:Producer 永远要使用带有回调通知的发送 API,也就是说不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。它能准确地告诉你消息是否真的提交成功了。一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

案例2:消费者程序丢失数据

Consumer端丢失数据主要体现在Counsumer端要消费的消息不见了。Consumer 程序有个“位移”的概念,表示的是这个 Consumer 当前消费到的 Topic 分区的位置。

先消费消息,再更新消费位移。保证这样的先后顺序能最大限度地保证消息不丢失,这种情况下可能导致重复消费(先消费消息,然后挂了,offset未更新),消费端需要做幂等处理。

还有一种丢失消息的常见就是Consumer从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移,如果其中某个线程运行失败了,它处理的消息没有被成功处理,但是位移更新了,因此这条消息对于Consumer来说就是丢失了。

解决这个问题的方案就是,如果是多线程异步处理消费消息,Consumer 程序不要开启自动提交位移,而是要应用程序手动提交位移。

最佳实践

关于Kafka无消息丢失的配置

  1. 不要使用producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
  2. 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  3. 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
  4. 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
  5. 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
  8. 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

学习来源:极客时间 《Kafka核心技术与实战》 学习笔记 Day16

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_38862257/article/details/128821501

kafka-消息防丢失和消息去重_猿与禅的博客-爱代码爱编程_kafka 去重

如何防止数据丢失 生产者:同步发送消息,且消息配置为-1或all,leader分区和所有follwer都写到磁盘里。 异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态

kafka无消息丢失实现-爱代码爱编程

一、Kafka 无消息丢失核心概念 Kafka 只对“已提交”的消息做有限度的持久化保证。 二、生产者程序丢失数据 目前Kafka Producer 是异步发送消息,也就是说如果你调用的是 producer.send(

Kafka消息丢失、重复消费的解决方案-爱代码爱编程

文章目录 生产者问题 消费者问题 问题总结 解决方案 生产者问题 Producer发送消息到队列,分区Leader收到消息后返回ACK给Producer,表示接收成功,此时可以继续发送下一笔消息。 Kafka提供了3种不同级别的ACK机制: 0:Leader收到消息后立刻返回给Produ

微服务 消息中间件kafka消息丢失问题-爱代码爱编程

微服务 消息中间件kafka消息丢失问题 1. kafka消息丢失概述1.1 kafka概述1.2 kafka架构1.3 kafka问题2. kafka消息传递语义3. kafka消息丢失问题分析4. Producer端消息丢失分析4.1 Producer消息发送流程4.2 Producer 端消息丢失场景4.3 Producer消息确认机制4.4

Kafka 无消息丢失配置如何实现?-爱代码爱编程

Kafka 无消息丢失配置如何实现? 1.如何保证kafka消息不丢失?1.1“已提交”的消息 是什么?1.2有限度的持久化保证 是什么?2.kafka“消息丢失”案例2.1 生产者程序丢失数据2.1.1 问题描述:“发射后不管”2.1.2 问题描述:“数据过大”2.2 解决方案 使用自定义kafka回调类Callback2.3 消费者程序丢失数据

Kafka的消息可靠性(防止消息丢失)-爱代码爱编程

当思考消息队列的消息丢失的时候,通常可以从三方面来思考: Producer生产者端丢失。Broker服务器端丢失。Consumer消费者端丢失。以上三点思考方式是通用的,例如RocketMQ的消息可靠性(防止消息丢失)。 1 Producer端 Producer调用send方法发送消息之后会直接返回,消息可能因为网络问题并没有发送过去。 因此我们可

如何保证kafka无消息丢失-爱代码爱编程

如何保证kafka无消息丢失 kafka无消息丢失的理解:“已提交”有限度的保证消息丢失的情况:生产者丢失数据。消费者程序丢失数据总结: kafka无消息丢失的理解:   Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。 “已提交”   什么叫“已提交”的信息?当kafka的若干个broker

如何配置 kafka 无消息丢失_大唐雨夜的博客-爱代码爱编程

如图kafka在三个阶段可能出现消息丢失,分别是生产消息、消费消息、页缓存操作后异步刷盘。 生产消息 生产消息丢失原因有两个: kafka生产端异步发送消息,不管broker是否响应,立即返回,例如producer.

kafka 业务架构及消息丢失处理方案_bug追踪的博客-爱代码爱编程

一、Kafka简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式消息中间件,它可以处理消费者在网站中的所有动作流数据。 二、Kafka业务架构