1水平分庫(kù)
最近在做一個(gè)IM系統(tǒng),之前的舊系統(tǒng)沒(méi)有考慮到用戶量會(huì)增長(zhǎng)得這么龐大,導(dǎo)致現(xiàn)在數(shù)據(jù)庫(kù)性能瓶頸非常嚴(yán)重,迫切需要分庫(kù),用于減少每個(gè)庫(kù)的用戶數(shù)量,進(jìn)而分?jǐn)傌?fù)載,最終達(dá)到數(shù)據(jù)庫(kù)橫向擴(kuò)展的目的。
數(shù)據(jù)庫(kù)水平分庫(kù)是以用戶Id為分庫(kù)的依據(jù),同一個(gè)用戶的所有數(shù)據(jù)都在同一個(gè)庫(kù)上,每個(gè)庫(kù)有著相同的表結(jié)構(gòu)。為了實(shí)現(xiàn)開(kāi)發(fā)人員來(lái)說(shuō)對(duì)數(shù)據(jù)庫(kù)的透明訪問(wèn),分庫(kù)框架需要解決二個(gè)問(wèn)題:
1、 方法參數(shù)中有用戶id的數(shù)據(jù)的新增,查詢及修改
2、 方法參數(shù)中無(wú)用戶id的數(shù)據(jù)的查詢
2用戶id
把用戶名和密碼所在的表定義為用戶表,用戶id即是用戶表中的惟一性標(biāo)識(shí)的整形值,如果用戶的用戶名只有一種方式,那么id可以是用戶名的hash值,此時(shí)用戶表也是分庫(kù)的;如果用戶的用戶名有多種方式,比如允許用戶使用email登陸,也允許用戶使用手機(jī)號(hào)碼登陸,那么用戶id應(yīng)該是用戶表中的遞增字段值,此時(shí)用戶表應(yīng)該是不分庫(kù)的,這時(shí)可以把用戶表獨(dú)立為另一個(gè)庫(kù),稱之為認(rèn)證庫(kù)。我們的項(xiàng)目應(yīng)用是屬于后者。
1 3 解決方案
3.1 說(shuō)明
簡(jiǎn)單服務(wù)即為DAO,每個(gè)domain都對(duì)應(yīng)一個(gè)簡(jiǎn)單服務(wù),簡(jiǎn)單服務(wù)之間不允許互相依賴;復(fù)雜服務(wù)可以依賴多個(gè)簡(jiǎn)單服務(wù),但不能直接訪問(wèn)數(shù)據(jù)庫(kù),復(fù)雜服務(wù)對(duì)數(shù)據(jù)庫(kù)的操作必須通過(guò)單簡(jiǎn)單服務(wù)。
使用hibernate作為訪問(wèn)數(shù)據(jù)庫(kù)的中間層,結(jié)合Spring的Aop攔截方法,簡(jiǎn)單服務(wù)代理與簡(jiǎn)單服務(wù)實(shí)現(xiàn)相同的接口,一個(gè)簡(jiǎn)單服務(wù)對(duì)應(yīng)二個(gè)實(shí)例,一個(gè)引用動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接的sessionFactory,另一個(gè)引用Hibernate Shards的sessionFactory
3.2 方法參數(shù)中有用戶Id
Spring Aop攔截簡(jiǎn)單服務(wù)代理的所有方法,如果方法的第一個(gè)參數(shù)為userid,則將userid
放到當(dāng)前線程中,并選擇引用動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接的sessionFactory的簡(jiǎn)單服務(wù)實(shí)例,在獲取數(shù)據(jù)庫(kù)連接時(shí)根據(jù)當(dāng)前線程的userid選擇相應(yīng)連接,流程如下:

3.3 方法參數(shù)中無(wú)用戶Id
Spring Aop攔截簡(jiǎn)單服務(wù)代理的所有方法,如果方法的第一個(gè)參數(shù)為非userid,選擇引用Hibernate Shards的sessionFactory的簡(jiǎn)單服務(wù)實(shí)例,遍歷所有數(shù)據(jù)庫(kù),并返回匯總后的數(shù)據(jù)。這種情況下只允許讀,不允許寫。流程如下:

