这篇博客主要是利用Titanic dataset来简单演示pyspark 1.6.1的使用方法。 这组数据比较小,训练数据只有891行,训练、测试数据可以在这里下载(train.csv, test.csv)。
当我们运行pyspark之后,SparkContect (sc)就同时运行了。 我们利用sc.textFile读取csv文件,生成的数据格式为RDD。 与此同时,我们也可以使用sqlContext.read.text读取csv文件,但是生成数据格式为DataFrame。
train_path='/Users/chaoranliu/Desktop/github/kaggle/titanic/train.csv' test_path='/Users/chaoranliu/Desktop/github/kaggle/titanic/test.csv' # Load csv file as RDD train_rdd = sc.textFile(train_path) test_rdd = sc.textFile(test_path)让我们看看前3行RDD数据:
train_rdd.take(3)数据的结构是python list, 每一行是一个string。
[u'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked', u'1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S', u'2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C']Spark DataFrame 是从R data frame 和 python pandas DataFrame 得到的灵感,它是Spark 新的数据格式,在以后版本会取代RDD。它的语法与RDD不同,会更加接近R和pandas. 这里我会把RDD转化为DataFrame,以便后面的数据处理。
步骤:
去掉数据标题(第一行)用逗号分割每行数据 并 转化为tuple用数据标题命名数据列 # Parse RDD to DF def parseTrain(rdd): # extract data header (first row) header = rdd.first() # remove header body = rdd.filter(lambda r: r!=header) def parseRow(row): # a function to parse each text row into # data format # remove double quote, split the text row by comma row_list = row.replace('"','').split(",") # convert python list to tuple, which is # compatible with pyspark data structure row_tuple = tuple(row_list) return row_tuple rdd_parsed = body.map(parseRow) colnames = header.split(",") colnames.insert(3,'FirstName') return rdd_parsed.toDF(colnames) ## Parse Test RDD to DF def parseTest(rdd): header = rdd.first() body = rdd.filter(lambda r: r!=header) def parseRow(row): row_list = row.replace('"','').split(",") row_tuple = tuple(row_list) return row_tuple rdd_parsed = body.map(parseRow) colnames = header.split(",") colnames.insert(2,'FirstName') return rdd_parsed.toDF(colnames) train_df = parseTrain(train_rdd) test_df = parseTest(test_rdd)现在让我们看看DataFrame的格式:
train_df.show(3)+———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ |PassengerId|Survived|Pclass|FirstName| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+ | 1| 0| 3| Braund| Mr. Owen Harris| male| 22| 1| 0| A/5 21171| 7.25| | S| | 2| 1| 1| Cumings| Mrs. John Bradle…|female| 38| 1| 0| PC 17599|71.2833| C85| C| | 3| 1| 3|Heikkinen| Miss. Laina|female| 26| 0| 0|STON/O2. 3101282| 7.925| | S| +———–+——–+——+———+——————–+——+—+—–+—–+—————-+——-+—–+——–+
合并训练和测试数据,方便后便的数据清理 和 特征提取。
## Add Survived column to test from pyspark.sql.functions import lit, col train_df = train_df.withColumn('Mark',lit('train')) test_df = (test_df.withColumn('Survived',lit(0)) .withColumn('Mark',lit('test'))) test_df = test_df[train_df.columns] ## Append Test data to Train data df = train_df.unionAll(test_df)可以看到 Age, SibSp, Parch, Fare 四个变量已经转变为数值数据了:
root |-- PassengerId: string (nullable = true) |-- Survived: double (nullable = true) |-- Pclass: string (nullable = true) |-- FirstName: string (nullable = true) |-- Name: string (nullable = true) |-- Sex: string (nullable = true) |-- Age: double (nullable = true) |-- SibSp: double (nullable = true) |-- Parch: double (nullable = true) |-- Ticket: string (nullable = true) |-- Fare: double (nullable = true) |-- Cabin: string (nullable = true) |-- Embarked: string (nullable = true) |-- Mark: string (nullable = false)Age, Fare 有 263, 1 个缺失数据,这里我简单地用平均值用填充。
numVars = ['Survived','Age','SibSp','Parch','Fare'] def countNull(df,var): return df.where(df[var].isNull()).count() missing = {var: countNull(df,var) for var in numVars} age_mean = df.groupBy().mean('Age').first()[0] fare_mean = df.groupBy().mean('Fare').first()[0] df = df.na.fill({'Age':age_mean,'Fare':fare_mean})各个数据的缺失情况:
{'Age': 263, 'Fare': 1, 'Parch': 0, 'SibSp': 0, 'Survived': 0}这里的主要思想是创建一个 user-defined-function (udf) 应用在Name列,来抓取Title。
from pyspark.sql.functions import udf from pyspark.sql.types import StringType ## created user defined function to extract title getTitle = udf(lambda name: name.split('.')[0].strip(),StringType()) df = df.withColumn('Title', getTitle(df['Name'])) df.select('Name','Title').show(3)数据df多一列Title:
+--------------------+-----+ | Name|Title| +--------------------+-----+ | Mr. Owen Harris| Mr| | Mrs. John Bradle...| Mrs| | Miss. Laina| Miss| +--------------------+-----+ only showing top 3 rows类别变量通常需要转化数值变量才可以套用一些机器学习的算法。这里我只是简单地利用引索来实现这个功能。例如这样的映射Sex - male => 0, Sex - female =>1。但是这种方法也有它的不足,因为在无形中引进的人为的变量之间数值关联。One-hot-encoding方法可以避免这个不足,但是会大幅增加数据维度(特征数量)
catVars = ['Pclass','Sex','Embarked','Title'] ## index Sex variable from pyspark.ml.feature import StringIndexer si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed') df_indexed = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed','Sex') ## make use of pipeline to index all categorical variables def indexer(df,col): si = StringIndexer(inputCol = col, outputCol = col+'_indexed').fit(df) return si indexers = [indexer(df,col) for col in catVars] from pyspark.ml import Pipeline pipeline = Pipeline(stages = indexers) df_indexed = pipeline.fit(df).transform(df) df_indexed.select('Embarked','Embarked_indexed').show(3)在生成的数据里,Embarked 被映射为 S=>0, C=>1, Q=>2:
+--------+----------------+ |Embarked|Embarked_indexed| +--------+----------------+ | S| 0.0| | C| 1.0| | S| 0.0| +--------+----------------+ only showing top 3 rows为了使用ml/mllib算法包,我们需要把特征转变为一个Vector.
catVarsIndexed = [i+'_indexed' for i in catVars] featuresCol = numVars+catVarsIndexed featuresCol.remove('Survived') labelCol = ['Mark','Survived'] from pyspark.sql import Row from pyspark.mllib.linalg import DenseVector row = Row('mark','label','features') df_indexed = df_indexed[labelCol+featuresCol] # 0-mark, 1-label, 2-features # map features to DenseVector lf = (df_indexed.map(lambda r: (row(r[0],r[1],DenseVector(r[2:])))) .toDF()) # index label # convert numeric label to categorical, which is required by # decisionTree and randomForest lf = (StringIndexer(inputCol = 'label',outputCol='index') .fit(lf) .transform(lf)) lf.show(3) +-----+-----+--------------------+-----+ | mark|label| features|index| +-----+-----+--------------------+-----+ |train| 0.0|[22.0,1.0,0.0,7.2...| 0.0| |train| 1.0|[38.0,1.0,0.0,71....| 1.0| |train| 1.0|[26.0,0.0,0.0,7.9...| 1.0| +-----+-----+--------------------+-----+ only showing top 3 rowsml对应的数据格式是DataFrame,而mllib对应的数据格式是RDD。 接下来,我会用逻辑回归,决策树,随机森林来做拟合,并观察它们的模型表现。
AUC ROC of Logistic Regression model is: 0.836952368823 逻辑回归模型的 ROC 0.837 ,接下来我们会与决策树和随机森林作比较。
在没有模型调测的情况下,随机森林看上去有更好的预测效果。
完整的python代码可以在 这里找到