本文为《Spark大型电商项目实战》 系列文章之一,主要介绍top10热门品类模块中的第六步:获取top10品类并写入MySQL。
在包com.erik.sparkproject.domain中新建top10品类的domain类Top10Category.java
package com.erik.sparkproject.domain; /** * top10品类 * @author Erik * */ public class Top10Category { private long taskid; private long categoryid; private long clickCount; private long orderCount; private long payCount; public long getTaskid() { return taskid; } public void setTaskid(long taskid) { this.taskid = taskid; } public long getCategoryid() { return categoryid; } public void setCategoryid(long categoryid) { this.categoryid = categoryid; } public long getClickCount() { return clickCount; } public void setClickCount(long clickCount) { this.clickCount = clickCount; } public long getOrderCount() { return orderCount; } public void setOrderCount(long orderCount) { this.orderCount = orderCount; } public long getPayCount() { return payCount; } public void setPayCount(long payCount) { this.payCount = payCount; } }在包com.erik.sparkproject.dao中新建top10品类DAO接口ITop10CategoryDAO.java
package com.erik.sparkproject.dao; import com.erik.sparkproject.domain.Top10Category; /** * top10品类DAO接口 * @author Erik * */ public interface ITop10CategoryDAO { void insert(Top10Category category); }在包com.erik.sparkproject.impl中,新建top10品类DAO实现类Top10CategoryDAOImpl.java
package com.erik.sparkproject.impl; import com.erik.sparkproject.dao.ITop10CategoryDAO; import com.erik.sparkproject.domain.Top10Category; import com.erik.sparkproject.jdbc.JDBCHelper; /** * top10品类DAO实现 * @author Erik * */ public class Top10CategoryDAOImpl implements ITop10CategoryDAO{ public void insert(Top10Category category) { String sql = "insert into top10_category values(?,?,?,?,?)"; Object[] params = new Object[]{ category.getTaskid(), category.getCategoryid(), category.getClickCount(), category.getOrderCount(), category.getPayCount()}; JDBCHelper jdbcHelper = JDBCHelper.getInstance(); jdbcHelper.executeUpdate(sql, params); } }将DAOFactory.java移动到包com.erik.sparkproject.dao.factory里,然后添加
public static ITop10CategoryDAO getTop10CategoryDAO() { return new Top10CategoryDAOImpl(); }在以下方法中添加taskid
private static void getTop10Category( long taskid, JavaPairRDD<String, String> filteredSessionid2AggrInfoRDD, JavaPairRDD<String, Row> sessionid2actionRDD) getTop10Category(task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);在UserVisitSessionAnalyzeSpark.java中添加
/** * 第六步:用kake(10)取出top10热门品类,并写入MySQL */ ITop10CategoryDAO top10CategoryDAO = DAOFactory.getTop10CategoryDAO(); List<Tuple2<CategorySortKey, String>> top10CategoryList = sortedCategoryCountRDD.take(10); for(Tuple2<CategorySortKey, String> tuple : top10CategoryList) { String countInfo = tuple._2; long categoryid = Long.valueOf(StringUtils.getFieldFromConcatString( countInfo, "\\|", Constants.FIELD_CATEGORY_ID)); long clickCount = Long.valueOf(StringUtils.getFieldFromConcatString( countInfo, "\\|", Constants.FIELD_CLICK_COUNT)); long orderCount = Long.valueOf(StringUtils.getFieldFromConcatString( countInfo, "\\|", Constants.FIELD_ORDER_COUNT)); long payCount = Long.valueOf(StringUtils.getFieldFromConcatString( countInfo, "\\|", Constants.FIELD_PAY_COUNT)); //封装domain对象 Top10Category category = new Top10Category(); category.setTaskid(taskid); category.setCategoryid(categoryid); category.setClickCount(clickCount); category.setOrderCount(orderCount); category.setPayCount(payCount); top10CategoryDAO.insert(category); }《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject
本文为《Spark大型电商项目实战》系列文章之一, 更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423