代码编织梦想

Kafka 部署

Kafka 部署说明

kafka 版本选择 kafka 基于scala语言实现,所以使用kafka需要指定scala的相应的版本.kafka 为多个版本的Scala构建。这仅在使用 Scala 时才重要,并且希望为使用的相同 Scala 版本构建一个版本。否则,任何版本都可以

kafka下载链接:Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=O83Ahttp://kafka.apache.org/downloads

kafka版本格式

kafka_<scala 版本>_<kafka 版本>
#示例:kafka_2.13-2.7.0.tgz 

 scala 语言官网: https://www.scala-lang.org/
scale 与 java关系: https://baike.baidu.com/item/Scala/2462287?fr=aladdin
Kafka 支持单机和集群部署,生产通常为集群模式
官方文档

Apache Kafka

单机部署

Download the latest Kafka release and extract it

$ apt update && apt -y install openjdk-8-jdk 
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0 

NOTE: Your local environment must have Java 8+ installed.
Apache Kafka can be started using ZooKeeper or KRaft. To get started with either configuration follow one the sections below but not both.
Kafka with ZooKeeper
Run the following commands in order to start all services in the correct order: 

# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal session and run:  

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

 

集群部署
环境准备 ZooKeeper

当前版本 Kafka 依赖 Zookeeper 服务,但以后将不再依赖

http://kafka.apache.org/quickstart
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.

环境说明

#在三个Ubuntu18.04节点提前部署zookeeper和kafka三个节点复用
node1:10.0.0.101 
node2:10.0.0.102 
node3:10.0.0.103
#注意:生产中zookeeper和kafka一般是分开独立部署的,kafka安装前需要安装java环境

 确保三个节点的zookeeper启动

