win7+PySpark1.5.0下的pyspark crash for large dataset和bound method PipelinedRDD.count of PythonRDD[218]

    xiaoxiao2021-03-25  17

    win7环境PySpark1.5.0下的pyspark crash for large dataset和

    问题描述

    在学习《Spark机器学习》第四章时,参考风雪夜归子写的用Spark Python构建推荐系统 学习的过程中,发现执行如下代码段:

    from pyspark import SparkContext sc = SparkContext(appName="recommendation engine") #生成用户数据的RDD rawData=sc.textFile("D:/myPython/ML_Spark/Datasets/ml-100k/u.data") #显示第一行数据 print rawData.first() #分割用户数据,只取用户、电影和评分数据 rawRatings=rawData.map(lambda line: line.split('\t')[0:3]) #显示前5组数据 print rawRatings.take(5) #导入Spark Mlib的ALS模型,Rating类是对用户ID、影片ID(通常称物品Product)和实际星级封装成RDD from pyspark.mllib.recommendation import Rating,ALS #ratings = rawRatings.map(lambda (user,movie,rating):Rating(int(user),int(movie),float(rating))) ratings = rawRatings.map(lambda x:Rating(int(x[0]),int(x[1]),float(x[2]))) #显示Rating RDD的前5个数据。 print ratings.take(5) #现在可以开始训练模型了,所需的其他参数有以下几个。 #rank:对应ALS模型中的因子个数,也就是在低阶近似矩阵中的隐含特征个数。因子个数一般越多越好。但它也会直接影响模型训练和保存时所需的内存开销,尤其是在用户和物品很多的时候。因此实践中该参数常作为训练效果与系统开销之间的调节参数。通常,其合理取值为10到200。 #iterations:对应运行时的迭代次数。ALS能确保每次迭代都能降低评级矩阵的重建误差,但一般经少数次迭代后ALS模型便已能收敛为一个比较合理的好模型。这样,大部分情况下都没必要迭代太多次(10次左右一般就挺好)。 #lambda:该参数控制模型的正则化过程,从而控制模型的过拟合情况。其值越高,正则化越严厉。该参数的赋值与实际数据的大小、特征和稀疏程度有关。和其他的机器学习模型一样,正则参数应该通过用非样本的测试数据进行交叉验证来调整。 #作为示例,这里将使用的rank、iterations和lambda参数的值分别为50、10和0.01: model=ALS.train(ratings,50,10,0.01) #上述代码返回一个MatrixFactorizationModel对象,该对象将用户因子和物品因子保存在一个(id,factor)对的类型RDD中,分别称作: #userFeatures和productFeatures userFeatures = model.userFeatures() print userFeatures.take(2) print userFeatures.count #print model.userFeatures().count() print model.productFeatures().count()

    无论在Ecplise4.5.2PyDev中、Pthon自带的IDLE还是win7命令行,都频频报如下错误: 错误A:

    【图一】 错误A关键的错误信息就是图中有底色的部分,关键错误信息是: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost): java.net.SocketException: Connection reset by peer: socket write error 上网去百度,看了一些资料,可以明确就是pyspark无法处理大数据集的错误,即“pyspark crash for large dataset”,尤其在执行first()、take(n)等命令时频繁发生 错误B: 【图二】 关键的错误信息就是图中有底色的部分 错误是:“bound method PipelinedRDD.count of PythonRDD[218] at RDD at PythonRDD.scala:43>” 上网百度不到,具体原因不详。 错误A和错误B都是是随机性出现的,执行十几次,偶尔会成功一次。

    解决过程

    过程一、不知其所以然。 在学习《Spark机器学习》第三章、第四章特征向量提取等章节时,遇到first()、take(n)因为在命令行正常,在Ecplise偶尔正常偶然不正常,没有当回事。为了学习,发现一个规律,就是在Ecplise中不同时运行两条及以上的first()、take(n),通常都能正常运行,于是每次就是注释掉相应的代码,保留一行代码,勉强继续学习。 过程二、从Spark核心参数入手 当遇到上述代码,尤其是

    ratings = rawRatings.map(lambda x:Rating(int(x[0]),int(x[1]),float(x[2]))) print ratings.take(5)

    这两行代码,无论注释还是修改代码写法,比如用 ratings = rawRatings.map(lambda (user,movie,rating):Rating(int(user),int(movie),float(rating))) 但是无济于事,用“不知其所以然”的态度学不下去了,百度了半天,最有价值的就是Spark官网的issues-Spark12980-pyspark crash for large dataset - clone 里面关键的信息是: On Windows, PySpark textfile method, followed by take(1), does not work on a file of 13M. If I set numpartitions to 2000 or take a smaller file, the method works well. The Pyspark is set with all RAM memory of the computer thanks to the command –conf spark.driver.memory=5g in local mode. 我是懒人,首先想到的是调整Spark参数,用的第二句话,没有用numpartitions,从而走了弯路,耽误了半周时间,当然对自己了解Spark还是有帮助的,我就不班门弄斧了,可以看Spark Core配置参数说明。或SparkCore程序优化总结 我尝试了 调整了spark.executor.memory、spark.driver.memory、spark.cores.max等参数,甚至spark.shuffle.service.enabled:false、spark.dynamicAllocation.enabled: false这些动态资源分配参数,但是除了减少了报错概率,没有根本性的改善。相关代码(截取一部分)如下:

    cf=SparkConf().setMaster("local[2]").setAppName("recommendation engine").set("spark.executor.memory", "4g").set("spark.driver.memory","2g").set("spark.default.parallelism",2000) sc = SparkContext(conf=cf)

    既然增加内存,无法解决问题,就考虑RDD分区了,尝试设置spark.default.parallelism等参数,发现执行的时候不生效。NumPartitions始终是2,相关代码如下:

    ratings.getNumPartitions()

    过程三、设置minPartitions参数解决了问题 改Spark参数没有解决问题,回到了第一句话“If I set numpartitions to 2000 or take a smaller file, the method works well.” 方法一、用如下代码:

    ratings = rawRatings.map(lambda x:Rating(int(x[0]),int(x[1]),float(x[2]))) ratings=ratings_default.partitionBy(2000) print ratings.getNumPartitions() print ratings.take(5)

    采用这种生成Rating RDD对象后再设置分区的命令没有其作用,错误A和错误B依旧。 方案二、在textFile()中直接设置minPartitions参数,核心代码就是:

    rawData=sc.textFile("D:/myPython/ML_Spark/Datasets/ml-100k/u.data",minPartitions=50)

    尝试结果如下:

    minPartitions数目执行次数平均执行时间错误A错误B1011分钟出错出错5032分钟成功出错20029分钟成功出错500320分钟成功出错1000248分钟成功出错

    当minPartitions>=50以后,错误A解决了 无论如何调minPartitions,发现错误B依旧,说明错误A和错误B无关,根据日志找到出错位置,将相关代码摘出逐个在IDLE中执行,发现是代码

    print userFeatures.count

    问题,上PySpark官网,去浏览,改了代码写法就OK了。 【结论】 从错误A得出的结论:u.data数据集合是10万条,按照100000/2000=50得出minPartitions=50。能达到处理性能和有效性的最佳平衡点。当然实际工程不要这么冒险,还是取1000~1500之间为好。 从错误B得出的结论:就是

    userFeatures = model.userFeatures(). print userFeatures.take(2) print userFeatures.count

    这个写法在win7+pthon2.7.5+Spark1.5.0环境下不行,要改成

    print model.userFeatures().count()

    小结

    小结: 1、还是要深刻理解Spark的原理。minPartitions开始就要设置,因为RDD是最后调用执行算子的时候才执行,如果RDD生成后再设置是没有作用的,之前学Spark的时候,对这个原理体会不深,这次算是有深刻体会了,也就是不能在生成Rating对象之后才设置分区数目。 2、2000是个神奇的数字。建议每个RDD分区处理的数据不要超过2000条,2000对Java好像是个神秘的数字,2006年使用普元EOS,其XML总线返回结果集超过2000就OVER,要求限制SQL语句写法;2007年用WebLogic ESB的时候,因为异常超过2000,开了Case到美国,也没解决;2008年做集中告警系统,大面积告警同一个时刻超过2000的时候,双方系统的WebServie都挂了。这次又是2000问题。我大胆猜测应该和Java的底层实现有关。因为这部分的实现也是Python通过Py4调用Java实现的。 3、分区多少要合理设置。少了,会溢出无法执行,多了性能太差,执行时间过长。本文的例子是在一个笔记本电脑上,配置是Inteli7-4510U,2核CPU,8G内存,win7,64位,16G的SSD硬盘。真正在生产环境上,如何设置,肯定是不同的,需要去实践才行。 4、不同的版本,代码写法不一样,这个只能靠实践了,遇到了上官网上百度上、ITPUB了。 解决这两个问题我用了一周时间才搞定,尤其是错误A,甚至差点放弃了用Python学习《Spark机器学习》这本书的努力,一度怀疑自己的环境配置错了或者准备去学习Scala了或者改用Java去学习了,好在一直没有放弃,终于搞清楚了问题本质。而相关的问题,基本都是英文版的,而且都是Spark issue上的内容,版本也是针对linux、Ubuntu、Spark1.6.0、Python2.7.8的环境,故写下这个博客,分享下经验,希望使用Python+Spark搞机器学习的青蛙们少走些弯路!

    转载请注明原文地址: https://ju.6miu.com/read-149644.html

    最新回复(0)