Spark中的mapPartitions

    xiaoxiao2021-03-25  128

    在使用Spark链接到外部服务过程中,比如JDBC等,如果对于RDD中每一条信息建立一个链接,会导致链接数过多,而且在这种情况下,快速且大量的简历链接和释放,会造成比较大的资源浪费。

    在这种情况下,特别是如果有batch接口的情况下,通过mapPartitions,对一个Partition中的数据来说,只建立一个链接或者通过这一个链接进行batch请求,会在很大程度上的节约现有的资源,提高资源的利用效率。

    在使用mapPartitions之后,每个mapPartions中操作的对象是Rdd中原有对象的一个Iterator。举例如下:

    var rdd1 = sc.makeRDD(1 to 5,2) //rdd1有两个分区 scala> var rdd3 = rdd1.mapPartitions{ x => { | var result = List[Int]() | var i = 0 | while(x.hasNext){ | i += x.next() | } | result.::(i).iterator | }} rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23 //rdd3将rdd1中每个分区中的数值累加 scala> rdd3.collect res65: Array[Int] = Array(3, 12) scala> rdd3.partitions.size res66: Int = 2

    转载请注明原文地址: https://ju.6miu.com/read-12678.html

    最新回复(0)