Mycat为了最高效的利用后端的MySQL连接,采取了不同于Cobar也不同于传统JDBC连接池的做法,传统的做法是基于Database的连接池,即一个MySQL服务器上有5个Database,则每个Database独占最大200个连接。这种模式的最大问题在于,将一个数据库所具备的最大1000个连接,隔离成了更新小的连接池,于是可能产生一个应用的连接不够,但其他应用的连接却很空闲的资源浪费情况,而对于分片这种场景,这个缺陷则几乎是致命的,因为每个分片所对应的Database的连接数量被限制在了一个很小的范围内,从而导致系统并发能力的大幅降低。而Mycat则采用了基于MySQL实例的连接池模式,每个Database都可以用现有的1000个连接中的空闲连接。
代码解读 在Mycat的连接池里,当前可用的MySQL连接是放到一个HashMap的数据结构里,Key为当前连接对应的Database,另外还有二级分类,即按照连接是自动提交还是手动提交模式进行区分,这个设计是为了高效的查询匹配的可用连接,具体逻辑如下: 当某个用户会话需要一个自动提交的,到分片dn1(对应db1)的SQL连接的时候,连接池首先找是否有db1上的可用连接,如果有,看是否有自动提交模式的连接,找到就返回,否则返回db1上的手动提交模式的连接,若没有db1的可用连接,则随机返回一个其他db对应的可用连接,若没有可用连接,并且连接池还没达到上限,则创建一个新连接并返回,这个逻辑过程,我们会发现,用户会话得到的连接可能不是他原先想要的,比如Database不对应,或者事务模式不匹配,因此在执行具体的SQL之前,还有一个自动同步数据库连接的过程,包括事务隔离级别、事务模式、字符集、Database等四个指标,同步完成以后,才会执行具体的SQL指令。 org.opencloudb.backend目录下包括连接池相关的代码,其中: PhysicalDBNode 是Mycat分片(Datanode)的对应,引用一个连接池对象PhysicalDBPool,PhysicalDBPool里面引用了真正的连接池对象PhysicalDatasource,并且按照读节点和写节点分开引用,实现读写分类和节点切换的功能,其中activedIndex属性表明了当前是哪个写节点的数据源在生效。连接池对象连接池对象PhysicalDatasource里最重要的数据结构是 ConMap,它里面存储有当前的可用连接,它的关键代码如下:
public class ConMap { // key -schema private final ConcurrentHashMap<String, ConQueue> items = new ConcurrentHashMap<String, ConQueue>(); public ConQueue getSchemaConQueue(String schema) { ConQueue queue = items.get(schema); if (queue == null) { ConQueue newQueue = new ConQueue(); queue = items.putIfAbsent(schema, newQueue); return (queue == null) ? newQueue : queue; } return queue; } public BackendConnection tryTakeCon(final String schema, boolean autoCommit) { final ConQueue queue = items.get(schema); BackendConnection con = tryTakeCon(queue, autoCommit); if (con != null) { return con; } else { for (ConQueue queue2 : items.values()) { if (queue != queue2) { con = tryTakeCon(queue2, autoCommit); if (con != null) { return con; } } } } return null; } private BackendConnection tryTakeCon(ConQueue queue, boolean autoCommit) { BackendConnection con = null; if (queue != null && ((con = queue.takeIdleCon(autoCommit)) != null)) { return con; } else { return null; } } }tryTakeCon是获取一个可用连接,代码的逻辑中,首先看对应的Database上是否有可用连接,如果有就立即返回,否则从其他的Dabase上找一个可用连接返回。 MySQLConnection类为具体的MySQL Native连接对象,synAndDoExecute方法则判断获取到的连接是否符合要求,若不符合要求,先同步状态,然后执行具体的SQL。
private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn, int clientCharSetIndex, int clientTxIsoLation, boolean clientAutoCommit) { String xaCmd = null; boolean conAutoComit = this.autocommit; String conSchema = this.schema; // never executed modify sql,so auto commit boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB() || clientAutoCommit; if (expectAutocommit == false && xaTxID != null && xaStatus == 0) { clientTxIsoLation = Isolations.SERIALIZABLE; xaCmd = "XA START " + xaTxID + ';'; } int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1; int charsetSyn = (this.charsetIndex == clientCharSetIndex) ? 0 : 1; int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1; int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1; int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn; if (synCount == 0) { // not need syn connection sendQueryCmd(rrn.getStatement()); return; } CommandPacket schemaCmd = null; StringBuilder sb = new StringBuilder(); if (schemaSyn == 1) { schemaCmd = getChangeSchemaCommand(conSchema); // getChangeSchemaCommand(sb, conSchema); } if (charsetSyn == 1) { getCharsetCommand(sb, clientCharSetIndex); } if (txIsoLationSyn == 1) { getTxIsolationCommand(sb, clientTxIsoLation); } if (autoCommitSyn == 1) { getAutocommitCommand(sb, expectAutocommit); } if (xaCmd != null) { sb.append(xaCmd); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("con need syn ,total syn cmd " + synCount + " commands " + sb.toString() + "schema change:" + (schemaCmd != null) + " con:" + this); } metaDataSyned = false; statusSync = new StatusSync(xaCmd != null, conSchema, clientCharSetIndex, clientTxIsoLation, expectAutocommit, synCount); // syn schema if (schemaCmd != null) { schemaCmd.write(this); } // and our query sql to multi command at last sb.append(rrn.getStatement()); // syn and execute others this.sendQueryCmd(sb.toString()); // waiting syn result... }通过共享一个MySQL上的所有物理连接,并结合连接状态同步的特性,MyCAT的连接池做到了最佳的吞吐量,也在一定程度上提升了整个系统的并发支撑能力。