参见我的git项目:https://github.com/jimingkang/StormTwo/tree/master/src/user_visit
项目文件:
1)package user_visit; import cloudy.spout.OrderBaseSpout; import com.ibf.base.spout.SourceSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class UVTopo { public static void main(String[] args) { // TODO Auto-generated method stub TopologyBuilder builder = new TopologyBuilder(); // builder.setSpout("spout", new SourceSpout(), 1); builder.setSpout("spout", new OrderBaseSpout("log"), 3); builder.setBolt("FmtLogBolt", new FmtLogBolt(), 4).shuffleGrouping("spout"); builder.setBolt("sumBolt", new DeepVisitBolt(),4).fieldsGrouping("FmtLogBolt", new Fields("date","session_id")); builder.setBolt("UvSum", new UVSumBolt(), 1).shuffleGrouping("sumBolt") ; Config conf = new Config() ; conf.setDebug(true); if (args.length > 0) { try { conf.setNumWorkers(4); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } catch (Exception e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology()); } } }
2)OrderBaseSpout文件,从kafka的消费者(OrderConsumer)里取数据出来
package cloudy.spout; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import kafka.consumers.OrderConsumer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class OrderBaseSpout implements IRichSpout { String topic = null; public OrderBaseSpout(String topic) { this.topic = topic ; } /** * 公共基类spout */ private static final long serialVersionUID = 1L; Integer TaskId = null; SpoutOutputCollector collector = null; Queue<String> queue = new ConcurrentLinkedQueue<String>() ; // String aaString = null; @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("order")) ; } @Override public void nextTuple() { // TODO Auto-generated method stub // if (aaString != null) { if (queue.size() > 0) { String str = queue.poll() ; //消费数据 //进行数据过滤 System.err.println("spout nextTuple: str="+str); collector.emit(new Values(str)) ; } } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { // TODO Auto-generated method stub this.collector = collector ; TaskId = context.getThisTaskId() ; // Thread.currentThread().getId() OrderConsumer consumer = new OrderConsumer(topic) ; consumer.start() ; queue = consumer.getQueue() ; //重要,取得consumer里的消息队列数据 // aaString = consumer.getString(); } }
3)DeepVisitBolt 文件,(安装sessionid统计访问量,即按照sessionid分组)
package user_visit; import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class DeepVisitBolt implements IBasicBolt{ private static final long serialVersionUID = 1L; @Override public void cleanup() { // TODO Auto-generated method stub } Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub String dateString = input.getStringByField("date"); String session_id = input.getStringByField("session_id"); Integer count = counts.get(dateString+"_"+session_id); if (count == null) { count = 0; } count ++ ; counts.put(dateString+"_"+session_id,count) ; 日期_sessionid, pv collector.emit(new Values(dateString+"_"+session_id,count)) ; } }
4)UVSumBolt 文件,UV统计
package user_visit; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import tools.DateFmt; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.FailedException; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class UVSumBolt implements IBasicBolt { /** * */ private static final long serialVersionUID = 1L; Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void cleanup() { // TODO Auto-generated method stub } long beginTime = System.currentTimeMillis() ; long endTime = 0; String cur_date = null; @Override public void execute(Tuple input, BasicOutputCollector collector) { // TODO Auto-generated method stub try { endTime = System.currentTimeMillis() ; long PV = 0;// 总数 long UV = 0; // 个数,去重后 String dateSession_id = input.getString(0); Integer count = input.getInteger(1); // 跨天的时候清空map if (!dateSession_id.startsWith(cur_date) && DateFmt.parseDate(dateSession_id.split("_")[0]).after( DateFmt.parseDate(cur_date))) { cur_date = dateSession_id.split("_")[0]; counts.clear(); } counts.put(dateSession_id, count); if (endTime - beginTime >= 2000) { // 获取word去重个数,遍历counts 的keySet,取count Iterator<String> i2 = counts.keySet().iterator(); while (i2.hasNext()) { String key = i2.next(); if (key != null) { if (key.startsWith(cur_date)) { UV++; PV += counts.get(key); } } } System.err.println("PV=" + PV + "; UV="+ UV); } } catch (Exception e) { throw new FailedException("SumBolt fail!"); } } }