1 4實(shí)現(xiàn)
4.1 簡(jiǎn)單服務(wù)代理
對(duì)每個(gè)簡(jiǎn)單服務(wù)用jdk動(dòng)態(tài)代理生成一個(gè)代理對(duì)像,復(fù)雜服務(wù)依賴代理對(duì)像。
4.2 實(shí)例化
在簡(jiǎn)單服務(wù)類上標(biāo)注@DetachDbService,則會(huì)產(chǎn)生三個(gè)實(shí)例(框架實(shí)現(xiàn)):
1. 簡(jiǎn)單服務(wù)代理實(shí)例
2. 引用動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接的sessionFactory的簡(jiǎn)單服務(wù)實(shí)例
3. 引用Hibernate Shards的sessionFactory簡(jiǎn)單服務(wù)實(shí)例
4.3 方法參數(shù)
如果是到某個(gè)庫(kù)獲取數(shù)據(jù),則第一個(gè)參數(shù)必須為Long或者UseridAble類型,用于獲取userid
4.4 userid與數(shù)據(jù)庫(kù)關(guān)系
可選方案
|
優(yōu)點(diǎn)
|
缺點(diǎn)
|
按號(hào)段分
|
可部分遷移
|
數(shù)據(jù)分布不均
|
取模
|
數(shù)據(jù)分布均勻
|
遷移數(shù)據(jù)量是1/(n+1),不能按服務(wù)器性能分配
|
在認(rèn)證庫(kù)中保存數(shù)據(jù)庫(kù)配置
|
靈活,可部分遷移
|
查詢前需要先從數(shù)據(jù)庫(kù)或緩存中獲得此配置
|
總的來(lái)說(shuō),取模是最優(yōu)方案,但是考慮到服務(wù)器性能可能不一致,而又需要充分利用服務(wù)器資源,所以需要在取模的同時(shí)加上權(quán)重。比如現(xiàn)在有二臺(tái)數(shù)據(jù)庫(kù),權(quán)重為1:2,那么用戶id先對(duì)3取模,0的為第一臺(tái)服務(wù)器,1,2的為第二臺(tái)服務(wù)器。
4.5精確分頁(yè)
由于hibernate shards不能到某個(gè)庫(kù)或者其中的幾個(gè)庫(kù)中去查詢,并且它的分頁(yè)是先到所有的庫(kù)中將所有符合條件的數(shù)據(jù)取回到內(nèi)存中再進(jìn)行分頁(yè),所以不可能使用它的分頁(yè)。
先用hibernate shards到各個(gè)庫(kù)上查出符合條件的數(shù)目及數(shù)據(jù)庫(kù)標(biāo)識(shí)(標(biāo)識(shí)為查詢表中最小用戶id),返回結(jié)果后對(duì)標(biāo)識(shí)進(jìn)行排序(這樣確保同樣的查詢條件在翻頁(yè)的時(shí)候能夠以同樣的順序查詢數(shù)據(jù)庫(kù),以達(dá)到精確查詢的目的)。根據(jù)這個(gè)結(jié)果計(jì)算出每個(gè)數(shù)據(jù)庫(kù)取值的段,然后用動(dòng)態(tài)數(shù)據(jù)庫(kù)連接按之前排好的順序遍歷數(shù)據(jù)庫(kù)進(jìn)行查找,段為0的直接跳過(guò),找滿結(jié)果則返回。
比如現(xiàn)在有3個(gè)庫(kù),要查詢所在地為深圳的用戶,通過(guò)hibernate shards查得數(shù)據(jù)如下:
|
深圳地區(qū)用戶總數(shù)
|
深圳特區(qū)用戶最小id
|
DB1
|
7
|
2
|
DB2
|
5
|
1
|
DB3
|
30
|
3
|
這時(shí)按用戶最小id排序結(jié)果是DB2,DB1,DB3
假設(shè)每頁(yè)10條記錄,
第一頁(yè)的數(shù)據(jù)是從DB2中取5條,DB1中取前5條,不需要到DB3去取
第二頁(yè)的數(shù)據(jù)是從DB1中取后2條,在DB3中取前8條,不需要到DB1中去取
第三頁(yè)數(shù)據(jù)是從DB3中取第9到第18條,不需要到DB1和DB2中去取
… …
缺點(diǎn):不能精確排序
5關(guān)鍵代碼
Java代碼

