本文为《Spark大型电商项目实战》 系列文章之一,主要介绍session随机抽取模块中的获取抽取session明细数据的实现过程。
在包com.erik.sparkproject.spark.session下的类UserVisitSessionAnalyzeSpark.java中, 在JavaRDD<Row> actionRDD = getActionRDDByDateRange(sqlContext, taskParam);下方添加:
JavaPairRDD<String, Row> sessionid2actionRDD = getSessionid2ActionRDD(actionRDD);然后将 randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD); 改为 randomExtractSession(task.getTaskid(),filteredSessionid2AggrInfoRDD, sessionid2actionRDD);
接着添加抽取session明细的方法
/** * 第四步:获取抽取出来的session的明细数据 */ JavaPairRDD<String, Tuple2<String, Row>> extractSessionDetailRDD = extractSessionidsRDD.join(sessionid2actionRDD); extractSessionDetailRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<String, Row>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception { //在包com.erik.sparkproject.domain中新建SessionDetail.java //在包com.erik.sparkproject.dao中新建ISessionDetailDAO.java接口 //在包com.erik.sparkproject.impl中新建SessionDetailDAOImpl.java //在DAOFactory.java中添加 //public static ISessionDetailDAO getSessionDetailDAO() { //return new SessionDetailDAOImpl(); //} Row row = tuple._2._2; //封装sessionDetail的domain SessionDetail sessionDetail = new SessionDetail(); sessionDetail.setTaskid(taskid); sessionDetail.setUserid(row.getLong(1)); sessionDetail.setSessionid(row.getString(2)); sessionDetail.setPageid(row.getLong(3)); sessionDetail.setActionTime(row.getString(4)); sessionDetail.setSearchKeyword(row.getString(5)); sessionDetail.setClickCategoryId(row.getLong(6)); sessionDetail.setClickProductId(row.getLong(7)); sessionDetail.setOrderCategoryIds(row.getString(8)); sessionDetail.setOrderProductIds(row.getString(9)); sessionDetail.setPayCategoryIds(row.getString(10)); sessionDetail.setPayProductIds(row.getString(11)); ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO(); sessionDetailDAO.insert(sessionDetail); } });这里需要将
private static void randomExtractSession( final long taskid, JavaPairRDD<String, String> sessionid2AggrInfoRDD)改为
private static void randomExtractSession( final long taskid, JavaPairRDD<String, String> sessionid2AggrInfoRDD, JavaPairRDD<String, Row> sessionid2actionRDD)在包com.erik.sparkproject.domain中新建SessionDetail.java
package com.erik.sparkproject.domain; /** * session明细 * @author Erik * */ public class SessionDetail { private long taskid; private long userid; private String sessionid; private long pageid; private String actionTime; private String searchKeyword; private long clickCategoryId; private long clickProductId; private String orderCategoryIds; private String orderProductIds; private String payCategoryIds; private String payProductIds; public long getTaskid() { return taskid; } public void setTaskid(long taskid) { this.taskid = taskid; } public long getUserid() { return userid; } public void setUserid(long userid) { this.userid = userid; } public String getSessionid() { return sessionid; } public void setSessionid(String sessionid) { this.sessionid = sessionid; } public long getPageid() { return pageid; } public void setPageid(long pageid) { this.pageid = pageid; } public String getActionTime() { return actionTime; } public void setActionTime(String actionTime) { this.actionTime = actionTime; } public String getSearchKeyword() { return searchKeyword; } public void setSearchKeyword(String searchKeyword) { this.searchKeyword = searchKeyword; } public long getClickCategoryId() { return clickCategoryId; } public void setClickCategoryId(long clickCategoryId) { this.clickCategoryId = clickCategoryId; } public long getClickProductId() { return clickProductId; } public void setClickProductId(long clickProductId) { this.clickProductId = clickProductId; } public String getOrderCategoryIds() { return orderCategoryIds; } public void setOrderCategoryIds(String orderCategoryIds) { this.orderCategoryIds = orderCategoryIds; } public String getOrderProductIds() { return orderProductIds; } public void setOrderProductIds(String orderProductIds) { this.orderProductIds = orderProductIds; } public String getPayCategoryIds() { return payCategoryIds; } public void setPayCategoryIds(String payCategoryIds) { this.payCategoryIds = payCategoryIds; } public String getPayProductIds() { return payProductIds; } public void setPayProductIds(String payProductIds) { this.payProductIds = payProductIds; } }在包com.erik.sparkproject.dao中新建ISessionDetailDAO.java接口
package com.erik.sparkproject.dao; import com.erik.sparkproject.domain.SessionDetail; /** * session明细接口 * @author Erik * */ public interface ISessionDetailDAO { /** * 插入一条session明细数据 * @param sessionDetail */ void insert(SessionDetail sessionDetail); }在包com.erik.sparkproject.impl中新建SessionDetailDAOImpl.java
package com.erik.sparkproject.impl; import com.erik.sparkproject.dao.ISessionDetailDAO; import com.erik.sparkproject.domain.SessionDetail; import com.erik.sparkproject.jdbc.JDBCHelper; /** * session明细DAO实现类 * @author Erik * */ public class SessionDetailDAOImpl implements ISessionDetailDAO{ public void insert(SessionDetail sessionDetail) { String sql = "insert into session_detail value(?,?,?,?,?,?,?,?,?,?,?,?)"; Object[] params = new Object[] { sessionDetail.getTaskid(), sessionDetail.getUserid(), sessionDetail.getSessionid(), sessionDetail.getPageid(), sessionDetail.getActionTime(), sessionDetail.getSearchKeyword(), sessionDetail.getClickCategoryId(), sessionDetail.getClickProductId(), sessionDetail.getOrderCategoryIds(), sessionDetail.getOrderProductIds(), sessionDetail.getPayCategoryIds(), sessionDetail.getPayProductIds()}; JDBCHelper jdbcHelper = JDBCHelper.getInstance(); jdbcHelper.executeUpdate(sql, params); } }在DAOFactory.java中添加
public static ISessionDetailDAO getSessionDetailDAO() { return new SessionDetailDAOImpl(); }《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject
本文为《Spark大型电商项目实战》系列文章之一。 更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423