代码编织梦想


Flink中的RangePartition
导读:
  RangePartition是Flink批处理中的一个算子,用于数据分区。
  在Flink批处理的优化器中,会专门针对RangePartition算子进行一次优化,主要是通过采样算法对数据进行估计,并修改原job生成的OptimizedPlan。本文通过一个示例,对这个过程进行相关介绍。

示例如下:其主要功能是先进行RangePartition,然后进行WordCount。

public class MapPartition {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        if(args.length != 3){
            System.out.println("the use is   input  output parallelism");
            return;
        }
        //输入
        String input = args[0];
        //输出
        String output = args[1];
        //并行度
        int parallelism = Integer.parseInt(args[2]);

        //设置并行度
        env.setParallelism(parallelism);
        DataSet<String> text = env.readTextFile(input);

        DataSet<Tuple2<String,Integer>> words = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] strings = value.split("\\W+");
                for(String s:strings){
                    out.collect(new Tuple2<String, Integer>(s,1));
                }
            }
        });
        //进行Range-Partition
        DataSet<Tuple2<String,Integer>> RangeWords = words.partitionByRange(0);
        DataSet<Tuple2<String,Integer>> counts = RangeWords.groupBy(0).sum(1);
        //写出结果
        counts.writeAsText(output);
        env.execute("this is a range partition job!!!");
    }
}


  上述代码正常来讲会形成如下的plan:
    Source–>FlatMap–>Partition–>GroupReduce–>DataSink
  但从WebUI上得到的结果是:

WebUI中的作业图

 

  可以发现多了一些算子,这些算子就是用于采样的,接下来就详细跟踪一下这个过程。

  其逻辑入口位于Optimizer类中:

    #Optimizer.java line 519
    plan.accept(new RangePartitionRewriter(plan));

  RangePartitionRewriter实现了Visitor接口,通过accept()方法,其会对原Job生成的Plan中的各算子进行遍历,从而在RangePartition算子处改写相关逻辑。具体是在调用PostVisit()的时候进行判断是否是采用的RangePartition传输策略,代码如下:

//RangePartitionRewriter.java类中的postVisit()方法
    public void postVisit(PlanNode node) {

        if(node instanceof IterationPlanNode) {
            IterationPlanNode iNode = (IterationPlanNode)node;
            if(!visitedIterationNodes.contains(iNode)) {
                visitedIterationNodes.add(iNode);
                iNode.acceptForStepFunction(this);
            }
        }
        //提取当前计划节点所有的的输入通道
        final Iterable<Channel> inputChannels = node.getInputs();
        for (Channel channel : inputChannels) {//遍历输入通道
            ShipStrategyType shipStrategy = channel.getShipStrategy();
            // Make sure we only optimize the DAG for range partition, and do not optimize multi times.
            if (shipStrategy == ShipStrategyType.PARTITION_RANGE) {// 确保优化的通道的数据传输策略为范围分区

                if(channel.getDataDistribution() == null) {
                    if (node.isOnDynamicPath()) {
                        throw new InvalidProgramException("Range Partitioning not supported within iterations if users do not supply the data distribution.");
                    }
                    //对该通道的范围分区进行“重写”,并将当前通道从源计划节点的通道中删除,然后加入新的通道集合
                    PlanNode channelSource = channel.getSource();
                    List<Channel> newSourceOutputChannels = rewriteRangePartitionChannel(channel);
                    //移除原来的OutgoingChannels
                    channelSource.getOutgoingChannels().remove(channel);
                    //添加新的OutgoingChannels
                    channelSource.getOutgoingChannels().addAll(newSourceOutputChannels);
                }
            }
        }
    }


  当判断传输策略为Range Partition后,会调用rewriteRangePartitionChannel()方法对范围分区进行“重写”。

