我们用简单的文本文档工作流来说明,
上面,顶行表示有三个步骤的流水线,前面的二个步骤(Tokenizer and HashingTF)是转换器(蓝色),第三个步骤(LogisticRegression)是评估器(红色),底行代表通过流水线的数据流,圆柱体代表DataFrames,方法Pipeline.fit()在原始的DataFrame上调用,它有原始的文本文档与标签,方法 Tokenizer.transform()分割原始的文本文档为单词集,给DataFrame添加新的words列,方法HashingTF.transform()转换words列为特征向量,把特征向量做为新的列添加进DataFrame,现在,因为LogisticRegression是一个评估器,流水线首先调用LogisticRegression.fit()方法生生逻辑回归模型,如果流水线有更多的步骤,在把DataFrame传输到下一个步骤前,它将在DataFrame调用方法LogisticRegressionModel’s transform() 流水线是一个评估器,因此在Pipeline’s fit()方法运行后,生成一个流水线模型(PipelineModel),它是一个转换器,
这个PipelineModel用来测试,下图说明了这个用途:
细节 DAG流水线:一个流水线的阶段用一个顺序数组指定,这里给出的例子全部是线性流水线,每一个步骤用前一个步骤产生的数据,创建非线性的流水线是可能的,只要数据流图形成一个有向无环图,这个图一般是基于每个步骤输入输出的列名隐式指定的(一般做为参数指定),如果流水线形成一个DAG,每一个步骤必须按拓扑顺序指定。 运行时检查:因为流水线能够操作具有不同类型数据的DataFrames,它不能使用编译时类型检查,流水线与流水线模型在真正实际运行流水线前做运行时检查,通过DataFrame的结构做类型检查,DataFrame的结构是列的数据类型的描述唯一的流水步骤:流水线的步骤必须是唯一的实例,比如,因为流水线的步骤必须有唯一的IDs,所以myHashingTF的同一个实例不能插入流水线二次,然而,不同的实例myHashingTF1和myHashingTF2能放进同一个流水线,因为不同的实例将用不同的ID创建 参数 机器学习库里的评估器与转换器用统一的API指定参数,参数是包含独立文档的命名参数,ParamMap是(parameter, value)对的集合, 有二个主要的方法传参数给一个算法: 1.为一个实例设置参数,比如:lr是一个逻辑回归对象实例,可以通过调用lr.setMaxIter(10)来使lr.fit()最多迭代10次 2.传递一个ParamMap给方法fit()或者transform(),在ParamMap里的任务参数将覆盖通过set方法指定的参数 保存与加载管道 通常保存流水线与模型到磁盘以备将来使用是值得的,在Spark 1.6模型输入输出功能被加入到流水线API,大多数转换器与一些基本的机器学习模型都支持
代码例子
Example: Estimator, Transformer, and Param
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.Row // Prepare training data from a list of (label, features) tuples. val training = spark.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features") // Create a LogisticRegression instance. This instance is an Estimator. val lr = new LogisticRegression() // Print out the parameters, documentation, and any default values. println("LogisticRegression parameters:\n" + lr.explainParams() + "\n") // We may set parameters using setter methods. lr.setMaxIter(10) .setRegParam(0.01) // Learn a LogisticRegression model. This uses the parameters stored in lr. val model1 = lr.fit(training) // Since model1 is a Model (i.e., a Transformer produced by an Estimator), // we can view the parameters it used during fit(). // This prints the parameter (name: value) pairs, where names are unique IDs for this // LogisticRegression instance. println("Model 1 was fit using parameters: " + model1.parent.extractParamMap) // We may alternatively specify parameters using a ParamMap, // which supports several methods for specifying parameters. val paramMap = ParamMap(lr.maxIter -> 20) .put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter. .put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params. // One can also combine ParamMaps. val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name. val paramMapCombined = paramMap ++ paramMap2 // Now learn a new model using the paramMapCombined parameters. // paramMapCombined overrides all parameters set earlier via lr.set* methods. val model2 = lr.fit(training, paramMapCombined) println("Model 2 was fit using parameters: " + model2.parent.extractParamMap) // Prepare test data. val test = spark.createDataFrame(Seq( (1.0, Vectors.dense(-1.0, 1.5, 1.3)), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)) )).toDF("label", "features") // Make predictions on test data using the Transformer.transform() method. // LogisticRegression.transform will only use the 'features' column. // Note that model2.transform() outputs a 'myProbability' column instead of the usual // 'probability' column since we renamed the lr.probabilityCol parameter previously. model2.transform(test) .select("features", "label", "myProbability", "prediction") .collect() .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) => println(s"($features, $label) -> prob=$prob, prediction=$prediction") }
Example: Pipeline
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.feature.{HashingTF, Tokenizer} import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.Row // Prepare training documents from a list of (id, text, label) tuples. val training = spark.createDataFrame(Seq( (0L, "a b c d e spark", 1.0), (1L, "b d", 0.0), (2L, "spark f g h", 1.0), (3L, "hadoop mapreduce", 0.0) )).toDF("id", "text", "label") // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. val tokenizer = new Tokenizer() .setInputCol("text") .setOutputCol("words") val hashingTF = new HashingTF() .setNumFeatures(1000) .setInputCol(tokenizer.getOutputCol) .setOutputCol("features") val lr = new LogisticRegression() .setMaxIter(10) .setRegParam(0.01) val pipeline = new Pipeline() .setStages(Array(tokenizer, hashingTF, lr)) // Fit the pipeline to training documents. val model = pipeline.fit(training) // Now we can optionally save the fitted pipeline to disk model.write.overwrite().save("/tmp/spark-logistic-regression-model") // We can also save this unfit pipeline to disk pipeline.write.overwrite().save("/tmp/unfit-lr-model") // And load it back in during production val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model") // Prepare test documents, which are unlabeled (id, text) tuples. val test = spark.createDataFrame(Seq( (4L, "spark i j k"), (5L, "l m n"), (6L, "mapreduce spark"), (7L, "apache hadoop") )).toDF("id", "text") // Make predictions on test documents. model.transform(test) .select("id", "text", "probability", "prediction") .collect() .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) => println(s"($id, $text) --> prob=$prob, prediction=$prediction") }