[root@node1 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
[root@node2 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@node3 ~]#zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
各节点部署 Kafka
Kafka 节点配置

配置文件说明

#配置文件 ./conf/server.properties内容说明
############################# Server Basics###############################
# broker的id,值为整数,且必须唯一,在一个集群中不能重复
broker.id=1
############################# Socket ServerSettings ######################
# kafka监听端口,默认9092
listeners=PLAINTEXT://10.0.0.101:9092
# 处理网络请求的线程数量,默认为3个
num.network.threads=3
# 执行磁盘IO操作的线程数量,默认为8个
num.io.threads=8
# socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400
# socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400
# socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600
############################# Log Basics###################################
# kafka存储消息数据的目录
log.dirs=../data
# 每个topic默认的partition
num.partitions=1
# 设置副本数量为3,当Leader的Replication故障,会进行故障自动转移。
default.replication.factor=3
# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1
############################# Log FlushPolicy #############################
# 消息刷新到磁盘中的消息条数阈值
log.flush.interval.messages=10000
# 消息刷新到磁盘中的最大时间间隔,1s
log.flush.interval.ms=1000
############################# Log RetentionPolicy #########################
# 日志保留小时数,超时会自动删除,默认为7天
log.retention.hours=168
# 日志保留大小,超出大小会自动删除,默认为1G
#log.retention.bytes=1073741824
# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824
# 每隔多长时间检测数据是否达到删除条件,300s
log.retention.check.interval.ms=300000
############################# Zookeeper ####################################
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181
# 连接zookeeper的超时时间,6s
zookeeper.connection.timeout.ms=6000
# 是否允许删除topic,默认为false,topic只会标记为marked for deletion
delete.topic.enable=true

范例:

#在所有节点上执行安装java
[root@node1 ~]#apt install openjdk-8-jdk -y
#在所有节点上执行下载,官方下载
[root@node1 ~]#wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
[root@node1 ~]#wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
#解压缩
[root@node1 ~]#tar xf kafka_2.13-2.7.0.tgz  -C /usr/local/
[root@node1 ~]#ln -s /usr/local/kafka_2.13-2.7.0/ /usr/local/kafka
#国内镜像下载
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.3.1/kafka_2.13-3.3.1.tgz
[root@node1 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz
[root@node1 ~]#wget https://mirror.bit.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz
#配置PATH变量
[root@node1 ~]#echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node1 ~]#. /etc/profile.d/kafka.sh
#修改配置文件
[root@node1 ~]#vim /usr/local/kafka/config/server.properties
broker.id=1 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的meta.properties文件
listeners=PLAINTEXT://10.0.0.101:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1 #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=3 #指定默认的副本数为3,可以实现故障的自动转移
log.retention.hours=168 #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.101:2181,10.0.0.102:2181,10.0.0.103:2181 #指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟
#准备数据目录
[root@node1 ~]#mkdir /usr/local/kafka/data
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 10.0.0.102:/usr/local/kafka/config
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 10.0.0.103:/usr/local/kafka/config
#修改第2个节点配置
[root@node2 ~]#vim /usr/local/kafka/config/server.properties
broker.id=2 #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://10.0.0.102:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
#修改第3个节点配置
[root@node3 ~]#vim /usr/local/kafka/config/server.properties
broker.id=3  #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://10.0.0.103:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
#可以调整内存
[root@node1 ~]#vim /usr/local/kafka/bin/kafka-server-start.sh
......
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
export KAFKA_HEAP_OPTS=" -Xmx1G -Xms1G"  
fi
 ......

 

启动服务

在所有kafka节点执行下面操作

[root@node1 ~]#kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 

确保服务启动状态
[root@node1 ~]#ss -ntl|grep 9092
 LISTEN  0        50[::ffff:10.0.0.101]:9092                  *:*  
[root@node1 ~]#tail /usr/local/kafka/logs/server.log 
[2021-02-16 12:10:01,276] INFO [ExpirationReaper-1-AlterAcls]: Starting 
(kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2021-02-16 12:10:01,311] INFO [/config/changes-event-process-thread]: Starting 
(kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2021-02-16 12:10:01,332] INFO [SocketServer brokerId=1] Starting socket server acceptors and processors 
(kafka.network.SocketServer)
[2021-02-16 12:10:01,339] INFO [SocketServer brokerId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
[2021-02-16 12:10:01,340] INFO [SocketServer brokerId=1] Started socket server acceptors and processors 
(kafka.network.SocketServer)
[2021-02-16 12:10:01,344] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-16 12:10:01,344] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-16 12:10:01,344] INFO Kafka startTimeMs: 1613448601340 (org.apache.kafka.common.utils.AppInfoParser)
[2021-02-16 12:10:01,346] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
[2021-02-16 12:10:01,391] INFO [broker-1-to-controller-send-thread]: Recorded new controller, from now on will 
use broker 1 (kafka.server.BrokerToControllerRequestThread)
#如果使用id,还需要修改/usr/local/kafka/data/meta.properties
#打开zooinspector可以看到三个id

 

  • Broker 依赖于 Zookeeper,每个Broker 的id 和 Topic、Partition这些元数据信息都会写入Zookeeper 的 ZNode 节点中

  • consumer 依赖于Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的offset 保存到 Zookeeper 中,下次消费在当前offset往后继续消费.kafka0.9 之前Consumer 的offset 存储在 Zookeeper 中,kafka0.9 以后offset存储在本地。

  • Partition 依赖于 Zookeeper,Partition 完成Replication 备份后,选举出一个Leader,这个是依托于 Zookeeper 的选举机制实现的

准备Kafka的service文件

[root@node1 ~]#cat /lib/systemd/system/kafka.service
[unit]
Description=Apache kafka
After=network.target
[service]
Type=simple
#Environment=JAVA_HOME=/data/server/java
PIDFile=/usr/local/kafka/kafka.pid
Execstart=/usr/local/kafka/bin/kafka-server-start.sh  /usr/local/kafka/config/server. properties
Execstop=/bin/kill  -TERM ${MAINPID}
Restart=always
RestartSec=20
[Install]
wantedBy=multi-user.target
[root@node1 ~]#systemctl daemon-load
[root@node1 ~]#systemctl restart kafka.service
一键部署kafka集群
#!/bin/bash

KAFKA_VERSION=3.9.0
#KAFKA_VERSION=3.4.0
#KAFKA_VERSION=3.3.2
#KAFKA_VERSION=3.2.0
SCALA_VERSION=2.13
#KAFKA_VERSION=-3.0.0
KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
#KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.13-2.8.1.tgz"
#KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz"
ZK_VERSOIN=3.8.1
#ZK_VERSOIN=3.7.1
#ZK_VERSOIN=3.6.3
ZK_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/apache-zookeeper-${ZK_VERSOIN}-bin.tar.gz"

ZK_INSTALL_DIR=/usr/local/zookeeper
KAFKA_INSTALL_DIR=/usr/local/kafka

NODE1=10.0.0.187
NODE2=10.0.0.188
NODE3=10.0.0.189

HOST=`hostname -I|awk '{print $1}'`
.  /etc/os-release


color () {
    RES_COL=60
    MOVE_TO_COL="echo -en \\033[${RES_COL}G"
    SETCOLOR_SUCCESS="echo -en \\033[1;32m"
    SETCOLOR_FAILURE="echo -en \\033[1;31m"
    SETCOLOR_WARNING="echo -en \\033[1;33m"
    SETCOLOR_NORMAL="echo -en \E[0m"
    echo -n "$1" && $MOVE_TO_COL
    echo -n "["
    if [ $2 = "success" -o $2 = "0" ] ;then
        ${SETCOLOR_SUCCESS}
        echo -n $"  OK  "    
    elif [ $2 = "failure" -o $2 = "1"  ] ;then 
        ${SETCOLOR_FAILURE}
        echo -n $"FAILED"
    else
        ${SETCOLOR_WARNING}
        echo -n $"WARNING"
    fi
    ${SETCOLOR_NORMAL}
    echo -n "]"
    echo 
}


install_jdk() {
    if [ $ID = 'centos' -o  $ID = 'rocky' ];then
        yum -y install java-1.8.0-openjdk-devel || { color "安装JDK失败!" 1; exit 1; }
    else
        apt update
        apt install openjdk-8-jdk -y || { color "安装JDK失败!" 1; exit 1; } 
    fi
    java -version
}

zk_myid () {
    read -p "请输入node编号(默认为 1): " MYID

    if [ -z "$MYID" ] ;then
        MYID=1
    elif [[ ! "$MYID" =~ ^[0-9]+$ ]];then
        color  "请输入正确的node编号!" 1
        exit
    else
        true
    fi
}

install_zookeeper() {
    wget -P /usr/local/src/ $ZK_URL || { color  "下载失败!" 1 ;exit ; }
    tar xf /usr/local/src/${ZK_URL##*/} -C `dirname ${ZK_INSTALL_DIR}`
    ln -s /usr/local/apache-zookeeper-*-bin/ ${ZK_INSTALL_DIR}
    echo "PATH=${ZK_INSTALL_DIR}/bin:$PATH" >  /etc/profile.d/zookeeper.sh
    .  /etc/profile.d/zookeeper.sh
    mkdir -p ${ZK_INSTALL_DIR}/data 
    echo $MYID > ${ZK_INSTALL_DIR}/data/myid
    cat > ${ZK_INSTALL_DIR}/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${ZK_INSTALL_DIR}/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=${NODE1}:2888:3888
server.2=${NODE2}:2888:3888
server.3=${NODE3}:2888:3888
EOF
   
	cat > /lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
#Environment=${ZK_INSTALL_DIR}
ExecStart=${ZK_INSTALL_DIR}/bin/zkServer.sh start
ExecStop=${ZK_INSTALL_DIR}/bin/zkServer.sh stop
ExecReload=${ZK_INSTALL_DIR}/bin/zkServer.sh restart

[Install]
WantedBy=multi-user.target
EOF
    systemctl daemon-reload
    systemctl enable --now  zookeeper.service
    systemctl is-active zookeeper.service
    if [ $? -eq 0 ] ;then 
        color "zookeeper 安装成功!" 0  
    else 
        color "zookeeper 安装失败!" 1
        exit 1
    fi  
}


install_kafka(){
    if [ ! -f kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz ];then
        wget -P /usr/local/src  $KAFKA_URL  || { color  "下载失败!" 1 ;exit ; }
    else
        cp kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz /usr/local/src/
    fi
    tar xf /usr/local/src/${KAFKA_URL##*/}  -C /usr/local/
    ln -s ${KAFKA_INSTALL_DIR}_*/ ${KAFKA_INSTALL_DIR}
    echo PATH=${KAFKA_INSTALL_DIR}/bin:'$PATH' > /etc/profile.d/kafka.sh
    . /etc/profile.d/kafka.sh
   
    cat > ${KAFKA_INSTALL_DIR}/config/server.properties <<EOF
broker.id=$MYID
listeners=PLAINTEXT://${HOST}:9092
log.dirs=${KAFKA_INSTALL_DIR}/data
num.partitions=1
log.retention.hours=168
zookeeper.connect=${NODE1}:2181,${NODE2}:2181,${NODE3}:2181
zookeeper.connection.timeout.ms=6000
EOF
    mkdir ${KAFKA_INSTALL_DIR}/data
	 
    cat > /lib/systemd/system/kafka.service <<EOF
[Unit]                                                                          
Description=Apache kafka
After=network.target

[Service]
Type=simple
#Environment=JAVA_HOME=/data/server/java
#PIDFile=${KAFKA_INSTALL_DIR}/kafka.pid
ExecStart=${KAFKA_INSTALL_DIR}/bin/kafka-server-start.sh  ${KAFKA_INSTALL_DIR}/config/server.properties
ExecStop=/bin/kill  -TERM \${MAINPID}
Restart=always
RestartSec=20

[Install]
WantedBy=multi-user.target

EOF
	systemctl daemon-reload
	systemctl enable --now kafka.service
	#kafka-server-start.sh -daemon ${KAFKA_INSTALL_DIR}/config/server.properties 
	systemctl is-active kafka.service
	if [ $? -eq 0 ] ;then 
        color "kafka 安装成功!" 0  
    else 
        color "kafka 安装失败!" 1
        exit 1
    fi    
}



zk_myid

#install_jdk

#install_zookeeper

install_kafka

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_74814027/article/details/144070826

架构师图谱·微服务&消息队列篇_java烟雨的博客-爱代码爱编程

1. 概述 “架构师图谱”是一个很宏大的命题,特别是优秀的架构师自身也是“由点到面再到图”,一点点成长积累起来,尝试写这系列文章的目的更多的是结合自身的一些经验和理解,来解读工程师和架构师所应具备的技能模型,这里会更偏向于后端技能,依赖于开源技术、云原生或者其他第三方服务。重点介绍一些技术栈、设计理念和适应场景,这些可以作为我们选型时的依据。所谓“架构即

消息队列kafka使用技巧和常见问题_kafka消息队列难点-爱代码爱编程

目录 【消息队列概述】 【kafka】 消息丢失问题 消息重复问题 消费顺序问题 消息积压问题 kafka集群部署 【消息队列概述】 消息队列主要解决应用耦合、异步消息、流量削锋等问题,是大型分布式系统不可缺少的中间件。消息生产者 只管把消息发布到 MQ 中而不用管谁来取,消息消费者 只管从 MQ 中取消息而不管是谁生产的,这样生

国际环境和背景下的云计算领域-爱代码爱编程

前言      在当前国际环境和背景下,云计算领域呈现出复杂多变的局面,其发展深受技术创新、地缘政治、全球经济以及监管政策的影响。以下从技术趋势、市场竞争、地缘政治和监管环境四个方面详细解析云计算领域的现状。 一、技术趋势:多云与边缘计算的崛起   1. **多云战略成为主流**        全球企业逐渐采用多云策略,将不同的工作负载分配到多个云

远程控制软件:探究云计算和人工智能的融合-爱代码爱编程

在数字化时代,远程控制工具已成为我们工作与生活的重要部分。用户能够通过网络远程操作和管理另一台计算机,极大地提升了工作效率和便捷性。随着人工智能(AI)和云计算技术的飞速发展,远程控制工具也迎来了新的发展机遇,实现了更智能、更高效的操作体验。 RayLink是一款新型远程控制工具,在结合人工智能与云计算方面展现出了显著的优势: 低延迟技术:RayL

等保测评在云计算方面的应用讲解-爱代码爱编程

等保测评(信息安全等级保护测评)在云计算方面的应用主要聚焦于如何满足等级保护相关要求,并确保云计算平台及其上运行的业务系统的安全性。以下是主要内容的讲解: 1. 云计算中的等保测评概述 等保测评是在我国网络安全等级保护制度(简称“等保”)的框架下,对信息系统的安全保护能力和安全状况进行评估。 云计算环境的复杂性和多租户架构使等保测评更加重要,其涉及:

flume 与 kafka 整合实战_flume消费kafka的消息-爱代码爱编程

目录 一、Kafka 作为 Source【数据进入到kafka中,抽取出来】 (一)环境准备与配置文件创建 (二)创建主题 (三)测试步骤 二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】 (一)编写配置脚本 (二)创建 topic (三)测试过程 三、应用场景示例  四、总结         在大数据处理的