hive 中udf,udaf,udtf

    xiaoxiao2021-03-25  201

    UDF步骤: 1.继承org.apache.hadoop.hive.ql.exec.UDF 2.实现evaluate函数,evaluate函数支持重载 [java]  view plain  copy package cn.sina.stat.hive.udf;   import java.util.Arrays;   import org.apache.hadoop.hive.ql.exec.UDF;   public final class SortFieldContent extends UDF {           public String evaluate( final String str, String delimiter) {                  if (str == null ) {                         return null ;                 }                  if (delimiter == null) {                        delimiter = "," ;                 }                 String[] strs = str.split(delimiter);                 Arrays. sort(strs);                 String result = "" ;                  for (int i = 0; i < strs. length; i++) {                         if (result.length() > 0) {                              result.concat(delimiter);                        }                        result.concat(strs[i]);                 }                  return result;          }              public String evaluate( final String str, String delimiter, String order) {                  if (str == null ) {                         return null ;                 }                  if (delimiter == null) {                        delimiter = "," ;                 }                  if (order != null && order.toUpperCase().equals( "ASC" )) {                         return evaluate(str, delimiter);                 } else {                        String[] strs = str.split(delimiter);                        Arrays. sort(strs);                        String result = "" ;                         for (int i = strs. length - 1; i >= 0; i--) {                               if (result.length() > 0) {                                     result.concat(delimiter);                              }                              result.concat(strs[i]);                        }                         return result;                 }          }   }  
    UDAF步骤: 1.函数类继承org.apache.hadoop.hive.ql.exec.UDAF    内部类实现接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator 2.Evaluator需要实现 init、iterate、terminatePartial、merge、terminate这几个函数    具体执行过程如图: [java]  view plain  copy package cn.sina.stat.hive.udaf;   import java.util.Arrays;   import org.apache.hadoop.hive.ql.exec.UDAF;   import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;      public class ConcatClumnGroupByKeyWithOrder extends UDAF {        public static class ConcatUDAFEvaluator implements UDAFEvaluator {             public static class PartialResult {                  String result;                  String delimiter;                  String order;             }                private PartialResult partial;                public void init() {                  partial = null;             }                public boolean iterate(String value, String delimiter, String order) {                     if (value == null) {                       return true;                  }                  if (partial == null) {                       partial = new PartialResult();                       partial.result = new String("");                       if (delimiter == null || delimiter.equals("")) {                            partial.delimiter = new String(",");                       } else {                            partial.delimiter = new String(delimiter);                       }                       if (order != null                                 && (order.toUpperCase().equals("ASC") || order                                           .toUpperCase().equals("DESC"))) {                            partial.order = new String(order);                       } else {                            partial.order = new String("ASC");                       }                     }                  if (partial.result.length() > 0) {                       partial.result = partial.result.concat(partial.delimiter);                  }                     partial.result = partial.result.concat(value);                     return true;             }                public PartialResult terminatePartial() {                  return partial;             }                public boolean merge(PartialResult other) {                  if (other == null) {                       return true;                  }                  if (partial == null) {                       partial = new PartialResult();                       partial.result = new String(other.result);                       partial.delimiter = new String(other.delimiter);                       partial.order = new String(other.order);                  } else {                       if (partial.result.length() > 0) {                            partial.result = partial.result.concat(partial.delimiter);                       }                       partial.result = partial.result.concat(other.result);                  }                  return true;             }                public String terminate() {                  String[] strs = partial.result.split(partial.delimiter);                  Arrays.sort(strs);                  String result = new String("");                  if (partial.order.equals("DESC")) {                       for (int i = strs.length - 1; i >= 0; i--) {                            if (result.length() > 0) {                                 result.concat(partial.delimiter);                            }                            result.concat(strs[i]);                       }                  } else {                       for (int i = 0; i < strs.length; i++) {                            if (result.length() > 0) {                                 result.concat(partial.delimiter);                            }                            result.concat(strs[i]);                       }                  }                  return new String(result);             }        }   }  
    UDTF步骤: 1.继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF 2.实现initialize, process, close三个方法      a.initialize初始化验证,返回字段名和字段类型      b.初始化完成后,调用process方法,对传入的参数进行处理,通过forword()方法把结果返回      c.最后调用close()方法进行清理工作 [java]  view plain  copy package cn.sina.stat.hive.udtf;   import java.util.ArrayList;   import java.util.Arrays;   import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;   import org.apache.hadoop.hive.ql.exec.UDFArgumentException;   import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;   import org.apache.hadoop.hive.ql.metadata.HiveException;   import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;   import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;   import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;   import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;      public class SortFieldExplodeToPair extends GenericUDTF {           @Override        public void close() throws HiveException {             // TODO Auto-generated method stub        }           @Override        public StructObjectInspector initialize(ObjectInspector[] args)                  throws UDFArgumentException {             if (args.length != 3) {                  throw new UDFArgumentLengthException(                            "SortFieldExplodeToPair takes only three argument");             }             if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {                  throw new UDFArgumentException(                            "SortFieldExplodeToPair takes string as first parameter");             }             if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {                  throw new UDFArgumentException(                            "SortFieldExplodeToPair takes string as second parameter");             }             if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) {                  throw new UDFArgumentException(                            "SortFieldExplodeToPair takes string as third parameter");             }             if (args[2] == null                       || !(args[2].toString().toUpperCase().equals("ASC") || args[2]                                 .toString().toUpperCase().equals("DESC"))) {                  throw new UDFArgumentException(                            "SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\"");             }                ArrayList<String> fieldNames = new ArrayList<String>();             ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();             fieldNames.add("col1");             fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);                return ObjectInspectorFactory.getStandardStructObjectInspector(                       fieldNames, fieldOIs);        }           private final String[] forwardStr = new String[1];           @Override        public void process(Object[] args) throws HiveException {             String input = args[0].toString();             String delimiter = args[1].toString();             String order = args[2].toString();             String[] strList = input.split(delimiter);             Arrays.sort(strList);             if (strList.length > 1) {                  if (order.toUpperCase().equals("DESC")) {                       for (int i = strList.length - 1; i > 0; i--) {                            forwardStr[0] = strList[i].concat(delimiter).concat(                                      strList[i - 1]);                            forward(forwardStr);                       }                  } else {                       for (int i = 0; i < strList.length - 1; i++) {                            forwardStr[0] = strList[i].concat(delimiter).concat(                                      strList[i + 1]);                            forward(forwardStr);                       }                  }             } else {                  forward(strList);             }        }   }  
    转载请注明原文地址: https://ju.6miu.com/read-580.html

    最新回复(0)