Step1
/** *第一步去重 * @author Administrator * */ public class Step1 { static class Step1_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if(key.get()!=0){ context.write(value,NullWritable.get()); } } } static class Step1_Reducer extends Reducer<Text, IntWritable, Text, NullWritable>{ protected void reduce(Text key, Iterable<IntWritable> i, Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); } } }
Step2
/** * 按用户分组,计算所有物品出现的组合列表,得到用户对物品的喜爱度得分矩阵 * @author Administrator *u13 i160:1, *u14 i25:1,i223:1, *u16 i252:1, *u21 i266:1, *u24 i64:1,i218:1,i185:1, *u26 i276:1,i201:1,i348:1,i321:1,i136:1, */ public class Step2 { //按用户分组,计算所有物品出现的组合列表,得到用户对物品的喜爱度得分矩阵 static class Step2_Mapper extends Mapper<LongWritable, Text, Text, Text>{ //如果使用:用户+物品,同时作为输出key,更优 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens=value.toString().split(","); String user=tokens[1]; String action =tokens[2]; Text k=new Text(user); Integer rv=StartRun.R.get(action); Text v=new Text(item+":"+rv.intValue()); //key:user v :item:action context.write(k,v); } } //u1 i1 3 //按照key user 进行分组 /* i100:i100 3 i100:i105 1 i100:i106 1 i100:i109 1 */ //按用户分组,计算所有物品出现的组合列表,得到用户对物品的喜爱度得分矩阵 static class Step2_Reducer extends Reducer<Text, Text, Text, Text>{ protected void reduce(Text key, Iterable<Text> i, Context context) throws IOException, InterruptedException { Map<String,String> r=new HashMap<String,Integer>(); for(Text value:i){ String [] vs=value.toString().split(":"); String item=vs[0]; Integer action=Integer.parseInt(vs[1]); //得到喜爱度 action=(r.get(item)==null?0:r.get(item)).intvalue()+action; r.put(item,action); } //得到同现组合列表 StringBuffer sb=new StringBuffer(); for(Entry<String,Integer> entry:r.entrySet()){ sb.append(entry.getKey()+":"+entry.getValue().intValue()+","); } //key? value item:action,item:action //key 是用户 //u1 item:action , item,action 这是物品出现的组合列表 context.write(key,new Text(sb.toString())); } } }
Step3
/** * 对物品组合列表进行计数,建立物品的同现矩阵 * @author Administrator * */ public class Step3 { //i100:i100 3 static class Step3_Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens=value.toString().split("\t"); String[] items=tokens[1].split(","); for(int i = 0; i < items.length; i++){ String itemA=items[i].split(":")[0]; for(int j = 0; j < items.length; j++){ String itemB=items[j].split(":"); //相同的key在此时会进行分组 K.set(itemA+":"+itemB); context.write(K,V); } } } } //进行 统计 计算 同现 次数 sum static class Step3_Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{ protected void reduce(Text key, Iterable<IntWritable> i, Context context) throws IOException, InterruptedException { int sum=0; for(IntWritable v:i){ sum=sum+v.get(); } V.set(sum); context.write(key,V); } } }
Step4
/** * 把同现矩阵和得分矩阵相乘 * @author Administrator * */ public class Step4 { static class Step4_Mapper extends Mapper<LongWritable, Text, Text, Text> { private String flag;//A同现矩阵 or B得分矩阵 //每个maptask,初始化时调用一次 protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getParent().getName();// 判断读的数据集 System.out.println(flag + "**********************"); } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] tokens=Pattern.compile("[\t]").split(value.toString()); if(flag.equals("step3")){//同现矩阵 String [] v1=tokens[0].split(":"); String itemID1=v1[0]; String itemID2 = v1[1]; String num = tokens[1]; Text k=new Text(itemID1);//以前一个物品为key 比如i100 Text v=new Text("A"+itemID2+","+num);//A:i109,1 context.write(k,v); } } else if (flag.equals("step2")) {// 用户对物品喜爱得分矩阵 String userID = tokens[0]; for (int i = 1; i < tokens.length; i++) { String[] vector = tokens[i].split(":"); String itemID = vector[0];// 物品id String pref = vector[1];// 喜爱分数 Text k = new Text(itemID); // 以物品为key 比如:i100 Text v = new Text("B:" + userID + "," + pref); // B:u401,2 context.write(k, v); } } } static class Step4_Reducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // A同现矩阵 or B得分矩阵 //某一个物品,针对它和其他所有物品的同现次数,都在mapA集合中 //和该物品同现的 其他物品同现集合 key中的itemID Map<String,Integer> mapA=new HashMap<String,Integer>(); //物品的推荐权重 物品对所有用户 可以说是 喜爱指数key中的itemID Map<String, Integer> mapB = new HashMap<String, Integer>(); for(Text line:values){ String val=line.toString(); if(val.startsWith("A:"){// 表示物品同现数字 //正则表达式 String [] kv=Pattern.compile("[\t,]").split( val.substring(2)); mapA.put(kv[0], Integer.parseInt(kv[1])); }else if (val.startsWith("B:")) { String[] kv = Pattern.compile("[\t,]").split( val.substring(2)); try { mapB.put(kv[0], Integer.parseInt(kv[1])); } catch (Exception e) { e.printStackTrace(); } } } double result=0; Iterator<String> iter=mapA.keySet().iterator(); while(iter.hasNext()){ String mapk=iter.next();//itemID int num = mapA.get(mapk).intValue(); Iterator<String> iterb = mapB.keySet().iterator(); while (iterb.hasNext()) { String mapkb = iterb.next();// userID int pref = mapB.get(mapkb).intValue(); result = num * pref;// 矩阵乘法相乘计算 Text k = new Text(mapkb); Text v = new Text(mapk + "," + result); context.write(k, v); //得到的key userid value } } } } }
Step5
/** *把相乘之后的矩阵相加获得结果矩阵 * @author Administrator * */ public class Step5 { static class Step5_Mapper extends Mapper<LongWritable, Text, Text, Text> { /** * 原封不动输出 */ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = Pattern.compile("[\t,]").split(value.toString()); Text k = new Text(tokens[0]);// 用户为key Text v = new Text(tokens[1] + "," + tokens[2]); context.write(k, v); } } static class Step5_Reducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String,Double> map=new HashMap<String,Double>();//结果 for(Text line:values){ String[] tokens = line.toString().split(","); String itemID = tokens[0]; Double score = Double.parseDouble(tokens[1]); //求和计算 if(map.containsKey(itemID)){ map.put(itemID,map.get(itemID)+score); } else { map.put(itemID, score); } } Iterator<String> iter = map.keySet().iterator(); while (iter.hasNext()) { String itemID = iter.next(); double score = map.get(itemID); Text v = new Text(itemID + "," + score); context.write(key, v); } } } }
Step6
/** * 按照推荐得分降序排序,每个用户列出10个推荐物品 * @author Administrator * */ public class Step6 { static class Step6_Mapper extends Mapper<LongWritable, Text, PairWritable, Text> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = Pattern.compile("[\t,]").split(value.toString()); String u = tokens[0]; String item = tokens[1]; String num = tokens[2]; PairWritable k =new PairWritable(); k.setUid(u); k.setNum(Double.parseDouble(num)); V.set(item+":"+num); context.write(k, V); } } static class Step6_Reducer extends Reducer<PairWritable, Text, Text, Text> { protected void reduce(PairWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int i=0; StringBuffer sb =new StringBuffer(); for(Text v :values){ if(i==10) break; sb.append(v.toString()+","); i++; } K.set(key.getUid()); V.set(sb.toString()); context.write(K, V); } } }
