本文为《Spark大型电商项目实战》 系列文章之一,主要代码实现top10热门品类模块中的第二步:计算各品类的点击、下单和支付的次数。
代码实现
/**
* 第二步:计算各品类的点击、下单和支付的次数
*/
JavaPairRDD<Long, Long> clickCategoryId2CountRDD =
getClickCategoryId2CountRDD(sessionid2detailRDD);
JavaPairRDD<Long, Long> orderCategoryId2CountRDD =
getOrderCategoryId2CountRDD(sessionid2detailRDD);
JavaPairRDD<Long, Long> payCategoryId2CountRDD =
getPayCategoryId2CountRDD(sessionid2detailRDD);
}
/**
* 获取各品类点击次数RDD
*/
private static JavaPairRDD<Long, Long>
getClickCategoryId2CountRDD(
JavaPairRDD<String, Row> sessionid2detailRDD) {
JavaPairRDD<String, Row> clickActionRDD = sessionid2detailRDD.filter(
new Function<Tuple2<String, Row>, Boolean>() {
private static final long serialVersionUID =
1L;
public Boolean
call(Tuple2<String, Row> tuple)
throws Exception {
Row row = tuple._2;
return row.get(
6) !=
null ?
true :
false;
}
});
JavaPairRDD<Long, Long> clickCategoryIdRDD = clickActionRDD.mapToPair(
new PairFunction<Tuple2<String, Row>, Long, Long>() {
private static final long serialVersionUID =
1L;
public Tuple2<Long, Long>
call(Tuple2<String, Row> tuple)
throws Exception {
long clickCategoryId = tuple._2.getLong(
6);
return new Tuple2<Long, Long>(clickCategoryId,
1L);
}
});
JavaPairRDD<Long, Long> clickCategoryId2CountRDD = clickCategoryIdRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID =
1L;
public Long
call(Long v1, Long v2)
throws Exception {
return v1 + v2;
}
});
return clickCategoryId2CountRDD;
}
private static JavaPairRDD<Long, Long>
getOrderCategoryId2CountRDD(
JavaPairRDD<String, Row>sessionid2detailRDD) {
JavaPairRDD<String, Row> orderActionRDD = sessionid2detailRDD.filter(
new Function<Tuple2<String, Row>, Boolean>() {
private static final long serialVersionUID =
1L;
public Boolean
call(Tuple2<String, Row> tuple)
throws Exception {
Row row = tuple._2;
return row.getString(
8) !=
null ?
true :
false;
}
});
JavaPairRDD<Long, Long> orderCategoryIdRDD = orderActionRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>(){
private static final long serialVersionUID =
1L;
public Iterable<Tuple2<Long, Long>>
call(Tuple2<String, Row> tuple)
throws Exception {
Row row = tuple._2;
String orderCategoryIds = row.getString(
8);
String[] orderCategoryIdsSplited = orderCategoryIds.split(
",");
List<Tuple2<Long, Long>> list =
new ArrayList<Tuple2<Long, Long>>();
for(String orderCategoryId : orderCategoryIdsSplited) {
list.add(
new Tuple2<Long, Long>(Long.valueOf(orderCategoryId),
1L));
}
return list;
}
});
JavaPairRDD<Long, Long> orderCategoryId2CountRDD = orderCategoryIdRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID =
1L;
public Long
call(Long v1, Long v2)
throws Exception {
return v1 + v2;
}
});
return orderCategoryId2CountRDD;
}
private static JavaPairRDD<Long, Long>
getPayCategoryId2CountRDD(
JavaPairRDD<String, Row> sessionid2detailRDD) {
JavaPairRDD<String, Row> payActionRDD = sessionid2detailRDD.filter(
new Function<Tuple2<String, Row>, Boolean>() {
private static final long serialVersionUID =
1L;
public Boolean
call(Tuple2<String, Row> tuple)
throws Exception {
Row row = tuple._2;
return row.getString(
10) !=
null ?
true :
false;
}
});
JavaPairRDD<Long, Long> payCategoryIdRDD = payActionRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<String, Row>, Long, Long>(){
private static final long serialVersionUID =
1L;
public Iterable<Tuple2<Long, Long>>
call(Tuple2<String, Row> tuple)
throws Exception {
Row row = tuple._2;
String payCategoryIds = row.getString(
10);
String[] payCategoryIdsSplited = payCategoryIds.split(
",");
List<Tuple2<Long, Long>> list =
new ArrayList<Tuple2<Long, Long>>();
for(String payCategoryId : payCategoryIdsSplited) {
list.add(
new Tuple2<Long, Long>(Long.valueOf(payCategoryId),
1L));
}
return list;
}
});
JavaPairRDD<Long, Long> payCategoryId2CountRDD = payCategoryIdRDD.reduceByKey(
new Function2<Long, Long, Long>(){
private static final long serialVersionUID =
1L;
public Long
call(Long v1, Long v2)
throws Exception {
return v1 + v2;
}
});
return payCategoryId2CountRDD;
}
《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject
本文为《Spark大型电商项目实战》系列文章之一, 更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423