java自定义hive sparksqlthriftServer连接池

    xiaoxiao2021-12-12  6

    由于公司需要做hive和sparksql查询功能,且都通过thrift server进行查询,那么如何有效的创建链接并缓存链接就是比较关键的步骤了。

    thrift connection类似于jdbc connection只是引入的driver class不同罢了。由于公司集群开了kerbrose,所以需要使用代理用户进行通信,每一次登录的用户都会有一个clusterName(代理用户的名称,集群名)。 所以每一个connection就可能都不一样了。  这里需要一个动态的连接池来保存已经创建和正在使用的链接,并需要有效的释放链接的机制。

    废话不多说,直接上关键代码:

    1.基础类  ObjectPool 

    public abstract class ObjectPool { static Logger logger = org.slf4j.LoggerFactory.getLogger(ObjectPool.class); private static long expirationTime; private static HashMap<String, ConcurrentHashMap<String, ConnectionBean>> locked; private static HashMap<String, ConcurrentLinkedQueue<ConnectionBean>> unlocked; public ObjectPool() { if (locked == null) { locked = new HashMap<>(); } if (unlocked == null) { unlocked = new HashMap<>(); } expirationTime = 30 * 60 * 1000; // 30 minute 过期 } protected abstract ConnectionBean create(); public abstract boolean validate(ConnectionBean o); public abstract void expire(ConnectionBean o); public ConnectionBean get(String clusterName) { synchronized (locked) { String key = Thread.currentThread().getName() + clusterName; logger.info("【POOL】 lock the LOCKED map, the clusterName is {}", clusterName); long now = System.currentTimeMillis(); ConcurrentLinkedQueue<ConnectionBean> beans; if (!unlocked.isEmpty()) { beans = unlocked.get(clusterName); if (beans != null) { while (!beans.isEmpty()) { // 获取头元素,并在资源队列中删除头元素 ConnectionBean bean = beans.poll(); // 如果头元素的时间过期了,那么关闭连接 if (now - bean.getUpdateTime() > expirationTime) { logger.info("【POOL】 the connection is out of time ,bean time is {}", bean.getUpdateTime()); // 释放 expire(bean); bean = null; } else { if (validate(bean)) { logger.info("【POOL】 get the connection from poll and the clusterName is {}", clusterName); bean.setUpdateTime(now); // 放入锁定的队列中并返回 锁定队列需要 locked.get(clusterName).put(key, bean); return bean; } else { // 如果链接已经关闭 unlocked.remove(clusterName); expire(bean); bean = null; } } } } } // 由于unlock可能为空,所以初始化对应的clusterName unlocked.put(clusterName, new ConcurrentLinkedQueue<ConnectionBean>()); // 如果没有链接则新建一个操作 ConnectionBean bean = create(); logger.info("【POOL】 the pool could not provide a connection, so create it,clusterName is {}", clusterName); if (locked.get(clusterName) == null) { logger.info("【POOL】 the clusterName in pool is null, create a new Map in LOCKED, clusterName is {}", clusterName); locked.put(clusterName, new ConcurrentHashMap<String, ConnectionBean>()); } locked.get(clusterName).put(key, bean); return bean; } } public void release(String clusterName) { synchronized (locked) { String key = Thread.currentThread().getName() + clusterName; ConcurrentHashMap<String, ConnectionBean> connectionBeans = locked.get(clusterName); ConnectionBean bean = connectionBeans.get(key); connectionBeans.remove(key); bean.setUpdateTime(System.currentTimeMillis()); unlocked.get(clusterName).add(bean); System.out.println("......................................................" + Thread.currentThread().getName()); } } } 在这个连接池中 包含了locked 和 unlocked 两个连接队列和concurrentHashMap(释放链接主要为了对应key的释放,不使用队列出列)。

    使用get 获取指定的connection,首先 如果连接池中有,且存活时间没有 超过 expirationTime 默认30min 则会为用户使用,但是超过30min则会回收。当获取到链接会,会和当前ThreadName以及集群名称作为key值存储至locked map中。

    2.  ConnectionBeanPool 这个对象继承了抽象类 并实现相应操作

    public class ConnectionBeanPool extends ObjectPool { private String url; // 链接url private String usr; // 账户名 private String pwd; // 密码 public ConnectionBeanPool(String driver, String url, String usr, String pwd) { super(); try { Class.forName(driver).newInstance(); } catch (Exception e) { e.printStackTrace(); } this.url = url; this.usr = usr; this.pwd = pwd; } @Override protected ConnectionBean create() { try { ConnectionBean connectionBean = new ConnectionBean(); Connection connection = DriverManager.getConnection(url, usr, pwd); if (connection == null) { System.out.print("null connection"); } connectionBean.setConnection(connection); connectionBean.setUpdateTime(new Date().getTime()); return connectionBean; } catch (SQLException e) { e.printStackTrace(); return null; } } @Override public void expire(ConnectionBean o) { try { o.getConnection().close(); } catch (SQLException e) { e.printStackTrace(); } } @Override public boolean validate(ConnectionBean o) { try { return (!(o.getConnection()).isClosed()); } catch (SQLException e) { e.printStackTrace(); return false; } } }

    3. bean

    public class ConnectionBean { private Connection connection; // 链接信息 private long updateTime; // 更新时间 public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public long getUpdateTime() { return updateTime; } public void setUpdateTime(long updateTime) { this.updateTime = updateTime; } }

    目前还没对链接数量进行限定。未来需要设置一个最大连接量,如果超过则让请求等待或者返回错误信息。

    转载请注明原文地址: https://ju.6miu.com/read-900193.html

    最新回复(0)