- package com.konceptusa.infinet.annotation;
-
- import java.lang.annotation.Documented;
- import java.lang.annotation.ElementType;
- import java.lang.annotation.Retention;
- import java.lang.annotation.RetentionPolicy;
- import java.lang.annotation.Target;
-
- import org.springframework.beans.factory.annotation.Autowire;
-
- /**
- * 簡(jiǎn)單服務(wù)類實(shí)例化標(biāo)注
- * @author Jwin
- *
- */
- @Retention(RetentionPolicy.RUNTIME)
- @Target( { ElementType.TYPE })
- @Documented
- public @interface DetachDbService
- {
- boolean lazy() default false;
- Autowire autoWire() default Autowire.BY_NAME;
- String init() default "";
- String destroy() default "";
- }
package com.konceptusa.infinet.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.beans.factory.annotation.Autowire;
/**
* 簡(jiǎn)單服務(wù)類實(shí)例化標(biāo)注
* @author Jwin
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target( { ElementType.TYPE })
@Documented
public @interface DetachDbService
{
boolean lazy() default false;
Autowire autoWire() default Autowire.BY_NAME;
String init() default "";
String destroy() default "";
}
Java代碼

- package com.konceptusa.infinet.annotation.handler;
-
- import java.util.ArrayList;
- import java.util.List;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.aop.framework.ProxyFactoryBean;
- import org.springframework.beans.MutablePropertyValues;
- import org.springframework.beans.factory.config.RuntimeBeanReference;
- import org.springframework.beans.factory.support.RootBeanDefinition;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
- import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
- import com.konceptusa.infinet.annotation.DetachDbService;
-
- /**
- * 向spring中注冊(cè)簡(jiǎn)單服務(wù)代理實(shí)例,引用動(dòng)態(tài)數(shù)據(jù)庫(kù)連結(jié)的簡(jiǎn)單服務(wù)實(shí)例,引用hibernate shards的簡(jiǎn)單服務(wù)實(shí)例
- * @author Jwin
- *
- */
- public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
- {
- private final static String SESSIONFACTORYNAME = "sessionFactory";
- public final static String DYNAMIC_POSTFIX = "Dynamic";
- public final static String SHARDS_POSTFIX = "Shards";
- private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
-
- private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
-
- public Class annotation()
- {
- return DetachDbService.class;
- }
-
- @Override
- protected void handle(DetachDbService s, Class target)
- {
- String name = target.getSimpleName();
- if (!name.endsWith("ServiceImpl"))
- {
- throw new IllegalConfigException(target.getName()
- + " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
- }
- name = getBeanName(name);
- String dynamicName = name + DYNAMIC_POSTFIX;
- String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
- //生成動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接的簡(jiǎn)單服務(wù)實(shí)例
- createBean(s, target, dynamicName, dynamicSessionFactory);
- String shardsName = name + SHARDS_POSTFIX;
- String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
- //生成查詢所有數(shù)據(jù)庫(kù)的簡(jiǎn)單服務(wù)實(shí)例
- createBean(s, target, shardsName, shardsFactory);
- //生成簡(jiǎn)單服務(wù)代理類
- RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
- List<String> interceptorNamesList = new ArrayList<String>();
- interceptorNamesList.add(DETACHDBINTERCEPTOR);
- mpv.addPropertyValue("interceptorNames", interceptorNamesList);
- definition.setPropertyValues(mpv);
- registerBeanDefinition(name, definition);
- }
-
- private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
- {
- RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
- MutablePropertyValues mpv = new MutablePropertyValues();
- mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
- beanDefinition.setPropertyValues(mpv);
- registerBeanDefinition(name, beanDefinition);
- }
-
- private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
- {
- RootBeanDefinition definition = new RootBeanDefinition();
- definition.setAbstract(false);
- definition.setBeanClass(target);
- definition.setSingleton(true);
- definition.setLazyInit(s.lazy());
- definition.setAutowireCandidate(true);
- definition.setAutowireMode(s.autoWire().value());
-
- if (!"".equals(s.init()))
- {
- definition.setInitMethodName(s.init().trim());
- }
- if (!"".equals(s.destroy()))
- {
- definition.setDestroyMethodName(s.destroy().trim());
- }
-
- if (LOG.isDebugEnabled())
- {
- LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
- }
- SpringAnnotationUtils.readProperties(target, definition);
- return definition;
- }
-
- private String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
-
- }
package com.konceptusa.infinet.annotation.handler;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.framework.annotation.spring.support.AbstractSpringListAnnotationHandler;
import com.konceptusa.framework.annotation.spring.support.SpringAnnotationUtils;
import com.konceptusa.infinet.annotation.DetachDbService;
/**
* 向spring中注冊(cè)簡(jiǎn)單服務(wù)代理實(shí)例,引用動(dòng)態(tài)數(shù)據(jù)庫(kù)連結(jié)的簡(jiǎn)單服務(wù)實(shí)例,引用hibernate shards的簡(jiǎn)單服務(wù)實(shí)例
* @author Jwin
*
*/
public class DetachDbServiceAnnotationHandler extends AbstractSpringListAnnotationHandler<DetachDbService>
{
private final static String SESSIONFACTORYNAME = "sessionFactory";
public final static String DYNAMIC_POSTFIX = "Dynamic";
public final static String SHARDS_POSTFIX = "Shards";
private final static String DETACHDBINTERCEPTOR = "detachDBInterceptor";
private final static Log LOG = LogFactory.getLog(DetachDbServiceAnnotationHandler.class);
public Class annotation()
{
return DetachDbService.class;
}
@Override
protected void handle(DetachDbService s, Class target)
{
String name = target.getSimpleName();
if (!name.endsWith("ServiceImpl"))
{
throw new IllegalConfigException(target.getName()
+ " is not a service bean.service bean 's class name must be end with 'ServiceImpl'");
}
name = getBeanName(name);
String dynamicName = name + DYNAMIC_POSTFIX;
String dynamicSessionFactory = SESSIONFACTORYNAME + DYNAMIC_POSTFIX;
//生成動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接的簡(jiǎn)單服務(wù)實(shí)例
createBean(s, target, dynamicName, dynamicSessionFactory);
String shardsName = name + SHARDS_POSTFIX;
String shardsFactory = SESSIONFACTORYNAME + SHARDS_POSTFIX;
//生成查詢所有數(shù)據(jù)庫(kù)的簡(jiǎn)單服務(wù)實(shí)例
createBean(s, target, shardsName, shardsFactory);
//生成簡(jiǎn)單服務(wù)代理類
RootBeanDefinition definition = createBeanDefinition(s, ProxyFactoryBean.class, name);
MutablePropertyValues mpv = new MutablePropertyValues();
mpv.addPropertyValue("target", new RuntimeBeanReference(shardsName));
List<String> interceptorNamesList = new ArrayList<String>();
interceptorNamesList.add(DETACHDBINTERCEPTOR);
mpv.addPropertyValue("interceptorNames", interceptorNamesList);
definition.setPropertyValues(mpv);
registerBeanDefinition(name, definition);
}
private void createBean(DetachDbService s, Class target, String name, String sessionFactory)
{
RootBeanDefinition beanDefinition = createBeanDefinition(s, target, name);
MutablePropertyValues mpv = new MutablePropertyValues();
mpv.addPropertyValue(SESSIONFACTORYNAME, new RuntimeBeanReference(sessionFactory));
beanDefinition.setPropertyValues(mpv);
registerBeanDefinition(name, beanDefinition);
}
private RootBeanDefinition createBeanDefinition(DetachDbService s, Class target, String name)
{
RootBeanDefinition definition = new RootBeanDefinition();
definition.setAbstract(false);
definition.setBeanClass(target);
definition.setSingleton(true);
definition.setLazyInit(s.lazy());
definition.setAutowireCandidate(true);
definition.setAutowireMode(s.autoWire().value());
if (!"".equals(s.init()))
{
definition.setInitMethodName(s.init().trim());
}
if (!"".equals(s.destroy()))
{
definition.setDestroyMethodName(s.destroy().trim());
}
if (LOG.isDebugEnabled())
{
LOG.debug("Reader Bean Definition[" + definition + "] with name[" + name + "]");
}
SpringAnnotationUtils.readProperties(target, definition);
return definition;
}
private String getBeanName(String name)
{
name = name.substring(0, name.length() - "Impl".length());
name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
return name;
}
}
Java代碼

- package com.konceptusa.infinet.detach.aop;
-
- import org.aopalliance.intercept.MethodInterceptor;
- import org.aopalliance.intercept.MethodInvocation;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.util.MethodInvoker;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.UseridAble;
- import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
-
- /**
- * 分庫(kù)簡(jiǎn)單服務(wù)代理
- * @author Jwin
- *
- */
- public class DetachDBInterceptor implements MethodInterceptor
- {
- private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
- public Object invoke(MethodInvocation invoke) throws Throwable
- {
- int len = invoke.getArguments().length;
- Long id = null;
- if(len >= 1)
- {
- Object arg = invoke.getArguments()[0];
- if(arg instanceof UseridAble)
- {
- UseridAble useridAble = (UseridAble) arg;
- id = useridAble.getUserid();
- }
- else if(arg instanceof Long)
- {
- id = (Long) arg;
- }
- }
- if(id != null)
- {
- UseridContextHolder.setUserid(id);
- try
- {
- return invoke(invoke, id);
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- else
- {
- return invoke(invoke, id);
- }
- }
- private Object invoke(MethodInvocation invoke, Long id) throws Throwable
- {
- String str = invoke.getThis().toString();
- int start = str.lastIndexOf(".");
- int end = str.lastIndexOf("@");
- String className = str.substring(start + 1, end);
- String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
- if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
- {
- postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
- }
- String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
- if(LOG.isDebugEnabled())
- LOG.debug("select service " + serviceName + " for userid = " + id);
- Object service = ObjectFactory.getManagedObject(serviceName);
- if(service == null)
- {
- throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
- }
- MethodInvoker invoker = new MethodInvoker();
- invoker.setArguments(invoke.getArguments());
- invoker.setTargetObject(service);
- invoker.setTargetMethod(invoke.getMethod().getName());
- invoker.prepare();
- return invoker.invoke();
- }
-
- }
package com.konceptusa.infinet.detach.aop;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.MethodInvoker;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.framework.core.support.ObjectFactory;
import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
import com.konceptusa.infinet.detach.UseridAble;
import com.konceptusa.infinet.detach.datasource.DataSourceIdContextHolder;
import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
/**
* 分庫(kù)簡(jiǎn)單服務(wù)代理
* @author Jwin
*
*/
public class DetachDBInterceptor implements MethodInterceptor
{
private final static Log LOG = LogFactory.getLog(DetachDBInterceptor.class);
public Object invoke(MethodInvocation invoke) throws Throwable
{
int len = invoke.getArguments().length;
Long id = null;
if(len >= 1)
{
Object arg = invoke.getArguments()[0];
if(arg instanceof UseridAble)
{
UseridAble useridAble = (UseridAble) arg;
id = useridAble.getUserid();
}
else if(arg instanceof Long)
{
id = (Long) arg;
}
}
if(id != null)
{
UseridContextHolder.setUserid(id);
try
{
return invoke(invoke, id);
}finally
{
UseridContextHolder.removeUserid();
}
}
else
{
return invoke(invoke, id);
}
}
private Object invoke(MethodInvocation invoke, Long id) throws Throwable
{
String str = invoke.getThis().toString();
int start = str.lastIndexOf(".");
int end = str.lastIndexOf("@");
String className = str.substring(start + 1, end);
String postFix = DetachDbServiceAnnotationHandler.DYNAMIC_POSTFIX;
if(id == null && DataSourceIdContextHolder.getDataSourceId() == null)
{
postFix = DetachDbServiceAnnotationHandler.SHARDS_POSTFIX;
}
String serviceName = className.substring(0,1).toLowerCase() + className.substring(1,className.length() - "Impl".length()) + postFix;
if(LOG.isDebugEnabled())
LOG.debug("select service " + serviceName + " for userid = " + id);
Object service = ObjectFactory.getManagedObject(serviceName);
if(service == null)
{
throw new IllegalConfigException("service name " + serviceName + " is not defined in spring context");
}
MethodInvoker invoker = new MethodInvoker();
invoker.setArguments(invoke.getArguments());
invoker.setTargetObject(service);
invoker.setTargetMethod(invoke.getMethod().getName());
invoker.prepare();
return invoker.invoke();
}
}
Java代碼

- package com.konceptusa.infinet.detach.datasource;
-
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
-
- import javax.sql.DataSource;
-
- import org.apache.commons.lang.StringUtils;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
- import org.springframework.util.Assert;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
- import com.konceptusa.infinet.detach.service.ISelectDBService;
-
- /**
- * 動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接基類
- * @author Jwin
- *
- */
- public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
- {
- private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
- public final static int defaultDataSourceId = -1;
- protected MultiHibernateProperties multiHibernateProperties;
- protected ISelectDBService selectDBService;
- private String newWeights;
- private String oldWeights;
- private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
- public void setSelectDBService(ISelectDBService selectDBService)
- {
- this.selectDBService = selectDBService;
- }
- public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
- {
- this.multiHibernateProperties = multiHibernateProperties;
- }
- @Override
- protected Object determineCurrentLookupKey()
- {
- Long id = UseridContextHolder.getUserid();
- return selectDBService.selectDb(id);
- }
-
- @Override
- public void afterPropertiesSet()
- {
- LOG.info("init dynamic datasource start");
- Assert.notNull(multiHibernateProperties);
- Assert.notNull(selectDBService);
- List<Properties> properties = multiHibernateProperties.getShardProperties();
- Assert.notEmpty(properties);
- int dataSourceCount = 0;
- for(Properties p : properties)
- {
- dataSourceCount++;
- createDataSource(dataSourceMap, p);
- }
- createDefaultDataSource(dataSourceMap);
- selectDBService.setDefaultDataSourceId(defaultDataSourceId);
- selectDBService.setDataSourceCount(dataSourceCount);
- setTargetDataSources(dataSourceMap);
- setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
- initWeight(dataSourceCount);
- super.afterPropertiesSet();
- LOG.info("init dynamic datasource success");
- }
- public void initWeight(int dataSourceCount)
- {
- Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
- Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
- int totalOldWeight = 0;
- int totalNewWeight = 0;
- if(newWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("newWeights " + newWeights);
- String[] weights = StringUtils.split(newWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- newWeightMap.put(totalNewWeight + j, i);
- }
- totalNewWeight += w;
- }
- }
- else
- {
- totalNewWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- newWeightMap.put(i, i);
- }
- }
- if(oldWeights != null)
- {
- if(LOG.isInfoEnabled())
- LOG.info("oldWeights " + oldWeights);
- String[] weights = StringUtils.split(oldWeights,";");
- if(weights.length > dataSourceCount)
- {
- throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
- }
- for(int i=0;i<weights.length;i++)
- {
- int w = Integer.parseInt(weights[i]);
- for(int j=0;j<w;j++)
- {
- oldWeightMap.put(totalOldWeight + j, i);
- }
- totalOldWeight += w;
- }
- }
- else
- {
- totalOldWeight = dataSourceCount;
- for(int i=0;i<dataSourceCount;i++)
- {
- oldWeightMap.put(i, i);
- }
- }
- if(LOG.isInfoEnabled())
- LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
- selectDBService.setTotalNewWeight(totalNewWeight);
- selectDBService.setNewWeightIdMap(newWeightMap);
- selectDBService.setTotalOldWeight(totalOldWeight);
- selectDBService.setOldWeightIdMap(oldWeightMap);
- }
- protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
- protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
- public void setNewWeights(String newWeights)
- {
- this.newWeights = newWeights;
- }
- public void setOldWeights(String oldWeights)
- {
- this.oldWeights = oldWeights;
- }
- public Map<Integer, DataSource> getDataSourceMap()
- {
- return dataSourceMap;
- }
-
- }
package com.konceptusa.infinet.detach.datasource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.util.Assert;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
import com.konceptusa.infinet.detach.service.ISelectDBService;
/**
* 動(dòng)態(tài)獲取數(shù)據(jù)庫(kù)連接基類
* @author Jwin
*
*/
public abstract class AbstractDynamicDataSource extends AbstractRoutingDataSource
{
private final static Log LOG = LogFactory.getLog(AbstractDynamicDataSource.class);
public final static int defaultDataSourceId = -1;
protected MultiHibernateProperties multiHibernateProperties;
protected ISelectDBService selectDBService;
private String newWeights;
private String oldWeights;
private Map<Integer, DataSource> dataSourceMap = new HashMap<Integer, DataSource>();
public void setSelectDBService(ISelectDBService selectDBService)
{
this.selectDBService = selectDBService;
}
public void setMultiHibernateProperties(MultiHibernateProperties multiHibernateProperties)
{
this.multiHibernateProperties = multiHibernateProperties;
}
@Override
protected Object determineCurrentLookupKey()
{
Long id = UseridContextHolder.getUserid();
return selectDBService.selectDb(id);
}
@Override
public void afterPropertiesSet()
{
LOG.info("init dynamic datasource start");
Assert.notNull(multiHibernateProperties);
Assert.notNull(selectDBService);
List<Properties> properties = multiHibernateProperties.getShardProperties();
Assert.notEmpty(properties);
int dataSourceCount = 0;
for(Properties p : properties)
{
dataSourceCount++;
createDataSource(dataSourceMap, p);
}
createDefaultDataSource(dataSourceMap);
selectDBService.setDefaultDataSourceId(defaultDataSourceId);
selectDBService.setDataSourceCount(dataSourceCount);
setTargetDataSources(dataSourceMap);
setDefaultTargetDataSource(dataSourceMap.get(defaultDataSourceId));
initWeight(dataSourceCount);
super.afterPropertiesSet();
LOG.info("init dynamic datasource success");
}
public void initWeight(int dataSourceCount)
{
Map<Integer, Integer> oldWeightMap = new HashMap<Integer, Integer>();
Map<Integer, Integer> newWeightMap = new HashMap<Integer, Integer>();
int totalOldWeight = 0;
int totalNewWeight = 0;
if(newWeights != null)
{
if(LOG.isInfoEnabled())
LOG.info("newWeights " + newWeights);
String[] weights = StringUtils.split(newWeights,";");
if(weights.length > dataSourceCount)
{
throw new IllegalConfigException("newWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
}
for(int i=0;i<weights.length;i++)
{
int w = Integer.parseInt(weights[i]);
for(int j=0;j<w;j++)
{
newWeightMap.put(totalNewWeight + j, i);
}
totalNewWeight += w;
}
}
else
{
totalNewWeight = dataSourceCount;
for(int i=0;i<dataSourceCount;i++)
{
newWeightMap.put(i, i);
}
}
if(oldWeights != null)
{
if(LOG.isInfoEnabled())
LOG.info("oldWeights " + oldWeights);
String[] weights = StringUtils.split(oldWeights,";");
if(weights.length > dataSourceCount)
{
throw new IllegalConfigException("oldWeights's length ["+weights.length+"] can't be more than dataSourceCount["+dataSourceCount+"]");
}
for(int i=0;i<weights.length;i++)
{
int w = Integer.parseInt(weights[i]);
for(int j=0;j<w;j++)
{
oldWeightMap.put(totalOldWeight + j, i);
}
totalOldWeight += w;
}
}
else
{
totalOldWeight = dataSourceCount;
for(int i=0;i<dataSourceCount;i++)
{
oldWeightMap.put(i, i);
}
}
if(LOG.isInfoEnabled())
LOG.info("totalNewWeight " + totalNewWeight + " totalOldWeight " + totalOldWeight);
selectDBService.setTotalNewWeight(totalNewWeight);
selectDBService.setNewWeightIdMap(newWeightMap);
selectDBService.setTotalOldWeight(totalOldWeight);
selectDBService.setOldWeightIdMap(oldWeightMap);
}
protected abstract void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p);
protected abstract void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap);
public void setNewWeights(String newWeights)
{
this.newWeights = newWeights;
}
public void setOldWeights(String oldWeights)
{
this.oldWeights = oldWeights;
}
public Map<Integer, DataSource> getDataSourceMap()
{
return dataSourceMap;
}
}
package com.konceptusa.infinet.detach.datasource;
import java.beans.PropertyVetoException;
import java.util.Map;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.konceptusa.framework.annotation.IllegalConfigException;
import com.konceptusa.infinet.detach.config.MultiHibernateProperties;
import com.mchange.v2.c3p0.ComboPooledDataSource;
/**
* 基于c3p0連接池的動(dòng)態(tài)獲取連接類
* @author Jwin
*
*/
public class DynamicC3p0DataSource extends AbstractDynamicDataSource
{
private final static Log LOG = LogFactory.getLog(DynamicC3p0DataSource.class);
private int initialSize = 1;
private int maxActive = 1;
private int minActive = 1;
private int maxIdleTime = 30;
private String automaticTestTable = "Test";
private int acquireIncrement = 3;
private int maxStatements = 100;
private int maxStatementsPerConnection = 3;
private int numHelperThreads = 3;
private int idleConnectionTestPeriod = 30;
protected void createDefaultDataSource(Map<Integer, DataSource> dataSourceMap)
{
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setUser("sa");
dataSource.setPassword("");
dataSource.setJdbcUrl("jdbc:hsqldb:mem:" + getClass().getSimpleName().toLowerCase());
try
{
dataSource.setDriverClass("org.hsqldb.jdbcDriver");
} catch (PropertyVetoException e)
{
throw new IllegalConfigException(e);
}
dataSource.setInitialPoolSize(initialSize);
dataSource.setMaxPoolSize(maxActive);
dataSource.setMinPoolSize(minActive);
dataSource.setMaxIdleTime(maxIdleTime);
dataSource.setAcquireIncrement(acquireIncrement);
dataSource.setNumHelperThreads(numHelperThreads);
dataSource.setAutomaticTestTable(automaticTestTable);
dataSource.setMaxStatements(maxStatements);
dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
dataSourceMap.put(defaultDataSourceId, dataSource);
}
@Override
protected void createDataSource(Map<Integer, DataSource> dataSourceMap, Properties p)
{
ComboPooledDataSource dataSource = new ComboPooledDataSource();
dataSource.setJdbcUrl(p.getProperty(MultiHibernateProperties.connectionUrlKey));
LOG.info("init datasource url " + dataSource.getJdbcUrl());
dataSource.setUser(p.getProperty(MultiHibernateProperties.connectionUsernameKey));
dataSource.setPassword(p.getProperty(MultiHibernateProperties.connectionPasswordKey));
try
{
dataSource.setDriverClass(p.getProperty(MultiHibernateProperties.connectionDriverClassKey));
} catch (PropertyVetoException e)
{
throw new IllegalConfigException(e);
}
dataSource.setInitialPoolSize(initialSize);
dataSource.setMaxPoolSize(maxActive);
dataSource.setMinPoolSize(minActive);
dataSource.setMaxIdleTime(maxIdleTime);
dataSource.setAcquireIncrement(acquireIncrement);
dataSource.setNumHelperThreads(numHelperThreads);
dataSource.setAutomaticTestTable(automaticTestTable);
dataSource.setMaxStatements(maxStatements);
dataSource.setMaxStatementsPerConnection(maxStatementsPerConnection);
dataSource.setIdleConnectionTestPeriod(idleConnectionTestPeriod);
String id = p.getProperty(MultiHibernateProperties.shardIdKey);
dataSourceMap.put(Integer.parseInt(id), dataSource);
}
public void setInitialSize(int initialSize)
{
this.initialSize = initialSize;
}
public void setMaxActive(int maxActive)
{
this.maxActive = maxActive;
}
public void setMaxIdleTime(int maxIdle)
{
this.maxIdleTime = maxIdle;
}
public void setAcquireIncrement(int acquireIncrement)
{
this.acquireIncrement = acquireIncrement;
}
public void setMaxStatements(int maxStatements)
{
this.maxStatements = maxStatements;
}
public void setMaxStatementsPerConnection(int maxStatementsPerConnection)
{
this.maxStatementsPerConnection = maxStatementsPerConnection;
}
public void setNumHelperThreads(int numHelperThreads)
{
this.numHelperThreads = numHelperThreads;
}
public void setAutomaticTestTable(String automaticTestTable)
{
this.automaticTestTable = automaticTestTable;
}
public void setMinActive(int minActive)
{
this.minActive = minActive;
}
public void setIdleConnectionTestPeriod(int idleConnectionTestPeriod)
{
this.idleConnectionTestPeriod = idleConnectionTestPeriod;
}
}
Java代碼

