1水平分庫
最近在做一個IM系統,之前的舊系統沒有考慮到用戶量會增長得這么龐大,導致現在數據庫性能瓶頸非常嚴重,迫切需要分庫,用于減少每個庫的用戶數量,進而分攤負載,最終達到數據庫橫向擴展的目的。
數據庫水平分庫是以用戶Id為分庫的依據,同一個用戶的所有數據都在同一個庫上,每個庫有著相同的表結構。為了實現開發人員來說對數據庫的透明訪問,分庫框架需要解決二個問題:
1、 方法參數中有用戶id的數據的新增,查詢及修改
2、 方法參數中無用戶id的數據的查詢
2用戶id
把用戶名和密碼所在的表定義為用戶表,用戶id即是用戶表中的惟一性標識的整形值,如果用戶的用戶名只有一種方式,那么id可以是用戶名的hash值,此時用戶表也是分庫的;如果用戶的用戶名有多種方式,比如允許用戶使用email登陸,也允許用戶使用手機號碼登陸,那么用戶id應該是用戶表中的遞增字段值,此時用戶表應該是不分庫的,這時可以把用戶表獨立為另一個庫,稱之為認證庫。我們的項目應用是屬于后者。
1 3 解決方案
3.1 說明
簡單服務即為DAO,每個domain都對應一個簡單服務,簡單服務之間不允許互相依賴;復雜服務可以依賴多個簡單服務,但不能直接訪問數據庫,復雜服務對數據庫的操作必須通過單簡單服務。
使用hibernate作為訪問數據庫的中間層,結合Spring的Aop攔截方法,簡單服務代理與簡單服務實現相同的接口,一個簡單服務對應二個實例,一個引用動態獲取數據庫連接的sessionFactory,另一個引用Hibernate Shards的sessionFactory
3.2 方法參數中有用戶Id
Spring Aop攔截簡單服務代理的所有方法,如果方法的第一個參數為userid,則將userid
放到當前線程中,并選擇引用動態獲取數據庫連接的sessionFactory的簡單服務實例,在獲取數據庫連接時根據當前線程的userid選擇相應連接,流程如下:

3.3 方法參數中無用戶Id
Spring Aop攔截簡單服務代理的所有方法,如果方法的第一個參數為非userid,選擇引用Hibernate Shards的sessionFactory的簡單服務實例,遍歷所有數據庫,并返回匯總后的數據。這種情況下只允許讀,不允許寫。流程如下:

1 4實現
4.1 簡單服務代理
對每個簡單服務用jdk動態代理生成一個代理對像,復雜服務依賴代理對像。
4.2 實例化
在簡單服務類上標注@DetachDbService,則會產生三個實例(框架實現):
1. 簡單服務代理實例
2. 引用動態獲取數據庫連接的sessionFactory的簡單服務實例
3. 引用Hibernate Shards的sessionFactory簡單服務實例
4.3 方法參數
如果是到某個庫獲取數據,則第一個參數必須為Long或者UseridAble類型,用于獲取userid
4.4 userid與數據庫關系
可選方案
|
優點
|
缺點
|
按號段分
|
可部分遷移
|
數據分布不均
|
取模
|
數據分布均勻
|
遷移數據量是1/(n+1),不能按服務器性能分配
|
在認證庫中保存數據庫配置
|
靈活,可部分遷移
|
查詢前需要先從數據庫或緩存中獲得此配置
|
總的來說,取模是最優方案,但是考慮到服務器性能可能不一致,而又需要充分利用服務器資源,所以需要在取模的同時加上權重。比如現在有二臺數據庫,權重為1:2,那么用戶id先對3取模,0的為第一臺服務器,1,2的為第二臺服務器。
4.5精確分頁
由于hibernate shards不能到某個庫或者其中的幾個庫中去查詢,并且它的分頁是先到所有的庫中將所有符合條件的數據取回到內存中再進行分頁,所以不可能使用它的分頁。
先用hibernate shards到各個庫上查出符合條件的數目及數據庫標識(標識為查詢表中最小用戶id),返回結果后對標識進行排序(這樣確保同樣的查詢條件在翻頁的時候能夠以同樣的順序查詢數據庫,以達到精確查詢的目的)。根據這個結果計算出每個數據庫取值的段,然后用動態數據庫連接按之前排好的順序遍歷數據庫進行查找,段為0的直接跳過,找滿結果則返回。
比如現在有3個庫,要查詢所在地為深圳的用戶,通過hibernate shards查得數據如下:
|
深圳地區用戶總數
|
深圳特區用戶最小id
|
DB1
|
7
|
2
|
DB2
|
5
|
1
|
DB3
|
30
|
3
|
這時按用戶最小id排序結果是DB2,DB1,DB3
假設每頁10條記錄,
第一頁的數據是從DB2中取5條,DB1中取前5條,不需要到DB3去取
第二頁的數據是從DB1中取后2條,在DB3中取前8條,不需要到DB1中去取
第三頁數據是從DB3中取第9到第18條,不需要到DB1和DB2中去取
… …
缺點:不能精確排序
5關鍵代碼
Java代碼

