从AbstractRoutingDataSource说分库分表实现路由规则

    xiaoxiao2021-03-25  156

    本文转自  http://wely.iteye.com/blog/2275725

    很多人不知分库分表怎么实现,可能是把它想得复杂了。事实上,我们将复杂的事情分工后就简单了。如果仅仅是单库分表,那直接在代码中根据分表的维度得到表名后缀,如“0001”,然后比如在mybatis下,sql语句就可以这么写“select * from user_#tbIndex#”。程序中我们能够操作数据库中的表,是因为我们拿到了数据源DataSource,并由此getConnection(),因此对于分库分表,我们首先要实现的是动态数据源,我们根据路由规则确定要访问哪个数据源的哪个表。怎么实现数据源的切换呢?而且多个数据源的连接要怎么管理呢?

        Spring为我们提供了实现方案,核心类是AbstractRoutingDataSource,代码如下:

        

    Java代码    public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {       private Map<Object, Object> targetDataSources;       private Object defaultTargetDataSource;       private boolean lenientFallback = true;       private DataSourceLookup dataSourceLookup = new JndiDataSourceLookup();       private Map<Object, DataSource> resolvedDataSources;       private DataSource resolvedDefaultDataSource;          public AbstractRoutingDataSource() {       }          public void setTargetDataSources(Map<Object, Object> targetDataSources) {           this.targetDataSources = targetDataSources;       }          public void setDefaultTargetDataSource(Object defaultTargetDataSource) {           this.defaultTargetDataSource = defaultTargetDataSource;       }          public void setLenientFallback(boolean lenientFallback) {           this.lenientFallback = lenientFallback;       }          public void setDataSourceLookup(DataSourceLookup dataSourceLookup) {           this.dataSourceLookup = (DataSourceLookup)(dataSourceLookup != null?dataSourceLookup:new JndiDataSourceLookup());       }          public void afterPropertiesSet() {           if(this.targetDataSources == null) {               throw new IllegalArgumentException("Property \'targetDataSources\' is required");           } else {               this.resolvedDataSources = new HashMap(this.targetDataSources.size());               Iterator var2 = this.targetDataSources.entrySet().iterator();                  while(var2.hasNext()) {                   Entry entry = (Entry)var2.next();                   Object lookupKey = this.resolveSpecifiedLookupKey(entry.getKey());                   DataSource dataSource = this.resolveSpecifiedDataSource(entry.getValue());                   this.resolvedDataSources.put(lookupKey, dataSource);               }                  if(this.defaultTargetDataSource != null) {                   this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);               }              }       }          protected DataSource resolveSpecifiedDataSource(Object dataSource) throws IllegalArgumentException {           if(dataSource instanceof DataSource) {               return (DataSource)dataSource;           } else if(dataSource instanceof String) {               return this.dataSourceLookup.getDataSource((String)dataSource);           } else {               throw new IllegalArgumentException("Illegal data source value - only [javax.sql.DataSource] and String supported: " + dataSource);           }       }          public Connection getConnection() throws SQLException {           return this.determineTargetDataSource().getConnection();       }          public Connection getConnection(String username, String password) throws SQLException {           return this.determineTargetDataSource().getConnection(username, password);       }          protected DataSource determineTargetDataSource() {           Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");           Object lookupKey = this.determineCurrentLookupKey();           DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);           if(dataSource == null && (this.lenientFallback || lookupKey == null)) {               dataSource = this.resolvedDefaultDataSource;           }              if(dataSource == null) {               throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");           } else {               return dataSource;           }       }          protected Object resolveSpecifiedLookupKey(Object lookupKey) {           return lookupKey;       }          protected abstract Object determineCurrentLookupKey();   }  

         

        AbstractRoutingDataSource实现了AbstractDataSource,该抽象类又继承了javax.sql.DataSource接口。我们常用的org.apache.commons.dbcp.BasicDataSource就是实现了这个接口,该接口的核心方法是getConnection(),AbstractRoutingDataSource实现该方法如下:

     

    Java代码    public Connection getConnection() throws SQLException {       return this.determineTargetDataSource().getConnection();   }  

     

    显然我们要关注选择目标数据源的方法,该方法中两个重要的地方是determineCurrentLookupKey()方法和属性resolvedDataSources。determineCurrentLookupKey()是个抽象方法,需要我们自己去实现,返回的是当前要操作的数据源的标识。resolvedDataSources和resolvedDefaultDataSource是在bean实例化后的操作得到的,即afterPropertiesSet()。下面给出bean的配置:

     

    Xml代码    <bean id="dynamicDataSource" class="org.javared.wely.dao.db.DynamicDataSource">        <property name="targetDataSources">              <map key-type="java.lang.String">                 <entry key="db1" value-ref="dataSource1"/>                 <entry key="db2" value-ref="dataSource2"/>              </map>           </property>           <property name="defaultTargetDataSource" ref="dataSource"/>       </bean>  

     

     DynamicDataSource需实现determineCurrentLookupKey()方法,代码如下:

     

    Java代码    public class DynamicDataSource extends AbstractRoutingDataSource {       public DynamicDataSource() {       }          protected Object determineCurrentLookupKey() {           return DbContextHolder.getDbKey(); // ThreadLocal       }   }  

     

    显然,现在我们的重点是路由规则实现了,即根据某个或几个字段维度找到对应的DB和table,并把dbKey和tbIndex保存于当前线程中。

     

    Xml代码      <bean id="dbRouter" class="org.javared.wely.dao.db.DBRouterImpl">       <property name="dbRules">           <list>               <ref bean="dbRule1" />           </list>       </property>   </bean>      <bean id="dbRule1" class="org.javared.wely.db.DbRule">                  <!-- 维度字段计算得到的long值范围 -->       <property name="routeFieldStart" value="0"></property>       <property name="routeFieldEnd" value="9200000000000000000"></property>                  <!-- db个数 -->       <property name="dbNumber" value="2"></property>                  <!-- 路由规则,分表,分库,既分库又分表 -->       <property name="routeType" value="2"></property>                  <!-- 每个库里分表个数 -->       <property name="tableNumber" value="2"></property>       <property name="dbKeys">           <list>               <value>db1</value>               <value>db2</value>           </list>       </property>   </bean>  

     

    Java代码    public String route(String fieldId) {       if(StringUtils.isEmpty(fieldId)) {           throw new IllegalArgumentException("dbsCount and tablesCount must be both positive!");       } else {               // base64编码得到的字符串取hashcode           int routeFieldInt = RouteUtils.getResourceCode(fieldId);            String dbKey = getDbKey(this.dbRules, routeFieldInt);           return dbKey;       }   }      public static String getDbKey(List<DbRule> rules, int routeFieldInt) {           Object dbRule = null;           if(rules != null && rules.size() > 0) {               String dbKey = null;               Iterator<DbRule> iter = rules.iterator();               while(iter.hasNext()) {                       DbRule item = iter.next();                       if(item.getDbKeys() != null && item.getDbNumber() != 0) {                           long dbIndex = 0L;                           long tbIndex = 0L;                           long mode = (long)item.getDbNumber();                           String tableIndex;                           if(item.getRouteType() == 2 && item.getTableNumber() != 0) {                              // 分库又分表                               mode = (long)(item.getDbNumber() * item.getTableNumber());                               dbIndex = (long)routeFieldInt % mode / (long)item.getTableNumber();                               tbIndex = (long)(routeFieldInt % item.getTableNumber());                               tableIndex = getFormateTableIndex(item.getTableIndexStyle(), tbIndex);                               DbContextHolder.setTableIndex(tableIndex);                           } else if(item.getRouteType() == 0) { // 只分库                               mode = (long)item.getDbNumber();                               dbIndex = (long)routeFieldInt % mode;                           } else if(item.getRouteType() == 1) { // 只分表                               tbIndex = (long)(routeFieldInt % item.getTableNumber());                               tableIndex = getFormateTableIndex(item.getTableIndexStyle(), tbIndex);                               DbContextHolder.setTableIndex(tableIndex);                           }                              dbKey = (String)item.getDbKeys().get(Long.valueOf(dbIndex).intValue());                           log.info("resource:{}------->dbkey:{},tableIndex:{},"new Object[]{Integer.valueOf(routeFieldInt), dbKey, Long.valueOf(tbIndex)});                           DbContextHolder.setDbKey(dbKey);                       }                       break;               }                  return dbKey;           } else {               throw new IllegalArgumentException("dbsCount and tablesCount must be both positive!");           }       }  

     

    Java代码    public class RouteUtils {       private static final Logger log = LoggerFactory.getLogger(RouteUtils.class);       private static final String encode = "utf-8";       private static final int resourceMax = 10000;          public RouteUtils() {       }          public static int getHashCodeBase64(String routeValue) {           int hashCode = 0;              try {               String e = Base64Binrary.encodeBase64Binrary(routeValue.getBytes("utf-8"));               hashCode = Math.abs(e.hashCode());           } catch (Exception var3) {               log.error("hashCode 失败", var3);           }              return hashCode;       }          public static int getResourceCode(String routeValue) {           int hashCode = getHashCodeBase64(routeValue);           int resourceCode = hashCode % 10000;           return resourceCode;       }          public static void main(String[] args) {           String payid = "140331160123935469773";           String resource = payid.substring(payid.length() - 4);           int routeFieldInt = Integer.valueOf(resource).intValue();           short mode = 1200;           int dbIndex = routeFieldInt % mode / 200;           int tbIndex = routeFieldInt % 200;           System.out.println(dbIndex + "-->" + tbIndex);       }   }  

     

        应用时,先执行dbRouter.route(field),这时dynamicDataSource.getConnection()得到的就是当前线程需要对应的数据源连接,DbContextHolder.getTableIndex()得到的是当前线程需要对应的表名后缀。

     

     最后,对于dbRouter.route(field)和DbContextHolder.getTableIndex(),我们可以用注解的方式来处理,这样程序员只需在代码中加入注解即可。下面给出一种解决方案:

    Java代码    @Retention(RetentionPolicy.RUNTIME)   @Target({ElementType.METHOD})   public @interface DoRoute {       String routeField() default "userId";          String tableStyle() default "_0000";   }      @Aspect   @Component   public class DBRouterInterceptor {       private static final Logger log = LoggerFactory.getLogger(DBRouterInterceptor.class);       private DBRouter dBRouter;          public DBRouterInterceptor() {       }          @Pointcut("@annotation( com.jd.jr.baitiao.dbrouter.annotation.DoRoute)")       public void aopPoint() {       }          @Before("aopPoint()")       public Object doRoute(JoinPoint jp) throws Throwable {           long t1 = System.currentTimeMillis();           boolean result = true;           Method method = this.getMethod(jp);           DoRoute doRoute = (DoRoute)method.getAnnotation(DoRoute.class);           String routeField = doRoute.routeField();           Object[] args = jp.getArgs();           if(args != null && args.length > 0) {               for(int i = 0; i < args.length; ++i) {                   long t2 = System.currentTimeMillis();                   String routeFieldValue = BeanUtils.getProperty(args[i], routeField);                   if(StringUtils.isNotEmpty(routeFieldValue)) {                       if("userId".equals(routeField)) {                           this.dBRouter.doRouteByResource("" + RouteUtils.getResourceCode(routeFieldValue));                       } else {                           String resource = routeFieldValue.substring(routeFieldValue.length() - 4);                           this.dBRouter.doRouteByResource(resource);                       }                       break;                   }               }           }              log.info("doRouteTime{}" + (System.currentTimeMillis() - t1));           return Boolean.valueOf(result);       }          private Method getMethod(JoinPoint jp) throws NoSuchMethodException {           Signature sig = jp.getSignature();           MethodSignature msig = (MethodSignature)sig;           return this.getClass(jp).getMethod(msig.getName(), msig.getParameterTypes());       }          private Class<? extends Object> getClass(JoinPoint jp) throws NoSuchMethodException {           return jp.getTarget().getClass();       }          public DBRouter getdBRouter() {           return this.dBRouter;       }          public void setdBRouter(DBRouter dBRouter) {           this.dBRouter = dBRouter;       }   }  

     上面定义了一个切面,需要在spring配置文件中加上<aop:aspectj-autoproxy />,这样spring会发现切面并织入到匹配的目标bean中。

     

    附:生产环境配置参数参考

     

    Xml代码    sqlMapConfig配置   <settings cacheModelsEnabled="false" enhancementEnabled="true"           lazyLoadingEnabled="false" errorTracingEnabled="true" maxRequests="200"           maxSessions="60" maxTransactions="20" useStatementNamespaces="true"           defaultStatementTimeout="2" />    

     

     

    <bean id="dataSource1" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="${db.jdbc.driverClassName}" /> <property name="url" value="${db1.jdbc.url}" /> <property name="username" value="${db1.jdbc.username}" /> <property name="password" value="${db1.jdbc.password}" /> <property name="maxActive" value="20" /> <property name="maxIdle" value="3" /> <property name="maxWait" value="15000" /> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <property name="minEvictableIdleTimeMillis" value="180000" /> </bean>

     

    <!-- MQ发消息线程池 --> <bean id="taskMqExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > <!-- 核心线程数 --> <property name="corePoolSize" value="10" /> <!-- 最大线程数 --> <property name="maxPoolSize" value="200" /> <!-- 队列最大长度 --> <property name="queueCapacity" value="500" /> <!-- 线程池维护线程所允许的空闲时间 --> <property name="keepAliveSeconds" value="5" /> <!-- 线程池对拒绝任务(无线程可用)的处理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$DiscardPolicy" /> </property> </bean>

    转载请注明原文地址: https://ju.6miu.com/read-3822.html

    最新回复(0)