spark udf实战

    xiaoxiao2025-08-04  17

    package com.dt.sparksql import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by fengli_pt on 2016/6/29. * * 通过案例实战Spark sql下的UDF和UDAF的具体使用 * UDF:用户自定义的函数,函数的输入是一条具体的数据记录,实现上讲就是普通的Scala函数 * UDAF:用户自定义的聚合函数,函数本身作用于数据集合,能够在聚合操作的基础上自定义操作 * * 实质上讲,例如说UDF会被Spark SQL中的catalyst封装成为Expression,最终会通过eval方法来计算输入的数据row(此处的row和DataFrame中的Row没有任何关系) * */ object SparkSQLUDF { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkSQLUDF").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc); val bigData = Array("spark","hadoop","kafka","hive","flume","hadoop","kafka","lfeng"); /** * 基于数据创建DataFrame * */ val bigDataRDD = sc.parallelize(bigData); val bigDataRDDRow = bigDataRDD.map(item => Row(item)) val structType = StructType(Array(StructField("word",StringType,true))); val bigDataDF = sqlContext.createDataFrame(bigDataRDDRow,structType); bigDataDF.registerTempTable("bigDataTable"); //注册成临时表 /** *通过sqlcontext注册UDF,在scala 2.10.x版本UDF函数最多可以接收22个输入参数 * */ sqlContext.udf.register("computerLength",(input:String)=>input.length) //直接在SQL语句中使用UDF,就像使用SQL自带的内部函数一样 sqlContext.sql("select word,computerLength(word) from bigDataTable").show(); sqlContext.udf.register("wordCount",new MyUDAF) sqlContext.sql("select word,computerLength(word) as length,wordCount(word)as count from bigDataTable group by word").show(); while(true){} } } /** * 按照模板实现自定义UDAF,按ctrl+i来加载需要实现的方法 * */ class MyUDAF extends UserDefinedAggregateFunction{ /** * 该方法指定具体输入数据的类型 * */ override def inputSchema: StructType = StructType(Array(StructField("input",StringType,true))) /** * 在进行聚合操作的时候所要处理的数据的结果的类型 * */ override def bufferSchema: StructType = StructType(Array(StructField("count",IntegerType,true))) /** * 指定UDAF函数计算后最终结果返回的数据类型 * */ override def dataType: DataType = IntegerType /** * 确保一致性的,一般设置为true * */ override def deterministic: Boolean = true /** * 在(聚合)Aggregate之前每组数据的初始化结果 * */ override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) =0 /** * 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算 * 本地的聚合操作,相当于hadoop mapreduce模型中的Combiner * */ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer(0) = buffer.getAs[Int](0) + 1; } /** * 最后再分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作 * */ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0); } /** * 返回UDAF最终的计算结果 * */ override def evaluate(buffer: Row): Any = buffer.getAs[Int](0) }
    转载请注明原文地址: https://ju.6miu.com/read-1301408.html
    最新回复(0)