代码编织梦想

之前公司因为 aws 的 kafka 服务上的副本数配置不正确,所以在 aws 例行重启时会导致 producer hang,连锁导致消费断连,当时总结了一篇简单的文章:

aws 上 kafka 服务更新导致断连一例[1]

然而在将队列的副本数都修正之后,发现 producer hang 从高概率必现变成了低概率必现。。这就让人头痛了。

虽然我们也保留了问题的现场,把各种日志多种姿势 Google 检索,但始终没有找到任何线索,本来还想偷个懒,看看能不能直接照抄解决方案,看来没得参考了。

fe9067c6dd607dcab09fafa04edc65b0.png

好吧,自己动手丰衣足食,不读代码是不行了,之前 sarama 的 producer 内部逻辑用了比较多的 channel,看着比较烦一直没有认真读,现在只能自己搞了。

我们先和公司内的队列运维同学以及 aws 的 msk 售后经过了多轮沟通,确定了 producer hang 的一些场景特征:

  • producer hang 只会发生在 aws 上,在自运维的集群中复现不出来

  • aws 上并不是必现,需要在安全更新期间

  • producer 被 hang 住之后,没有任何错误返回

  • aws 的 broker 重启和 leader 选举要比我们自己的集群慢,大概要 1-2s

出问题的时候,producer 的日志长这样:

send msg succ 584
send msg succ 585
send msg succ 586
send msg succ 587
2022/10/21 06:20:54 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:54 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:54 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:54 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:55 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:55 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 client/metadata fetching metadata for [t5] from broker b-3.testmsk.rwb418.c13.kafka.us-west-2.amazonaws.com:9092
2022/10/21 06:20:55 producer/broker/1 state change to [retrying] on t5/11 because kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 Retrying batch for t5-11 because of kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date.
2022/10/21 06:20:55 producer/txnmanager rolling over epoch due to publish failure on t5/11

之后日志便不再滚动,注意这里最后一行的日志 `producer/txnmanager rolling over epoch due to publish failure on t5/11` 因为比较特殊,直接用文本去代码里搜索,可以找到代码的位置在:

e994c8b59562027896179351f181f327.png

这里 msg.hasSequence 是在 producer 设置了 idempotent 时才会为 true 的:

6796ed7c0d29b38c957e2f7a516eabdb.png

这说明我们部门目前的 producer 设置了 idempotent,和之前读代码时的认知相符。

既然知道了 idempotent 的特征,就需要看看 idempotent 的流程和非 idempotent 的流程有什么不同,经过一番确认和筛选,最终看到区别在 retry 流程。当 produce 发生错误时,有一类是 sdk 认为可以通过重试自动恢复的错误:

case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:

从日志中可以得知,我们触发的 producer error 就是 ErrNotLeaderForPartition  和 ErrLeaderNotAvailable,都是可以通过重试恢复的错误。

重试时,若用户设置了 produce 为 idempotent,则会进入 retryBatch 逻辑:

fb6bba92fed1b3a5f7323ceb5cf8315d.png

79ce184a0aaa0938bd6c6e41f5f89574.png

这里的 retryBatch 判断批量重试过程中,某条消息如果重试次数超标了,那么就会直接从函数中返回,而我们在阅读这段代码的时候会发现,紧跟着超 retry.max 的逻辑,当获取 partition 的 leader 逻辑出错时,每一条消息都返回了错误,这已经能够给我们足够的提示了:这里不应该在一次 returnError 时直接 return,而应该将批量消息中的每一条消息都返回错误。

可能大多数读者对 sarama 的 producer 逻辑不太了解,我们这里简单画一个图:

7e5b42c0d6aaab9c590ba76a9d13d775.png

简单来说,用户调用 producer 的 SendMessages 接口,sarama 的 sdk 会给每一条 message 生成一个 ProducerMessage 对象,且对象内部会自带一个 channel,成功时,该 channel 需要返回 nil,失败时,需要返回 error 信息,每一条消息都必须有明确的成功 or 失败,因为 SendMessages 函数中会等待每一条消息的 expectation channel 有内容返回才能正常向下执行:

26962ba7fbd0be490074e110d21f65bf.png

在 producer 的 sdk 中,这条 ProducerMessage 虽然传递链路较长,会从 asyncProducer.input -> topicProducer.input -> partitionProducer.input -> brokerProducer.input 一路传递,但最终的 response 都是从 msg 的 expectation channel 中返回的。

如果某条消息的 expectation channel 没返回,那么就会导致用户的 syncProducer 无限 hang 下去。

阅读代码和流程分析到这里,我们已经基本可以知道原因了,这个问题的触发流程是这样的:

  1. aws 执行安全更新,broker 滚动重启

  2. broker 下线期间,某些 topic 的 leader election 较慢,经过了 1-2s 才把新的 leader 选出来

  3. 我们部门的 kafka producer 使用了 idempotent = true 和 sendmessages 批量发送接口和默认的 producer.retry = 3,producer.backoff = 100ms

  4. 当 broker 下线且 leader 未选举出时,经过 3 次后,leader 依然未恢复,这时由于 sarama 的 bug,导致某些消息的 expectation channel 一直没有resp/err 返回

  5. 之后 producer 就永远 hang 在 SendMessages 函数上了

