一 .背景 使用java爬取所需的数据,使用spark streaming处理数据后,存入数据库(用的mysql,但不推荐,有很大的后遗症),使用web重新展示出来。
二 .代码
1.原先的想法与实现
数据库连接池的定义(时间自定义) package org.com.wh; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; /** * Created by root on 8/2/16. */ public class ConnectionPoolTitle { private static Log log = LogFactory.getLog(ConnectionPoolTitle.class); private static BasicDataSource bs = null; public static BasicDataSource getDataSource() throws Exception{ if(bs == null){ bs = new BasicDataSource(); bs.setDriverClassName("com.mysql.jdbc.Driver"); bs.setUrl("jdbc:mysql://master:3306/spark"); bs.setUsername("xxxxx"); bs.setPassword("xxxxx"); bs.setMaxActive(500);//设置数据库最大并发数 bs.setInitialSize(50);//数据库初始化时建立的连接数 bs.setMinIdle(50);//最小空闲连接数 bs.setMaxIdle(500);//数据库最大连接数 bs.setMaxWait(1000); bs.setMinEvictableIdleTimeMillis(6);//空闲连接6ms后释放 bs.setTimeBetweenEvictionRunsMillis(60*1000);//检测是否有死掉的线程 bs.setTestOnBorrow(true); } return bs; } public static void shutDownDataSource() throws Exception{ if(bs!=null){ bs.close(); } } public static Connection getConnection(){ Connection connection = null; try { if (bs != null) { connection = bs.getConnection(); } else { connection = getDataSource().getConnection(); } }catch(SQLException e){ log.error(e.getMessage(),e); }catch(Exception e){ log.error(e.getMessage(),e); } return connection; } public static void closeConnection(ResultSet rs, Statement ps, Connection con){ if(rs!=null){ try { rs.close(); }catch(SQLException e){ log.error("rs is Exception"+e.getMessage(),e); } } if(ps!=null){ try { ps.close(); }catch (SQLException e){ log.error("ps is Exception"+e.getMessage(),e); } } if(con!=null){ try { con.close(); }catch (SQLException e){ log.error("con is Exception"+e.getMessage(),e); } } } } 线程池的定义 package org.com.wh; import scala.Tuple2; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by root on 8/6/16. */ public class ThreadPool { private static ExecutorService pool = Executors.newFixedThreadPool(20);//定义一个有界线程池,20个线程 private Tuple2<String,Integer> tuple; public void setTuple(Tuple2<String,Integer> tuple){ this.tuple = tuple; } //将线程放入线程池,同时启动线程 public void putThreadurl(){ pool.execute(new UrlThread(tuple)); } //关闭线程池 public void shutdownPool(){ if(!pool.isShutdown()){ pool.shutdown(); } } } 处理代码 package org.com.wh.test; /** * Created by root on 7/30/16. */ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.*; import org.apache.spark.examples.streaming.StreamingExamples; import org.com.wh.ConnectionPool; import org.com.wh.ConnectionPoolTitle; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; public class JavaKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); private static Statement ps; private static boolean flag = false; private static String zk = "192.168.1.6:2181,192.168.1.5:2181,192.168.1.7:2181"; private JavaKafkaWordCount() { } public static void main(String[] args) throws Exception { StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); // Create the context with 2 seconds batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); Map<String,Integer> topicMap = new HashMap<>(); topicMap.put("test",2); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zk, "test", topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); //数据传进来后,分割处理存入数据库中 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { if(x!=null&&SPACE.split(x).length>=2) { try { Connection conn = ConnectionPoolTitle.getConnection(); conn.setAutoCommit(false); String sql = "insert into num(key,value) values (?,?)"; PreparedStatement ps = conn.prepareStatement(sql); ps.setString(1,SPACE.split(x)[0]); ps.setString(2,SPACE.split(x)[1]); ps.executeUpdate(); conn.commit(); } catch (SQLException e) { e.printStackTrace(); } } return Arrays.asList(SPACE.split(x)); } }); //过滤数据 JavaDStream<String> filters = words.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return s.startsWith("http"); } }); //将数据map成<key,value>形式,并进行一个reduce操作 JavaPairDStream<String, Integer> wordCounts = filters.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) throws Exception { return i1 + i2; } }); //遍历DStream流中的每个数据,存入到数据库中 wordCounts.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() { @Override public Void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception { stringIntegerJavaPairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { //ConnectionPool的定义与ConnectionPoolTitle的一样 Connection conn = ConnectionPool.getConnection(); conn.setAutoCommit(false); String sql = "insert into user(key,num) values (?,?)"; PreparedStatement ps = conn.prepareStatement(sql); ps.setString(1, stringIntegerTuple2._1); ps.setInt(2, stringIntegerTuple2._2); ps.executeUpdate(); conn.commit(); } }); return null; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); } }遇到的问题(至今未解决)
数据库连接connection的获取问题:ConnectionPool(Title).getConnection()会报NullPointException异常,大概原因是并发时数据库连接取完了但是程序还在继续,(我在这似乎有个理解的误区,初始时设置的数据库连接的数量一定了,一旦超过了上线,还能继续获得连接吗,难道不用等待其他的连接释放),每次处理的数据大概60条左右,每次大概在存入数据库30条数据时报错。数据处理问题:程序设置每隔2s采集一次数据统计,但是实际结果是需要n段采集后的总处理得到的,没能将结果重新使用。线程处理问题:尝试过使用线程池来控制数据库的连接,避免NullPointException,但发现思路是有问题的,线程池数量达到上限后,依然在会new一个线程获取数据库连接放在线程队列中等待,数据库会遭遇巨大的访问量(废掉了)。2.改进后的实现
static变量的设置 package org.com.wh.test; import java.util.concurrent.ConcurrentHashMap; /** * Created by root on 8/12/16. */ //设置static变量存储数据,使用ConcurrentHashMap主要是为了去重,以及线程安全 public class StaticMember { private static ConcurrentHashMap<String,String> listtitle = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String,Integer> listnum = new ConcurrentHashMap<>(); public ConcurrentHashMap<String,String> getTitle(){ return listtitle; } public ConcurrentHashMap<String,Integer> getNum(){ return listnum; } } 数据库操作(UrlThread的设置差不多相似) package org.com.wh; import com.mysql.jdbc.exceptions.MySQLIntegrityConstraintViolationException; import org.com.wh.test.StaticMember; import scala.Tuple2; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; /** * Created by root on 8/6/16. */ public class MysqlThread extends Thread { private ConcurrentHashMap<String,String> list; public MysqlThread(ConcurrentHashMap<String,String> list){ super(); this.list = list; } public void run(){ try { Class.forName("com.mysql.jdbc.Driver"); Connection conn = DriverManager.getConnection("jdbc:mysql://master:3306/spark","root","spark"); String sql = "insert into num(key,value) values (?,?)"; PreparedStatement ps = conn.prepareStatement(sql); Iterator<Map.Entry<String,String>> ite = list.entrySet().iterator(); while(ite.hasNext()) { Map.Entry<String,String> ent = ite.next(); ps.setString(1, ent.getKey()); ps.setString(2, ent.getValue()); ps.executeUpdate(); } new StaticMember().getTitle().clear(); conn.close(); } catch (Exception e) { e.printStackTrace(); } } } 处理程序 package org.com.wh.test; /** * Created by root on 7/30/16. */ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Map; import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Pattern; import groovy.lang.Tuple; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.*; import org.apache.spark.examples.streaming.StreamingExamples; import org.com.wh.*; import org.mortbay.log.Log; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; public class JavaKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); private static Statement ps; private static boolean flag = false; private static ThreadPool threadpool = new ThreadPool(); private static StaticMember sm = new StaticMember(); private static String zk = "192.168.1.6:2181,192.168.1.5:2181,192.168.1.7:2181"; private JavaKafkaWordCount() { } public static void main(String[] args) throws Exception { StreamingExamples.setStreamingLogLevels(); SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); // Create the context with 2 seconds batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); Map<String,Integer> topicMap = new HashMap<>(); topicMap.put("test",2); JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zk, "test", topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); /*传入的数据分为3个部分,每部分之间使用SPACE分割,收集有用的元素存入static变量, *一定条件存入数据库,并返回由[0]与[2]组成的Tuple2<> */ JavaDStream<Tuple2<String,Integer>> key = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public Iterable<Tuple2<String, Integer>> call(String s) throws Exception { sm.getTitle().put(SPACE.split(s)[0],SPACE.split(s)[1]); if(sm.getTitle().size()>=150){ new MysqlThread(sm.getTitle()).run(); } return Arrays.asList(new Tuple2<>(SPACE.split(s)[0],Integer.parseInt(SPACE.split(s)[2]))); } }); JavaDStream<Tuple2<String,Integer>> filterkey = key.filter(new Function<Tuple2<String, Integer>, Boolean>() { @Override public Boolean call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return stringIntegerTuple2._1().startsWith("http"); } }); //map为以Tuple2._1(),Tuple2._2()为<key,value>的map并进行reduce操作 JavaPairDStream<String,Integer> count = filterkey.flatMapToPair(new PairFlatMapFunction<Tuple2<String, Integer>, String, Integer>() { @Override public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { return Arrays.asList(new Tuple2<>(stringIntegerTuple2._1(),stringIntegerTuple2._2())); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); //统计每个数据,放入static变量,一定条件后存入数据库 count.foreachRDD(new Function<JavaPairRDD<String, Integer>, Void>() { @Override public Void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRDD) throws Exception { stringIntegerJavaPairRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception { sm.getNum().put(stringIntegerTuple2._1(),stringIntegerTuple2._2()); if(sm.getNum().size()>=150){ new UrlThread(sm.getNum()).run(); } } }); return null; } }); count.print(); jssc.start(); jssc.awaitTermination(); } } 问题总结(出现的新问题)数据的采集:采取数据收集到一定数量后,再通过kafka发送到spark streaming中处理,爬取数据的过程中会需要一定的时间,这样处理会很大的延迟,另外在数据处理时没有一个好的标准来控制数据存入数据库(目前的标准是判断两个地方的数据量是否相等)。static变量的控制:不好控制static变量,在一段数据接收后,如何清空static变量(标准)避免数据的损失。数据库的策略:采用的是使用一个连接来讲所有的数据存储到数据库中,这样的话会需要很长的时间(非常不好),并且如果是在数据还未存完的情况下访问数据库,会造成数据的不准确。线程问题 : 在web程序中并未创建线程,但程序启动显示已有30多个线程存在。三 .总结 就目前来说,可以实现小数据量的情况下,得到原来想要的结果,不过时间稍微有点长(主要是前台访问数据库的时间消耗,不知道为什么),其次,上面提及的两种情况都有一个严重的bug,关于数据的存储问题,没有设置一定的容错机制,会导致数据的存储错误以及缺失,这差不多也是使用mysql来存储spark streaming的关键问题,当然程序中关于并发的处理都不到位,还有很多漏洞,关于这些的问题在后期会进一步的修改完善。当然如果有好的想法可以相互交流学习,毕竟也是刚刚开始学习。