MapReduce的思想可以参考http://blog.csdn.net/mrbcy/article/details/60139191
上表中的-1代表入度,1代表出度
代码内容如下:
package tech.mrbcy.bigdata.weiborelation; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WeiboRelation { public static class RelationMapper extends Mapper<Object, Text, Text, IntWritable>{ private boolean flag = true; public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { //10457 104594 String valStr = value.toString().replace(" ", ","); String[] users = valStr.split(","); if(flag){ System.out.println(users[0] + "," + users[1]); flag = false; } IntWritable inDegree = new IntWritable(-1); IntWritable outDegree = new IntWritable(1); context.write(new Text(users[0]), outDegree); context.write(new Text(users[1]), inDegree); } } public static class RelationReducer extends Reducer<Text,IntWritable,Text,Text> { public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int inDegree = 0; int outDegree = 0; for (IntWritable val : values) { if(val.get() > 0){ // 出度 outDegree += val.get(); }else{ // 入度 inDegree += -val.get(); } } String outString = String.format("%d %d", inDegree,outDegree); context.write(key, new Text(outString)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"statistic weibo user relation"); job.setJarByClass(WeiboRelation.class); job.setMapperClass(RelationMapper.class); job.setReducerClass(RelationReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }首先把工程打成一个jar包导出。这一步我是用MyEclipse完成的。
然后把relation.txt拷贝到/root/homework/week2下面。
然后使用下面的命令在HDFS中建立输入文件夹,并确保输出文件夹不存在:
hadoop fs -mkdir -p /wbrelation/input hadoop fs -rm -r /wbrelation/output将input.txt上传到HDFS中:
hadoop fs -put /root/homework/week2/relation.txt /wbrelation/input使用下面的命令运行MapReduce程序。
hadoop jar /root/homework/week2/weiborelation.jar tech.mrbcy.bigdata.weiborelation.WeiboRelation /wbrelation/input /wbrelation/output等待执行结束后,使用下面的命令查看结果:
hadoop fs -get /wbrelation/output/part-r-00000 mv part-r-00000 wbrelation.out more wbrelation.out输出结果如下:
1000003374 1 2 1000060787 0 4 1000079167 1 1 1000085444 2 0 1000092382 1 0 1000094052 0 1 1000095500 1 0 1000097914 2 8 1000098257 0 1 1000106075 3 0 1000117663 3 1 1000131684 0 1 1000136955 0 1 1000144332 0 2 1000148680 2 0 1000158915 0 1 1000166111 4 30 1000172520 0 1 1000195387 3 1 1000196715 0 1 1000213720 0 1 1000216284 0 1 1000240153 0 1 1000242321 0 1 1000256983 0 1 1000258991 1 0 1000269641 0 1 1000273720 0 1 1000281111 1 1 1000284465 0 1 1000305443 6 2