//RangePartitionRewriter.java类中的rewriteRangePartitionChannel()方法
    private List<Channel> rewriteRangePartitionChannel(Channel channel) {
        //RangePartition前驱Operator的输出?
        final List<Channel> sourceNewOutputChannels = new ArrayList<>();
        //sourceNode是RangePartition的前驱Operator
        final PlanNode sourceNode = channel.getSource();
        //targetNode就是RangePartition该Operator
        final PlanNode targetNode = channel.getTarget();
        //得到并行度
        final int sourceParallelism = sourceNode.getParallelism();
        final int targetParallelism = targetNode.getParallelism();
        final Costs defaultZeroCosts = new Costs(0, 0, 0);
        final TypeComparatorFactory<?> comparator = Utils.getShipComparator(channel, this.plan.getOriginalPlan().getExecutionConfig());

        // 1. Fixed size sample in each partitions.
        final int sampleSize = SAMPLES_PER_PARTITION * targetParallelism;
        //SampleInPartition继承自RichMapPartitionFunction,用于各分区的采样,SampleInPartition在Flink-core中,RichMapPartitionFunction则在flink-runtime中
        final SampleInPartition sampleInPartition = new SampleInPartition(false, sampleSize, SEED);
        //得到RangePartition的前驱Operator的输出格式,即RangePartition的接受格式
        final TypeInformation<?> sourceOutputType = sourceNode.getOptimizerNode().getOperator().getOperatorInfo().getOutputType();
        //IntermediateSampleData中存储了元素值和权重weight
        final TypeInformation<IntermediateSampleData> isdTypeInformation = TypeExtractor.getForClass(IntermediateSampleData.class);
        final UnaryOperatorInformation sipOperatorInformation = new UnaryOperatorInformation(sourceOutputType, isdTypeInformation);
        //MapPartitionOperatorBase是flink-core中的类
        final MapPartitionOperatorBase sipOperatorBase = new MapPartitionOperatorBase(sampleInPartition, sipOperatorInformation, SIP_NAME);
        //采样的Map节点,MapPartitionNode是flink-optimizer中dag包的类,其中只有DagConnection
        final MapPartitionNode sipNode = new MapPartitionNode(sipOperatorBase);
        //新建一个Channel,该Channel的source是RangePartition的前驱Operator
        final Channel sipChannel = new Channel(sourceNode, TempMode.NONE);
        sipChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
        //SingleInputPlanNode也是flink-optimizer中plan包的类,其中多了Channel input 的信息,应该是用在OptimizedPlan中的
        final SingleInputPlanNode sipPlanNode = new SingleInputPlanNode(sipNode, SIP_NAME, sipChannel, DriverStrategy.MAP_PARTITION);
        sipNode.setParallelism(sourceParallelism);
        sipPlanNode.setParallelism(sourceParallelism);
        sipPlanNode.initProperties(new GlobalProperties(), new LocalProperties());
        sipPlanNode.setCosts(defaultZeroCosts);
        //设置新加的Channel的target节点为新创建的采样MapPartitionNode
        sipChannel.setTarget(sipPlanNode);
        this.plan.getAllNodes().add(sipPlanNode);
        sourceNewOutputChannels.add(sipChannel);

        // 2. Fixed size sample in a single coordinator.
        //SampleInCoordinator实现了GroupReduceFunction,用于将各分区的采样混合?
        final SampleInCoordinator sampleInCoordinator = new SampleInCoordinator(false, sampleSize, SEED);
        final UnaryOperatorInformation sicOperatorInformation = new UnaryOperatorInformation(isdTypeInformation, sourceOutputType);
        //GroupReduceOperatorBase是flink-core中的
        final GroupReduceOperatorBase sicOperatorBase = new GroupReduceOperatorBase(sampleInCoordinator, sicOperatorInformation, SIC_NAME);
        //Reduce节点,GroupReduceNode-optimizer中dag的类,其中有DagConnection
        final GroupReduceNode sicNode = new GroupReduceNode(sicOperatorBase);
        //初始化一个新的Channel,其Source是1.中的Map算子
        final Channel sicChannel = new Channel(sipPlanNode, TempMode.NONE);
        sicChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
        //同理,根据sicNode和Channel信息构建PlanNode,是flink-optimizer中plan中的类
        final SingleInputPlanNode sicPlanNode = new SingleInputPlanNode(sicNode, SIC_NAME, sicChannel, DriverStrategy.ALL_GROUP_REDUCE);
        //注意这里的并行度需要设置为1,因为要将各节点的样本根据权值进行聚合到一起,相当于中心Coordinate的作用
        sicNode.setParallelism(1);
        sicPlanNode.setParallelism(1);
        sicPlanNode.initProperties(new GlobalProperties(), new LocalProperties());
        sicPlanNode.setCosts(defaultZeroCosts);
        //设置Channel的Target
        sicChannel.setTarget(sicPlanNode);
        sipPlanNode.addOutgoingChannel(sicChannel);
        this.plan.getAllNodes().add(sicPlanNode);

        // 3. Use sampled data to build range boundaries.
        //RangeBoundaryBuilder实现了RichMapPartitionFunction,用于计算各个分段的界
        final RangeBoundaryBuilder rangeBoundaryBuilder = new RangeBoundaryBuilder(comparator, targetParallelism);
        final TypeInformation<CommonRangeBoundaries> rbTypeInformation = TypeExtractor.getForClass(CommonRangeBoundaries.class);
        final UnaryOperatorInformation rbOperatorInformation = new UnaryOperatorInformation(sourceOutputType, rbTypeInformation);
        //MapPartitionOperatorBase是flink-core中的类
        final MapPartitionOperatorBase rbOperatorBase = new MapPartitionOperatorBase(rangeBoundaryBuilder, rbOperatorInformation, RB_NAME);
        //Map节点,MapPartitionNode是flink-optimizer中dag的类,其中只有DagConnection
        final MapPartitionNode rbNode = new MapPartitionNode(rbOperatorBase);
        //创建以2.中reduce的节点为Source的Channel
        final Channel rbChannel = new Channel(sicPlanNode, TempMode.NONE);
        rbChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
        //创建PlanNode,flink-optimizer中plan中的类
        final SingleInputPlanNode rbPlanNode = new SingleInputPlanNode(rbNode, RB_NAME, rbChannel, DriverStrategy.MAP_PARTITION);
        rbNode.setParallelism(1);
        rbPlanNode.setParallelism(1);
        rbPlanNode.initProperties(new GlobalProperties(), new LocalProperties());
        rbPlanNode.setCosts(defaultZeroCosts);
        rbChannel.setTarget(rbPlanNode);
        sicPlanNode.addOutgoingChannel(rbChannel);
        this.plan.getAllNodes().add(rbPlanNode);

        // 4. Take range boundaries as broadcast input and take the tuple of partition id and record as output.
        //AssignRangeIndex实现了RichMapPartitionFunction,利用了boundary来得到每个记录的输出
        final AssignRangeIndex assignRangeIndex = new AssignRangeIndex(comparator);
        final TypeInformation<Tuple2> ariOutputTypeInformation = new TupleTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO, sourceOutputType);
        final UnaryOperatorInformation ariOperatorInformation = new UnaryOperatorInformation(sourceOutputType, ariOutputTypeInformation);
        //MapPartitionOperatorBase是flink-core中的类
        final MapPartitionOperatorBase ariOperatorBase = new MapPartitionOperatorBase(assignRangeIndex, ariOperatorInformation, ARI_NAME);
        //Map节点,MapPartitionNode是flink-optimizer中dag的类,其中只有DagConnection
        final MapPartitionNode ariNode = new MapPartitionNode(ariOperatorBase);
        //创建以RangePartition的前驱Operator为Source的Channel
        final Channel ariChannel = new Channel(sourceNode, TempMode.NONE);
        // To avoid deadlock, set the DataExchangeMode of channel between source node and this to Batch.
        //为了防止死锁,将Channel的DataExchangeMode设置为Batch
        ariChannel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.BATCH);
        //创建PlanNode,flink-optimizer中plan中的类
        final SingleInputPlanNode ariPlanNode = new SingleInputPlanNode(ariNode, ARI_NAME, ariChannel, DriverStrategy.MAP_PARTITION);
        ariNode.setParallelism(sourceParallelism);
        ariPlanNode.setParallelism(sourceParallelism);
        ariPlanNode.initProperties(new GlobalProperties(), new LocalProperties());
        ariPlanNode.setCosts(defaultZeroCosts);
        //将创建的Channel的Target指向新创建的Map Operator
        ariChannel.setTarget(ariPlanNode);
        this.plan.getAllNodes().add(ariPlanNode);
        //将新创建的Channel添加到RangePartition的前一个Operator的sourceNewOutputChannels中
        sourceNewOutputChannels.add(ariChannel);

        //计算得到的boundaries会被输出到广播通道,rbPlanNode即Channel的sourceNode,为step3创建的map节点,用于计算各个分段的界的
        final NamedChannel broadcastChannel = new NamedChannel("RangeBoundaries", rbPlanNode);
        broadcastChannel.setShipStrategy(ShipStrategyType.BROADCAST, DataExchangeMode.PIPELINED);
        //将Channel的Target指向step4创建的Map Operator,用于得到每个记录属于哪个分区的
        broadcastChannel.setTarget(ariPlanNode);
        List<NamedChannel> broadcastChannels = new ArrayList<>(1);
        broadcastChannels.add(broadcastChannel);
        //broadcastChannels的source是计算各个分段界的rbPlanNode,target是根据界将各个记录分离的ariPlanNode
        //这里将broadcastChannels添加给ariPlanNode
        ariPlanNode.setBroadcastInputs(broadcastChannels);

        // 5. Remove the partition id.
        //创建Channel,source为将各个记录分离的ariPlanNode
        final Channel partChannel = new Channel(ariPlanNode, TempMode.NONE);
        final FieldList keys = new FieldList(0);
        partChannel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, keys, idPartitioner, DataExchangeMode.PIPELINED);
        ariPlanNode.addOutgoingChannel(partChannel);

        //RemoveRangeIndex继承自MapFunction,从泛型<Tuple2<Integer,T>,T>中可知,其通过将Tuple2<Integer,T>map为T将分区信息进行删除
        //因为找到记录的分区之后,分区编号就没有存在的意义了,因此为流中的记录移除分区编号
        final RemoveRangeIndex partitionIDRemoveWrapper = new RemoveRangeIndex();
        final UnaryOperatorInformation prOperatorInformation = new UnaryOperatorInformation(ariOutputTypeInformation, sourceOutputType);
        final MapOperatorBase prOperatorBase = new MapOperatorBase(partitionIDRemoveWrapper, prOperatorInformation, PR_NAME);
        //Map节点,MapNode是flink-optimizer中dag的类,其中只有DagConnection信息
        final MapNode prRemoverNode = new MapNode(prOperatorBase);
        //创建PlanNode,flink-optimizer中plan中的类
        final SingleInputPlanNode prPlanNode = new SingleInputPlanNode(prRemoverNode, PR_NAME, partChannel, DriverStrategy.MAP);
        //设置Channel的target,target为新创建的Map Operator,该map消除了数据中的分区信息,只保留原始记录
        partChannel.setTarget(prPlanNode);
        //将新创建的节点并行度设置为RangePartition节点的后继节点的并行度
        prRemoverNode.setParallelism(targetParallelism);
        prPlanNode.setParallelism(targetParallelism);
        GlobalProperties globalProperties = new GlobalProperties();
        globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING));
        prPlanNode.initProperties(globalProperties, new LocalProperties());
        prPlanNode.setCosts(defaultZeroCosts);
        this.plan.getAllNodes().add(prPlanNode);

        // 6. Connect to target node.
        //将原来的RangePartition算子的输入Channel的source设置为prPlanNode
        channel.setSource(prPlanNode);
        channel.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
        prPlanNode.addOutgoingChannel(channel);

        return sourceNewOutputChannels;
    }


  经过以上步骤,则将OptimizedPlan进行了改写,从而完成了分区的“重写”。上述代码可以结合下图来看(可以放大图片看):

