个人笔记-爱代码爱编程
创建依赖包
1、创建maven工程FlinkTutorial
引入依赖包
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<!-- scala版本-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>
2、创建读取文件:
resource目录创建txt文件输入文本
hello world
hello flink
hello spark
hello scala
how are you
fine thank you
and you
3、创建WordCount.java
psvm快速键创建main方法
//批处理
public class WorldCount {
public static void main(String[] args) throws Exception {
//创建执行方法 方法.var可以快速创建变量名
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//文件中读取数据
String inputPath = "D:\\workspace\\bigDataWorkspace\\FlinkTutorial\\src\\main\\resources\\hello.txt";
//DataSource Operator 最终是DataSet 也可以
// DataSource<String> stringDataSource = env.readTextFile(inputPath);
DataSet<String> inputDataSet = env.readTextFile(inputPath);
//对数据集处理,按照空格分词展开,转换成(word,1)二元组进行统计 flatMap打散数据,
// 需要定义一个mapper操作,FlatMapFunction需要实现flatMap方法
DataSet<Tuple2<String,Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) //改分组可传0,代表第零个位置word进行分组操作,有keySelector,可以传int位置 String字段名等
.sum(1) //代表将第二个位置上的数据求和
;
resultSet.print();
}
//自定义类,实现FlatMapFunction flink自己实现的二元组Tuple2!!!注意不要使用scala的依赖太多,需要flink的Tuple2并定义返回的泛型
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String,Integer>> {
//实现方法
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" "); //空格统计
for (String word : words) { //遍历所有word,包成二元组输出 ,空格区分每个分组
collector.collect(new Tuple2<String,Integer>(word,1));
}
}
}
}
datastream处理
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);
String inputPath = "D:\\workspace\\bigDataWorkspace\\FlinkTutorial\\src\\main\\resources\\hello.txt";
//DataStreamSource和DataStream同样可以操作
// DataStreamSource<String> inputStream = env.readTextFile(inputPath);
DataStream<String> inputStream = env.readTextFile(inputPath);
//基于数据流进行转换处理,直接引用上个WorldCount的MyFlatMapper || SingleOutputStreamOperator和DataStream都没问题,因为SingleOutputStreamOperator继承DataStream
DataStream<Tuple2<String, Integer>> resultStream = inputStream.flatMap(new WorldCount.MyFlatMapper())
.keyBy(0) //groupBy则非流,每次流处理并没有全到达
.sum(1);
//resultStream.print(); 并非直接打印数据,必须配合execute() 因为流是不会等待
resultStream.print();
//真正的启动,之前的操作都是定义,未启动,来一个数据就读取一个数据,输出多次,有状态的运算
//结果返回4> 3> 1> 1> 代表的是多线程编号 默认并行度4 可以配置env.setParallelism(num)设置并行数
env.execute();
}
}
nc命令发送socket文本流 nc -lk 7777 (7777为端口)
系统没有nc命令则安装nc工具 yum install -y nc
启动nc后再启动main方法,再命令行输入文本:eg:hello world
-
设置提取配置项:parameter tool工具从程序启动参数中提取配置项
-
开发环境需要加入port和host参数步骤:Run->edit Configurations -> programs arguments -> **–host 192.168.56.10 --port 7777 **
Flink部署
-
3.1 Standalone模式
安装:解压 flink1.10.1-bin-scala_2.12.tgz,进入conf目录中 官网 下载连接:flink1.10.1
需要下载额外的hadoop组件 Additional Components 下载文件hadoop-uber文件到lib目录,默认没有
解压:flink文件,进入conf目录中,
修改flink/conf/flink.conf.yaml文件:
主要配置:
jobmanager.rpc.address: localhost #配置地址 # The RPC port where the JobManager is reachable. jobmanager.rpc.port: 6123 #配置端口 # The heap size for the JobManager JVM jobmanager.heap.size: 1024m # The total process memory size for the TaskManager. # Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. taskmanager.memory.process.size: 1728m taskmanager.numberOfTaskSlots: 1 #多线程槽位,1个节点 # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 #默认并行度,多少线程
查看当前节点 cat master结果:localhost:8081
查看slaves 查看cat slaves结果: localhost
修改 /conf/slaves 文件:
分发给另外两台机子:
启动:./bin/start-cluster.sh
查看进程 ./bin/jps
默认网页启动端口:8081 url
添加jar包 submit job
show plan预览
submit发现job并没有运行成功,overview查看,查看job可以取消 cancel job
重新配置slot
停止flink ,设置taskmanager.numberofTaskSlots:4 #当前最佳cpu核心数,重启 ./start-cluster.sh
调节任务数后job 变成绿色running
命令行启动jobs
./flink run -c com.atguigu.wc.StreamWordCount –p 2 FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
./bin/flink list
如果需要取消jobs
./bin/flink cancel 取消job 可以看到某个job状态是running还是cancel
3.2 Yarn模式
Yarn模式部署Flink任务时,要求Flink是有Hadoop支持的版本,Hadoop环境必须保证版本再2.2以上,并且集群中安装有HDFS服务
Flink 提供了两种在 yarn 上运行的模式,分别为 Session-Cluster 和 Per-Job-Cluster
模式
013flink理论和部署
3.2.2 Session Cluster
- 启动 hadoop 集群(略)
启动 yarn-session
./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
其中:
-n(–container):TaskManager 的数量。
-s(–slots): 每个 TaskManager 的 slot 数量,默认一个 slot 一个 core,默认每个
taskmanager 的 slot 的个数为 1,有时可以多一些 taskmanager,做冗余。
-jm:JobManager 的内存(单位 MB)。
-tm:每个 taskmanager 的内存(单位 MB)。
-nm:yarn 的 appName(现在 yarn 的 ui 上的名字)。
-d:后台执行。执行任务
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777
- 去 yarn 控制台查看任务状态
- 取消 yarn-session
yarn application --kill application_1577588252906_0001
第四章 Flink运行架构
Flink流处理API
5.1.1 getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则
此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法
返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方
式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();```
如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1。
5.1.2 createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度。
LocalStreamEnvironment env =StreamExecutionEnvironment.createLocalEnvironment(1);
5.1.3 createRemoteEnvironment
返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager
的 IP 和端口号,并指定要在集群中运行的 Jar 包
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//WordCount.jar")
5.2soure处理
5.2.1 集合读取数据
public class SourceTest1_Collection {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 1.Source: 从集合读取数据
DataStream<SensorReading> sensorDataStream = env.fromCollection(
Arrays.asList(
new SensorReading("sensor_1", 1547718199L, 35.8),
new SensorReading("sensor_6", 1547718201L, 15.4),
new SensorReading("sensor_7", 1547718202L, 6.7),
new SensorReading("sensor_10", 1547718205L, 38.1)
)
);
// 2. 打印
sensorDataStream.print();
// 3. 执行
env.execute();
}
}
5.2.2 文件读取数据
DataStream<String> dataStream = env.readTextFile("YOUR_FILE_PATH ");
5.2.3 kafka消息队列作为数据来源
<dependency>
<groupId>org.apache.flink</ groupId>
<artifactId>flink-connector-kafka-0.11_2.12</ artifactId>
<version>1.10.1</ version>
</dependency>
具体代码:
// kafka 配置项
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
// 从 kafka 读取数据
DataStream<String> dataStream = env.addSource( new
FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
5.2.4 自定义数据源
除了以上的 source 数据来源,我们还可以自定义 source。需要做的,只是传入
一个 SourceFunction 就可以。具体调用如下:
DataStream<SensorReading> dataStream = env.addSource( new MySensor());
示例代码
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
dataStream.print();
env.execute();
}
public static class MySensorSource implements SourceFunction<SensorReading> {
//定义一个标志位
private Boolean runing = true;
@Override
public void run(SourceContext<SensorReading> ctx) throws Exception {
Random random = new Random();
//设置10个传感器的初始温度
HashMap<String,Double> sensorTempMap = new HashMap<>();
for(int i = 0;i<10;i++){
sensorTempMap.put("senor_"+(i+1),60+random.nextGaussian()*20); //高斯随机数
}
while (runing){
for(String sensorId:sensorTempMap.keySet()){
//当前温度基础上随机波动
Double newTemp = sensorTempMap.get(sensorId)+random.nextGaussian();
sensorTempMap.put(sensorId,newTemp);
ctx.collect(new SensorReading(sensorId,System.currentTimeMillis(),newTemp));
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
runing = false;
}
}
5.3 Transform
转换算子
5.3.1 map
5.3.2 flatMap
5.3.3 filter
示例代码
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//文件读取
DataStream<String> inputStream = env.readTextFile("D:\\workspace\\bigDataWorkspace\\FlinkTutorial\\src\\main\\resources\\sensor.txt");
// 1.map 把String转换长度输出
DataStream<Integer> mapStream = inputStream.map(new MapFunction<String,Integer>(){
@Override
public Integer map(String s) throws Exception {
return s.length();
}
});
//flatMap操作操作
DataStream<Object> flatMapStream = inputStream.flatMap(new FlatMapFunction<String, Object>() {
@Override
public void flatMap(String s, Collector<Object> collector) throws Exception {
String[] fields = s.split(",");
for (String field : fields) {
collector.collect(field);
}
}
});
//filter 赛选sensor_1开头对应的数据
DataStream<String> filterStream = inputStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("sensor_1");
}
});
mapStream.print("map");
flatMapStream.print("flatMapStream");
filterStream.print("filter");
env.execute();
}
ter 赛选sensor_1开头对应的数据
DataStream filterStream = inputStream.filter(new FilterFunction() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith(“sensor_1”);
}
});
mapStream.print("map");
flatMapStream.print("flatMapStream");
filterStream.print("filter");
env.execute();
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/markyingshou/article/details/111089857