相信日常开发中,开发人员对数据源(DataSource)一词应该不陌生,这里的数据源着重指的是数据库。但通常开发人员面对数据源时,可能面临最多的是做一些配置优化相关的工作,或者当面临数据库连接不足,数据库连接泄露等问题时,才会开始着重关注数据源。但市面上,已经有各种数据源产品,如DBCP,C3P0,BoneCP等,到底哪一个是更优秀,值得信赖呢?本文将对这些数据源作一些简单测试,由于近期也是将业务线的数据源均换为Druid,所以会着重介绍下该数据源。
在JDBC4.1中,这样描述DataSource接口:
Using a Datasource object increases application portability by making it possible for an application to use a logical name for a data source instead of having to supply information specific to a particular driver. The DataSource increased performance and scalability through connection pooling.
大致意思就是,数据源可以为应用程序屏蔽特定的数据库驱动(Driver),并将数据库连接进行池化,以提升数据库操作的性能和可伸缩性。将数据库连接进行统一管理的同时,还可以对数据库连接进行监控,比如Druid数据源。
DBCP由Apache开发,内部使用通用的对象池化技术Commons Pool进行开发,主要有1.x和2.x两个版本,1.x版本中使用单线程进行对象分配和回收,因此高并发下,性能很差,这在2.x版本中进行了重写,提升了性能和伸缩性,并加入了一些跟踪和监控特性。
C3P0也是一个标准数据源实现,其从0.9.5之后开始完全支持JDBC4.0,其实现了传统的基于DriverManager的JDBC驱动到javax.sql.DataSource的适配,并对Connection和PreparedStatements作了透明池化。
BoneCP是一个高效的数据源实现,其号称没有使用自旋锁(Spin Lock),因此不会降低应用性能,并提供了一些特性,如通过分区提升性能,自动调整连接池大小,异步获取连接等。
Druid不仅仅是一个数据库连接池,它还包含一个ProxyDriver,一系列内置的JDBC组件库,一个SQL Parser,其提供了强大的监控特性,如SQL的执行时间,监控连接池的物理连接创建和销毁次数等,也在极高负载的生产环境中实际使用,性能方面也足以与其他数据源媲美。
Tomcat JDBC Connection Pool作为Tomcat的内置组件,旨在替代DBCP(1.x版本性能差),其比较轻量,核心类仅有8个,内部实现了异步获取连接,支持高并发和多核等场景
以下是测试时使用的代码片段,分别针对不同线程数和SQL查询次数进行对比测试:
public class DataSourcePermTest {
private static String JDBC_URL = "jdbc:mysql://localhost:3306/test?useSSL=false";
private static String USER = "root";
private static String PASS = "";
private static String DRIVER = "com.mysql.jdbc.Driver";
/**
* 初始的连接数
*/
private static int initPoolSize = 5;
/**
* 最小存活的连接数
*/
private static int minPoolSize = 5;
/**
* 最大存活的连接数, 超过该数后, 后续的请求将等待
*/
private static int maxPoolSize = 20;
/**
* 连接不被销毁的最长空闲时间
*/
private static int maxIdleTime = 100000;
/**
* 线程数
*/
private static int RUN_THREAD_COUNT = 20;
/**
* 每个线程执行的查询数
*/
private static int RUN_COUNT_PER_THREAD = 10000;
/**
* 运行N次求平均值
*/
private static int RUN_LOOP = 10;
/**
* ID
*/
private static Random ids = new Random();
@Test
public void testDruid() throws Exception {
DruidDataSource ds = new DruidDataSource();
ds.setUsername(USER);
ds.setUrl(JDBC_URL);
ds.setPassword(PASS);
ds.setDriverClassName(DRIVER);
ds.setInitialSize(initPoolSize);
ds.setMinIdle(minPoolSize);
ds.setMaxActive(maxPoolSize);
ds.setMinEvictableIdleTimeMillis(maxIdleTime);
ds.setTestWhileIdle(false);
ds.setTestOnReturn(false);
ds.setTestOnBorrow(false);
runTestQuery(ds);
}
@Test
public void testTomcatJdbcPool() throws Exception {
org.apache.tomcat.jdbc.pool.DataSource ds = new org.apache.tomcat.jdbc.pool.DataSource();
ds.setUsername(USER);
ds.setUrl(JDBC_URL);
ds.setPassword(PASS);
ds.setDriverClassName(DRIVER);
ds.setInitialSize(initPoolSize);
ds.setMaxActive(maxPoolSize);
ds.setMinIdle(minPoolSize);
ds.setMaxIdle(minPoolSize);
ds.setMinEvictableIdleTimeMillis(maxIdleTime);
ds.setTestWhileIdle(false);
ds.setTestOnReturn(false);
ds.setTestOnBorrow(false);
runTestQuery(ds);
}
@Test
public void testDBCP2() throws Exception {
BasicDataSource ds = new BasicDataSource();
ds.setUsername(USER);
ds.setUrl(JDBC_URL);
ds.setPassword(PASS);
ds.setDriverClassName(DRIVER);
ds.setInitialSize(initPoolSize);
ds.setMaxTotal(maxPoolSize);
ds.setMaxIdle(minPoolSize);
ds.setMinEvictableIdleTimeMillis(maxIdleTime);
ds.setTestWhileIdle(false);
ds.setTestOnReturn(false);
ds.setTestOnBorrow(false);
runTestQuery(ds);
}
@Test
public void testC3P0() throws Exception {
ComboPooledDataSource ds = new ComboPooledDataSource();
try {
ds.setDriverClass(DRIVER);
} catch (PropertyVetoException e) {
}
ds.setJdbcUrl(JDBC_URL);
ds.setUser(USER);
ds.setPassword(PASS);
ds.setInitialPoolSize(initPoolSize);
ds.setMinPoolSize(minPoolSize);
ds.setMaxPoolSize(maxPoolSize);
ds.setMaxIdleTime(maxIdleTime);
ds.setTestConnectionOnCheckin(false);
ds.setTestConnectionOnCheckout(false);
runTestQuery(ds);
}
@Test
public void testBoneCP() throws Exception {
BoneCPDataSource ds = new BoneCPDataSource();
ds.setUsername(USER);
ds.setPassword(PASS);
ds.setJdbcUrl(JDBC_URL);
ds.setDriverClass(DRIVER);
ds.setServiceOrder("LIFO");
ds.setMinConnectionsPerPartition(minPoolSize);
ds.setMaxConnectionsPerPartition(maxPoolSize);
ds.setIdleMaxAge(maxIdleTime, TimeUnit.MILLISECONDS);
ds.setPartitionCount(1);
runTestQuery(ds);
}
private void runTestQuery(final DataSource ds) throws Exception {
// prepare
for (int i=0; i<5; i++){
doTestQuery(ds);
}
long totalTime = 0L;
long currentTime = 0L;
for (int i = 0; i< RUN_LOOP; i++){
currentTime = doTestQuery(ds);
totalTime += currentTime;
System.out.println(i + ": " + RUN_THREAD_COUNT + " threads, every run " + RUN_COUNT_PER_THREAD + " times query, run time: " + currentTime + " ms");
}
System.out.println("Loop " + RUN_LOOP + " times, "
+ RUN_THREAD_COUNT + " threads, every run " + RUN_COUNT_PER_THREAD
+ " times query, average run time: " + totalTime / RUN_LOOP + " ms");
}
private long doTestQuery(final DataSource ds) throws InterruptedException {
long start = System.currentTimeMillis();
final CountDownLatch latch = new CountDownLatch(RUN_THREAD_COUNT);
for (int i = 0; i< RUN_THREAD_COUNT; i++){
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < RUN_COUNT_PER_THREAD; i++) {
doQuery(ds);
}
latch.countDown();
}
}).start();
}
latch.await();
return System.currentTimeMillis() - start;
}
public void doQuery(DataSource ds) {
try {
Connection conn = ds.getConnection();
String sql = "SELECT * FROM blogs where id = " + ids.nextInt(10000000);
Statement st = conn.createStatement();
ResultSet res = st.executeQuery(sql);
res.close();
st.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
运行环境为:
各数据源耗时(ms)对比图如下:
数据源/线程数*查询次数 | 1 * 1000 | 1 * 10000 | 5 * 1000 | 5 * 10000 | 10 * 1000 | 10 * 10000 | 20 * 1000 | 20 * 10000 |
Druid | 439 | 2322 | 484 | 4382 | 943 | 8851 | 2047 | 20268 |
Tomcat JDBC Pool | 342 | 2277 | 464 | 4494 | 940 | 9253 | 2016 | 20296 |
DBCP2 | 352 | 2486 | 499 | 4670 | 1142 | 8340 | 1934 | 18727 |
C3P0 | 337 | 2302 | 610 | 5352 | 1098 | 10276 | 2086 | 20801 |
BoneCP | 565 | 4557 | 898 | 8538 | 1934 | 18222 | 4219 | 41807 |
从上面的测试结果来看,Druid,Tomcat JDBC Pool和DBCP2在性能上比较有优势,官方通过对连接进行申请和归还来进行对比测试,实际情况还需自行测试。但Druid支持一些强大的监控特性,通过Druid提供的监控功能,可以清楚知道连接池和SQL的工作情况;除此外,Druid内置了一个高性能的SQL Parser,可以通过其作一些JDBC层拦截SQL做相应处理,比如说分库分表、审计等。因此下面将更多地聊聊Druid。
DruidDataSource的接口设计:
package java.sql;
/**
* 该接口可用于在JDBC类中,访问内部的代理实例
*/
public interface Wrapper {
/**
* 返回接口的实现类对象或实现类对象的代理
* 可用于访问一些非标准的方法,或一些没有被代理暴露的非标准方法
*/
<T> T unwrap(java.lang.Class<T> iface) throws java.sql.SQLException;
/**
* 对象是否实现了接口,或者直接或间接包装了实现了该接口的对象
*/
boolean isWrapperFor(java.lang.Class<?> iface) throws java.sql.SQLException;
}
/**
* 定义DataSource,XADataSource和ConnectionPoolDataSource的公共接口
*/
public interface CommonDataSource {
/**
* 获取数据源的日志写入器
*/
java.io.PrintWriter getLogWriter() throws SQLException;
/**
* 设置数据源的日志写入器
*/
void setLogWriter(java.io.PrintWriter out) throws SQLException;
/**
* 设置数据源连接数据库时,最大的等待等待时间
*/
void setLoginTimeout(int seconds) throws SQLException;
/**
* 获取数据源连接数据库时,最大的等待等待时间
*/
int getLoginTimeout() throws SQLException;
//------------------------- JDBC 4.1 -----------------------------------
/**
* 获取数据源的父日志接口
*/
public Logger getParentLogger() throws SQLFeatureNotSupportedException;
}
/**
* 数据源是一种比DriverManager获取数据库连接更好的方式
* 数据源实现者通常需要基于JNDI API来注册名字服务(Naming Service)
* 数据源需要被数据库驱动厂商实现,有三种类型的实现:
* 1. 基本实现:获取一个标准的连接对象;
* 2. 连接池实现:获取一个由连接池管理器管理的连接对象;
* 3. 分布式事务实现:获取一个用于分布式事务的连接对象,通常也由连接池管理器管理。
*/
public interface DataSource extends CommonDataSource,Wrapper {
/**
* 获取Connection对象
*/
Connection getConnection() throws SQLException;
/**
* 使用用户名和密码获取Connection对象
*/
Connection getConnection(String username, String password) throws SQLException;
}
/**
* PooledConnection工厂类
*/
public interface ConnectionPoolDataSource extends CommonDataSource {
/**
* 获取PooledConnection对象
*/
PooledConnection getPooledConnection() throws SQLException;
/**
* 使用用户名和密码获取PooledConnection对象
*/
PooledConnection getPooledConnection(String user, String password) throws SQLException;
}
/**
* 数据源代理类
*/
public interface DataSourceProxy {
/**
* 数据源状态
*/
JdbcDataSourceStat getDataSourceStat();
/**
* 数据源名称
*/
String getName();
/**
* 数据库类型
*/
String getDbType();
/**
* 内部包装的数据库驱动
*/
Driver getRawDriver();
/**
* 获取JDBC URL
*/
String getUrl();
/**
* 获取裸露的JDBC URL
*/
String getRawJdbcUrl();
/**
* 代理时使用的过滤器
*/
List<Filter> getProxyFilters();
/**
* 创建Connection ID
*/
long createConnectionId();
/**
* 创建Statement ID
*/
long createStatementId();
/**
* 创建ResultSet ID
*/
long createResultSetId();
/**
* 创建源数据ID
*/
long createMetaDataId();
/**
* 创建事务ID
*/
long createTransactionId();
/**
* 数据源连接属性
*/
Properties getConnectProperties();
}
/**
* Wrapper的简单实现
*/
public class WrapperAdapter implements Wrapper {
public WrapperAdapter(){
}
@Override
public boolean isWrapperFor(Class<?> iface) {
// 当前对象是否实现了iface接口
return iface != null && iface.isInstance(this);
}
@SuppressWarnings("unchecked")
@Override
public <T> T unwrap(Class<T> iface) {
if (iface == null) {
return null;
}
// 若当前对象是iface接口的实例,则返回当前对象
if (iface.isInstance(this)) {
return (T) this;
}
return null;
}
}
/**
* 通过MBeanServer监控数据源时,定义需要监控的基本信息,
* 包括一些进行配置信息和运行时信息
*/
public interface DruidAbstractDataSourceMBean {
/**
* 数据源连接数据库时,最大的等待时间(秒)
*/
int getLoginTimeout();
/**
* 数据库类型
*/
String getDbType();
/**
* 数据源名称
*/
String getName();
/**
* 数据源初始连接数大小
*/
int getInitialSize();
/**
* 连接用户名
*/
String getUsername();
/**
* 连接地址
*/
String getUrl();
/**
* 数据库驱动类名
*/
String getDriverClassName();
/**
* 连接数
*/
long getConnectCount();
/**
* 连接关闭数
*/
long getCloseCount();
/**
* 连接错误数
*/
long getConnectErrorCount();
/**
* 连接池大小
*/
int getPoolingCount();
/**
* 连接回收数
*/
long getRecycleCount();
/**
* 连接存活数
*/
int getActiveCount();
/**
* 连接创建数
*/
long getCreateCount();
/**
* 连接销毁数
*/
long getDestroyCount();
/**
* 创建时间间隔
*/
long getCreateTimespanMillis();
/**
* 事务提交数
*/
long getCommitCount();
/**
* 事务回滚数
*/
long getRollbackCount();
/**
* 事务开启数
*/
long getStartTransactionCount();
/**
* 查询超时时间
*/
int getQueryTimeout();
/**
* 事务查询超时时间
*/
int getTransactionQueryTimeout();
/**
* 验证连接有效性的SQL
*/
String getValidationQuery();
/**
* 验证连接的查询超时时间
*/
int getValidationQueryTimeout();
/**
* 最大等待线程数
*/
int getMaxWaitThreadCount();
/**
* 间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
*/
long getTimeBetweenEvictionRunsMillis();
/**
* 连接保持空闲而不被驱逐的最长时间
*/
long getMinEvictableIdleTimeMillis();
/**
* 是否自动回收超时连接
*/
boolean isRemoveAbandoned();
/**
* 连接超时时间
*/
long getRemoveAbandonedTimeoutMillis();
/**
* 存活连接的栈信息
*/
List<String> getActiveConnectionStackTrace();
/**
* filter类名列表
*/
List<String> getFilterClassNames();
/**
* 获取到连接时,是否检测连接有效性,会影响性能
*/
boolean isTestOnBorrow();
/**
* 设置获取到连接时,是否检测连接有效性
*/
void setTestOnBorrow(boolean testOnBorrow);
/**
* 归还连接时,是否检测连接有效性
*/
boolean isTestOnReturn();
/**
* 空闲时,是否检测连接有效性
*/
boolean isTestWhileIdle();
/**
* 设置空闲时,是否检测连接有效性
*/
void setTestWhileIdle(boolean testWhileIdle);
/**
* 是否默认提交事务
*/
boolean isDefaultAutoCommit();
/**
* 默认的是否为只读模式
*/
Boolean getDefaultReadOnly();
/**
* 默认的事务隔离级别
*/
Integer getDefaultTransactionIsolation();
/**
* 默认Catalog
*/
String getDefaultCatalog();
/**
* 是否池化PS
*/
boolean isPoolPreparedStatements();
/**
* 是否共享PS
*/
boolean isSharePreparedStatements();
/**
* 获取连接最大等待时间
*/
long getMaxWait();
/**
* 最小空闲连接
*/
int getMinIdle();
/**
* 最大空闲连接
*/
int getMaxIdle();
/**
* 创建连接错误数
*/
long getCreateErrorCount();
/**
* 最大存活连接数
*/
int getMaxActive();
/**
* 设置最大存活连接数
*/
void setMaxActive(int maxActive);
/**
* 发生错误时多久重连
*/
long getTimeBetweenConnectErrorMillis();
/**
* 最大打开的PS数
*/
int getMaxOpenPreparedStatements();
/**
* 移除的超时连接数
*/
long getRemoveAbandonedCount();
/**
* 移除超时连接时,是否日志记录
*/
boolean isLogAbandoned();
/**
* 移除超时连接时,是否日志记录
*/
void setLogAbandoned(boolean logAbandoned);
/**
* 重复关闭数
*/
long getDupCloseCount();
/**
* 数据源获取连接失败后,是否关闭数据源
*/
boolean isBreakAfterAcquireFailure();
/**
* 连接错误时,重试次数
*/
int getConnectionErrorRetryAttempts();
/**
* 每个连接池化的最大PS数
*/
int getMaxPoolPreparedStatementPerConnectionSize();
/**
* 设置每个连接池化的最大PS数
*/
void setMaxPoolPreparedStatementPerConnectionSize(int maxPoolPreparedStatementPerConnectionSize);
/**
* 获取属性
*/
String getProperties();
/**
* 数据库驱动的小版本
*/
int getRawDriverMinorVersion();
/**
* 数据库驱动的大版本
*/
int getRawDriverMajorVersion();
/**
* 数据源创建时间
*/
Date getCreatedTime();
/**
* 获取检测连接的类名
*/
String getValidConnectionCheckerClassName();
/**
* 获取事务的柱状图
*/
long[] getTransactionHistogramValues();
/**
* 设置事务超时时间
*/
void setTransactionThresholdMillis(long transactionThresholdMillis);
/**
* 事务超时时间
*/
long getTransactionThresholdMillis();
/**
* PS数
*/
long getPreparedStatementCount();
/**
* 关闭的PS数
*/
long getClosedPreparedStatementCount();
/**
* 缓存的PS数
*/
long getCachedPreparedStatementCount();
/**
* 缓存PS被删除的数量
*/
long getCachedPreparedStatementDeleteCount();
/**
* 缓存PS的被访问数
*/
long getCachedPreparedStatementAccessCount();
/**
* 缓存PS的Miss数
*/
long getCachedPreparedStatementMissCount();
/**
* 缓存PS的命中数
*/
long getCachedPreparedStatementHitCount();
/**
* 是否使用Oracle实现的Cache
*/
boolean isUseOracleImplicitCache();
/**
* 设置是否使用Oracle实现的Cache
*/
void setUseOracleImplicitCache(boolean useOracleImplicitCache);
/**
* 获取数据库驱动的大版本
*/
int getDriverMajorVersion();
/**
* 获取数据库驱动的小版本
*/
int getDriverMinorVersion();
/**
* 异常检测器的类名
*/
String getExceptionSorterClassName();
}
/**
* 在MBean注册到MBean Server(或从MBean Server取消注册)前后,执行一些特定操作
*/
public interface MBeanRegistration {
/**
* MBean注册到MBeanServer前执行
*
*/
public ObjectName preRegister(MBeanServer server, ObjectName name) throws java.lang.Exception;
/**
* MBean注册到MBeanServer后执行
*/
public void postRegister(Boolean registrationDone);
/**
* MBean取消注册前执行
*/
public void preDeregister() throws java.lang.Exception ;
/**
* MBean取消注册后执行
*/
public void postDeregister();
}
/**
* DruidDataSource实现扩展的一些监控信息
*/
public interface DruidDataSourceMBean extends DruidAbstractDataSourceMBean {
/**
* 数据源重置次数
*/
long getResetCount();
/**
* 数据源是否启用
*/
boolean isEnable();
/**
* 收缩数据源
*/
void shrink();
/**
* 移除超时连接的数量
*/
int removeAbandoned();
/**
* dump数据源信息
*/
String dump();
/**
* 等待线程数
*/
int getWaitThreadCount();
/**
* 获取Lock的等待线程数
*/
int getLockQueueLength();
/**
* 数据源非空条件的等待数
*/
long getNotEmptyWaitCount();
/**
* 非空条件的等待线程数
*/
int getNotEmptyWaitThreadCount();
/**
* 非空条件的唤醒数
*/
long getNotEmptySignalCount();
/**
* 非空条件的等待毫秒数
*/
long getNotEmptyWaitMillis();
/**
* 非空条件的等待纳秒数
*/
long getNotEmptyWaitNanos();
/**
* 重置状态
*/
void resetStat();
/**
* 是否运行重置状态
*/
boolean isResetStatEnable();
/**
* 设置是否运行重置状态
*/
void setResetStatEnable(boolean resetStatEnable);
/**
* 版本
*/
String getVersion();
/**
* 设置是否池化PS
*/
void setPoolPreparedStatements(boolean poolPreparedStatements);
/**
* 存活连接的最大值
*/
int getActivePeak();
/**
* 连接池的最大值
* @return
*/
int getPoolingPeak();
/**
* 连接存活的最大时间
* @return
*/
Date getActivePeakTime();
/**
* 连接池化的最大时间
*/
Date getPoolingPeakTime();
/**
* 错误数
*/
long getErrorCount();
/**
* 数据源MBean的ObjectName
*/
ObjectName getObjectName();
/**
* 清除Statement缓存
*/
void clearStatementCache() throws SQLException;
/**
* 丢弃的连接数
*/
long getDiscardCount();
/**
* 设置数据源状态的日志类名称
*/
void setStatLoggerClassName(String className);
/**
* 数据源状态的日志打印间隔时间
*/
long getTimeBetweenLogStatsMillis();
/**
* 设置数据源状态的日志打印间隔时间
*/
void setTimeBetweenLogStatsMillis(long timeBetweenLogStatsMillis);
/**
* 设置连接属性
*/
void setConnectionProperties(String connectionProperties);
/**
* 填充连接, 默认补充到maxActive个连接
*/
int fill() throws SQLException;
/**
* 填充连接
* @param toCount 补充到多少个连接
*/
int fill(int toCount) throws SQLException;
}
/**
* 可以被管理的数据源, 即成为一个MBean
*/
public interface ManagedDataSource extends DataSource {
/**
* 是否开启被管理
*/
boolean isEnable();
/**
* 设置是否开启被管理
*/
void setEnable(boolean value);
/**
* MBean的名称
*/
ObjectName getObjectName();
/**
* 设置MBean的名称
*/
void setObjectName(ObjectName objectName);
}
DruidDataSource的实现主要有DruidAbstractDataSource和DruidDataSource:
DruidAbstractDataSource作为DruidDataSource和DruidXADataSource数据源的父类,主要设置一些数据源的基本属性,和一些基础方法的实现,如创建物理连接,验证物理连接等:
/**
* 创建物理连接
*/
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
// 数据库URL
String url = this.getUrl();
// 连接属性
Properties connectProperties = getConnectProperties();
// 用户名
String user;
if (getUserCallback() != null) {
// 输入用户名
user = getUserCallback().getName();
} else {
user = getUsername();
}
// 密码
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if (passwordCallback != null) {
if (passwordCallback instanceof DruidPasswordCallback) {
DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
}
char[] chars = passwordCallback.getPassword();
if (chars != null) {
password = new String(chars);
}
}
// 连接属性
Properties physicalConnectProperties = new Properties();
if (connectProperties != null) {
physicalConnectProperties.putAll(connectProperties);
}
if (user != null && user.length() != 0) {
physicalConnectProperties.put("user", user);
}
if (password != null && password.length() != 0) {
physicalConnectProperties.put("password", password);
}
Connection conn;
// 进行连接相关操作的开始时间
long connectStartNanos = System.nanoTime();
// 成功连接后的时间, 初始化后的时间, 验证成功后的时间
long connectedNanos, initedNanos, validatedNanos;
try {
// 创建物理连接
conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null) {
throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
}
// 初始化连接基本属性, 如是否自动提交, 事务隔离级别
initPhysicalConnection(conn);
initedNanos = System.nanoTime();
// 验证连接
validateConnection(conn);
validatedNanos = System.nanoTime();
setCreateError(null);
} catch (SQLException ex) {
setCreateError(ex);
throw ex;
} catch (RuntimeException ex) {
setCreateError(ex);
throw ex;
} catch (Error ex) {
// 创建连接错误数 + 1
createErrorCount.incrementAndGet();
throw ex;
} finally {
long nano = System.nanoTime() - connectStartNanos;
// 创建连接的时间跨度
createTimespan += nano;
}
// 物理连接信息
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos);
}
/**
* 创建物理连接
* @param url DB URL
* @param info 连接属性
* @return 连接
* @throws SQLException
*/
public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
Connection conn;
if (getProxyFilters().size() == 0) {
// 若没有配置proxyFilter, 则直接创建连接
conn = getDriver().connect(url, info);
} else {
// 若配置了proxyFilter, 则通过FilterChainImpl来创建连接, FilterChain后面讲述
conn = new FilterChainImpl(this).connection_connect(info);
}
// 创建连接数 +1
createCount.incrementAndGet();
return conn;
}
public static class PhysicalConnectionInfo {
private Connection connection;
...
public PhysicalConnectionInfo(Connection connection //
, long connectStartNanos //
, long connectedNanos //
, long initedNanos //
, long validatedNanos) {
this.connection = connection;
...
}
...
// 获取建立连接所消耗的时间
public long getConnectNanoSpan() {
return connectedNanos - connectStartNanos;
}
}
/**
* 验证连接有效性
*/
public void validateConnection(Connection conn) throws SQLException {
// 验证的SQL语句
String query = getValidationQuery();
if (conn.isClosed()) {
throw new SQLException("validateConnection: connection closed");
}
// 若配置了验证实现类
if (validConnectionChecker != null) {
boolean result = true;
Exception error = null;
try {
// 通常使用ping命令校验,如MySQL
result = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
} catch (Exception ex) {
error = ex;
}
if (!result) {
SQLException sqlError = error != null ? //
new SQLException("validateConnection false", error) //
: new SQLException("validateConnection false");
throw sqlError;
}
return;
}
// 若未配置了验证实现类
// 直接创建Statement执行query
if (null != query) {
Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(getValidationQueryTimeout());
}
rs = stmt.executeQuery(query);
if (!rs.next()) {
throw new SQLException("validationQuery didn't return a row");
}
} finally {
// 释放Statement, ResultSet
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}
}
}
public DruidDataSource(boolean fairLock){
// 是否以公平锁来获取数据源
super(fairLock);
// 配置数据源基本属性
configFromPropety(System.getProperties());
}
public void configFromPropety(Properties properties) {
{
// 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,
// 如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
Boolean value = getBoolean(properties, "druid.testWhileIdle");
if (value != null) {
this.setTestWhileIdle(value);
}
}
{
// 获取连接后是否检测有效性,
// 这通常可以防止使用无效的连接执行SQL时, Server抛错, 但有损性能
Boolean value = getBoolean(properties, "druid.testOnBorrow");
if (value != null) {
this.setTestOnBorrow(value);
}
}
{
// 检测语句
String property = properties.getProperty("druid.validationQuery");
if (property != null && property.length() > 0) {
this.setValidationQuery(property);
}
}
{
// 是否使用全局的数据源状态
Boolean value = getBoolean(properties, "druid.useGlobalDataSourceStat");
if (value != null) {
this.setUseGlobalDataSourceStat(value);
}
}
{
// 过滤器
String property = properties.getProperty("druid.filters");
if (property != null && property.length() > 0) {
try {
this.setFilters(property);
} catch (SQLException e) {
LOG.error("setFilters error", e);
}
}
}
{
// 数据源状态日志输出时间间隔
String property = properties.getProperty(Constants.DRUID_TIME_BETWEEN_LOG_STATS_MILLIS);
if (property != null && property.length() > 0) {
try {
long value = Long.parseLong(property);
this.setTimeBetweenLogStatsMillis(value);
} catch (NumberFormatException e) {
LOG.error("illegal property '" + Constants.DRUID_TIME_BETWEEN_LOG_STATS_MILLIS + "'", e);
}
}
}
{
// 数据源状态的最大SQL长度
String property = properties.getProperty(Constants.DRUID_STAT_SQL_MAX_SIZE);
if (property != null && property.length() > 0) {
try {
int value = Integer.parseInt(property);
if (dataSourceStat != null) {
dataSourceStat.setMaxSqlSize(value);
}
} catch (NumberFormatException e) {
LOG.error("illegal property '" + Constants.DRUID_STAT_SQL_MAX_SIZE + "'", e);
}
}
}
{
// 是否允许清除过滤器
Boolean value = getBoolean(properties, "druid.clearFiltersEnable");
if (value != null) {
this.setClearFiltersEnable(value);
}
}
{
// 是否允许重置状态
Boolean value = getBoolean(properties, "druid.resetStatEnable");
if (value != null) {
this.setResetStatEnable(value);
}
}
{
// 数据源未满条件等待超时后的重试次数
String property = properties.getProperty("druid.notFullTimeoutRetryCount");
if (property != null && property.length() > 0) {
try {
int value = Integer.parseInt(property);
this.setNotFullTimeoutRetryCount(value);
} catch (NumberFormatException e) {
LOG.error("illegal property 'druid.notFullTimeoutRetryCount'", e);
}
}
}
{
// 获取连接时, 等待的最大线程数
String property = properties.getProperty("druid.maxWaitThreadCount");
if (property != null && property.length() > 0) {
try {
int value = Integer.parseInt(property);
this.setMaxWaitThreadCount(value);
} catch (NumberFormatException e) {
LOG.error("illegal property 'druid.maxWaitThreadCount'", e);
}
}
}
{
// 是否快速失败
Boolean value = getBoolean(properties, "druid.failFast");
if (value != null) {
this.setFailFast(value);
}
}
{
// 物理超时时间
String property = properties.getProperty("druid.phyTimeoutMillis");
if (property != null && property.length() > 0) {
try {
long value = Long.parseLong(property);
this.setPhyTimeoutMillis(value);
} catch (NumberFormatException e) {
LOG.error("illegal property 'druid.phyTimeoutMillis'", e);
}
}
}
{
// 连接保持空闲而不被驱逐的最长时间
String property = properties.getProperty("druid.minEvictableIdleTimeMillis");
if (property != null && property.length() > 0) {
try {
long value = Long.parseLong(property);
this.setMinEvictableIdleTimeMillis(value);
} catch (NumberFormatException e) {
LOG.error("illegal property 'druid.minEvictableIdleTimeMillis'", e);
}
}
}
{
// 连接被驱逐前, 最大的空闲时间
String property = properties.getProperty("druid.maxEvictableIdleTimeMillis");
if (property != null && property.length() > 0) {
try {
long value = Long.parseLong(property);
this.setMaxEvictableIdleTimeMillis(value);
} catch (NumberFormatException e) {
LOG.error("illegal property 'druid.maxEvictableIdleTimeMillis'", e);
}
}
}
}
public void init() throws SQLException {
// 是否已初始化
if (inited) {
return;
}
// 初始化加锁
final ReentrantLock lock = this.lock;
try {
// 可被中断
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}
boolean init = false;
try {
// 再次检查是否已初始化
if (inited) {
return;
}
initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
// 数据源ID生成, 从1开始
this.id = DruidDriver.createDataSourceId();
if (this.id > 1) {
// 多数据源时, 以下属性ID生成相差10w, 即每个数据源的以下属性有10w个id
long delta = (this.id - 1) * 100000;
// Connection ID
this.connectionIdSeed.addAndGet(delta);
// Statement ID
this.statementIdSeed.addAndGet(delta);
// ResultSet ID
this.resultSetIdSeed.addAndGet(delta);
// transaction ID
this.transactionIdSeed.addAndGet(delta);
}
// jdbc url
if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl();
}
// 初始化filter
for (Filter filter : filters) {
filter.init(this);
}
// db类型
if (this.dbType == null || this.dbType.length() == 0) {
this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
}
// MySQL 或 MariaDB
if (JdbcConstants.MYSQL.equals(this.dbType) || //
JdbcConstants.MARIADB.equals(this.dbType)) {
boolean cacheServerConfigurationSet = false;
// 是否缓存服务端配置
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}
// 最大存活连接数不能小于0
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
// 最大存活连接数不能小于最小空闲连接数
if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}
// 初始化连接数不能大于最大存活连接数
if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActieve " + maxActive);
}
if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}
if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}
if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}
// 使用ServiceLoader加载Filter, 即从META-INF/services/com.alibaba.druid.filter.Filter中获取实现类
// Filter实现类必须注解@AutoLoad, 才能被addFilter
initFromSPIServiceLoader();
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
// 从URL中解析Driver
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
}
if (MockDriver.class.getName().equals(driverClass)) {
// MockDriver
driver = MockDriver.instance;
} else {
// 加载Driver
driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
}
} else {
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}
// 做一些特例DB检查, 如Oracle版本必须大于10
// 针对DB2的validateQuery语句
initCheck();
// 初始化异常处理器, 根据不同的DB驱动, 实例化不同的ExceptionSorter
// ExceptionSorter可用于友好地处理一些严重的数据库错误,
// 如Server不可用等, 这时可能需要将连接关闭
initExceptionSorter();
// 初始化连接验证器, 根据不同的DB驱动, 实例化不同的ValidConnectionChecker
// ValidConnectionChecker用于验证连接是否有效
initValidConnectionChecker();
// 检查validationQuery是否需要设置
validationQueryCheck();
if (isUseGlobalDataSourceStat()) {
// 使用全局的数据源状态
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.getDbType());
}
} else {
// 使用单独的数据源状态
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);
// 预实例化连接数为最大存活连接数
connections = new DruidConnectionHolder[maxActive];
SQLException connectError = null;
try {
// 初始化连接池, 连接数为initialSize
for (int i = 0, size = getInitialSize(); i < size; ++i) {
// 创建物理连接
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
// 包装连接为DruidConnectionHolder
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
connections[poolingCount] = holder;
// poolingCount 统计数+1
incrementPoolingCount();
}
if (poolingCount > 0) {
// 记录下连接池最大值
poolingPeak = poolingCount;
// 记录下连接池最大值时的时间, 可以分析什么时刻请求连接达到最高
poolingPeakTime = System.currentTimeMillis();
}
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
connectError = ex;
}
// 开启日志记录数据源状态的线程
createAndLogThread();
// 开启创建连接的线程
createAndStartCreatorThread();
// 开启销毁连接的线程
createAndStartDestroyThread();
// 等待上面的CreateConnectionThread和DestroyConnectionThread成功启动
initedLatch.await();
init = true;
// 完成初始化的时间
initedTime = new Date();
// 注册数据源到MBean Server
registerMbean();
if (connectError != null && poolingCount == 0) {
throw connectError;
}
} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} finally {
// 初始化完毕
inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
LOG.info("{dataSource-" + this.getID() + "} inited");
}
}
}
public void close() {
// 加锁
lock.lock();
try {
// 若已经关闭
if (this.closed) {
return;
}
// 若未初始化过
if (!this.inited) {
return;
}
// 中断记录状态的日志线程
if (logStatsThread != null) {
logStatsThread.interrupt();
}
// 中断创建连接的线程
if (createConnectionThread != null) {
createConnectionThread.interrupt();
}
// 中断销毁连接的线程
if (destroyConnectionThread != null) {
destroyConnectionThread.interrupt();
}
// 取消销毁调度任务
if (destroySchedulerFuture != null) {
destroySchedulerFuture.cancel(true);
}
for (int i = 0; i < poolingCount; ++i) {
try {
DruidConnectionHolder connHolder = connections[i];
// 关闭连接中的Statement
for (PreparedStatementHolder stmtHolder : connHolder.getStatementPool().getMap().values()) {
connHolder.getStatementPool().closeRemovedStatement(stmtHolder);
}
connHolder.getStatementPool().getMap().clear();
Connection physicalConnection = connHolder.getConnection();
// 关闭物理连接
physicalConnection.close();
// help gc clean
connections[i] = null;
// 销毁连接数 +1
destroyCount.incrementAndGet();
} catch (Exception ex) {
LOG.warn("close connection error", ex);
}
}
// 连接数置0
poolingCount = 0;
// 取消注册MBean
unregisterMbean();
enable = false;
notEmpty.signalAll();
notEmptySignalCount++;
// 已关闭
this.closed = true;
this.closeTimeMillis = System.currentTimeMillis();
// 销毁filter
for (Filter filter : filters) {
filter.destroy();
}
} finally {
lock.unlock();
}
if (LOG.isInfoEnabled()) {
LOG.info("{dataSource-" + this.getID() + "} closed");
}
}
public void restart() throws SQLException {
// 加锁
lock.lock();
try {
// 若还有存活的连接
if (activeCount > 0) {
throw new SQLException("can not restart, activeCount not zero. " + activeCount);
}
if (LOG.isInfoEnabled()) {
LOG.info("{dataSource-" + this.getID() + "} restart");
}
// 关闭数据源
this.close();
// 重置状态
this.resetStat();
// 未初始化标志
this.inited = false;
this.enable = true;
this.closed = false;
} finally {
lock.unlock();
}
}
public DruidPooledConnection getConnection() throws SQLException {
// 默认不设置最大等待时间
return getConnection(maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
// 尝试初始化数据源
init();
if (filters.size() > 0) {
// 经过filter中获取连接
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
// 直接获取连接
return getConnectionDirect(maxWaitMillis);
}
}
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
// 请求连接
for (;;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection;
try {
// 获取Connection
poolableConnection = getConnectionInternal(maxWaitMillis);
} catch (GetConnectionTimeoutException ex) {
// 获取连接超时, 并且连接池未满
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) {
notFullTimeoutRetryCnt++;
if (LOG.isWarnEnabled()) {
LOG.warn("not full timeout retry : " + notFullTimeoutRetryCnt);
}
// 再次尝试获取连接
continue;
}
throw ex;
}
// 当获取到连接后作验证
if (isTestOnBorrow()) {
// 验证连接
// 1. 优先使用ValidConnectionChecker
// 2. 否则, 直接通过Connection的Statement对象执行validateQuery
boolean validate = testConnectionInternal(poolableConnection.getConnection());
if (!validate) {
// 验证失败
// 真实的连接对象
Connection realConnection = poolableConnection.getConnection();
// 丢弃连接
discardConnection(realConnection);
continue;
}
} else {
Connection realConnection = poolableConnection.getConnection();
if (realConnection.isClosed()) {
// 连接已关闭
// 传入null,避免重复关闭
discardConnection(null);
continue;
}
// 是否在连接空闲时, 检测连接有效性
if (isTestWhileIdle()) {
// 当前时间
final long currentTimeMillis = System.currentTimeMillis();
// 连接最后激活时间
final long lastActiveTimeMillis = poolableConnection.getConnectionHolder().getLastActiveTimeMillis();
// 连接已经空闲的时间
final long idleMillis = currentTimeMillis - lastActiveTimeMillis;
// 每隔多久运行一次驱逐空闲连接的动作
long timeBetweenEvictionRunsMillis = this.getTimeBetweenEvictionRunsMillis();
if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}
// 该连接空闲时间 > 驱逐空闲连接的间隔时间
// 可以执行一次连接验证
if (idleMillis >= timeBetweenEvictionRunsMillis) {
// 验证同上
boolean validate = testConnectionInternal(poolableConnection.getConnection());
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}
// 丢弃连接
discardConnection(realConnection);
continue;
}
}
}
}
// 是否回收长时间不使用的连接
// 防止因应用程序未正确关闭连接而导致的连接泄露
// remove abandoned的动作会在DestroyTask中执行
// 配置removeAbandoned对性能会有一些影响,建议怀疑存在泄漏之后再打开
if (isRemoveAbandoned()) {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.setConnectStackTrace(stackTrace);
// 设置连接的连接完成时间
poolableConnection.setConnectedTimeNano();
poolableConnection.setTraceEnable(true);
// 将连接放入activeConnections
synchronized (activeConnections) {
activeConnections.put(poolableConnection, PRESENT);
}
}
// 不自动提交事务
if (!this.isDefaultAutoCommit()) {
poolableConnection.setAutoCommit(false);
}
return poolableConnection;
}
}
/**
* 丢弃连接
*/
public void discardConnection(Connection realConnection) {
// 物理关闭连接
JdbcUtils.close(realConnection);
// 加锁
lock.lock();
try {
// 存活连接数 -1
activeCount--;
// 丢弃连接数 +1
discardCount++;
// 当 存活连接数 < 最小空闲连接数时, 可以尝试增加一个用于创建连接的任务, 即CreateConnectionTask
if (activeCount <= minIdle) {
emptySignal();
}
} finally {
// 释放锁
lock.unlock();
}
}
/**
* 获取连接对象
*/
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
// 数据源是否已经关闭
if (closed) {
connectErrorCount.incrementAndGet();
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}
// 数据源是否可用
if (!enable) {
connectErrorCount.incrementAndGet();
throw new DataSourceDisableException();
}
// 最大等待时间
final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait);
// 最大等待线程数
final int maxWaitThreadCount = getMaxWaitThreadCount();
DruidConnectionHolder holder;
try {
// 获取连接时加锁, 并可被中断
lock.lockInterruptibly();
} catch (InterruptedException e) {
// 连接错误数 +1
connectErrorCount.incrementAndGet();
throw new SQLException("interrupt", e);
}
try {
if (maxWaitThreadCount > 0) {
// 当前等待连接的线程数 > 最大等待线程数
if (notEmptyWaitThreadCount >= maxWaitThreadCount) {
// 连接错误数 +1
connectErrorCount.incrementAndGet();
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}
}
// 连接数 +1
connectCount++;
// 开始获取连接:这里使用lock的notEmpty, notFull两个Condition实现连接的获取,类似生产者,消费者的模式
if (maxWait > 0) {
// 若设置了最大等待时间
// 获取连接, 有可能超时, 返回null
holder = pollLast(nanos);
} else {
// 一直等待到获取到连接
holder = takeLast();
}
if (holder != null) {
// 激活连接数 +1
activeCount++;
// 设置最大激活连接数和最大激活连接数的时间
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
// 连接错误数 +1
connectErrorCount.incrementAndGet();
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
// 连接错误数 +1
connectErrorCount.incrementAndGet();
throw e;
} finally {
lock.unlock();
}
if (holder == null) {
// 未获取到连接时, 输出一些错误信息, 如超时信息, 如等待时间, 存活连接数等, 当前运行的SQL
long waitNanos = waitNanosLocal.get();
StringBuilder buf = new StringBuilder();
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active " + activeCount)//
.append(", maxActive " + maxActive)//
;
List<JdbcSqlStatValue> sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) {
buf.append('\n');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ");
buf.append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}
String errorMessage = buf.toString();
if (this.createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}
// 增加该连接的使用次数
holder.incrementUseCount();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder);
return poolalbeConnection;
}
/**
* 获取连接池最后的连接
*/
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
// 若池中可用的连接为0
while (poolingCount == 0) {
// 尝试增加一个用于创建连接的任务, 即CreateConnectionTask
emptySignal();
// 允许快速失败, 且在CreateConnectionTask中创建连接时发生了持续错误
if (failFast && failContinuous.get()) {
throw new DataSourceNotAvailableException(createError);
}
// 等待线程数 +1
notEmptyWaitThreadCount++;
// 统计最大等待线程数
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
// 等待, 直到有连接被回收 or 创建
notEmpty.await();
} finally {
// 等待线程数 -1
notEmptyWaitThreadCount--;
}
// 等待次数 +1
notEmptyWaitCount++;
// 数据源已经禁用
if (!enable) {
connectErrorCount.incrementAndGet();
throw new DataSourceDisableException();
}
}
} catch (InterruptedException ie) {
// 有可能当前线程获取到了连接通知, 但被中断了
// 通知其他未被中断的线程, 有机会获取连接
notEmpty.signal();
// 通知次数 +1
notEmptySignalCount++;
throw ie;
}
// 连接池可用连接数 -1
decrementPoolingCount();
// 获取最后一个连接
DruidConnectionHolder last = connections[poolingCount];
// 移除该连接的引用
connections[poolingCount] = null;
return last;
}
/**
* 超时获取连接
*/
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;
for (;;) {
// 若池中可用连接为0
if (poolingCount == 0) {
// 尝试增加一个用于创建连接的任务, 即CreateConnectionTask
emptySignal();
// 允许快速失败, 且在CreateConnectionTask中创建连接时发生了持续错误
if (failFast && failContinuous.get()) {
throw new DataSourceNotAvailableException(createError);
}
if (estimate <= 0) {
// 等待超时了, 设置等待了多久
waitNanosLocal.set(nanos - estimate);
// 返回null
return null;
}
// 等待线程数 +1
notEmptyWaitThreadCount++;
// 统计最大等待线程数
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}
try {
long startEstimate = estimate;
// 等待, 直到超时或者被唤醒
estimate = notEmpty.awaitNanos(estimate);
// 等待次数 +1
notEmptyWaitCount++;
// 等待时间
notEmptyWaitNanos += (startEstimate - estimate);
// 数据源已关闭
if (!enable) {
connectErrorCount.incrementAndGet();
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
// 有可能当前线程获取到了连接通知, 但被中断了
// 通知其他未被中断的线程, 有机会获取连接
notEmpty.signal();
// 通知次数 +1
notEmptySignalCount++;
throw ie;
} finally {
// 等待线程数 +1
notEmptyWaitThreadCount--;
}
if (poolingCount == 0) {
// 继续等待
if (estimate > 0) {
continue;
}
// 等待超时
waitNanosLocal.set(nanos - estimate);
return null;
}
}
// 减小池中可用连接数
decrementPoolingCount();
// 取出连接
DruidConnectionHolder last = connections[poolingCount];
connections[poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
}
}
/**
* 连接池为空时,尝试提交一个用于创建连接的任务
*/
private void emptySignal() {
// 创建连接的调度器为null
if (createScheduler == null) {
empty.signal();
return;
}
// 创建连接的任务线程达到最大值, 默认为3
if (createTaskCount >= maxCreateTaskCount) {
return;
}
// 存活连接数 + 池中可用连接数 + 创建连接的任务数 >= 最大存活连接数
if (activeCount + poolingCount + createTaskCount >= maxActive) {
return;
}
// 创建连接的任务数 +1
createTaskCount++;
// 创建用于创建连接的任务
CreateConnectionTask task = new CreateConnectionTask();
// 提交任务
createScheduler.submit(task);
}
/**
* 用于创建连接的任务
*/
public class CreateConnectionTask implements Runnable {
/**
* 错误次数
*/
private int errorCount = 0;
@Override
public void run() {
runInternal();
}
private void runInternal() {
for (;;) {
// 加锁
lock.lock();
try {
boolean emptyWait = true;
// 已经发生过创建连接的错误, 且池中可用连接为0
if (createError != null && poolingCount == 0) {
emptyWait = false;
}
if (emptyWait) {
// 若池中的连接数 >= 等待线程数, 没必要再创建连接
if (poolingCount >= notEmptyWaitThreadCount) {
createTaskCount--;
return;
}
// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
createTaskCount--;
return;
}
}
} finally {
lock.unlock();
}
PhysicalConnectionInfo physicalConnection = null;
try {
// 创建物理连接
physicalConnection = createPhysicalConnection();
// 创建成功, 则重试创建连接成功
setFailContinuous(false);
} catch (SQLException e) {
LOG.error("create connection error, url: " + jdbcUrl, e);
// 创建连接, 发生错误次数 + 1
errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// 错误次数 > 连接错误重试次数, 并设置了创建连接错误的时间间隔
// 重试创建连接失败
setFailContinuous(true);
// 快速失败
if (failFast) {
lock.lock();
try {
// 通知其他等待的线程
// 这个时候有可能数据源已经不可用, 不用再等待了
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
if (breakAfterAcquireFailure) {
lock.lock();
try {
createTaskCount--;
} finally {
lock.unlock();
}
return;
}
this.errorCount = 0; // reset errorCount
// timeBetweenConnectErrorMillis之后, 重新调度该Task
createScheduler.schedule(this, timeBetweenConnectErrorMillis, TimeUnit.MILLISECONDS);
return;
}
} catch (RuntimeException e) {
LOG.error("create connection error", e);
// unknow fatal exception
setFailContinuous(true);
continue;
} catch (Error e) {
lock.lock();
try {
createTaskCount--;
} finally {
lock.unlock();
}
LOG.error("create connection error", e);
// unknow fatal exception
setFailContinuous(true);
break;
}
if (physicalConnection == null) {
continue;
}
// 将创建的连接放入池中
boolean result = put(physicalConnection);
if (!result) {
// 未放入成功,则关闭连接
JdbcUtils.close(physicalConnection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}
break;
}
}
}
/**
* 创建物理连接
*/
public PhysicalConnectionInfo createPhysicalConnection() throws SQLException {
// 数据库URL
String url = this.getUrl();
// 连接属性
Properties connectProperties = getConnectProperties();
// 用户名
String user;
if (getUserCallback() != null) {
// 输入用户名
user = getUserCallback().getName();
} else {
user = getUsername();
}
// 密码
String password = getPassword();
PasswordCallback passwordCallback = getPasswordCallback();
if (passwordCallback != null) {
if (passwordCallback instanceof DruidPasswordCallback) {
DruidPasswordCallback druidPasswordCallback = (DruidPasswordCallback) passwordCallback;
druidPasswordCallback.setUrl(url);
druidPasswordCallback.setProperties(connectProperties);
}
char[] chars = passwordCallback.getPassword();
if (chars != null) {
password = new String(chars);
}
}
// 连接属性
Properties physicalConnectProperties = new Properties();
if (connectProperties != null) {
physicalConnectProperties.putAll(connectProperties);
}
if (user != null && user.length() != 0) {
physicalConnectProperties.put("user", user);
}
if (password != null && password.length() != 0) {
physicalConnectProperties.put("password", password);
}
Connection conn;
// 进行连接相关操作的开始时间
long connectStartNanos = System.nanoTime();
// 成功连接后的时间, 初始化后的时间, 验证成功后的时间
long connectedNanos, initedNanos, validatedNanos;
try {
// 创建物理连接
conn = createPhysicalConnection(url, physicalConnectProperties);
connectedNanos = System.nanoTime();
if (conn == null) {
throw new SQLException("connect error, url " + url + ", driverClass " + this.driverClass);
}
// 初始化连接基本属性, 如是否自动提交, 事务隔离级别
initPhysicalConnection(conn);
initedNanos = System.nanoTime();
// 验证连接
validateConnection(conn);
validatedNanos = System.nanoTime();
setCreateError(null);
} catch (SQLException ex) {
setCreateError(ex);
throw ex;
} catch (RuntimeException ex) {
setCreateError(ex);
throw ex;
} catch (Error ex) {
// 创建连接错误数 + 1
createErrorCount.incrementAndGet();
throw ex;
} finally {
long nano = System.nanoTime() - connectStartNanos;
// 创建连接的时间跨度
createTimespan += nano;
}
// 物理连接信息
return new PhysicalConnectionInfo(conn, connectStartNanos, connectedNanos, initedNanos, validatedNanos);
}
/**
* 将物理连接服务连接池中
*/
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
// 包装连接为DruidConnectionHolder对象
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
lock.lock();
try {
if (createScheduler != null) {
createTaskCount--;
}
} finally {
lock.unlock();
}
LOG.error("create connection holder error", ex);
return false;
}
lock.lock();
try {
// 连接池中连接数 > 最大存活连接数
if (poolingCount >= maxActive) {
return false;
}
// 放入连接
connections[poolingCount] = holder;
// 增加连接池连接数
incrementPoolingCount();
// 统计最大连接池连接数
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
// 唤醒一个等待线程
notEmpty.signal();
notEmptySignalCount++;
if (createScheduler != null) {
createTaskCount--;
if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
// 尝试启动创建连接的线程
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}
通过上面的代码片段,即获取到了连接对象DruidPooledConnection,继续看下DruidPooledConnection如何关闭内部Connection对象:
public void close() throws SQLException {
// 连接已经不可用
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
// 已经关闭了
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
DruidAbstractDataSource dataSource = holder.getDataSource();
// 创建该连接的线程是否与关闭线程一致
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
if (!isSameThread) {
// 不一致, 则设置数据源可以异步关闭连接
dataSource.setAsyncCloseConnectionEnable(true);
}
if (dataSource.isAsyncCloseConnectionEnable()) {
// 异步关闭连接
syncClose();
return;
}
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}
List<Filter> filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
// 若配置了Filter, 则调用Filter链回收连接
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
// 回收连接
recycle();
}
// 连接不可用
this.disable = true;
}
public void recycle() throws SQLException {
// 连接已关闭
if (this.disable) {
return;
}
DruidConnectionHolder holder = this.holder;
if (holder == null) {
// 重复关闭
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}
// 是否直接丢弃, 默认当然是放回连接池
if (!this.abandoned) {
DruidAbstractDataSource dataSource = holder.getDataSource();
// 回收连接
dataSource.recycle(this);
}
// 置空
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}
/**
* DruidDataSource.recycle()
*/
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.getConnectionHolder();
if (holder == null) {
LOG.warn("connectionHolder is null");
return;
}
if (logDifferentThread //
&& (!isAsyncCloseConnectionEnable()) //
&& pooledConnection.getOwnerThread() != Thread.currentThread()//
) {
// 不是同一线程获取/关闭连接
LOG.warn("get/close not same thread");
}
final Connection physicalConnection = holder.getConnection();
if (pooledConnection.isTraceEnable()) {
// 连接开启了调试
synchronized (activeConnections) {
if (pooledConnection.isTraceEnable()) {
// 从激活连接中移除连接
Object oldInfo = activeConnections.remove(pooledConnection);
if (oldInfo == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
}
}
pooledConnection.setTraceEnable(false);
}
}
}
// 连接是否自动提交
final boolean isAutoCommit = holder.isUnderlyingAutoCommit();
// 连接是否是只读模式
final boolean isReadOnly = holder.isUnderlyingReadOnly();
// 是否在归还连接时, 检测连接有效性
final boolean testOnReturn = this.isTestOnReturn();
try {
if ((!isAutoCommit) && (!isReadOnly)) {
// 有必要回滚连接
pooledConnection.rollback();
}
// 重置holder
// 连接获取者和归还者是否为同一线程
boolean isSameThread = pooledConnection.getOwnerThread() == Thread.currentThread();
if (!isSameThread) {
// 不同线程, 需加锁
synchronized (pooledConnection) {
holder.reset();
}
} else {
// 同一线程
holder.reset();
}
// 如果线程已经标记为丢弃, 不用归还
if (holder.isDiscard()) {
return;
}
// 检测连接, 无效则关闭连接
if (testOnReturn) {
boolean validate = testConnectionInternal(physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);
destroyCount.incrementAndGet();
lock.lock();
try {
activeCount--;
closeCount++;
} finally {
lock.unlock();
}
return;
}
}
// 数据源已经不可用, 则丢弃连接
if (!enable) {
discardConnection(holder.getConnection());
return;
}
boolean result;
final long lastActiveTimeMillis = System.currentTimeMillis();
lock.lockInterruptibly();
try {
// 激活连接数 -1
activeCount--;
// 关闭连接数 +1
closeCount++;
// 将连接放入连接池尾部
result = putLast(holder, lastActiveTimeMillis);
// 归还连接数 +1
recycleCount++;
} finally {
lock.unlock();
}
if (!result) {
// 归还失败,则关闭连接
JdbcUtils.close(holder.getConnection());
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
// 清空PS缓存
holder.clearStatementCache();
if (!holder.isDiscard()) {
this.discardConnection(physicalConnection);
holder.setDiscard(true);
}
LOG.error("recyle error", e);
// 归还错误数 +1
recycleErrorCount.incrementAndGet();
}
}
/**
* 将连接放入连接池尾部
*/
boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
// 池中连接数已经 >= 最大激活连接数, 则不用归还
if (poolingCount >= maxActive) {
return false;
}
// 设置连接最后激活连接时间
e.setLastActiveTimeMillis(lastActiveTimeMillis);
// 归还连接
connections[poolingCount] = e;
// 池中连接数 +1
incrementPoolingCount();
// 统计池中最大连接数
if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = lastActiveTimeMillis;
}
// 唤醒一个等待的线程
notEmpty.signal();
// 通知数 +1
notEmptySignalCount++;
return true;
}
除了获取连接,归还连接,数据源中还需要自动移除一些长时间未使用的连接,这通过DestroyTask来完成:
/**
* 移除长时间未使用连接的任务
*/
public class DestroyTask implements Runnable {
@Override
public void run() {
// 收缩连接池
// 以连接的最后激活时间为准
shrink(true);
if (isRemoveAbandoned()) {
// 是以连接建立的时间为准
removeAbandoned();
}
}
}
/**
* 收缩连接池
*/
public void shrink(boolean checkTime) {
// 被驱逐的连接
final List<DruidConnectionHolder> evictList = new ArrayList<DruidConnectionHolder>();
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}
try {
// 需要检查的连接数
final int checkCount = poolingCount - minIdle;
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) {
DruidConnectionHolder connection = connections[i];
// 需要驱逐超时的连接
if (checkTime) {
// 物理超时时间
if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - connection.getTimeMillis();
if (phyConnectTimeMillis > phyTimeoutMillis) {
// 超时则驱逐
evictList.add(connection);
continue;
}
}
// 连接已经空闲的时间
long idleMillis = currentTimeMillis - connection.getLastActiveTimeMillis();
if (idleMillis < minEvictableIdleTimeMillis) {
break;
}
if (checkTime && i < checkCount) {
// 驱逐前面多余的连接
evictList.add(connection);
} else if (idleMillis > maxEvictableIdleTimeMillis) {
// 驱逐超过最大空闲时间的连接
evictList.add(connection);
}
} else {
// 仅驱逐多余的连接, 不作超时校验
if (i < checkCount) {
// 驱逐前面多余的连接
evictList.add(connection);
} else {
break;
}
}
}
int removeCount = evictList.size();
if (removeCount > 0) {
// 将保留的连接复制到连接池前面
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount);
// 置空连接池剩余的位置
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null);
// 连接池可用连接数 -removeCount
poolingCount -= removeCount;
}
} finally {
lock.unlock();
}
// 将驱逐的连接都物理关闭
for (DruidConnectionHolder item : evictList) {
Connection connection = item.getConnection();
JdbcUtils.close(connection);
destroyCount.incrementAndGet();
}
}
/**
* 关闭连接过长的连接
*/
public int removeAbandoned() {
int removeCount = 0;
long currrentNanos = System.nanoTime();
// 移除的连接
List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();
synchronized (activeConnections) {
// 仅从激活的连接中
Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();
for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();
if (pooledConnection.isRunning()) {
continue;
}
// 连接已经建立的时间
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);
if (timeMillis >= removeAbandonedTimeoutMillis) {
// 超过连接最大连接时间, 则移除
iter.remove();
pooledConnection.setTraceEnable(false);
abandonedList.add(pooledConnection);
}
}
}
if (abandonedList.size() > 0) {
for (DruidPooledConnection pooledConnection : abandonedList) {
synchronized (pooledConnection) {
// 连接已经disable
if (pooledConnection.isDisable()) {
continue;
}
}
// 直接关闭连接
JdbcUtils.close(pooledConnection);
pooledConnection.abandond();
removeAbandonedCount++;
removeCount++;
// 日志处理
if (isLogAbandoned()) {
StringBuilder buf = new StringBuilder();
buf.append("abandon connection, owner thread: ");
buf.append(pooledConnection.getOwnerThread().getName());
buf.append(", connected at : ");
buf.append(pooledConnection.getConnectedTimeMillis());
buf.append(", open stackTrace\n");
StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
buf.append("ownerThread current state is "+pooledConnection.getOwnerThread().getState() + ", current stackTrace\n");
trace = pooledConnection.getOwnerThread().getStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}
LOG.error(buf.toString());
}
}
}
return removeCount;
}
以上,则是Druid数据源内部对连接的基本管理,这也是连接池的基本功能。除此外,Druid通过Filter实现了对数据源的监控,代理等功能,这部分可以后期研究。