简要过程

详细过程
对具体的采样算法感兴趣,可以参考:
https://blog.csdn.net/yanghua_kobe/article/details/69603795
https://blog.csdn.net/yanghua_kobe/article/details/69358347
 

 

 

flink源码阅读:如何使用flinkkafkaproducer将数据在kafka的多个partition中均匀分布_raycee的博客-爱代码爱编程

使Flink输出的数据在多个partition中均匀分布 FlinkKafkaProducerBase的子类可以使用默认的KafkaPartitioner FixedPartitioner(只向partition 0中写数

flink dataset api之partition_jiny_li的博客-爱代码爱编程

基本介绍 Rebalance:对数据集进行再平衡,重分区,消除数据倾斜 Hash-Partition:根据指定key的哈希值对数据集进行分区,某一key集中时还是会出现数据倾斜 (partitionByHash()) Range-Partition:根据指定的key对数据集进行范围分区   (.partitionByRange()) 

理解flink中的状态实现_黑头人的博客-爱代码爱编程

state的层次结构keyedState => windowStateOperatorState => kafkaOffsetstateBackendsnapshot/restoreinternalTimerServiceRocksDB操作的初探state ttLstate local recoveryQueryableStateincream

Flink SQL中的窗口函数-爱代码爱编程

1 OVER窗口       OVER窗口(OVER Window)是传统数据库的标准开窗,不同于GROUP BY Window,OVER Window中的每一个元素都对应一个窗口。窗口元素是与当前元素相邻的元素集合,流数据元素分布在多个窗口中。在Flink SQL Window的实现中,每个触发计算的元素所确定的行,都是该元素所在窗口的最优一行。   

FlinkSql中窗口(window)的使用-爱代码爱编程

FlinkSql中窗口(window)的使用 目录 FlinkSql中窗口(window)的使用一、Table API中使用窗口Group WindowsOver Windows二、SQL API中使用窗口Group WindowsOver Windows 时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算

关于flink中sql的over window底层个人实现及分析-爱代码爱编程

 1、前言         在使用flink中sql的over window函数一直在想底层是如何实现的,查看资料并没有找到相关说明,去翻源码的话没那么多时间去研究,就按照自己思路简单测试了下实现过程,时间过短,语言没有进行整理,仅供初学者参考 2、测试内容         分别对开窗函数中对行和事件时间进行分析底层实现,加强理解 3、实现过程

flink入门总结-爱代码爱编程

spark     Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎     支持迭代式计算,图形计算,Spark框架计算比MR快的原因是:中间结果不落盘。注意Spark的Shuffle也是落盘的。     Spark内置模块         Spark Core:Spark的基本功能,含任务调度、内存管理、错误恢复、与存储系统交互等模块。

Flink算子大全-爱代码爱编程

Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。 所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。 DataSet 一、Source算子 1. fromCollection fromCollection:从本地集合读取数据

FlinkSql系列4之OVER聚合-爱代码爱编程

系列文章目录 前言 本节主要记录学习flinksql中的OVER聚合的使用,我们知道,对于GROUP BY来说,我们只能保留我们分组的字段,其他的字段是无法保留的,而对于OVER聚合来说我们可以实现保留全部的字段,不过在实际应用中,这个并不常用。 一、Over聚合实际测试 1.时间区间聚合 创建源表 CREATE TABLE source_ta

Flink Table API和SQL(中)-爱代码爱编程

传送门: Flink Table API和SQL(上)(基本API介绍+流处理表的特性)Flink Table API和SQL(中)(时间属性及窗口+聚合查询+联结查询)Flink Table API和SQL(下)(函数+SQL客户端+连接到外部系统) 文章目录 一、时间属性和窗口1. 事件时间1.1 在创建表的DDL 中定义1.2 在

【Flink】Table API和Flink SQL-爱代码爱编程

Table API和Flink SQL 简绍基本程序结构TableEnvironmetnt表(Table)输出表更新模式DataStream与表的相互转换查看执行计划时间窗口Group WindowsTumbling WindowsSliding WindowsSession WindowsOver WindowsSQL中的Over Windows

flink中table api和sql 完整使用中 (第十一章)_小坏讲微服务的博客-爱代码爱编程

Flink中Table API和SQL 完整使用下 一、联结(Join)查询1、常规联结查询1. 等值内联结(INNER Equi-JOIN)2. 等值外联结(OUTER Equi-JOIN) 2、间隔联结查询

flinksql中的聚合查询_大大大大肉包的博客-爱代码爱编程

        在 SQL 中,一个很常见的功能就是对某一列的多条数据做一个合并统计,得到一个或多个结果值;比如求和、最大最小值、平均值等等,这种操作叫作聚合(Aggregation)查询。Flink 中的 SQL 是流处理与标准 SQL 结合的产物,所以聚合查询也可以分成两种:流处理中特有的聚合(主要指窗口聚合),以及 SQL 原生的聚合查询方式(分

大数据组件之flink-爱代码爱编程

文章目录 大数据组件之Flink一.Flink简介Flink是什么?Flink的特点Flink框架处理流程Flink发展时间线Flink在企业中的应用Flink的应用场景为什么选择Flink?传统数据处理架构有状态的

spring integration开篇:说明-爱代码爱编程

系列文章目录 …TODO spring integration开篇:说明 …TODO spring integration使用:消息路由 spring integration开篇:说明 系列文章目录前言ent

flink中的table api和sql(三)_flink tableapi 开窗-爱代码爱编程

目录 11.4 时间属性和窗口 11.4.1 事件时间 11.4.2 处理时间 11.4.3 窗口(Window) 11.5 聚合(Aggregation)查询 11.5.1 分组聚合 11.5.2 窗口聚合 11.5.3 开窗(Over)聚合 11.4 时间属性和窗口 基于时间的操作(比如时间窗口),需要定义相关的时间语义和时

flink分区之flink分区策略整理-爱代码爱编程

title: Flink系列 一、Flink Partitioner 分区策略整理 Flink 的分区策略: 批处理的分区策略: Partitioner 流处理的分区策略: StreamPartitione