mapreduce计算均值combine加速

    xiaoxiao2021-03-26  9

    import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.Text; public class Score { static class ScoreMapper extends Mapper<Object, Text, Text, Text> {//class public void map(Object key,Text value,Context context) throws IOException,InterruptedException{ StringTokenizer itr=new StringTokenizer(value.toString()); String s; while(itr.hasMoreTokens()) { System.out.println("Map "+(s=itr.nextToken())); context.write(new Text(s),new Text(itr.nextToken()+",1")); } } } static class ScoreCombine extends Reducer<Text, Text, Text, Text> { public void reduce(Text key,Iterable<Text>values,Context context) throws IOException,InterruptedException{ int sum=0,cnt=0; for(Text val:values) { String[] s1=val.toString().split(","); sum+=Integer.parseInt(s1[0]); cnt+=Integer.parseInt(s1[1]); } String s; System.out.println("Combine"+(s=new String(sum+","+cnt))); context.write(key,new Text(new String(sum+","+cnt))); } } static class ScoreReducer extends Reducer<Text, Text, Text, DoubleWritable> { public void reduce(Text key,Iterable<Text>values,Context context) throws IOException,InterruptedException{ int sum=0,cnt=0; for(Text val:values) { String[]s=val.toString().split(","); sum+=Integer.parseInt(s[0]); cnt+=Integer.parseInt(s[1]); } String s; System.out.println("reduce"+(s=new String(key+","+(sum*1.0/cnt)))); context.write(key,new DoubleWritable(sum*1.0/cnt)); } } public static void main(String args[])throws Exception { Configuration conf=new Configuration(); if(args.length!=2) { System.out.print("Usage: Score <in> <out>"); System.exit(2); } Job job=new Job(conf,"Score"); job.setJarByClass(Score.class); job.setMapperClass(ScoreMapper.class); job.setCombinerClass(ScoreCombine.class); job.setReducerClass(ScoreReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
    转载请注明原文地址: https://ju.6miu.com/read-600350.html

    最新回复(0)