本文为《Spark大型电商项目实战》 系列文章之一,主要介绍用户访问session随机抽取模块中实现根据随机索引进行抽取这一过程。
这节主要实现根据随机索引进行抽取的过程,为数据写入MySQL做准备,根据之前总结的数据写入MySQL步骤 ,依次进行如下步骤。
在包SessionRandomExtract.java中创建类SessionRandomExtract.java,根据 29.session随机抽取之实现思路分析 中的数据库设置个变量的setX()和getX()方法
package com.erik.sparkproject.domain; /** * 随机抽取的session * @author Erik * */ public class SessionRandomExtract { private long taskid; private String sessionid; private String startTime; private String searchKeywords; private String clickCategoryIds; public long getTaskid() { return taskid; } public void setTaskid(long taskid) { this.taskid = taskid; } public String getSessionid() { return sessionid; } public void setSessionid(String sessionid) { this.sessionid = sessionid; } public String getStartTime() { return startTime; } public void setStartTime(String startTime) { this.startTime = startTime; } public String getSearchKeywords() { return searchKeywords; } public void setSearchKeywords(String searchKeywords) { this.searchKeywords = searchKeywords; } public String getClickCategoryIds() { return clickCategoryIds; } public void setClickCategoryIds(String clickCategoryIds) { this.clickCategoryIds = clickCategoryIds; } }在包ISessionRandomExtractDAO.java中创建session随机抽取模块的DAO接口ISessionRandomExtractDAO.java
package com.erik.sparkproject.dao; import com.erik.sparkproject.domain.SessionRandomExtract; /** * session随机抽取模块DAO接口 * @author Erik * */ public interface ISessionRandomExtractDAO { /** * 插入session随机抽取 */ void insert(SessionRandomExtract sessionRandomExtract); }在包com.erik.sparkproject.dao.factory下的DAOFactory.java中添加
public static ISessionRandomExtractDAO getSessionRandomExtractDAO() { return new SessinoRandomExtractDAOImpl(); }SessinoRandomExtractDAOImpl.java
package com.erik.sparkproject.impl; import com.erik.sparkproject.dao.ISessionRandomExtractDAO; import com.erik.sparkproject.domain.SessionRandomExtract; import com.erik.sparkproject.jdbc.JDBCHelper; /** * 随机抽取session的DAO实现 * @author Erik * */ public class SessinoRandomExtractDAOImpl implements ISessionRandomExtractDAO { //插入session随机抽取 public void insert(SessionRandomExtract sessionRandomExtract) { String sql = "insert into session_random_extract values(?,?,?,?,?)"; Object[] params = new Object[]{sessionRandomExtract.getTaskid(), sessionRandomExtract.getSessionid(), sessionRandomExtract.getStartTime(), sessionRandomExtract.getSearchKeywords(), sessionRandomExtract.getClickCategoryIds()}; JDBCHelper jdbcHelper = JDBCHelper.getInstance(); jdbcHelper.executeUpdate(sql, params); } }接着在包com.erik.sparkproject.spark.session下的类UserVisitSessionAnalyzeSpark.java中继续第三步根据随机索引进行抽取。
/** * 第三步:遍历每天每小时的session,根据随机索引抽取 */ //执行groupByKey算子,得到<dateHour,(session aggrInfo)> JavaPairRDD<String, Iterable<String>> time2sessionsRDD = time2sessionidRDD.groupByKey(); //我们用flatMap算子遍历所有的<dateHour,(session aggrInfo)>格式的数据 //然后会遍历每天每小时的session //如果发现某个session恰巧在我们指定的这天这小时的随机抽取索引上 //那么抽取该session,直接写入MySQL的random_extract_session表 //将抽取出来的session id返回回来,形成一个新的JavaRDD<String> //然后最后一步,用抽取出来的sessionid去join它们的访问行为明细数据写入session表 JavaPairRDD<String, String> extractSessionidsRDD = time2sessionsRDD.flatMapToPair( new PairFlatMapFunction<Tuple2<String, Iterable<String>>, String, String>() { private static final long serialVersionUID = 1L; public Iterable<Tuple2<String, String>> call( Tuple2<String, Iterable<String>> tuple) throws Exception { List<Tuple2<String, String>> extractSessionids = new ArrayList<Tuple2<String, String>>(); String dateHour = tuple._1; String date = dateHour.split("_")[0]; String hour = dateHour.split("_")[1]; Iterator<String> iterator = tuple._2.iterator(); //拿到这一天这一小时的随机索引 List<Integer> extractIndexList = dateHourExtractMap.get(date).get(hour); //先建domain和DAO //先在包com.erik.sparkproject.domain中新建SessionRandomExtract.java //然后在包com.erik.sparkproject.dao中新建ISessionRandomExtractDAO.java //接着在包com.erik.sparkproject.impl中新建SessinoRandomExtractDAOImpl.java //最后在DAOFactory.java中添加 //public static ISessionRandomExtractDAO getSessionRandomExtractDAO() { //return new SessinoRandomExtractDAOImpl(); //} ISessionRandomExtractDAO sessionRandomExtractDAO = DAOFactory.getSessionRandomExtractDAO(); int index = 0; while(iterator.hasNext()) { String sessionAggrInfo = iterator.next(); if(extractIndexList.contains(index)) { String sessionid = StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID); //将数据写入MySQL SessionRandomExtract sessionRandomExtract = new SessionRandomExtract(); //增加参数 //private static void randomExtractSession( //final long taskid, //JavaPairRDD<String, String> sessionid2AggrInfoRDD) sessionRandomExtract.setTaskid(taskid); sessionRandomExtract.setSessionid(sessionid); sessionRandomExtract.setSessionid(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID)); sessionRandomExtract.setSearchKeywords(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS)); sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString( sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS)); sessionRandomExtractDAO.insert(sessionRandomExtract); //将sessionid加入list extractSessionids.add(new Tuple2<String, String>(sessionid, sessionid)); } index ++; } return extractSessionids; } });《Spark 大型电商项目实战》源码:https://github.com/Erik-ly/SprakProject
本文为《Spark大型电商项目实战》系列文章之一。 更多文章:Spark大型电商项目实战:http://blog.csdn.net/u012318074/article/category/6744423