性能优化:
广播大变量!
从这往后讲的都是性能调优的小的技术点,可能总共30分钟作业!小的点你用一个减少1分钟!
用户访问session分析模块中的,按时间比例随机抽取
这里随机抽取出来的dateHourExtractMap做成了final,然后每个task根据随机抽取map进行抽取了,是不是用到了我们的
dateHourExtractMap,默认情况下是怎么样的?默认情况下每个task拷贝一份map的副本!
这种默认的,task执行的算子中,使用了外部的变量,每个task都会获取一份变量的副本,什么问题?
比如,map是1M。总共,你前面调优都调的特好,资源给的到位,配合着资源,并行度调节的绝对到位,1000个task。大量task的确都在并行运行。
这些task里面都用到了占用1M内存的map,那么首先,map会拷贝1000份副本,通过网络传输到各个task中去,给task使用。总计有1G的数据,会通过网络传输。网络传输的开销很大!网络传输,也许就会消耗掉你的spark作业运行的总时间的一小部分。
map副本,传输到了各个task上之后,是要占用内存的。1个map的确不大,1M;1000个map分布在你的集群中,
一下子就耗费掉1G的内存。对性能会有什么影响呢?
不必要的内存的消耗和占用,就导致了,你在进行RDD持久化到内存,也许就没法完全在内存中放下;
就只能写入磁盘,最后导致后续的操作在磁盘IO上消耗性能;
你的task在创建对象的时候,也许会发现堆内存放不下所有对象,也许就会导致频繁的垃圾回收器的回收,
GC。GC的时候,一定是会导致工作线程停止,也就是导致Spark暂停工作那么一点时间。频繁GC的话,
对Spark作业的运行的速度会有相当可观的影响。
刚才说的这种随机抽取的map,1M,举例。还算小的。如果你是从哪个表里面读取了一些维度数据,比方说,
所有商品品类的信息,在某个算子函数中要使用到。100M。
1000个task。100G的数据,网络传输。集群瞬间因为这个原因消耗掉100G的内存。
如果说,task使用大变量(1m~100m),明知道会导致性能出现恶劣的影响。那么我们怎么来解决呢?
广播,Broadcast,将大变量广播出去。而不是直接使用。
广播变量,初始的时候,就在Drvier上有一份副本。
task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中,
尝试获取变量副本;如果本地没有,那么就从Driver远程拉取变量副本,并保存在本地的BlockManager中;
此后这个executor上的task,都会直接使用本地的BlockManager中的副本。
executor的BlockManager除了从driver上拉取,也可能从其他节点的BlockManager上拉取变量副本。
HttpBroadcast TorrentBroadcast(默认)
BlockManager
负责管理某个Executor对应的内存和磁盘上的数据,尝试在本地BlockManager中找map
广播变量的好处,不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,
就可以让变量产生的副本大大减少。
举例来说,(虽然是举例,但是基本都是用我们实际在企业中用的生产环境中的配置和经验来说明的)。
50个executor,1000个task。一个map,10M。
默认情况下,1000个task,1000份副本。10G的数据,网络传输,在集群中,耗费10G的内存资源。
如果使用了广播变量。50个execurtor,50个副本。500M的数据,网络传输,
而且不一定都是从Driver传输到每个节点,还可能是就近从最近的节点的executor的bockmanager
上拉取变量副本,网络传输速度大大增加;500M的内存消耗。
10000M,500M,20倍。20倍~以上的网络传输性能消耗的降低;20倍的内存消耗的减少。
对性能的提升和影响,还是很客观的。
虽然说,不一定会对性能产生决定性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,
速度快了2分钟,或者5分钟。但是一点一滴的调优,积少成多。最后还是会有效果的。
没有经过任何调优手段的spark作业,16个小时;三板斧下来,就可以到5个小时;
然后非常重要的一个调优,影响特别大,shuffle调优,2~3个小时;应用了10个以上的性能调优的技术点
,JVM+广播,30分钟。16小时~30分钟。
那最后我们做一下,怎么做?就是把dateHourExtractMap做成广播变量Broadcast
广播变量,很简单
其实就是SparkContext的broadcast()方法,传入你要广播的变量,即可
final Broadcast<Map<String, Map<String, IntList>>> dateHourExtractMapBroadcast =
sc.broadcast(fastutilDateHourExtractMap);
使用广播变量的时候
直接调用广播变量(Broadcast类型)的value() / getValue()
可以获取到之前封装的广播变量
转载请注明原文地址: https://ju.6miu.com/read-8194.html