- package com.konceptusa.infinet.annotation;
-
- import java.lang.annotation.Documented;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
-
- import org.springframework.beans.factory.annotation.Autowire;
-
- /**
- * 簡單服務類實例化標注
- * @author Jwin
- *
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target( { ElementType.TYPE })
- @Documented
- public @interface DetachDbService
- {
- boolean lazy() default false;
- Autowire autoWire() default Autowire.BY_NAME;
- String init() default "";
- String destroy() default "";
- }
package com.konceptusa.infinet.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.beans.factory.annotation.Autowire;
/**
* 簡單服務類實例化標注
* @author Jwin
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target( { ElementType.TYPE })
@Documented
public @interface DetachDbService
{
boolean lazy() default false;
Autowire autoWire() default Autowire.BY_NAME;
String init() default "";
String destroy() default "";
}
Java代碼

- package com.konceptusa.infinet.annotation.handler;
-
- import java.util.ArrayList;
- import java.util.List;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.aop.framework.ProxyFactoryBean;
- import org.springframework.beans.MutablePropertyValues;
- import org.springframework.beans.factory.config.RuntimeBeanReference;
- import org.springframework.beans.factory.support.RootBeanDefinition;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
- import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
- import com.konceptusa.infinet.annotation.DetachDbService;
-
- /**
- * 向spring中注冊簡單服務代理實例,引用動態數據庫連結的簡單服務實例,引用hibernate shards的簡單服務實例
- * @author Jwin
- *
- */
- public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
- {
- private final static String SESSIONFACTORYNAME = "sessionFactory";
- public final static String DYNAMIC_POSTFIX = "Dynamic";
- public final static String SHARDS_POSTFIX = "Shards";
- private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
-
- private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
-
- public Class annotation()
- {
- return DetachDbService.class;
- }
-
- @Override
- protected void handle(DetachDbService s, Class target)
- {
- String name = target.getSimpleName();
- if (!name.endsWith("ServiceImpl"))
- {
- throw new IllegalConfigException(target.getName()
- + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
- }
- name = getBeanName(name);
- String dynamicName = name + DYNAMIC_POSTFIX;
- String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
- //生成動態獲取數據庫連接的簡單服務實例
- createBean(s, target, dynamicName, dynamicSessionFactory);
- String shardsName = name + SHARDS_POSTFIX;
- String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
- //生成查詢所有數據庫的簡單服務實例
- createBean(s, target, shardsName, shardsFactory);
- //生成簡單服務代理類
- RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
- List<String> interceptorNamesList = new ArrayList<String>();
- interceptorNamesList.add(DETACHDBINTERCEPTOR);
- mpv.addPropertyValue("interceptorNames", interceptorNamesList);
- definition.setPropertyValues(mpv);
- registerBeanDefinition(name, definition);
- }
-
- private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
- {
- RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
- beanDefinition.setPropertyValues(mpv);
- registerBeanDefinition(name, beanDefinition);
- }
-
- private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
- {
- RootBeanDefinition definition = new RootBeanDefinition();
- definition.setAbstract(false);
- definition.setBeanClass(target);
- definition.setSingleton(true);
- definition.setLazyInit(s.lazy());
- definition.setAutowireCandidate(true);
- definition.setAutowireMode(s.autoWire().value());
-
- if (!"".equals(s.init()))
- {
- definition.setInitMethodName(s.init().trim());
- }
- if (!"".equals(s.destroy()))
- {
- definition.setDestroyMethodName(s.destroy().trim());
- }
-
- if (LOG.isDebugEnabled())
- {
- LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
- }
- SpringAnnotationUtils.readProperties(target, definition);
- return definition;
- }
-
- private String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
-
- }
package com.konceptusa.infinet.annotation.handler;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
import com.konceptusa.infinet.annotation.DetachDbService;
/**
* 向spring中注冊簡單服務代理實例,引用動態數據庫連結的簡單服務實例,引用hibernate shards的簡單服務實例
* @author Jwin
*
*/
public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
{
private final static String SESSIONFACTORYNAME = "sessionFactory";
public final static String DYNAMIC_POSTFIX = "Dynamic";
public final static String SHARDS_POSTFIX = "Shards";
private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
public Class annotation()
{
return DetachDbService.class;
}
@Override
protected void handle(DetachDbService s, Class target)
{
String name = target.getSimpleName();
if (!name.endsWith("ServiceImpl"))
{
throw new IllegalConfigException(target.getName()
+ " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
}
name = getBeanName(name);
String dynamicName = name + DYNAMIC_POSTFIX;
String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
//生成動態獲取數據庫連接的簡單服務實例
createBean(s, target, dynamicName, dynamicSessionFactory);
String shardsName = name + SHARDS_POSTFIX;
String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
//生成查詢所有數據庫的簡單服務實例
createBean(s, target, shardsName, shardsFactory);
//生成簡單服務代理類
RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
MutablePropertyValues mpv = new MutablePropertyValues();
mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
List<String> interceptorNamesList = new ArrayList<String>();
interceptorNamesList.add(DETACHDBINTERCEPTOR);
mpv.addPropertyValue("interceptorNames", interceptorNamesList);
definition.setPropertyValues(mpv);
registerBeanDefinition(name, definition);
}
private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
{
RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
MutablePropertyValues mpv = new MutablePropertyValues();
mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
beanDefinition.setPropertyValues(mpv);
registerBeanDefinition(name, beanDefinition);
}
private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
{
RootBeanDefinition definition = new RootBeanDefinition();
definition.setAbstract(false);
definition.setBeanClass(target);
definition.setSingleton(true);
definition.setLazyInit(s.lazy());
definition.setAutowireCandidate(true);
definition.setAutowireMode(s.autoWire().value());
if (!"".equals(s.init()))
{
definition.setInitMethodName(s.init().trim());
}
if (!"".equals(s.destroy()))
{
definition.setDestroyMethodName(s.destroy().trim());
}
if (LOG.isDebugEnabled())
{
LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
}
SpringAnnotationUtils.readProperties(target, definition);
return definition;
}
private String getBeanName(String name)
{
name = name.substring(0, name.length() - "Impl".length());
name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
return name;
}
}
Java代碼

- package com.konceptusa.infinet.detach.aop;
-
- import org.aopalliance.intercept.MethodInterceptor;
- import org.aopalliance.intercept.MethodInvocation;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.util.MethodInvoker;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.UseridAble;
- import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
-
- /**
- * 分庫簡單服務代理
- * @author Jwin
- *
- */
- public class DetachDBInterceptor implements MethodInterceptor
- {
- private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
- public Object invoke(MethodInvocation invoke) throws Throwable
- {
- int len = invoke.getArguments().length;
- Long id = null;
- if(len >= 1)
- {
- Object arg = invoke.getArguments()[0];
- if(arg instanceof UseridAble)
- {
- UseridAble useridAble = (UseridAble) arg;
- id = useridAble.getUserid();
- }
- else if(arg instanceof Long)
- {
- id = (Long) arg;
- }
- }
- if(id != null)
- {
- UseridContextHolder.setUserid(id);
- try
- {
- return invoke(invoke, id);
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- else
- {
- return invoke(invoke, id);
- }
- }
- private Object invoke(MethodInvocation invoke, Long id) throws Throwable
- {
- String str = invoke.getThis().toString();
- int start = str.lastIndexOf(".");
- int end = str.lastIndexOf("@");
- String className = str.substring(start + 1, end);
- String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
- if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
- {
- postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
- }
- String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
- if(LOG.isDebugEnabled())
- LOG.debug("select service " + serviceName + " for userid = " + id);
- Object service = ObjectFactory.getManagedObject(serviceName);
- if(service == null)
- {
- throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
- }
- MethodInvoker invoker = new MethodInvoker();
- invoker.setArguments(invoke.getArguments());
- invoker.setTargetObject(service);
- invoker.setTargetMethod(invoke.getMethod().getName());
- invoker.prepare();
- return invoker.invoke();
- }
-
- }
package com.konceptusa.infinet.detach.aop;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.MethodInvoker;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.framework.core.support.ObjectFactory;
import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
import com.konceptusa.infinet.detach.UseridAble;
import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
/**
* 分庫簡單服務代理
* @author Jwin
*
*/
public class DetachDBInterceptor implements MethodInterceptor
{
private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
public Object invoke(MethodInvocation invoke) throws Throwable
{
int len = invoke.getArguments().length;
Long id = null;
if(len >= 1)
{
Object arg = invoke.getArguments()[0];
if(arg instanceof UseridAble)
{
UseridAble useridAble = (UseridAble) arg;
id = useridAble.getUserid();
}
else if(arg instanceof Long)
{
id = (Long) arg;
}
}
if(id != null)
{
UseridContextHolder.setUserid(id);
try
{
return invoke(invoke, id);
}finally
{
UseridContextHolder.removeUserid();
}
}
else
{
return invoke(invoke, id);
}
}
private Object invoke(MethodInvocation invoke, Long id) throws Throwable
{
String str = invoke.getThis().toString();
int start = str.lastIndexOf(".");
int end = str.lastIndexOf("@");
String className = str.substring(start + 1, end);
String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
{
postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
}
String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
if(LOG.isDebugEnabled())
LOG.debug("select service " + serviceName + " for userid = " + id);
Object service = ObjectFactory.getManagedObject(serviceName);
if(service == null)
{
throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
}
MethodInvoker invoker = new MethodInvoker();
invoker.setArguments(invoke.getArguments());
invoker.setTargetObject(service);
invoker.setTargetMethod(invoke.getMethod().getName());
invoker.prepare();
return invoker.invoke();
}
}
Java代碼

- package com.konceptusa.infinet.detach.datasource;
-
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
-
- import javax.sql.DataSource;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
- import org.springframework.util.Assert;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
- import com.konceptusa.infinet.detach.service.ISelectDBService;
-
- /**
- * 動態獲取數據庫連接基類
- * @author Jwin
- *
- */
- public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
- {
- private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
- public final static int defaultDataSourceId = -1;
- protected MultiHibernateProperties multiHibernateProperties;
- protected ISelectDBService selectDBService;
- private String newWeights;
- private String oldWeights;
- private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
- public void setSelectDBService(ISelectDBService selectDBService)
- {
- this.selectDBService = selectDBService;
- }
- public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
- {
- this.multiHibernateProperties = multiHibernateProperties;
- }
- @Override
- protected Object determineCurrentLookupKey()
- {
- Long id = UseridContextHolder.getUserid();
- return selectDBService.selectDb(id);
- }
-
- @Override
- public void afterPropertiesSet()
- {
- LOG.info("init dynamic datasource start");
- Assert.notNull(multiHibernateProperties);
- Assert.notNull(selectDBService);
- List<Properties> properties = multiHibernateProperties.getShardProperties();
- Assert.notEmpty(properties);
- int dataSourceCount = 0;
- for(Properties p : properties)
- {
- dataSourceCount++;
- createDataSource(dataSourceMap, p);
- }
- createDefaultDataSource(dataSourceMap);
- selectDBService.setDefaultDataSourceId(defaultDataSourceId);
- selectDBService.setDataSourceCount(dataSourceCount);
- setTargetDataSources(dataSourceMap);
- setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
- initWeight(dataSourceCount);
- super.afterPropertiesSet();
- LOG.info("init dynamic datasource success");
- }
- public void initWeight(int dataSourceCount)
- {
- Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
- Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
- int totalOldWeight = 0;
- int totalNewWeight = 0;
- if(newWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("newWeights " + newWeights);
- String[] weights = StringUtils.split(newWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- newWeightMap.put(totalNewWeight + j, i);
- }
- totalNewWeight += w;
- }
- }
- else
- {
- totalNewWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- newWeightMap.put(i, i);
- }
- }
- if(oldWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("oldWeights " + oldWeights);
- String[] weights = StringUtils.split(oldWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- oldWeightMap.put(totalOldWeight + j, i);
- }
- totalOldWeight += w;
- }
- }
- else
- {
- totalOldWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- oldWeightMap.put(i, i);
- }
- }
- if(LOG.isInfoEnabled())
- LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
- selectDBService.setTotalNewWeight(totalNewWeight);
- selectDBService.setNewWeightIdMap(newWeightMap);
- selectDBService.setTotalOldWeight(totalOldWeight);
- selectDBService.setOldWeightIdMap(oldWeightMap);
- }
- protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
- protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
- public void setNewWeights(String newWeights)
- {
- this.newWeights = newWeights;
- }
- public void setOldWeights(String oldWeights)
- {
- this.oldWeights = oldWeights;
- }
- public Map<Integer, DataSource> getDataSourceMap()
- {
- return dataSourceMap;
- }
-
- }
package com.konceptusa.infinet.detach.datasource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.Assert;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
import com.konceptusa.infinet.detach.service.ISelectDBService;
/**
* 動態獲取數據庫連接基類
* @author Jwin
*
*/
public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
{
private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
public final static int defaultDataSourceId = -1;
protected MultiHibernateProperties multiHibernateProperties;
protected ISelectDBService selectDBService;
private String newWeights;
private String oldWeights;
private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
public void setSelectDBService(ISelectDBService selectDBService)
{
this.selectDBService = selectDBService;
}
public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
{
this.multiHibernateProperties = multiHibernateProperties;
}
@Override
protected Object determineCurrentLookupKey()
{
Long id = UseridContextHolder.getUserid();
return selectDBService.selectDb(id);
}
@Override
public void afterPropertiesSet()
{
LOG.info("init dynamic datasource start");
Assert.notNull(multiHibernateProperties);
Assert.notNull(selectDBService);
List<Properties> properties = multiHibernateProperties.getShardProperties();
Assert.notEmpty(properties);
int dataSourceCount = 0;
for(Properties p : properties)
{
dataSourceCount++;
createDataSource(dataSourceMap, p);
}
createDefaultDataSource(dataSourceMap);
selectDBService.setDefaultDataSourceId(defaultDataSourceId);
selectDBService.setDataSourceCount(dataSourceCount);
setTargetDataSources(dataSourceMap);
setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
initWeight(dataSourceCount);
super.afterPropertiesSet();
LOG.info("init dynamic datasource success");
}
public void initWeight(int dataSourceCount)
{
Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
int totalOldWeight = 0;
int totalNewWeight = 0;
if(newWeights != null)
{
if(LOG.isInfoEnabled())
LOG.info("newWeights " + newWeights);
String[] weights = StringUtils.split(newWeights,";");
if(weights.length > dataSourceCount)
{
throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
}
for(int i=0;i<weights.length;i++)
{
int w = Integer.parseInt(weights[i]);
for(int j=0;j<w;j++)
{
newWeightMap.put(totalNewWeight + j, i);
}
totalNewWeight += w;
}
}
else
{
totalNewWeight = dataSourceCount;
for(int i=0;i<dataSourceCount;i++)
{
newWeightMap.put(i, i);
}
}
if(oldWeights != null)
{
if(LOG.isInfoEnabled())
LOG.info("oldWeights " + oldWeights);
String[] weights = StringUtils.split(oldWeights,";");
if(weights.length > dataSourceCount)
{
throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
}
for(int i=0;i<weights.length;i++)
{
int w = Integer.parseInt(weights[i]);
for(int j=0;j<w;j++)
{
oldWeightMap.put(totalOldWeight + j, i);
}
totalOldWeight += w;
}
}
else
{
totalOldWeight = dataSourceCount;
for(int i=0;i<dataSourceCount;i++)
{
oldWeightMap.put(i, i);
}
}
if(LOG.isInfoEnabled())
LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
selectDBService.setTotalNewWeight(totalNewWeight);
selectDBService.setNewWeightIdMap(newWeightMap);
selectDBService.setTotalOldWeight(totalOldWeight);
selectDBService.setOldWeightIdMap(oldWeightMap);
}
protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
public void setNewWeights(String newWeights)
{
this.newWeights = newWeights;
}
public void setOldWeights(String oldWeights)
{
this.oldWeights = oldWeights;
}
public Map<Integer, DataSource> getDataSourceMap()
{
return dataSourceMap;
}
}
package com.konceptusa.infinet.detach.datasource;
import java.beans.PropertyVetoException;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
import com.mchange.v2.c3p0.ComboPooledDataSource;
/**
* 基于c3p0連接池的動態獲取連接類
* @author Jwin
*
*/
public class DynamicC3p0DataSource extends AbstractDynamicDataSource
{
private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class);
private int initialSize = 1;
private int maxActive = 1;
private int minActive = 1;
private int maxIdleTime = 30;
private String automaticTestTable = "Test";
private int acquireIncrement = 3;
private int maxStatements = 100;
private int maxStatementsPerConnection = 3;
private int numHelperThreads = 3;
private int idleConnectionTestPeriod = 30;
protected void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap)
{
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setUser("sa");
dataSource.setPassword("");
dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase());
try
{
dataSource.setDriverClass("org.hsqldb.jdbcDriver");
} catch (PropertyVetoException e)
{
throw new IllegalConfigException(e);
}
dataSource.setInitialPoolSize(initialSize);
dataSource.setMaxPoolSize(maxActive);
dataSource.setMinPoolSize(minActive);
dataSource.setMaxIdleTime(maxIdleTime);
dataSource.setAcquireIncrement(acquireIncrement);
dataSource.setNumHelperThreads(numHelperThreads);
dataSource.setAutomaticTestTable(automaticTestTable);
dataSource.setMaxStatements(maxStatements);
dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
dataSourceMap.put(defaultDataSourceId, dataSource);
}
@Override
protected void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p)
{
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey));
LOG.info("init datasource url " + dataSource.getJdbcUrl());
dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey));
dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey));
try
{
dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey));
} catch (PropertyVetoException e)
{
throw new IllegalConfigException(e);
}
dataSource.setInitialPoolSize(initialSize);
dataSource.setMaxPoolSize(maxActive);
dataSource.setMinPoolSize(minActive);
dataSource.setMaxIdleTime(maxIdleTime);
dataSource.setAcquireIncrement(acquireIncrement);
dataSource.setNumHelperThreads(numHelperThreads);
dataSource.setAutomaticTestTable(automaticTestTable);
dataSource.setMaxStatements(maxStatements);
dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
String id = p.getProperty(MultiHibernateProperties.shardIdKey);
dataSourceMap.put(Integer.parseInt(id), dataSource);
}
public void setInitialSize(int initialSize)
{
this.initialSize = initialSize;
}
public void setMaxActive(int maxActive)
{
this.maxActive = maxActive;
}
public void setMaxIdleTime(int maxIdle)
{
this.maxIdleTime = maxIdle;
}
public void setAcquireIncrement(int acquireIncrement)
{
this.acquireIncrement = acquireIncrement;
}
public void setMaxStatements(int maxStatements)
{
this.maxStatements = maxStatements;
}
public void setMaxStatementsPerConnection(int maxStatementsPerConnection)
{
this.maxStatementsPerConnection = maxStatementsPerConnection;
}
public void setNumHelperThreads(int numHelperThreads)
{
this.numHelperThreads = numHelperThreads;
}
public void setAutomaticTestTable(String automaticTestTable)
{
this.automaticTestTable = automaticTestTable;
}
public void setMinActive(int minActive)
{
this.minActive = minActive;
}
public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod)
{
this.idleConnectionTestPeriod = idleConnectionTestPeriod;
}
}
Java代碼

