import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
public class Score
{
static class ScoreMapper extends Mapper<Object, Text, Text, Text>
{//class
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
StringTokenizer itr=new StringTokenizer(value.toString());
String s;
while(itr.hasMoreTokens())
{
System.out.println("Map "+(s=itr.nextToken()));
context.write(new Text(s),new Text(itr.nextToken()+",1"));
}
}
}
static class ScoreCombine extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key,Iterable<Text>values,Context context)
throws IOException,InterruptedException{
int sum=0,cnt=0;
for(Text val:values)
{
String[] s1=val.toString().split(",");
sum+=Integer.parseInt(s1[0]);
cnt+=Integer.parseInt(s1[1]);
}
String s;
System.out.println("Combine"+(s=new String(sum+","+cnt)));
context.write(key,new Text(new String(sum+","+cnt)));
}
}
static class ScoreReducer extends Reducer<Text, Text, Text, DoubleWritable>
{
public void reduce(Text key,Iterable<Text>values,Context context)
throws IOException,InterruptedException{
int sum=0,cnt=0;
for(Text val:values)
{
String[]s=val.toString().split(",");
sum+=Integer.parseInt(s[0]);
cnt+=Integer.parseInt(s[1]);
}
String s;
System.out.println("reduce"+(s=new String(key+","+(sum*1.0/cnt))));
context.write(key,new DoubleWritable(sum*1.0/cnt));
}
}
public static void main(String args[])throws Exception
{
Configuration conf=new Configuration();
if(args.length!=2)
{
System.out.print("Usage: Score <in> <out>");
System.exit(2);
}
Job job=new Job(conf,"Score");
job.setJarByClass(Score.class);
job.setMapperClass(ScoreMapper.class);
job.setCombinerClass(ScoreCombine.class);
job.setReducerClass(ScoreReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
转载请注明原文地址: https://ju.6miu.com/read-600350.html