本文介绍动态数据源实现原理与源码讲解。
基本原理
多数据源即一个项目中同时存在多个不同的数据库连接池。
比如 127.0.0.1:3306/test、127.0.0.1:3307/test、127.0.0.1:3308/test。
总之项目存在需要操作多个库的需求。
具体在编码方面呢,具体就是一个service 中,方法1使用库1查询,方法2使用库2查询,或者同一个方法需要查询两个库的数据。
多数据源实现原理是什么呢?可分为两大关键部分。
1、使用 AOP 拦截,方法执行前获取到当前方法要用的数据源
可以使用自定义注解实现,注解参数带数据源名称,然后自己解析
2、实现自定义 DataSource 接口,实现 DataSource 接口的 getConnect 方法做动态处理
动态处理,就是拿到 AOP 那一步获取到的数据源,直接返回该数据源
基本原理,可看这个简易图:
源码介绍
源码最好自己完整看一遍,本篇博客只展示部分关键源码。
使用 AOP 拦截,方法执行前获取到当前方法要用的数据源
@DS注解代表定义当前方法、当前类使用哪个数据源
value指定当前类、方法使用的数据源名称
数据源名称也是在配置文件中定义的
注解处理切面 DynamicDataSourceAnnotationAdvisor
切面 advice 由外部传过来,要处理的注解也从外面传过来。
DynamicDataSourceAnnotationInterceptor 负责处理 DS 注解
接着看 DynamicDataSourceAnnotationInterceptor 如何处理
1、将@DS注解的value值压入ThreadLocal当前线程的栈
2、调用实际的方法
3、清空当前线程数据源,如果当前线程是连续切换数据源 只会移除掉当前线程的数据源名称
ThreadLocal 中存储的是 Deque 类,也就是一个双端队列(两头都可以插入的队列) ,使用的是 ArrayDeque 双端队列,内部是一个数组。
为什么使用队列,而不是简单一个字符串,注释已经写的很清楚了,看注释即可。
这段话翻译为大家都能懂得的意思就是“可以同时操控多个数据源”。
/**
* 核心基于ThreadLocal的切换数据源工具类
*
* @author TaoYu Kanyuxia
* @since 1.0.0
*/
public final class DynamicDataSourceContextHolder {
/**
* 为什么要用链表存储(准确的是栈)
* <pre>
* 为了支持嵌套切换,如ABC三个service都是不同的数据源
* 其中A的某个业务要调B的方法,B的方法需要调用C的方法。一级一级调用切换,形成了链。
* 传统的只设置当前线程的方式不能满足此业务需求,必须使用栈,后进先出。
* </pre>
*/
private static final ThreadLocal<Deque<String>> LOOKUP_KEY_HOLDER = new NamedThreadLocal<Deque<String>>("dynamic-datasource") {
@Override
protected Deque<String> initialValue() {
return new ArrayDeque<>();
}
};
private DynamicDataSourceContextHolder() {
}
/**
* 获得当前线程数据源
*
* @return 数据源名称
*/
public static String peek() {
return LOOKUP_KEY_HOLDER.get().peek();
}
/**
* 设置当前线程数据源
* <p>
* 如非必要不要手动调用,调用后确保最终清除
* </p>
*
* @param ds 数据源名称
*/
public static String push(String ds) {
String dataSourceStr = StringUtils.isEmpty(ds) ? "" : ds;
LOOKUP_KEY_HOLDER.get().push(dataSourceStr);
return dataSourceStr;
}
/**
* 清空当前线程数据源
* <p>
* 如果当前线程是连续切换数据源 只会移除掉当前线程的数据源名称
* </p>
*/
public static void poll() {
Deque<String> deque = LOOKUP_KEY_HOLDER.get();
deque.poll();
if (deque.isEmpty()) {
LOOKUP_KEY_HOLDER.remove();
}
}
/**
* 强制清空本地线程
* <p>
* 防止内存泄漏,如手动调用了push可调用此方法确保清除
* </p>
*/
public static void clear() {
LOOKUP_KEY_HOLDER.remove();
}
}
实现自定义 DataSource 接口,实现 DataSource 接口的 getConnection 方法做动态处理
DynamicRoutingDataSource 代表动态路由数据源
做的事情就是运行时动态路由出一个当前需要的数据源
接着看源码,首先与 SpringBoot 整合时 自动配置出当前 Bean
DynamicRoutingDataSource 类图
DataSource
代表一个数据源,由 javax 扩展定义
AbstractDataSource
抽象实现,将一些对数据源的配置操作都实现为不支持操作抛出异常 UnsupportedOperationException
(动态数据源相当于一个代理,不需要给动态数据源本身设置相关配置)
AbstractRoutingDataSource
抽象实现,路由动态配置源,实现了关键方法 getConnection ,完成了路由操作
看看源码 getConnection()
getConnection() 何时调用呢,也就是之前切面中的第 2 步中,invocation.proceed()
,执行业务逻辑的过程中,遇到的数据库层的操作时,就会到这里了。
这里直接看简单的非事务的获取数据源这里。
关键代码 determineDataSource().getConnection()
determineDataSource()方法是个抽象方法,由子类实现,也就是下面的 DynamicRoutingDataSource
类
DynamicRoutingDataSource
动态路由数据源核心实现
,完成 数据源的维护
(添加删除数据源)、数据源的选择
接着上面的源码流程,子类的 determineDataSource 方法最终调用了 getDataSource
getDataSource 源码如下:
/**
* 获取数据源
*
* @param ds 数据源名称
* @return 数据源
*/
public DataSource getDataSource(String ds) {
if (StringUtils.isEmpty(ds)) {
// 没有指定数据源名称,直接使用默认的数据源
return determinePrimaryDataSource();
} else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
// 从分组数据源中找一个数据源
return groupDataSources.get(ds).determineDataSource();
} else if (dataSourceMap.containsKey(ds)) {
// 直接根据名称找一个数据源
return dataSourceMap.get(ds);
}
if (strict) {
// 开启了严格模式时,如果没有找到数据源,就抛出异常
throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
}
// 使用默认的数据源
return determinePrimaryDataSource();
}
下面分别讲解关键之处
1、没有指定数据源名称,直接使用默认的数据源
没有指定代表的是没有加 @DS 注解,或者加了注解,但是 value 值没有写
此时就是用默认的数据源,默认的数据源是什么呢?
也就是配置文件中的 primary 中指定的数据源名称,如果不配置的话默认值就是 master
2、实现自定义 DataSource 接口,实现 DataSource 接口的 getConnection 方法做动态处理
从分组找一个数据源 groupDataSources.get(ds).determineDataSource();
分组是什么意思?
分组定义的规则是 group_xxx,也就是数据源名称以下划线分割,下划线前面的就是组名。
分组的作用是什么呢?本质用于实现一个名称对应多数据源。
比如一主多从,可以将从数据源都分到 slave 组里面,用的时候就是 @DS("slave") // 组名
在实际决定数据源的时候,就会按照一定的策略从这个组里的数据源挑选一个了。
接着看源码,如何 从分组数据源中找一个数据源:
最后到了策略的选择,DynamicDataSourceStrategy
DynamicDataSourceStrategy 有两个实现类
- LoadBalanceDynamicDataSourceStrategy 负载均衡动态数据源策略,按顺序一个一个来
- RandomDynamicDataSourceStrategy 随机动态数据源策略,纯随机选一个
3、 直接根据名称找一个数据源
如果走到了这里,说明这个数据源名称没有配置分组,那就直接根据名称取这单个数据源了,直接纯 get 了
数据源何时初始化的
还是在 DynamicRoutingDataSource,这个类实现了 Spring InitializingBean
接口回调方法 afterPropertiesSet,当当前 Bean 内部的属性都初始化完毕了后就回调这个方法
看看 afterPropertiesSet 回调方法内容
@Override
public void afterPropertiesSet() throws Exception {
// 检查开启了配置但没有相关依赖
checkEnv();
// 添加并分组数据源
Map<String, DataSource> dataSources = new HashMap<>(16);
for (DynamicDataSourceProvider provider : providers) {
dataSources.putAll(provider.loadDataSources());
}
for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
addDataSource(dsItem.getKey(), dsItem.getValue());
}
// 检测默认数据源是否设置
if (groupDataSources.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
} else if (dataSourceMap.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
} else {
log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
}
}
这里只看关键代码
1、dataSources.putAll(provider.loadDataSources());
@Autowired private List<DynamicDataSourceProvider> providers;
providers 是什么呢 ?
providers 代表 动态数据源配置的来源,默认实现就是从 yml 中来,也就是 SpringBoot 的 application.yml 配置
默认实现:
传进去的参数配置类
/*
* Copyright © 2018 organization baomidou
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.baomidou.dynamic.datasource.spring.boot.autoconfigure;
import com.baomidou.dynamic.datasource.enums.SeataMode;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.beecp.BeeCpConfig;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.dbcp2.Dbcp2Config;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.druid.DruidConfig;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.hikari.HikariCpConfig;
import com.baomidou.dynamic.datasource.strategy.DynamicDataSourceStrategy;
import com.baomidou.dynamic.datasource.strategy.LoadBalanceDynamicDataSourceStrategy;
import com.baomidou.dynamic.datasource.toolkit.CryptoUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.NestedConfigurationProperty;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* DynamicDataSourceProperties
*
* @author TaoYu Kanyuxia
* @see DataSourceProperties
* @since 1.0.0
*/
@Slf4j
@Getter
@Setter
@ConfigurationProperties(prefix = DynamicDataSourceProperties.PREFIX)
public class DynamicDataSourceProperties {
public static final String PREFIX = "spring.datasource.dynamic";
/**
* 必须设置默认的库,默认master
*/
private String primary = "master";
/**
* 是否启用严格模式,默认不启动. 严格模式下未匹配到数据源直接报错, 非严格模式下则使用默认数据源primary所设置的数据源
*/
private Boolean strict = false;
/**
* 是否使用p6spy输出,默认不输出
*/
private Boolean p6spy = false;
/**
* 是否使用开启seata,默认不开启
*/
private Boolean seata = false;
/**
* 是否懒加载数据源
*/
private Boolean lazy = false;
/**
* seata使用模式,默认AT
*/
private SeataMode seataMode = SeataMode.AT;
/**
* 全局默认publicKey
*/
private String publicKey = CryptoUtils.DEFAULT_PUBLIC_KEY_STRING;
/**
* 每一个数据源
*/
private Map<String, DataSourceProperty> datasource = new LinkedHashMap<>();
/**
* 多数据源选择算法clazz,默认负载均衡算法
*/
private Class<? extends DynamicDataSourceStrategy> strategy = LoadBalanceDynamicDataSourceStrategy.class;
/**
* Druid全局参数配置
*/
@NestedConfigurationProperty
private DruidConfig druid = new DruidConfig();
/**
* HikariCp全局参数配置
*/
@NestedConfigurationProperty
private HikariCpConfig hikari = new HikariCpConfig();
/**
* BeeCp全局参数配置
*/
@NestedConfigurationProperty
private BeeCpConfig beecp = new BeeCpConfig();
/**
* DBCP2全局参数配置
*/
@NestedConfigurationProperty
private Dbcp2Config dbcp2 = new Dbcp2Config();
/**
* aop with default ds annotation
*/
@NestedConfigurationProperty
private DynamicDatasourceAopProperties aop = new DynamicDatasourceAopProperties();
}
DynamicDataSourceProvider 也就是解析了这些配置来获取到所有配置
拿到配置后,就要解析这些配置了 ,这里委托了父类处理
/**
* YML数据源提供者
*
* @author TaoYu Kanyuxia
* @since 1.0.0
*/
@Slf4j
@AllArgsConstructor
public class YmlDynamicDataSourceProvider extends AbstractDataSourceProvider {
/**
* 所有数据源
*/
private final Map<String, DataSourceProperty> dataSourcePropertiesMap;
@Override
public Map<String, DataSource> loadDataSources() {
return createDataSourceMap(dataSourcePropertiesMap);
}
}
这里完成创建数据源,然后将结果封装成了 Map<String, DataSource> dataSourceMap 返回
看看如何创建数据源的 defaultDataSourceCreator.createDataSource(dataSourceProperty)
,不同版本可能不同,高版本后面新加了好多东西。
/**
* 数据源创建器
*
* @author TaoYu
* @since 2.3.0
*/
@Slf4j
@Setter
public class DefaultDataSourceCreator {
private List<DataSourceCreator> creators;
public DataSource createDataSource(DataSourceProperty dataSourceProperty) {
DataSourceCreator dataSourceCreator = null;
for (DataSourceCreator creator : this.creators) {
if (creator.support(dataSourceProperty)) {
dataSourceCreator = creator;
break;
}
}
if (dataSourceCreator == null) {
throw new IllegalStateException("creator must not be null,please check the DataSourceCreator");
}
return dataSourceCreator.createDataSource(dataSourceProperty);
}
}
这里介绍一下 creators
dynamic-datasource-creator 模块下定义了单独数据源创建的代码。
DataSourceCreator 代表一个数据源创建器,用于创建一个数据源。
每种数据源类型都有自己的创建器,比如这里常见的 Druid、Hikar
这里就举例其中一个 HikariDataSourceCreator,其他的都差不多
HikariDataSourceCreator
调用这些创造器的创建的时候默认直接就启动了,除非配置了懒加载。
到现在,数据源就已经创建完了。
再次说一下这是在 Spring 的 afterPropertiesSet 回调里完成创建的。(afterPropertiesSet 即当前 Bean 的所有属性 Spring 都填充完毕后回调)
2、 addDataSource(dsItem.getKey(), dsItem.getValue());
上一步的 provider.loadDataSources() 讲解完毕了,这次看看下面的 addDataSource(dsItem.getKey(), dsItem.getValue());
@Override
public void afterPropertiesSet() throws Exception {
// 检查开启了配置但没有相关依赖
checkEnv();
// 添加并分组数据源
Map<String, DataSource> dataSources = new HashMap<>(16);
for (DynamicDataSourceProvider provider : providers) {
dataSources.putAll(provider.loadDataSources());
}
for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
addDataSource(dsItem.getKey(), dsItem.getValue());
}
// 检测默认数据源是否设置
if (groupDataSources.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
} else if (dataSourceMap.containsKey(primary)) {
log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
} else {
log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
}
}
addDataSource 方法
/**
* 添加数据源
*
* @param ds 数据源名称
* @param dataSource 数据源
*/
public synchronized void addDataSource(String ds, DataSource dataSource) {
DataSource oldDataSource = dataSourceMap.put(ds, dataSource);
// 新数据源添加到分组
this.addGroupDataSource(ds, dataSource);
// 关闭老的数据源
if (oldDataSource != null) {
closeDataSource(ds, oldDataSource);
}
log.info("dynamic-datasource - add a datasource named [{}] success", ds);
}
首先是先给 dataSourceMap 放进去了。这里会返回旧的数据源(如果是第一次加入,则返回null),所以下面判断了如果返回有值旧关闭掉旧的数据源,关闭就是调用数据源的 close 方法。
然后是 addGroupDataSource
/**
* 新数据源添加到分组
*
* @param ds 新数据源的名字
* @param dataSource 新数据源
*/
private void addGroupDataSource(String ds, DataSource dataSource) {
if (ds.contains(UNDERLINE)) { // 包含了下划线就代表是分组的配置
String group = ds.split(UNDERLINE)[0];
GroupDataSource groupDataSource = groupDataSources.get(group); // 拿到组
if (groupDataSource == null) {
try {
// 没有组则创建
groupDataSource = new GroupDataSource(group, strategy.getDeclaredConstructor().newInstance());
groupDataSources.put(group, groupDataSource);
} catch (Exception e) {
throw new RuntimeException("dynamic-datasource - add the datasource named " + ds + " error", e);
}
}
groupDataSource.addDatasource(ds, dataSource); // 向当前组添加数据源
}
}
这里数据源就完成了添加,这个整体步骤都是在启动的时候添加的。
添加好后,数据源就可以自主编程使用了。
具体操作最好看官方文档。
评论区