- package com.konceptusa.infinet.imsupport.detach;
-
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.transaction.annotation.Propagation;
- import org.springframework.transaction.annotation.Transactional;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.dao.HibernateQueryListCallback;
- import com.konceptusa.framework.core.dao.hql.Hql;
- import com.konceptusa.framework.core.service.BaseServiceSupport;
- import com.konceptusa.framework.core.service.Page;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.CountId;
- import com.konceptusa.infinet.detach.CountIdComparetor;
- import com.konceptusa.infinet.detach.MagrateAble;
- import com.konceptusa.infinet.detach.QueryListAble;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
-
- /**
- * 多個數據庫綜合查詢,簡單服務類父類
- * @author Jwin
- *
- * @param <T>
- */
- @Transactional(readOnly=true, rollbackFor = Exception.class)
- public abstract class BaseServiceSupportForMulti<T> extends BaseServiceSupport<T> implements QueryListAble<T>,MagrateAble<T>
- {
- private final static Log LOG = LogFactory.getLog(BaseServiceSupportForMulti.class);
- @Override
- protected int findCountByHql(Hql hql)
- {
- List<Long> countList = (List<Long>) getHibernateTemplate().execute(
- new HibernateQueryListCallback(new Hql("select count(*) "
- + hql.getHql(), hql.getCache(), hql.getParameters())));
- Long counts = 0L;
- for(Long count : countList)
- {
- counts += count;
- }
- return counts.intValue();
- }
- @Transactional(readOnly=true, rollbackFor = Exception.class,propagation=Propagation.NOT_SUPPORTED)
- public List<T> queryList(Hql hql, int from, int offset)
- {
- return queryListByHql(hql, from, offset);
- }
-
- public List<CountId> queryCount(Hql hql)
- {
- List<Object[]> list = queryListByHql(hql);
- List<CountId> countList = new ArrayList<CountId>(list.size());
- for(Object[] l : list)
- {
- if(l[1] != null)
- {
- CountId count = new CountId((Long) l[1],(Long)l[0]);
- countList.add(count);
- }
- }
- Collections.sort(countList, new CountIdComparetor());
- return countList;
- }
- protected String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
- protected Page queryPageByHql(Hql hql,String useridName, int start, int offset)
- {
- Hql countHql = new Hql("select count(*),min(" + useridName + ") "
- + hql.getHql(), hql.getCache(), hql.getParameters());
- return queryPageByHql(countHql, hql, start, offset);
- }
- //先查出各個數據庫的總數及標識,然后對標識進行排序,最后根據這個結果遍歷數據庫進行分頁查找,找滿結果則返回。
- private Page queryPageByHql(Hql countHql,Hql listHql,int start, int offset)
- {
- QueryListAble<T> serviceShards = getShardsService();
- QueryListAble<T> serviceDynamic = getDynamicService();
- List<CountId> countList = serviceShards.queryCount(countHql);
- //相對于當前之前所有數據庫的總數偏移
- int totalCount = 0;
- //相對于所有數據庫的結束偏移
- int end = start + offset;
- //相對于當前數據庫的開始偏移量
- int startRelative = -1;
- List<T> queryList = new ArrayList<T>(offset);
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- //之前所有庫總數小于開始偏移量,繼續下一個數據庫
- if(totalCount < start)
- {
- continue;
- }
- //之前所有庫總數第一次大于開始偏移量
- if(startRelative == -1)
- {
- startRelative = count.getCount().intValue() - (totalCount - start);
- }
- else
- {
- startRelative = 0;
- }
- int relativeCount = totalCount - end;
- if(relativeCount >= 0)
- {
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //計算相對于當前庫的偏移
- int offsetRelative = count.getCount().intValue() - relativeCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- break;
- }
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //計算相對于當前庫的偏移
- int offsetRelative = totalCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- } finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- totalCount = 0;
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- }
- return new Page<T>(totalCount, queryList);
- }
- protected Page queryPageByHql(String hqlstr,String useridName, int start, int offset,Object ... values)
- {
- Hql listHql = Hql.createIndexHql(