代码编织梦想

目录

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

(一)环境准备与配置文件创建

(二)创建主题

(三)测试步骤

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

(一)编写配置脚本

(二)创建 topic

(三)测试过程

三、应用场景示例 

四、总结


        在大数据处理的生态系统中,Flume 和 Kafka 都是非常重要的组件。Flume 擅长收集、聚合和传输大量的日志数据等,而 Kafka 则是一个高性能的分布式消息队列,能够处理海量的实时数据。将 Flume 和 Kafka 进行整合,可以构建强大的数据处理管道,实现数据的高效采集、传输和处理。本文将详细介绍 Flume 和 Kafka 整合的两种常见方式:Kafka 作为 Source 和 Kafka 作为 Sink。

一、Kafka 作为 Source【数据进入到kafka中,抽取出来】

 

(一)环境准备与配置文件创建

        在 Flume 的 conf 文件夹下,创建一个名为 kafka - memory - logger.conf 的脚本文件。这里需要注意,在实际操作中可能会遇到错误,例如 kafka 的每一批次的读取数量大于了 channel 的容量。这种情况下的解决方案是要么降低 kafka 的每一批次读取的容量,要么提高 channel 的容量。

https://flume.liyifeng.org/#kafka-source

kafka-memory-logger.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 100
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = bigdata01:9092,bigdata02:9092,bigdata03:9092
a1.sources.r1.kafka.topics = five
a1.sources.r1.kafka.consumer.group.id = qiaodaohu

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = logger
a1.sinks.k1.maxBytesToLog = 128

 

(二)创建主题

        接着创建一个 topic,名字可以叫做 kafka - flume,当然也可以直接使用以前创建好的主题。

kafka-topics.sh --create --topic kafka-flume --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试步骤

首先启动一个消息生产者,向 topic 中发送消息。

kafka-console-producer.sh --topic kafka-flume --bootstrap-server bigdata01:9092

然后启动 Flume,接收消息并查看 log 日志,这样就可以验证数据是否能够从 Kafka 成功抽取到 Flume 中并进行后续处理。

在flume的flumeconf 文件夹下

flume-ng agent -n a1 -c ../conf -f ./kafka-memory-logger.conf -Dflume.root.logger=INFO,console

二、Kafka 作为 Sink数据从别的地方抽取到kafka里面】

 

 

(一)编写配置脚本

编写一个名为 flume - kafka - sink.conf 的脚本,内容如下:

##a1就是flume agent的名称
## source r1
## channel c1
## sink k1
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = bigdata01
a1.sources.r1.port = 44444

# 修改sink为kafka
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = bigdata01:9092
a1.sinks.k1.kafka.topic = five
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

这里的流程是 netcat(模拟数据源)→ memory(内存通道)→ kafka。

 

(二)创建 topic

使用以下命令创建 topic(flume - kafka):

kafka-topics.sh --create --topic flume-kafka --bootstrap-server bigdata01:9092 --partitions 3 --replication-factor 1

 

(三)测试过程

启动 Flume:

flume-ng agent -n a1 -c conf -f $FLUME_HOME/job/flume-kafka-sink.conf -Dflume.root.logger=INFO,console

使用 telnet 命令,向端口发送消息:

yum -y install telnet

telnet bigdata01 44444

在窗口不断地发送文本数据,数据就会被抽取到 Kafka 中。


使用消费者获取 Kafka 数据:

kafka-console-consumer.sh --topic flume-kafka --bootstrap-server bigdata01:9092 --from-beginning

 

三、应用场景示例 

        假定有这样一个场景:Flume 可以抽取不断产生的日志,抽取到的日志数据,发送给 Kafka,Kafka 经过处理,可以展示在页面上,或者进行汇总统计。这样就实现了一定的实时效果,在实际的大数据处理流程中,这种整合方式能够有效地处理海量的实时数据,提高数据处理的效率和可靠性。

四、总结

        通过 Flume 和 Kafka 的整合,我们能够构建更加灵活、高效的数据处理架构,满足不同场景下的大数据处理需求,为后续的数据挖掘、分析等提供坚实的数据基础。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_64726356/article/details/143921417

flume+kafka整合案例实现-爱代码爱编程

  一、为什么要集成Flume和Kafka 我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果F

玩转flume+kafka原来也就那点事儿_weixin_34189116的博客-爱代码爱编程

好久没有写分享了,继前一个系列进行了Kafka源码分享之后,接下来进行Flume源码分析系列,望大家继续关注,今天先进行开篇文章Flume+kafka的环境配置与使用。 一、FLUME介绍 Flume是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提

flume集群安装部署、kafka集群安装部署以及maxwell安装部署实战-爱代码爱编程

1、Flume集群安装部署 1.1、安装地址 Flume官网地址:http://flume.apache.org/文档查看地址:http://flume.apache.org/FlumeUserGuide.html下载地

flume的配置和kafka_flume 配置kafka多线程-爱代码爱编程

配置两个flume  sink为kafkasink 到flume官方文档(注意版本)  a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = org.apache.flume.source.taildir.TaildirSource a1.sources.r1.pos

flume-爱代码爱编程

看到hdfs大家应该做什么? 是的你应该去把集群打开, cd /export/servers/hadoop/sbin 启动集群 ./start-all.sh 在虚拟机hadoop02和hadoop03上的conf目录下配置相同的日志采集方案,‘ cd /export/servers/flume/conf 切换完成之后,接下来我们输入下面的

flume日志采集系统的部署,实现flume负载均衡,flume故障恢复-爱代码爱编程

目录 安装包 flume的部署 负载均衡测试 故障恢复 安装包 在这里给大家准备好了flume的安装包 通过网盘分享的文件:apache-flume-1.9.0-bin.tar.gz 链接: https://pan.baidu.com/s/1DXMA4PxdDtUQeMB4J62xoQ 提取码: euz7  --来自百度网盘超

日志抽取工具——flume的安装与使用教程-爱代码爱编程

2、安装 解压,重命名,修改配置文件: tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs/ mv apache-flume-1.9.0-bin/ flume 在企业中安装软件起始有两个地方比较常见: /usr/local/ 也可以安装在 /opt/installs export FLU