上节说到SparkSQL中将Sort分为两部分:第一部分是基于boundary的range repartition,通过采样确定每个partition的上下界,然后将数据按照上下界重新分区;第二部分呢,就是在分区内将数据进行排序。完成这两步之后,整张表中的数据就变成有序的了。
分区内的排序是借助UnsafeExternalRowSorter来完成的,而它其中又嵌套了一个UnsafeExternalSorter,排序的主要逻辑靠后者来完成,前者主要做一些转换和适配工作。
这是一个用来存储记录数据的地址和prefix的Sorter。其实质上是一个LongArray,奇数为存储记录的地址,偶数位置存储记录的prefix。地址和prefix都是Long类型。
对记录进行排序时,首先判断两条记录的prefix是否相等,如果根据prefix就可以判断出两条记录的大小,那么直接返回结果。否则从相应的地址中拿出两条记录进行进一步的比较。相对于真实存储的记录来说,他们的地址和prefix占用的空间都比较小,在比较时遍历较小的数据结构更有利于提高cache命中率。
比较的详细过程在后文描述。
另外值得注意的一点时,其存储底层LongArray只有一部分内存实际被存储使用,另外一部分是给排序预留的,预留的空间比例是1/2(Radix Sort)或者1/3(Tim Sort)。
首先,这是一个可以独立申请内存(当然也包括释放内存和spill到磁盘)的MemoryConsumer。有一个链表用来保存其使用到的MemoryBlock (allocatedPages),另外一个链表维护spill到磁盘的信息(spillWriters),例如溢出文件路径、blockId、溢出的记录数等。并有各种数据结构跟踪当前的内存使用和分配情况。
其次,这个数据结构的主要功能用于给记录排序。主要的辅助类在上面已经提到了,是一个基于地址和prefix的内存排序器。
这里面用到的其他辅助数据结构有:
一个简单的prefix比较器。比较使用prefix computer计算出来的相对于每条记录的prefix值。其实就是比较他们相对应的signed long和unsigned long值。关于prefix computer的细节下面会介绍。
根据ordering构造,用来比较两条记录的大小。当prefix comparator不适用时,则使用此比较器对记录进行全维度比较。
数据的插入分为两个部分,一个是真实数据的插入,另外一个是其索引的更新。后者这个点很重要,每个数据的索引包含了真是数据插入的地址和其prefix值,当记录可以只用prefix来比较大小时,那么真实数据的排序就转换为了索引的排序。数据排序和指针排序哪个更快,一目了然。
一条记录的prefix值是由prefix computer计算而来。计算一个Row的prefix值,输入的是一个Row对象,输出的是一个Long值。顾名思义,这个东东的计算基准是SortOrder的第一个维度在Row对象上的映射,映射的元素根据数据类型来计算出一个Long值。具体的数据类型和计算方法如下表:
Data TypeHow to computeBooleantrue -> 1, false -> 0Integer强转LongDate/Timestamp强转LongFloat/Double先强转Double(如果是Float),Double转Long比较复杂,参考这里:http://stereopsis.com/radix.htmlString其实就是Array[Byte],会根据JVM是大/小端、4/8字节对齐分别对待,具体参考UTF8String.getPrefix()函数Binary参考ByteArray.getPrefixDecimal(小精度)参考Decimal.toUnscaledLong函数Decimal(大精度)按照Double来处理真实数据插入时调用Platform的API,这个地方也有一个优化,就是在插入数据之前先写一个length值,表示数据占用的空间大小,方便后续取数据使用。
当然每插入一条数据前,都会check一下索引页和数据页是否有足够的空间来容纳新的数据,没有的话会扩大内存。这边申请内存使用的就是Spark新引入的统一内存管理,内存不足时会挤压其他MemoryConsumer的内存,迫使其溢出到磁盘,从而获取可用内存。
注意,在判断索引数组是否有足够内存时,实际上是判断总占用内存是否达到了其1/2(使用radix sort)或者2/3(使用Tim sort)。
真实数据插入时也基本上是重复以上流程:)
索引页和数据页的内容以及对应关系如下图所示:
如果能只根据prefix将数据进行排序的话,radix sort是最好用的排序算法。
radix sort也就是俗称的基数排序,这个算法牛逼了,能达到O(w*n)的时间复杂度(其中n是需要排序的记录数,w是排序的位数),参考:Radix sort。
上面已经介绍过了,在ordering只有一个维度时,可以通过只排序索引数组的方式达到将数据数组排序的效果。具体的实现就是通过radix sort将prefix进行排序,通过prefix和address的关联,实现数据数组的排序。
radix sort的排序过程分以下几步:
统计每个位上的直方图,计算此位上每个桶的插入offset从头遍历prefix值,将其插入到指定桶的位置,并将桶的位置加1计算高一位的直方图,重复1、2步直到将所有的位排序完毕在SparkSQL的排序中,使用了位计算的技巧将计算使用的空间最小化,此处简略图示如下:
当然,这里只取了4条数据演示,实际排序的数据也只有两位数。而在实际的SparkSQL的排序中,数据的个数即记录的总个数,而prefix的位数也有64位,分为每8位一组。
另外,前面提到的radix sort的情况下,索引数组为什么要空一半的原因也揭示了。没错,就是为了排序过程中做临时存储空间使用的。
当不能简单的使用prefix对数据进行排序的时候,Radix sort就不适用了,这里使用Tim sort进行排序,其中的comparator首先比较prefix的值,当两条记录的prefix值相同时,才取两条记录,按照row comparator进行大小的比较。
Tim sort的原理在这里。它是插入排序和归并排序的结合。分为两个步骤:
首先,将待排序的数据分为几个片段分别进行插入排序,片段的长度由待排序数据的长度而定,在Spark SQL中这个数字是16-32之间。排序后的片段索引(片段开始位置和片段长度)存储在堆栈中。
其次,将排序好的n个片段两两进行归并排序,归并排序的选择方式为:
记栈顶自上而下的三个元素分别是X/Y/Z,当X长度小于等于Y+Z的长度时,取Y和X/Z中较短的那个进行归并排序,直到所有片段归并排序完成。
Tim sort的排序过程如下图所示:
前面说的排序都是基于插入时没有发生spill的情况,当插入时有数据spill到磁盘时,Spark SQL是怎么处理的呢?
其实很简单,在spill时,会将已经读取的数据进行排序(排序过程和上述一致),排序后写到磁盘内,这样每次spill的数据就都是有序的了。
形成多个spill文件后,再读取时,类似于merge sort的思路,先比较他们几个文件的第一个元素的大小,最小的那个先读取,Spark SQL中是使用priority queue来实现的。如下图所示:
可以看出,SparkSQL在排序中做了大量的优化,spill机制保证了内存不足时可以将数据spill到磁盘上,从而保证稳定性;使用Radix sort在数据仅有一维排序时,缩小排序所访问的内存空间,提高cache命中率;在多维排序时,也使用了目前对真实数据排序时间复杂度最低的Tim sort算法。
当然在整个SparkSQL中更是使用了大量的内存管理方式使得计算过程中的额外内存消耗更低,减少GC频率;在整个排序过程中也尽量使用位运算的方式来进行信息统计和收集,提高CPU指令效率。经过这些优化,SparkSQL的排序基本上可以满足业界关于在大量数据下的场景应用。
声明:本文为原创,版权归本人所有,禁止用于任何商业目的,转载请注明出处:http://blog.csdn.net/asongoficeandfire/article/details/61668186
如果你觉得我文章写的不错,可以扫描支付宝二维码打赏我:)