代码编织梦想

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。

一、源算子(Source)

 

Flink代码中通用的添加source的方式,是调用执行环境的addSource()方法:

DataStream<String> stream = env.addSource(...);

方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的source function,通常情况下足以应对我们的实际需求。

二、准备工作

为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的url,用户访问url的时间戳),所以在这里,我们可以创建一个类Event,将用户行为包装成它的一个对象。Event类中包含了以下一些字段:

 

具体代码如下:

import java.sql.Timestamp;

public class Event {

public String user;

public String url;

public Long timestamp;

public Event() {

}

public Event(String user, String url, Long timestamp) {

this.user = user;

this.url = url;

this.timestamp = timestamp;

}

@Override

public String toString() {

return "Event{" +

"user='" + user + '\'' +

", url='" + url + '\'' +

", timestamp=" + new Timestamp(timestamp) +

'}';

}

}

这里需要注意,我们定义的Event,有这样几个特点:

  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是公有(public)的
  • 所有属性的类型都是可以序列化的

Flink会把这样的类作为一种特殊的POJO(Plain Ordinary Java Object简单的Java对象,实际就是普通JavaBeans)数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了toString方法,主要是为了测试输出显示更清晰。

我们这里自定义的Event POJO类会在后面的代码中频繁使用,所以在后面的代码中碰到Event,把这里的POJO类导入就好了。

三、从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ArrayList<Event> clicks = new ArrayList<>();

clicks.add(new Event("Mike","./home",1000L));

clicks.add(new Event("Joy","./cart",2000L));

DataStream<Event> stream = env.fromCollection(clicks);

stream.print();

env.execute();

}

四、从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

DataStream<String> stream = env.readTextFile("clicks.csv");

说明:

  • 参数可以是目录,也可以是文件;还可以从HDFS目录下读取,使用路径hdfs://...;
  • 路径可以是相对路径,也可以是绝对路径;
  • 相对路径是从系统属性user.dir获取路径:idea下是project的根目录,standalone模式下是集群节点根目录;

 

五、从Socket读取数据

不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。

我们之前用到的读取socket文本流,就是流处理场景。但是这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

六、从Kafka读取数据

Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。

所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

方式一:Flink1.14标记为过时

public class SourceKafka {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "hadoop102:9092");

properties.setProperty("group.id", "consumer-group");

properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.setProperty("auto.offset.reset", "latest");

DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(

"clicks",

new SimpleStringSchema(),

properties

));

stream.print("Kafka");

env.execute();

}

}

创建FlinkKafkaConsumer时需要传入三个参数:

  • 第一个参数topic,定义了从哪些主题中读取数据。可以是一个topic,也可以是topic列表,还可以是匹配所有想要读取的topic的正则表达式。当从多个topic中读取数据时,Kafka连接器将会处理所有topic的分区,将这些分区的数据放到一条流中去。
  • 第二个参数是一个DeserializationSchema或者KeyedDeserializationSchema。Kafka消息被存储为原始的字节数据,所以需要反序列化成Java或者Scala对象。上面代码中使用的SimpleStringSchema,是一个内置的DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共接口,所以我们也可以自定义反序列化逻辑。
  • 第三个参数是一个Properties对象,设置了Kafka客户端的一些属性。

方式二:Flink1.13开始官方推荐

public class SourceKafka {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> kafkaSource = KafkaSource.<String>builder()

.setBootstrapServers("hadoop102:9092")

.setTopics("topic_1")

.setGroupId("bigdata")

.setStartingOffsets(OffsetsInitializer.latest())

.setValueOnlyDeserializer(new SimpleStringSchema())

.build();

DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

stream.print("Kafka");

env.execute();

}

}

七、自定义Source

接下来我们创建一个自定义的数据源,实现SourceFunction接口。主要重写两个关键方法:run()和cancel()。

  • run()方法:通过运行时上下文对象SourceContext循环生成数据,并发送到流中。
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

1)自定义数据源生成代码

public class ClickSource implements SourceFunction<Event> {

private Boolean running = true;

@Override

public void run(SourceContext<Event> sourceContext) throws Exception {

Random random = new Random();

String[] users = {"Mike", "Joy"};

String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

while (running) {

sourceContext.collect(new Event(

users[random.nextInt(users.length)],

urls[random.nextInt(urls.length)],

Calendar.getInstance().getTimeInMillis()

));

}

}

@Override

public void cancel() {

running = false;

}

}

2)调用自定义数据源代码

public class SourceCustom {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 有了自定义的source function,调用addSource方法

DataStreamSource<Event> stream = env.addSource(new ClickSource());

stream.print("SourceCustom");

env.execute();

}

}

总结

通过本文,我们可以掌握Flink Source从集合中、从文件、从Socket、从Kafka和自定义Source读取数据,如此强大的Flink,大家Get到了吗。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zjjcchina/article/details/130840904