一、自定义数据类型 在设计的系统开发过程之中,有可能要参与分析的文件会有很多,并且文件的组成结构也可能会非常的复杂,所以来讲在整个的Hadoop里面可以针对于用户的需求实现自定义类型。 现在假如说有以下一个信息文件,文件的组成格式(购物统计):
用户名[0]、省份[1]、城市[2]、购买日期时间[3]、商品名称[4]、商品分类[5]、商品子分类[6]、商品价格[7]、商品购买价格[8]希望可以通过一种数据类型能够描述出以下几种关系: 1、保留有省份的花销(商品原始价格、成交价格、折扣的价格); 2、保留有用户的花销(商品原始价格、成交价格、折扣的价格); 3、保留有商品分类的花销(商品原始价格、成交价格、折扣的价格);
在整个的MapReduce之中并不会提供有任何的一种类型来描述这种花销的结构,那么这样的话就需要定义一个自己的数据类型,而所有的数据类型一定要实现一个接口:org.apache.hadoop.io.Writable;
/** * 实现了自定义的记录数据价格的结构 * @author mldn */ public class RecordWritable implements Writable { private double price ; // 商品的原始价格 private double deal ; // 商品的成交价格 private double discount ; // 商品的折扣价格 public RecordWritable() { // RecordWritable类需要在读取的时候执行数据反序列化操作; } // RecordWritable类要在数据创建的时候接收内容 public RecordWritable(double price, double deal, double discount) { super(); this.price = price; this.deal = deal; this.discount = discount; } @Override public void write(DataOutput output) throws IOException { output.writeDouble(this.price); output.writeDouble(this.deal); output.writeDouble(this.discount); } @Override public void readFields(DataInput input) throws IOException { this.price = input.readDouble() ; this.deal = input.readDouble() ; this.discount = input.readDouble() ; } @Override public String toString() { return "RecordWritable [price=" + price + ", deal=" + deal + ", discount=" + discount + "]"; } public double getPrice() { return price; } public double getDeal() { return deal; } public double getDiscount() { return discount; } }最终的结果一定要对数据进行MapReduce 操作统计,所以在整个的统计里面,现在希望可以实现这样的操作统计: · 可以实现根据省份数据得到的统计结果; 定义Map 数据处理
/** * 针对于每一行发送的数据进行拆分处理,将每一行的数据拆分为key与value的形式<br> * 输入类型:关注的是value的内容;<br> * 输出类型:单词名称=数量(1)<br> * @author mldn */ private static class RecordMapper extends Mapper<Object, Text, Text, RecordWritable> { @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, RecordWritable>.Context context) throws IOException, InterruptedException { // 读取每一行的数据,Map是根据换行作为读取的分割符; String str = value.toString(); // 取的每一行的内容 // 所有的单词要求按照空格进行拆分 String result[] = str.split(","); // 按照“,”拆分 // 以后如果要根据不同的方式来进行统计,则此处可以取得不同的内容 String province = result[1] ; // 取得省份的信息内容 double price = Double.parseDouble(result[7]) ; // 取得原始价格 double deal = Double.parseDouble(result[8]) ; // 成交价格 RecordWritable rw = new RecordWritable(price, deal, price-deal) ; context.write(new Text(province), rw); // 将数据取出 } }定义Reduce 处理类
private static class RecordReducer extends Reducer<Text, RecordWritable, Text, RecordWritable> { @Override protected void reduce(Text key, Iterable<RecordWritable> values,Reducer<Text, RecordWritable, Text, RecordWritable>.Context context) // 计算出消费的总价格数据 double priceSum = 0.0 ; double dealSum = 0.0 ; double discountSum = 0.0 ; for (RecordWritable rw : values) { priceSum += rw.getPrice() ; dealSum += rw.getDeal() ; discountSum += rw.getDiscount() ; } RecordWritable rw = new RecordWritable(priceSum, dealSum, discountSum) ; context.write(key, rw); } }定义相关的作业进行处理
public class Record { private static final String OUTPUT_PATH = "hdfs://192.168.122.132:9000/output-" ; // 具体的输入的路径由用户自己来输入,而我们来定义一个属于自己的输出路径,格式“output-20201010” public static void main(String[] args) throws Exception { if (args.length != 1) { // 现在输入的参数个数不足,那么则应该退出程序 System.out.println("本程序的执行需要两个参数:HDFS输入路径"); System.exit(1); // 程序退出 } Configuration conf = new Configuration() ; // 此时需要进行HDFS操作 String paths [] = new GenericOptionsParser(conf,args).getRemainingArgs() ; //将输入的两个路径解析为HDFS的路径 // 需要定义一个描述作业的操作类 Job job = Job.getInstance(conf, "hadoop") ; job.setJarByClass(Record.class); // 定义本次作业执行的类的名称 job.setMapperClass(RecordMapper.class); // 定义本次作业完成所需要的Mapper程序类 job.setMapOutputKeyClass(Text.class); // 定义Map输出数据的Key的类型 job.setMapOutputValueClass(RecordWritable.class); // 定义Map输出数据的Value的类型 job.setReducerClass(RecordReducer.class); // 定义要使用的Reducer程序处理类 job.setOutputKeyClass(Text.class); // 最终输出统计结果的key的类型 job.setOutputValueClass(RecordWritable.class); // 最终输出统计结果的value的类型 // 所有的数据都在HDFS上进行操作,那么就必须使用HDFS提供的程序进行数据的输入配置 FileInputFormat.addInputPath(job, new Path(paths[0])); // 定义输入的HDFS路径 // 输出路径可以自己进行定义 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH + new SimpleDateFormat("yyyyMMdd").format(new Date())));// 定义统计结果的输出路径 System.exit(job.waitForCompletion(true) ? 0 : 1); // 程序执行完毕后要进行退出 } }