hadoop2.5.2学习11-MR之好友推荐2

    xiaoxiao2021-03-26  29

    问题:eclipse运行hadoop程序报错:Connection refused: no further information

       今天运行hadoop时,总是报Connection refused: no further information, 不明白为什么,昨天还是好好的,今天就连不上, 网上说是网络有问题, 到hadoop主机上看一下发现网络断了, 网络连上,问题解决。

    上文hadoop2.5.2学习11-MR之好友推荐1,介绍了第一个mapreducer,获取了Fof的关系

    这个结果顺序是杂乱的,没有用处,所以我们要对这个数据进行分组排序:

    分析

    我们推荐是根据某个用户和他的间接好友的fof的数量来进行有限推荐 第一:我们比较必须是每组是该用户和他的间接好友,

    所以分组的第一个因素是用户username

    我们需要对该组数据进行排序,排序是按照fofNum进行降序排序,

    第二个因素是fofNum

    所以创建一个User类包含用户名和fofNum,作为第二个mapreder的Mapper的输出

    注意: User类实现了WriteableComparable接口

    implements WritableComparable,是的User类具有持久化, 可以比较 package com.chb.friend2; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 按照用户分组 * 按照fof数量排序 */ public class User implements WritableComparable<User> { private String userName; private int fofNum; public User() { } public User(String userName, int fofNum){ this.userName = userName; this.fofNum = fofNum; } //setter getter... public void write(DataOutput out) throws IOException { out.writeUTF(userName); out.writeInt(fofNum); } @Override public void readFields(DataInput in) throws IOException { this.userName = in.readUTF(); this.fofNum = in.readInt(); } @Override public int compareTo(User u) { int r1 = this.userName.compareTo(u.getUserName()) ; if (r1 ==0 ) { return Integer.compare(this.fofNum, u.getFofNum()); } return r1; } }

    第二个mapreduce的mapper

    输入一行数据为

    用户 fof好友, fofNum

    mapper的输入键值仍然是按照第一个制表符(\t)进行切分, mapper的输出的键类型为User,由于分组是按照用户名称进行分组 所以mapper的输出为 用户构建的User对象new User(key.toString() 而mapper的输出的值类型类型为User,是因为reduce的每组中,我们需要对一个用户的fof好友通过fofNum进行比较, 而排序在sort阶段已经完成, 所以我们需要的是fof好友的名字。所以mapper的输出为fof好友构建的User对象new User(other, fofNum)。

    package com.chb.friend2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 上一个maperduce的输出作为这个mapper的输入 * 用户1 用户2 fof的数量 * a b 3 * 现在我们需要按照用户1的姓名分组 * 按照fof的数量排序, * 所以创建一个类User包含这两个元素 */ public class SortMapper extends Mapper<Text, Text, User, User>{ @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String args[] = value.toString().split("\t"); String other = args[0]; int fofNum = Integer.parseInt(args[1]); //输出 context.write(new User(key.toString(), fofNum), new User(other, fofNum)); } }

    第二个mapreduce的Sort

    排序是按照fofNum进行降序排序

    package com.chb.friend2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 按照fof数量进行排序 */ public class Fof2Sort extends WritableComparator{ public Fof2Sort() { super(User.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { User u1 = (User)a; User u2 = (User)b; int r1 = u1.getUserName().compareTo(u2.getUserName()); if (r1 == 0) { //按照降序排序 return -Integer.compare(u1.getFofNum(), u2.getFofNum()); }else { return r1; } } }

    第二个mapreduce的分组

    分组是按照用户的名字进行分组

    package com.chb.friend2; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class Fof2Group extends WritableComparator{ public Fof2Group() { super(User.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { User u1 = (User)a; User u2 = (User)b; return u1.getUserName().compareTo(u2.getUserName()); } }

    第二个mapreduce的reducer

    由于排序已经执行,分组是按照用户的名字, 所以在这个reducer中,可以直接打印。

    package com.chb.friend2; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SortReducer extends Reducer<User, User, Text, Text>{ @Override protected void reduce(User key, Iterable<User> values, Context context) throws IOException, InterruptedException { //由于自定义了分组,和排序,所以进入reudce中的数据按照组进行处理,每组是按照fofNum降序排序 String user = key.getUserName(); StringBuilder sb = new StringBuilder(); for (User u : values) { //提取fof好友,和fofNum sb.append(u.getUserName()+":"+u.getFofNum()+";"); } context.write(new Text(user), new Text(sb.toString())); } }

    测试执行:

    实在第一个mapreduce执行完成之后,再执行第二个mapreduce

    if (f) {//第一个mapper执行完成,开始执行第二个mapper run2(); }

    执行测试代码:

    package com.chb.friend2; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) throws Exception{ System.setProperty("HADOOP_USER_NAME", "chb"); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(); job.setJobName("Fof"); job.setJarByClass(RunJob.class); //Mapper job.setMapperClass(FofMapper.class); job.setMapOutputKeyClass(Fof.class); job.setMapOutputValueClass(IntWritable.class); job.setJar("C:\\Users\\12285\\Desktop\\Fof.jar"); //Reducer job.setReducerClass(FofReducer.class); job.setInputFormatClass(KeyValueTextInputFormat.class); //输入 FileInputFormat.addInputPaths(job, "/user/chb/input/friend"); //输出 Path out = new Path("/user/chb/output/fof1"); if (fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); boolean f = job.waitForCompletion(true); if (f) {//第一个mapper执行完成,开始执行第二个mapper run2(); } } public static void run2() throws Exception { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Job job = Job.getInstance(); job.setJobName("Fof"); job.setJarByClass(RunJob.class); //Mapper job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(User.class); job.setMapOutputValueClass(User.class); job.setJar("C:\\Users\\12285\\Desktop\\Fof.jar"); //Reducer job.setReducerClass(SortReducer.class); job.setInputFormatClass(KeyValueTextInputFormat.class); //设置分组 job.setGroupingComparatorClass(Fof2Group.class); //设置排序 job.setSortComparatorClass(Fof2Sort.class); //输入 FileInputFormat.addInputPaths(job, "/user/chb/output/fof"); //输出 Path out = new Path("/user/chb/output/fof1"); if (fs.exists(out)) { fs.delete(out, true); } FileOutputFormat.setOutputPath(job, out); if (job.waitForCompletion(true)) { System.out.println("任务完成。。。"); } } }

    测试结果:

    对比原始关系图:结果正确

    转载请注明原文地址: https://ju.6miu.com/read-350191.html

    最新回复(0)