这个问题后来也提给了 sarama 官方,不过外企很 wlb,至今依然没有回复:

https://github.com/Shopify/sarama/issues/2377

https://github.com/Shopify/sarama/pull/2378

我反思了一下,为什么这个问题其它公司没怎么遇到过,Google 又搜不出来呢?

  • 因为 idempotent 这个特性使用的人很少

  • 同时开启 idempotent 又使用批量发送接口的人就更少了

  • 同时开启了 idempotent 又使用批量发送,还用 aws 同时又遇到这种 leader election 特别慢的人少之又少

所以这个坑只有我们踩到了,没有办法。

[1]

aws 上 kafka 服务更新导致断连一例: https://xargin.com/aws-produce-hang-case/

资料下载

点击下方卡片关注公众号,发送特定关键字获取对应精品资料!

  • 回复「电子书」,获取入门、进阶 Go 语言必看书籍。

  • 回复「视频」,获取价值 5000 大洋的视频资料,内含实战项目(不外传)!

  • 回复「路线」,获取最新版 Go 知识图谱及学习、成长路线图。

  • 回复「面试题」,获取四哥精编的 Go 语言面试题,含解析。

  • 回复「后台」,获取后台开发必看 10 本书籍。

对了,看完文章,记得点击下方的卡片。关注我哦~ 👇👇👇

如果您的朋友也在学习 Go 语言,相信这篇文章对 TA 有帮助,欢迎转发分享给 TA,非常感谢!61d440b07434b3854b412df9e6b3a0b1.png

kafka入门教程 golang实现kafka消息发送、接收_熊猫卡洛斯的博客-爱代码爱编程_kafka golang

一:核心概念 kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点。 kafka中涉及的名词: 消息记录(record): 由一个key,一个value和一个时间戳构成,消息最终存储在主题下的分区中, 记录在生产者中称为生产者记录(ProducerRecord

kafka producer 异常记录-爱代码爱编程

2019独角兽企业重金招聘Python工程师标准>>> Failed to send producer request with correlation id 11 to broker 0 with data for partitions [my-topic,1] 解决办法:修改kafka service配

zabbix监控kafka_Zabbix的Kafka监控-爱代码爱编程

zabbix监控kafka 介绍 (Introduction) Apache Kafka is modern, powerful and fancy service provides storing and managing messages for real-time data processing.

k8s kafka集群 连接不上_在K8S上运行Kafka合适吗?会遇到哪些陷阱?-爱代码爱编程

原标题:在K8S上运行Kafka合适吗?会遇到哪些陷阱? Kubernetes设计的初衷是运行无状态工作负载。这些通常采用微服务架构的工作负载,是轻量级,可水平扩展,遵循十二要素应用程序,可以处理环形断路和随机Monkey测试。 另一方面,Kafka本质上是一个分布式数据库。这意味着你必须处理状态,它比微服务更重量级。Kubernetes支持有状态

Pulsar 和 Kafka 架构对比-爱代码爱编程

本文作者是 David Kjerrumgaard,目前任职于 Splunk,Apache Pulsar 和 Apache NiFi 项目贡献者。译者为 Sijia@StreamNative。原文链接:https://searchdatamanagement.techtarget.com/post/Apache-Pulsar-vs-Kafka-a

kafka Producer APIs (version:2.4.1)-爱代码爱编程

SKIP NAVIGATION LINKS OVERVIEWPACKAGECLASSTREEDEPRECATEDINDEXHELPPREV CLASSNEXT CLASSFRAMESNO FRAMESSUMMARY: NESTED | FIELD | CONSTR | METHODDETAIL: FIELD | CONSTR | METHOD or

Kafka Producer全流程分析和思考-爱代码爱编程

本号旨在一篇文章说清楚一个问题,凭借个人十多年的工作经验,觉得不存在一篇文章就能让你懂xxx,熟悉或者精通一门语言,一个框架,需要应用场景、时间、精力去研究和深挖。 Kafka Producer大家也许都比较熟悉,但是如果深究的话,估计有些细节还有模棱两可的地方,本文将结合工作上遇到的问题和源码分析来尽量说清楚Kafka Producer,以便大家

Kafka producer写入优化-爱代码爱编程

目录 提出需求环境信息测试系统,寻找问题Kafka producer 写入原理Producer 参数ACKS 参数Producer send 方法调用结果Note参考文献 提出需求 由于线上的接口是暴露给外部用户使用,外部用户付费接入,产品经理提出了需求,需要在公司内部做重新数据清洗的时候尽量减少对外部用户数据更新的影响,当前更新数据需要耗费

Kafka消息的压缩机制-爱代码爱编程

大纲 Kafka支持的消息压缩类型什么是 Kafka 的消息压缩消息压缩类型何时需要压缩如何开启压缩在 Broker 端开启压缩compression.type 属性broker 和 topic 两个级别broker 级别topic 级别在 Producer 端压缩compression.type 属性开启压缩的方式压缩和解压的位置何处会压缩pro

kafka session_老邋遢的博客-爱代码爱编程

Kafka Session 文章目录 Kafka Session1. What's Kafka2. Why Kafka3. Tech Points3.1 说一说什么是Kafka中的 ISR、OSR?3.2 说一说什么是Kafka中的LSO、LEO、HW?3.3 Kafka Producer的常见参数3.4 有没有对Kafka Producer参数