- package com.konceptusa.infinet.imsupport.detach;
-
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.List;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.springframework.transaction.annotation.Propagation;
- import org.springframework.transaction.annotation.Transactional;
-
- import com.konceptusa.framework.annotation.IllegalConfigException;
- import com.konceptusa.framework.core.dao.HibernateQueryListCallback;
- import com.konceptusa.framework.core.dao.hql.Hql;
- import com.konceptusa.framework.core.service.BaseServiceSupport;
- import com.konceptusa.framework.core.service.Page;
- import com.konceptusa.framework.core.support.ObjectFactory;
- import com.konceptusa.infinet.annotation.handler.DetachDbServiceAnnotationHandler;
- import com.konceptusa.infinet.detach.CountId;
- import com.konceptusa.infinet.detach.CountIdComparetor;
- import com.konceptusa.infinet.detach.MagrateAble;
- import com.konceptusa.infinet.detach.QueryListAble;
- import com.konceptusa.infinet.detach.datasource.UseridContextHolder;
-
- /**
- * 多個(gè)數(shù)據(jù)庫(kù)綜合查詢,簡(jiǎn)單服務(wù)類父類
- * @author Jwin
- *
- * @param <T>
- */
- @Transactional(readOnly=true, rollbackFor = Exception.class)
- public abstract class BaseServiceSupportForMulti<T> extends BaseServiceSupport<T> implements QueryListAble<T>,MagrateAble<T>
- {
- private final static Log LOG = LogFactory.getLog(BaseServiceSupportForMulti.class);
- @Override
- protected int findCountByHql(Hql hql)
- {
- List<Long> countList = (List<Long>) getHibernateTemplate().execute(
- new HibernateQueryListCallback(new Hql("select count(*) "
- + hql.getHql(), hql.getCache(), hql.getParameters())));
- Long counts = 0L;
- for(Long count : countList)
- {
- counts += count;
- }
- return counts.intValue();
- }
- @Transactional(readOnly=true, rollbackFor = Exception.class,propagation=Propagation.NOT_SUPPORTED)
- public List<T> queryList(Hql hql, int from, int offset)
- {
- return queryListByHql(hql, from, offset);
- }
-
- public List<CountId> queryCount(Hql hql)
- {
- List<Object[]> list = queryListByHql(hql);
- List<CountId> countList = new ArrayList<CountId>(list.size());
- for(Object[] l : list)
- {
- if(l[1] != null)
- {
- CountId count = new CountId((Long) l[1],(Long)l[0]);
- countList.add(count);
- }
- }
- Collections.sort(countList, new CountIdComparetor());
- return countList;
- }
- protected String getBeanName(String name)
- {
- name = name.substring(0, name.length() - "Impl".length());
- name = name.substring(0, 1).toLowerCase() + name.substring(1, name.length());
- return name;
- }
- protected Page queryPageByHql(Hql hql,String useridName, int start, int offset)
- {
- Hql countHql = new Hql("select count(*),min(" + useridName + ") "
- + hql.getHql(), hql.getCache(), hql.getParameters());
- return queryPageByHql(countHql, hql, start, offset);
- }
- //先查出各個(gè)數(shù)據(jù)庫(kù)的總數(shù)及標(biāo)識(shí),然后對(duì)標(biāo)識(shí)進(jìn)行排序,最后根據(jù)這個(gè)結(jié)果遍歷數(shù)據(jù)庫(kù)進(jìn)行分頁(yè)查找,找滿結(jié)果則返回。
- private Page queryPageByHql(Hql countHql,Hql listHql,int start, int offset)
- {
- QueryListAble<T> serviceShards = getShardsService();
- QueryListAble<T> serviceDynamic = getDynamicService();
- List<CountId> countList = serviceShards.queryCount(countHql);
- //相對(duì)于當(dāng)前之前所有數(shù)據(jù)庫(kù)的總數(shù)偏移
- int totalCount = 0;
- //相對(duì)于所有數(shù)據(jù)庫(kù)的結(jié)束偏移
- int end = start + offset;
- //相對(duì)于當(dāng)前數(shù)據(jù)庫(kù)的開(kāi)始偏移量
- int startRelative = -1;
- List<T> queryList = new ArrayList<T>(offset);
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- //之前所有庫(kù)總數(shù)小于開(kāi)始偏移量,繼續(xù)下一個(gè)數(shù)據(jù)庫(kù)
- if(totalCount < start)
- {
- continue;
- }
- //之前所有庫(kù)總數(shù)第一次大于開(kāi)始偏移量
- if(startRelative == -1)
- {
- startRelative = count.getCount().intValue() - (totalCount - start);
- }
- else
- {
- startRelative = 0;
- }
- int relativeCount = totalCount - end;
- if(relativeCount >= 0)
- {
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //計(jì)算相對(duì)于當(dāng)前庫(kù)的偏移
- int offsetRelative = count.getCount().intValue() - relativeCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- }finally
- {
- UseridContextHolder.removeUserid();
- }
- break;
- }
- UseridContextHolder.setUserid(count.getId());
- try
- {
- //計(jì)算相對(duì)于當(dāng)前庫(kù)的偏移
- int offsetRelative = totalCount - startRelative;
- LOG.debug("query from " + startRelative + " offset " + offsetRelative + " for min(userid)=" + count.getId());
- queryList.addAll(serviceDynamic.queryList(listHql, startRelative, offsetRelative));
- } finally
- {
- UseridContextHolder.removeUserid();
- }
- }
- totalCount = 0;
- for(CountId count : countList)
- {
- totalCount += count.getCount();
- }
- return new Page<T>(totalCount, queryList);
- }
- protected Page queryPageByHql(String hqlstr,String useridName, int start, int offset,Object ... values)
- {
- Hql listHql = Hql.createIndexHql(