MapReduce Join

    xiaoxiao2023-05-27  1

    Map Side Join

    package MapJoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; /* * Table1 011990-99999 SIHCCAJAVRI 012650-99999 TYNSET-HANSMOEN Table2 012650-99999 194903241200 111 012650-99999 194903241800 78 011990-99999 195005150700 0 011990-99999 195005151200 22 011990-99999 195005151800 -11 * */ public class MapJoin { static class mapper extends Mapper<LongWritable, Text, Text, Text> { private Map<String, String> Table1Map = new HashMap<String, String>(); // 将小表读到内存HashMap中 protected void setup(Context context) throws IOException { URI[] paths = context.getCacheFiles(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); FSDataInputStream fsr = fs.open(new Path(paths[0].toString())); // BufferedReader br = new BufferedReader(new FileReader( // paths[0].toString())); String line = null; try { while ((line = fsr.readLine().toString()) != null) { String[] vals = line.split("\\t"); if (vals.length == 2) { Table1Map.put(vals[0], vals[1]); } } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } finally { fsr.close(); } } // 对大表进行Map扫描 protected void map(LongWritable key, Text val, Context context) throws IOException, InterruptedException { String[] vals = val.toString().split("\\t"); if (vals.length == 3) { // 每条记录都用外键对HashMap get String Table1Vals = Table1Map.get(vals[0]); Table1Vals = (Table1Vals == null) ? "" : Table1Vals; context.write(new Text(vals[0]), new Text(Table1Vals + "\t" + vals[1] + "\t" + vals[2])); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 3) { System.err .println("Parameter number is wrong, please enter three parameters:<big table hdfs input> <small table local input> <hdfs output>"); System.exit(-1); } Job job = new Job(conf, "MapJoin"); job.setJarByClass(MapJoin.class); job.setMapperClass(mapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); job.addCacheFile((new Path(args[1]).toUri())); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

    Reduce Side Join

    package ReduceJoin; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /*user.csv文件: "ID","NAME","SEX" "1","user1","0" "2","user2","0" "3","user3","0" "4","user4","1" "5","user5","0" "6","user6","0" "7","user7","1" "8","user8","0" "9","user9","0" order.csv文件: "USER_ID","NAME" "1","order1" "2","order2" "3","order3" "4","order4" "7","order7" "8","order8" "9","order9" */ public class ReduceJoin { public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { //最好在map方法外定义变量,以减少map计算时创建对象的个数 private Text key = new Text(); private Text value = new Text(); private String[] keyValue = null; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //value是每一行的内容,Text类型,所有我们要把key从value中解析出来 keyValue = value.toString().split(",", 2); this.key.set(keyValue[0]); //把外键设为MapReduce key this.value.set(keyValue[1]); context.write(this.key, this.value); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { private Text value = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder valueStr = new StringBuilder(); //values中的每一个值是不同数据文件中的具有相同key的值 //即是map中输出的多个文件相同key的value值集合 for(Text val : values) { valueStr.append(val); valueStr.append(","); } this.value.set(valueStr.deleteCharAt(valueStr.length()-1).toString()); context.write(key, this.value); } } public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException { // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job = new Job(conf, "MyJoin"); job.setJarByClass(ReduceJoin.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); //job.setCombinerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
    转载请注明原文地址: https://ju.6miu.com/read-1260743.html
    最新回复(0)