有几种创建初始 RDD 的方式:
1. 通过Scala 集合创建 RDD
使用SparkContext的parallelize方法,为Scala集合的数据指定分片数,存储到内存中。例如:sc.parallelize(List(1,2,3), 2); //对List(1, 2, 3)进行并行化, 并行度为2(把scala的Seq序列,分为2片)2. 通过读取本地文件或HDFS上的文件创建RDD
1)文本文件( TextInputFormat)
sc.textFile(“file://path/file.txt”) //将本地文本文件加载成RDD sc.textFile(“directory/*.txt”) //将某类文本文件加载成RDDsc.textFile(“hdfs://nn:9000/path/file”) //hdfs文件或目录
textFile()方法创建RDD:将文本文件加载成RDD,一条记录就代表了 text file里的一行文本
通过这种方法创建的RDD,RDD的分区是怎么分的呢?
①对于在HDFS上存放的文本文件来说:
文件有多少个block,通过textFile创建的初始输入RDD就有几个partition
如果一个文件不足一个block,那么它也是根据1个block来进行分区的,分区数为 1
②对于读取本地的文本文件来说:
需要同一个文件存在所有的worker node上面,在读取的时候每个worker node的task会去读取本文件的一部分。
spark app所运行的节点也需要有这个file,因为需要用到file进行Partition划分。默认分片为 2,可以通过 textFile(path, minPartitions)指定分片数目。
2)sequenceFile文件( SequenceFileInputFormat) sc.sequenceFile(“file.txt”) //将本地二进制文件加载成RDD sc.sequenceFile[String, Int] (“hdfs://nn:9000/path/file”) //hdfs文件或目录
3)使用任意自定义的Hadoop InputFormat
sc.hadoopFile(path, inputFormat, keyClass, valClass) // 与
例如:
val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
