本文为《Spark大型电商项目实战》 系列文章之一,主要代码实现top10热门品类模块中的第三步:join各品类与它的点击、下单和支付的次数。
在代码末尾实现joinCategoryAndData方法
private static JavaPairRDD<Long, String> joinCategoryAndData( JavaPairRDD<Long, Long> categoryidRDD, JavaPairRDD<Long, Long> clickCategoryId2CountRDD, JavaPairRDD<Long, Long> orderCategoryId2CountRDD, JavaPairRDD<Long, Long> payCategoryId2CountRDD) { //如果使用leftOuterJoin就有可能出现右边那个RDD中join过来时没有值 //所以Tuple2中的第二个值用Optional<Long>类型就代表可能有值也可能没有值 JavaPairRDD<Long, Tuple2<Long, Optional<Long>>> tmpJoinRDD = categoryidRDD.leftOuterJoin(clickCategoryId2CountRDD); JavaPairRDD<Long, String> tmpMapRDD = tmpJoinRDD.mapToPair( new PairFunction<Tuple2<Long,Tuple2<Long,Optional<Long>>>, Long, String>() { private static final long serialVersionUID = 1L; public Tuple2<Long, String> call( Tuple2<Long, Tuple2<Long, Optional<Long>>> tuple) throws Exception { long categoryid = tuple._1; Optional<Long> optional = tuple._2._2; long clickCount = 0L; if(optional.isPresent()) { clickCount = optional.get(); } String value = Constants.FIELD_CATEGORY_ID + "=" + categoryid + "|" + Constants.FIELD_CLICK_COUNT + "=" + clickCount; return new Tuple2<Long, String>(categoryid, value); } }); tmpMapRDD = tmpMapRDD.leftOuterJoin(orderCategoryId2CountRDD).mapToPair( new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() { private static final long serialVersionUID = 1L; public Tuple2<Long, String> call( Tuple2<Long, Tuple2<String, Optional<Long>>> tuple) throws Exception { long categoryid = tuple._1; String value = tuple._2._1; Optional<Long> optional = tuple._2._2; long orderCount = 0L; if(optional.isPresent()) { orderCount = optional.get(); } value = value + "|" + Constants.FIELD_ORDER_COUNT + "=" + orderCount; return new Tuple2<Long, String>(categoryid, value); } }); tmpMapRDD = tmpMapRDD.leftOuterJoin(payCategoryId2CountRDD).mapToPair( new PairFunction<Tuple2<Long,Tuple2<String,Optional<Long>>>, Long, String>() { private static final long serialVersionUID = 1L; public Tuple2<Long, String> call( Tuple2<Long, Tuple2<String, Optional<Long>>> tuple) throws Exception { long categoryid = tuple._1; String value = tuple._2._1; Optional<Long> optional = tuple._2._2; long payCount = 0L; if(optional.isPresent()) { payCount = optional.get(); } value = value + "|" + Constants.FIELD_PAY_COUNT + "=" + payCount; return new Tuple2<Long, String>(categoryid, value); } }); return tmpMapRDD; }《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject
本文为《Spark大型电商项目实战》系列文章之一, 更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423