在基于成本的优化器中,成本估算非常重要,它直接影响着候选计划的生成。在Flink中成本估算依赖于每个不同的运算符所提供的自己的“预算”,本篇我们将分析什么是成本、运算符如何提供自己的预算以及如何基于预算估算成本。
Flink以类Costs来定义成本,它封装了一些成本估算的因素同时提供了一些针对成本对象的计算方法(加、减、乘、除)以及对这些因素未知值的认定与校验。
“cost”一词也有译作:开销、代价,将其视为同义即可。
Flink当前将成本估算的因素划分为两大类:
可量化的成本估算因素:指代通过跟踪一个可量化的测量指标可以计算出的成本估算因素(比如网络或I/O的字节数);启发式的成本估算因素:指代那些不可定量计算的成本估算因素,因此只能给出一些定性的经验值;当前被纳入成本估算的因素如下:
网络成本;磁盘I/O成本;CPU成本;启发式网络成本;启发式磁盘成本;启发式CPU成本;可量化的成本估算因素可能经常会被设置为未知的(UNKNOWN,在Costs中以字面常量值-1表示)。当可量化的成本估算因素被置为未知时,所有操作的成本都将变成未知的,因此这将导致在进行优化裁剪期间,无法决策出哪个偏向的操作。在这种情况下,启发式的成本估算因素必须能发挥作用,它应该包含一个值来确保以不同策略执行的运算符是可比较的(甚至在无法估算的情况下)。
成本的估算借助于成本估算器(CostEstimator),CostEstimator定义了一系列增加成本的方法,这些方法有待具体的估算器实现,它们大致分为三大类:
增加传输策略的成本;增加本地策略的成本;增加屏障的成本;CostEstimator借助于以上这几类方法,可完成对一个运算符总成本的计算,具体的计算逻辑封装在方法costOperator中,该方法接收一个计划节点(PlanNode)参数,然后按照传输策略和本地策略分别进行枚举与计算。完整的方法如下:
public void costOperator(PlanNode n) { //构建一个成本对象用来存储总成本 final Costs totalCosts = new Costs(); //获得该节点的最少可用内存 final long availableMemory = n.getGuaranteedAvailableMemory(); //----------------------------- // 增加传输策略产生的成本 //----------------------------- //遍历该节点的所有输入端通道 for (Channel channel : n.getInputs()) { final Costs costs = new Costs(); //匹配当前通道的传输策略 switch (channel.getShipStrategy()) { case NONE: throw new CompilerException( "Cannot determine costs: Shipping strategy has not been set for an input."); case FORWARD: break; //随机重分区 case PARTITION_RANDOM: addRandomPartitioningCost(channel, costs); break; //哈希分区与自定义分区增加成本的方式相同 case PARTITION_HASH: case PARTITION_CUSTOM: addHashPartitioningCost(channel, costs); break; //范围分区 case PARTITION_RANGE: addRangePartitionCost(channel, costs); break; //广播 case BROADCAST: addBroadcastCost(channel, channel.getReplicationFactor(), costs); break; //强制重平衡分区 case PARTITION_FORCED_REBALANCE: addRandomPartitioningCost(channel, costs); break; default: throw new CompilerException("Unknown shipping strategy for input: " + channel.getShipStrategy()); } //匹配当前通道的本地策略 switch (channel.getLocalStrategy()) { case NONE: break; //排序与合并排序都增加本地的排序成本 case SORT: case COMBININGSORT: addLocalSortCost(channel, costs); break; default: throw new CompilerException("Unsupported local strategy for input: " + channel.getLocalStrategy()); } //增加屏障成本 if (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE) { addArtificialDamCost(channel, 0, costs); } //如果通道在动态路径上,则需要调整成本计算的权重 if (channel.isOnDynamicPath()) { costs.multiplyWith(channel.getCostWeight()); } //将当前通道的成本加入总成本 totalCosts.addCosts(costs); } Channel firstInput = null; Channel secondInput = null; Costs driverCosts = new Costs(); int costWeight = 1; //如果节点在动态路径上,则重新调整成本权重 if (n.isOnDynamicPath()) { costWeight = n.getCostWeight(); } //获得当前节点的所有输入端通道 { Iterator<Channel> channels = n.getInputs().iterator(); if (channels.hasNext()) { firstInput = channels.next(); } if (channels.hasNext()) { secondInput = channels.next(); } } //根据计划节点的执行策略来计算本地成本 switch (n.getDriverStrategy()) { //以下这些执行策略不计算本地成本 case NONE: case UNARY_NO_OP: case BINARY_NO_OP: case MAP: case MAP_PARTITION: case FLAT_MAP: case ALL_GROUP_REDUCE: case ALL_REDUCE: case CO_GROUP: case CO_GROUP_RAW: case SORTED_GROUP_REDUCE: case SORTED_REDUCE: case SORTED_GROUP_COMBINE: case ALL_GROUP_COMBINE: case UNION: break; //各种形式的合并成本 case INNER_MERGE: case FULL_OUTER_MERGE: case LEFT_OUTER_MERGE: case RIGHT_OUTER_MERGE: addLocalMergeCost(firstInput, secondInput, driverCosts, costWeight); break; //混合哈希join的成本(第一个输入边是构建边,第二个输入边是扫描边) case HYBRIDHASH_BUILD_FIRST: case RIGHT_HYBRIDHASH_BUILD_FIRST: case LEFT_HYBRIDHASH_BUILD_FIRST: case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight); break; //混合哈希join的成本(第二个输入边是构建边,第一个输入边是扫描边) case HYBRIDHASH_BUILD_SECOND: case LEFT_HYBRIDHASH_BUILD_SECOND: case RIGHT_HYBRIDHASH_BUILD_SECOND: case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight); break; //各种其他的执行策略 case HYBRIDHASH_BUILD_FIRST_CACHED: addCachedHybridHashCosts(firstInput, secondInput, driverCosts, costWeight); break; case HYBRIDHASH_BUILD_SECOND_CACHED: addCachedHybridHashCosts(secondInput, firstInput, driverCosts, costWeight); break; case NESTEDLOOP_BLOCKED_OUTER_FIRST: addBlockNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight); break; case NESTEDLOOP_BLOCKED_OUTER_SECOND: addBlockNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight); break; case NESTEDLOOP_STREAMED_OUTER_FIRST: addStreamedNestedLoopsCosts(firstInput, secondInput, availableMemory, driverCosts, costWeight); break; case NESTEDLOOP_STREAMED_OUTER_SECOND: addStreamedNestedLoopsCosts(secondInput, firstInput, availableMemory, driverCosts, costWeight); break; default: throw new CompilerException("Unknown local strategy: " + n.getDriverStrategy().name()); } //将驱动器的执行成本加入到总成本,将得到的总成本作为当前节点的成本 totalCosts.addCosts(driverCosts); n.setCosts(totalCosts); }DefaultCostEstimator继承自CostEstimator,作为默认的(也是唯一的)成本估算器。它实现了上面计算成本逻辑中调用的一系列增加成本的addXXX方法。这些方法中的绝大部分,又依赖于预算提供器(EstimateProvider)所提供的预算数据,然后根据不同的增加成本的算法逻辑,利用这些预算数据做计算。比如我们以新增广播成本的addBroadcastCost方法为示例,其实广播传输方式说白了就是将数据复制到当前运算符的所有输出通道中,因此这里对成本的计算取决于复制因子,代码如下:
public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) { //检查复制因子的合法性 if (replicationFactor <= 0) { throw new IllegalArgumentException("The replication factor of must be larger than zero."); } if (replicationFactor > 0) { //所估算的需要输出数据的大小 final long estOutShipSize = estimates.getEstimatedOutputSize(); //如果数据大小小于等于零,则标记网络成本为“未知” if (estOutShipSize <= 0) { costs.setNetworkCost(Costs.UNKNOWN); } //否则网络成本拿数据大小乘以复制因子 else { costs.addNetworkCost(replicationFactor * estOutShipSize); } //增加启发式网络成本,通过启发式成本基数乘以复制因子后再扩大十倍 costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 10 * replicationFactor); } else { costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 1000); } }前面我们谈论了如何通过CostEstimator来估算成本,但其实CostEstimator是在已获得预算数据的基础上应用相关的算法来算出成本的,而用来估算成本的预算数据其实是来自预算提供者(EstimateProvider)。Flink批处理中所有的运算符都有一个基于优化器的内部表示,我们可以称它们为优化器运算符,这些运算符创建于优化操作之前,且它们都必须实现EstimateProvider接口。各个优化器运算符根据自己的实现以及语义将成本估算相关的信息暴露给外部查询。目前被纳入预算的信息有:
输出的数据流大小:由接口方法getEstimatedOutputSize提供;输出的记录数:由接口方法getEstimatedNumRecords提供;单个输出记录的平均字节数:由接口方法getEstimatedAvgWidthPerOutputRecord提供;在dag包下,EstimateProvider接口的继承关系图如下:
其中,OptimizerNode是所有被优化的运算符继承的基类,因此所有优化器运算符都是预算提供者。OptimizerNode为绝大部分的优化器运算符提供了统一的预算计算方法computeOutputEstimates。
为什么说是绝大部分运算符呢?因为有些运算符是特殊的,比如双输入端union运算符BinaryUnionNode以及迭代相关的运算符。
所有的运算符都会在优化时被遍历,Flink提供了一个编号及预算遍历器(IdAndEstimatesVisitor)来对所有运算符进行逐个遍历并计算预算,这一点体现在Optimizer的compile方法的下面这行代码中:
rootNode.accept(new IdAndEstimatesVisitor(this.statistics));而在IdAndEstimatesVisitor的postVisit方法中即调用computeOutputEstimates方法来计算预算。下面,我们来分析一下预算是如何计算得出的,总得来说computeOutputEstimates的逻辑被分为两部分:
各个具体的运算符计算它们特定的预算;根据编译提示(CompilerHints)覆盖原有的预算计算;OptimizerNode将特定运算符的预算计算定义成名为computeOperatorSpecificDefaultEstimates的抽象方法开放给派生类根据自身的特定逻辑实现。然后,如果该运算符如果设置有CompilerHints的话,将会根据CompilerHints覆盖原有的预算结果。
所谓CompilerHints,它是封装了描述用户函数行为的编译提示,它可用于改进优化器对计划的选择。如果给某个运算符设置编译提示的话,那么在计算预算时,将会用它来覆盖运算符自身给出的中间结果的预算。当前,CompilerHints在优化器中没有得到太大的机会发挥。
因为CompilerHints没有被广泛应用,所以预算的计算还是依赖各个运算符具体提供,所以我们关注一下computeOperatorSpecificDefaultEstimates方法。该方法完全是按照具体运算符的语义特征来实现的,我们选择看其中的几个实现:
二元union运算符的预算就是累加其两个输入端:
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { long card1 = getFirstPredecessorNode().getEstimatedNumRecords(); long card2 = getSecondPredecessorNode().getEstimatedNumRecords(); this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 + card2; long size1 = getFirstPredecessorNode().getEstimatedOutputSize(); long size2 = getSecondPredecessorNode().getEstimatedOutputSize(); this.estimatedOutputSize = (size1 < 0 || size2 < 0) ? -1 : size1 + size2; }Cross运算符的处理方式是:
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { long card1 = getFirstPredecessorNode().getEstimatedNumRecords(); long card2 = getSecondPredecessorNode().getEstimatedNumRecords(); //输出的总记录数为第一个输入节点和第二个输入节点的记录数的乘积; this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : card1 * card2; //如果记录数大于等于零,则会计算输出数据的大小 if (this.estimatedNumRecords >= 0) { //获得第一个、第二个输入节点的单条记录大小,两者相加则是cross运算符单条输出记录的大小 float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord(); float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2; if (width > 0) { this.estimatedOutputSize = (long) (width * this.estimatedNumRecords); } } }从上面的两个运算符对预算的计算可见,它们大都依赖上游运算符的输出预算。而最初的预算肯定由source运算符决定,因为只有source才能知道数据的具体规模。
所以,我们来看一下DataSourceNode,很明显它作为数据的输入源,是最有可能了解初始数据集大小的运算符,为此Flink定义了一个专门用于统计的对象BaseStatistics,它用于统计对接外部的数据源的预算信息。但并非每个数据源的信息都能被统计到,而Flink当前也只实现了以文件为输入的FileInputFormat的预算统计FileBaseStatistics。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)