相信日常开发中,开发人员对数据源(DataSource) 一词应该不陌生,这里的数据源着重指的是数据库 。但通常开发人员面对数据源时,可能面临最多的是做一些配置优化相关的工作,或者当面临数据库连接不足 ,数据库连接泄露 等问题时,才会开始着重关注数据源。但市面上,已经有各种数据源产品,如DBCP ,C3P0 ,BoneCP 等,到底哪一个是更优秀,值得信赖呢?本文将对这些数据源作一些简单测试,由于近期也是将业务线的数据源均换为Druid ,所以会着重介绍下该数据源。
什么是数据源(DataSource)?
在JDBC4.1 中,这样描述DataSource 接口:
Using a Datasource object increases application portability by making it possible for an application to use a logical name for a data source instead of having to supply information specific to a particular driver. The DataSource increased performance and scalability through connection pooling.
大致意思就是,数据源 可以为应用程序屏蔽特定的数据库驱动(Driver) ,并将数据库连接进行池化 ,以提升数据库操作的性能 和可伸缩性 。将数据库连接进行统一管理的同时,还可以对数据库连接进行监控,比如Druid 数据源。
各种数据源产品
DBCP
DBCP 由Apache开发,内部使用通用的对象池化技术Commons Pool 进行开发,主要有1.x 和2.x 两个版本,1.x 版本中使用单线程 进行对象分配和回收,因此高并发下,性能很差,这在2.x 版本中进行了重写,提升了性能和伸缩性,并加入了一些跟踪和监控特性。
C3P0
C3P0 也是一个标准数据源实现,其从0.9.5之后开始完全支持JDBC4.0,其实现了传统的基于DriverManager的JDBC驱动 到javax.sql.DataSource 的适配,并对Connection 和PreparedStatements 作了透明池化。
BoneCP
BoneCP 是一个高效的数据源实现,其号称没有使用自旋锁(Spin Lock) ,因此不会降低应用性能,并提供了一些特性,如通过分区提升性能 ,自动调整连接池大小 ,异步获取连接 等。
Druid
Druid 不仅仅是一个数据库连接池,它还包含一个ProxyDriver,一系列内置的JDBC组件库,一个SQL Parser,其提供了强大的监控特性,如SQL的执行时间 ,监控连接池的物理连接创建和销毁次数 等,也在极高负载的生产环境中实际使用,性能方面也足以与其他数据源媲美。
Tomcat JDBC Connection Pool
Tomcat JDBC Connection Pool 作为Tomcat 的内置组件,旨在替代DBCP (1.x版本性能差 ),其比较轻量,核心类仅有8个,内部实现了异步获取连接 ,支持高并发 和多核 等场景
各数据源的一些性能对比测试
以下是测试时使用的代码片段,分别针对不同线程数 和SQL查询次数 进行对比测试:
public class DataSourcePermTest {
private static String JDBC_URL = "jdbc:mysql://localhost:3306/test?useSSL=false" ;
private static String USER = "root" ;
private static String PASS = "" ;
private static String DRIVER = "com.mysql.jdbc.Driver" ;
/**
* 初始的连接数
*/
private static int initPoolSize = 5 ;
/**
* 最小存活的连接数
*/
private static int minPoolSize = 5 ;
/**
* 最大存活的连接数, 超过该数后, 后续的请求将等待
*/
private static int maxPoolSize = 20 ;
/**
* 连接不被销毁的最长空闲时间
*/
private static int maxIdleTime = 100000 ;
/**
* 线程数
*/
private static int RUN_THREAD_COUNT = 20 ;
/**
* 每个线程执行的查询数
*/
private static int RUN_COUNT_PER_THREAD = 10000 ;
/**
* 运行N次求平均值
*/
private static int RUN_LOOP = 10 ;
/**
* ID
*/
private static Random ids = new Random ();
@Test
public void testDruid () throws Exception {
DruidDataSource ds = new DruidDataSource ();
ds . setUsername ( USER );
ds . setUrl ( JDBC_URL );
ds . setPassword ( PASS );
ds . setDriverClassName ( DRIVER );
ds . setInitialSize ( initPoolSize );
ds . setMinIdle ( minPoolSize );
ds . setMaxActive ( maxPoolSize );
ds . setMinEvictableIdleTimeMillis ( maxIdleTime );
ds . setTestWhileIdle ( false );
ds . setTestOnReturn ( false );
ds . setTestOnBorrow ( false );
runTestQuery ( ds );
}
@Test
public void testTomcatJdbcPool () throws Exception {
org . apache . tomcat . jdbc . pool . DataSource ds = new org . apache . tomcat . jdbc . pool . DataSource ();
ds . setUsername ( USER );
ds . setUrl ( JDBC_URL );
ds . setPassword ( PASS );
ds . setDriverClassName ( DRIVER );
ds . setInitialSize ( initPoolSize );
ds . setMaxActive ( maxPoolSize );
ds . setMinIdle ( minPoolSize );
ds . setMaxIdle ( minPoolSize );
ds . setMinEvictableIdleTimeMillis ( maxIdleTime );
ds . setTestWhileIdle ( false );
ds . setTestOnReturn ( false );
ds . setTestOnBorrow ( false );
runTestQuery ( ds );
}
@Test
public void testDBCP2 () throws Exception {
BasicDataSource ds = new BasicDataSource ();
ds . setUsername ( USER );
ds . setUrl ( JDBC_URL );
ds . setPassword ( PASS );
ds . setDriverClassName ( DRIVER );
ds . setInitialSize ( initPoolSize );
ds . setMaxTotal ( maxPoolSize );
ds . setMaxIdle ( minPoolSize );
ds . setMinEvictableIdleTimeMillis ( maxIdleTime );
ds . setTestWhileIdle ( false );
ds . setTestOnReturn ( false );
ds . setTestOnBorrow ( false );
runTestQuery ( ds );
}
@Test
public void testC3P0 () throws Exception {
ComboPooledDataSource ds = new ComboPooledDataSource ();
try {
ds . setDriverClass ( DRIVER );
} catch ( PropertyVetoException e ) {
}
ds . setJdbcUrl ( JDBC_URL );
ds . setUser ( USER );
ds . setPassword ( PASS );
ds . setInitialPoolSize ( initPoolSize );
ds . setMinPoolSize ( minPoolSize );
ds . setMaxPoolSize ( maxPoolSize );
ds . setMaxIdleTime ( maxIdleTime );
ds . setTestConnectionOnCheckin ( false );
ds . setTestConnectionOnCheckout ( false );
runTestQuery ( ds );
}
@Test
public void testBoneCP () throws Exception {
BoneCPDataSource ds = new BoneCPDataSource ();
ds . setUsername ( USER );
ds . setPassword ( PASS );
ds . setJdbcUrl ( JDBC_URL );
ds . setDriverClass ( DRIVER );
ds . setServiceOrder ( "LIFO" );
ds . setMinConnectionsPerPartition ( minPoolSize );
ds . setMaxConnectionsPerPartition ( maxPoolSize );
ds . setIdleMaxAge ( maxIdleTime , TimeUnit . MILLISECONDS );
ds . setPartitionCount ( 1 );
runTestQuery ( ds );
}
private void runTestQuery ( final DataSource ds ) throws Exception {
// prepare
for ( int i = 0 ; i < 5 ; i ++){
doTestQuery ( ds );
}
long totalTime = 0L ;
long currentTime = 0L ;
for ( int i = 0 ; i < RUN_LOOP ; i ++){
currentTime = doTestQuery ( ds );
totalTime += currentTime ;
System . out . println ( i + ": " + RUN_THREAD_COUNT + " threads, every run " + RUN_COUNT_PER_THREAD + " times query, run time: " + currentTime + " ms" );
}
System . out . println ( "Loop " + RUN_LOOP + " times, "
+ RUN_THREAD_COUNT + " threads, every run " + RUN_COUNT_PER_THREAD
+ " times query, average run time: " + totalTime / RUN_LOOP + " ms" );
}
private long doTestQuery ( final DataSource ds ) throws InterruptedException {
long start = System . currentTimeMillis ();
final CountDownLatch latch = new CountDownLatch ( RUN_THREAD_COUNT );
for ( int i = 0 ; i < RUN_THREAD_COUNT ; i ++){
new Thread ( new Runnable () {
@Override
public void run () {
for ( int i = 0 ; i < RUN_COUNT_PER_THREAD ; i ++) {
doQuery ( ds );
}
latch . countDown ();
}
}). start ();
}
latch . await ();
return System . currentTimeMillis () - start ;
}
public void doQuery ( DataSource ds ) {
try {
Connection conn = ds . getConnection ();
String sql = "SELECT * FROM blogs where id = " + ids . nextInt ( 10000000 );
Statement st = conn . createStatement ();
ResultSet res = st . executeQuery ( sql );
res . close ();
st . close ();
conn . close ();
} catch ( SQLException e ) {
e . printStackTrace ();
}
}
}
运行环境为:
硬件配置 :2.5 GHz Intel Core i5 / 1600MHz DDR3;
操作系统 :Mac OSX 10.11.5;
JDK :jdk1.7.0_80。
各数据源耗时(ms)对比图如下:
数据源/线程数*查询次数
1 * 1000
1 * 10000
5 * 1000
5 * 10000
10 * 1000
10 * 10000
20 * 1000
20 * 10000
Druid
439
2322
484
4382
943
8851
2047
20268
Tomcat JDBC Pool
342
2277
464
4494
940
9253
2016
20296
DBCP2
352
2486
499
4670
1142
8340
1934
18727
C3P0
337
2302
610
5352
1098
10276
2086
20801
BoneCP
565
4557
898
8538
1934
18222
4219
41807
从上面的测试结果来看,Druid ,Tomcat JDBC Pool 和DBCP2 在性能上比较有优势,官方 通过对连接进行申请 和归还 来进行对比测试,实际情况还需自行测试。但Druid 支持一些强大的监控特性,通过Druid提供的监控功能,可以清楚知道连接池和SQL的工作情况;除此外,Druid 内置了一个高性能的SQL Parser ,可以通过其作一些JDBC层拦截SQL做相应处理,比如说分库分表 、审计 等。因此下面将更多地聊聊Druid 。
Druid 设计 & 实现
DruidDataSource设计
DruidDataSource 的接口设计:
Wrapper
package java.sql ;
/**
* 该接口可用于在JDBC类中,访问内部的代理实例
*/
public interface Wrapper {
/**
* 返回接口的实现类对象或实现类对象的代理
* 可用于访问一些非标准的方法,或一些没有被代理暴露的非标准方法
*/
< T > T unwrap ( java . lang . Class < T > iface ) throws java . sql . SQLException ;
/**
* 对象是否实现了接口,或者直接或间接包装了实现了该接口的对象
*/
boolean isWrapperFor ( java . lang . Class <?> iface ) throws java . sql . SQLException ;
}
CommonDataSource
/**
* 定义DataSource,XADataSource和ConnectionPoolDataSource的公共接口
*/
public interface CommonDataSource {
/**
* 获取数据源的日志写入器
*/
java . io . PrintWriter getLogWriter () throws SQLException ;
/**
* 设置数据源的日志写入器
*/
void setLogWriter ( java . io . PrintWriter out ) throws SQLException ;
/**
* 设置数据源连接数据库时,最大的等待等待时间
*/
void setLoginTimeout ( int seconds ) throws SQLException ;
/**
* 获取数据源连接数据库时,最大的等待等待时间
*/
int getLoginTimeout () throws SQLException ;
//------------------------- JDBC 4.1 -----------------------------------
/**
* 获取数据源的父日志接口
*/
public Logger getParentLogger () throws SQLFeatureNotSupportedException ;
}
DataSource
/**
* 数据源是一种比DriverManager获取数据库连接更好的方式
* 数据源实现者通常需要基于JNDI API来注册名字服务(Naming Service)
* 数据源需要被数据库驱动厂商实现,有三种类型的实现:
* 1. 基本实现:获取一个标准的连接对象;
* 2. 连接池实现:获取一个由连接池管理器管理的连接对象;
* 3. 分布式事务实现:获取一个用于分布式事务的连接对象,通常也由连接池管理器管理。
*/
public interface DataSource extends CommonDataSource , Wrapper {
/**
* 获取Connection对象
*/
Connection getConnection () throws SQLException ;
/**
* 使用用户名和密码获取Connection对象
*/
Connection getConnection ( String username , String password ) throws SQLException ;
}
ConnectionPoolDataSource
/**
* PooledConnection工厂类
*/
public interface ConnectionPoolDataSource extends CommonDataSource {
/**
* 获取PooledConnection对象
*/
PooledConnection getPooledConnection () throws SQLException ;
/**
* 使用用户名和密码获取PooledConnection对象
*/
PooledConnection getPooledConnection ( String user , String password ) throws SQLException ;
}
DataSourceProxy
/**
* 数据源代理类
*/
public interface DataSourceProxy {
/**
* 数据源状态
*/
JdbcDataSourceStat getDataSourceStat ();
/**
* 数据源名称
*/
String getName ();
/**
* 数据库类型
*/
String getDbType ();
/**
* 内部包装的数据库驱动
*/
Driver getRawDriver ();
/**
* 获取JDBC URL
*/
String getUrl ();
/**
* 获取裸露的JDBC URL
*/
String getRawJdbcUrl ();
/**
* 代理时使用的过滤器
*/
List < Filter > getProxyFilters ();
/**
* 创建Connection ID
*/
long createConnectionId ();
/**
* 创建Statement ID
*/
long createStatementId ();
/**
* 创建ResultSet ID
*/
long createResultSetId ();
/**
* 创建源数据ID
*/
long createMetaDataId ();
/**
* 创建事务ID
*/
long createTransactionId ();
/**
* 数据源连接属性
*/
Properties getConnectProperties ();
}
WrapperAdapter
/**
* Wrapper的简单实现
*/
public class WrapperAdapter implements Wrapper {
public WrapperAdapter (){
}
@Override
public boolean isWrapperFor ( Class <?> iface ) {
// 当前对象是否实现了iface接口
return iface != null && iface . isInstance ( this );
}
@SuppressWarnings ( "unchecked" )
@Override
public < T > T unwrap ( Class < T > iface ) {
if ( iface == null ) {
return null ;
}
// 若当前对象是iface接口的实例,则返回当前对象
if ( iface . isInstance ( this )) {
return ( T ) this ;
}
return null ;
}
}
DruidAbstractDataSourceMBean
/**
* 通过MBeanServer监控数据源时,定义需要监控的基本信息,
* 包括一些进行配置信息和运行时信息
*/
public interface DruidAbstractDataSourceMBean {
/**
* 数据源连接数据库时,最大的等待时间(秒)
*/
int getLoginTimeout ();
/**
* 数据库类型
*/
String getDbType ();
/**
* 数据源名称
*/
String getName ();
/**
* 数据源初始连接数大小
*/
int getInitialSize ();
/**
* 连接用户名
*/
String getUsername ();
/**
* 连接地址
*/
String getUrl ();
/**
* 数据库驱动类名
*/
String getDriverClassName ();
/**
* 连接数
*/
long getConnectCount ();
/**
* 连接关闭数
*/
long getCloseCount ();
/**
* 连接错误数
*/
long getConnectErrorCount ();
/**
* 连接池大小
*/
int getPoolingCount ();
/**
* 连接回收数
*/
long getRecycleCount ();
/**
* 连接存活数
*/
int getActiveCount ();
/**
* 连接创建数
*/
long getCreateCount ();
/**
* 连接销毁数
*/
long getDestroyCount ();
/**
* 创建时间间隔
*/
long getCreateTimespanMillis ();
/**
* 事务提交数
*/
long getCommitCount ();
/**
* 事务回滚数
*/
long getRollbackCount ();
/**
* 事务开启数
*/
long getStartTransactionCount ();
/**
* 查询超时时间
*/
int getQueryTimeout ();
/**
* 事务查询超时时间
*/
int getTransactionQueryTimeout ();
/**
* 验证连接有效性的SQL
*/
String getValidationQuery ();
/**
* 验证连接的查询超时时间
*/
int getValidationQueryTimeout ();
/**
* 最大等待线程数
*/
int getMaxWaitThreadCount ();
/**
* 间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
*/
long getTimeBetweenEvictionRunsMillis ();
/**
* 连接保持空闲而不被驱逐的最长时间
*/
long getMinEvictableIdleTimeMillis ();
/**
* 是否自动回收超时连接
*/
boolean isRemoveAbandoned ();
/**
* 连接超时时间
*/
long getRemoveAbandonedTimeoutMillis ();
/**
* 存活连接的栈信息
*/
List < String > getActiveConnectionStackTrace ();
/**
* filter类名列表
*/
List < String > getFilterClassNames ();
/**
* 获取到连接时,是否检测连接有效性,会影响性能
*/
boolean isTestOnBorrow ();
/**
* 设置获取到连接时,是否检测连接有效性
*/
void setTestOnBorrow ( boolean testOnBorrow );
/**
* 归还连接时,是否检测连接有效性
*/
boolean isTestOnReturn ();
/**
* 空闲时,是否检测连接有效性
*/
boolean isTestWhileIdle ();
/**
* 设置空闲时,是否检测连接有效性
*/
void setTestWhileIdle ( boolean testWhileIdle );
/**
* 是否默认提交事务
*/
boolean isDefaultAutoCommit ();
/**
* 默认的是否为只读模式
*/
Boolean getDefaultReadOnly ();
/**
* 默认的事务隔离级别
*/
Integer getDefaultTransactionIsolation ();
/**
* 默认Catalog
*/
String getDefaultCatalog ();
/**
* 是否池化PS
*/
boolean isPoolPreparedStatements ();
/**
* 是否共享PS
*/
boolean isSharePreparedStatements ();
/**
* 获取连接最大等待时间
*/
long getMaxWait ();
/**
* 最小空闲连接
*/
int getMinIdle ();
/**
* 最大空闲连接
*/
int getMaxIdle ();
/**
* 创建连接错误数
*/
long getCreateErrorCount ();
/**
* 最大存活连接数
*/
int getMaxActive ();
/**
* 设置最大存活连接数
*/
void setMaxActive ( int maxActive );
/**
* 发生错误时多久重连
*/
long getTimeBetweenConnectErrorMillis ();
/**
* 最大打开的PS数
*/
int getMaxOpenPreparedStatements ();
/**
* 移除的超时连接数
*/
long getRemoveAbandonedCount ();
/**
* 移除超时连接时,是否日志记录
*/
boolean isLogAbandoned ();
/**
* 移除超时连接时,是否日志记录
*/
void setLogAbandoned ( boolean logAbandoned );
/**
* 重复关闭数
*/
long getDupCloseCount ();
/**
* 数据源获取连接失败后,是否关闭数据源
*/
boolean isBreakAfterAcquireFailure ();
/**
* 连接错误时,重试次数
*/
int getConnectionErrorRetryAttempts ();
/**
* 每个连接池化的最大PS数
*/
int getMaxPoolPreparedStatementPerConnectionSize ();
/**
* 设置每个连接池化的最大PS数
*/
void setMaxPoolPreparedStatementPerConnectionSize ( int maxPoolPreparedStatementPerConnectionSize );
/**
* 获取属性
*/
String getProperties ();
/**
* 数据库驱动的小版本
*/
int getRawDriverMinorVersion ();
/**
* 数据库驱动的大版本
*/
int getRawDriverMajorVersion ();
/**
* 数据源创建时间
*/
Date getCreatedTime ();
/**
* 获取检测连接的类名
*/
String getValidConnectionCheckerClassName ();
/**
* 获取事务的柱状图
*/
long [] getTransactionHistogramValues ();
/**
* 设置事务超时时间
*/
void setTransactionThresholdMillis ( long transactionThresholdMillis );
/**
* 事务超时时间
*/
long getTransactionThresholdMillis ();
/**
* PS数
*/
long getPreparedStatementCount ();
/**
* 关闭的PS数
*/
long getClosedPreparedStatementCount ();
/**
* 缓存的PS数
*/
long getCachedPreparedStatementCount ();
/**
* 缓存PS被删除的数量
*/
long getCachedPreparedStatementDeleteCount ();
/**
* 缓存PS的被访问数
*/
long getCachedPreparedStatementAccessCount ();
/**
* 缓存PS的Miss数
*/
long getCachedPreparedStatementMissCount ();
/**
* 缓存PS的命中数
*/
long getCachedPreparedStatementHitCount ();
/**
* 是否使用Oracle实现的Cache
*/
boolean isUseOracleImplicitCache ();
/**
* 设置是否使用Oracle实现的Cache
*/
void setUseOracleImplicitCache ( boolean useOracleImplicitCache );
/**
* 获取数据库驱动的大版本
*/
int getDriverMajorVersion ();
/**
* 获取数据库驱动的小版本
*/
int getDriverMinorVersion ();
/**
* 异常检测器的类名
*/
String getExceptionSorterClassName ();
}
MBeanRegistration
/**
* 在MBean注册到MBean Server(或从MBean Server取消注册)前后,执行一些特定操作
*/
public interface MBeanRegistration {
/**
* MBean注册到MBeanServer前执行
*
*/
public ObjectName preRegister ( MBeanServer server , ObjectName name ) throws java . lang . Exception ;
/**
* MBean注册到MBeanServer后执行
*/
public void postRegister ( Boolean registrationDone );
/**
* MBean取消注册前执行
*/
public void preDeregister () throws java . lang . Exception ;
/**
* MBean取消注册后执行
*/
public void postDeregister ();
}
DruidDataSourceMBean
/**
* DruidDataSource实现扩展的一些监控信息
*/
public interface DruidDataSourceMBean extends DruidAbstractDataSourceMBean {
/**
* 数据源重置次数
*/
long getResetCount ();
/**
* 数据源是否启用
*/
boolean isEnable ();
/**
* 收缩数据源
*/
void shrink ();
/**
* 移除超时连接的数量
*/
int removeAbandoned ();
/**
* dump数据源信息
*/
String dump ();
/**
* 等待线程数
*/
int getWaitThreadCount ();
/**
* 获取Lock的等待线程数
*/
int getLockQueueLength ();
/**
* 数据源非空条件的等待数
*/
long getNotEmptyWaitCount ();
/**
* 非空条件的等待线程数
*/
int getNotEmptyWaitThreadCount ();
/**
* 非空条件的唤醒数
*/
long getNotEmptySignalCount ();
/**
* 非空条件的等待毫秒数
*/
long getNotEmptyWaitMillis ();
/**
* 非空条件的等待纳秒数
*/
long getNotEmptyWaitNanos ();
/**
* 重置状态
*/
void resetStat ();
/**
* 是否运行重置状态
*/
boolean isResetStatEnable ();
/**
* 设置是否运行重置状态
*/
void setResetStatEnable ( boolean resetStatEnable );
/**
* 版本
*/
String getVersion ();
/**
* 设置是否池化PS
*/
void setPoolPreparedStatements ( boolean poolPreparedStatements );
/**
* 存活连接的最大值
*/
int getActivePeak ();
/**
* 连接池的最大值
* @return
*/
int getPoolingPeak ();
/**
* 连接存活的最大时间
* @return
*/
Date getActivePeakTime ();
/**
* 连接池化的最大时间
*/
Date getPoolingPeakTime ();
/**
* 错误数
*/
long getErrorCount ();
/**
* 数据源MBean的ObjectName
*/
ObjectName getObjectName ();
/**
* 清除Statement缓存
*/
void clearStatementCache () throws SQLException ;
/**
* 丢弃的连接数
*/
long getDiscardCount ();
/**
* 设置数据源状态的日志类名称
*/
void setStatLoggerClassName ( String className );
/**
* 数据源状态的日志打印间隔时间
*/
long getTimeBetweenLogStatsMillis ();
/**
* 设置数据源状态的日志打印间隔时间
*/
void setTimeBetweenLogStatsMillis ( long timeBetweenLogStatsMillis );
/**
* 设置连接属性
*/
void setConnectionProperties ( String connectionProperties );
/**
* 填充连接, 默认补充到maxActive个连接
*/
int fill () throws SQLException ;
/**
* 填充连接
* @param toCount 补充到多少个连接
*/
int fill ( int toCount ) throws SQLException ;
}
ManagedDataSource
/**
* 可以被管理的数据源, 即成为一个MBean
*/
public interface ManagedDataSource extends DataSource {
/**
* 是否开启被管理
*/
boolean isEnable ();
/**
* 设置是否开启被管理
*/
void setEnable ( boolean value );
/**
* MBean的名称
*/
ObjectName getObjectName ();
/**
* 设置MBean的名称
*/
void setObjectName ( ObjectName objectName );
}
DruidDataSource实现
DruidDataSource 的实现主要有DruidAbstractDataSource 和DruidDataSource :
DruidAbstractDataSource
DruidAbstractDataSource 作为DruidDataSource 和DruidXADataSource 数据源的父类,主要设置一些数据源的基本属性,和一些基础方法的实现,如创建物理连接 ,验证物理连接 等:
/**
* 创建物理连接
*/
public PhysicalConnectionInfo createPhysicalConnection () throws SQLException {
// 数据库URL
String url = this . getUrl ();
// 连接属性
Properties connectProperties = getConnectProperties ();
// 用户名
String user ;
if ( getUserCallback () != null ) {
// 输入用户名
user = getUserCallback (). getName ();
} else {
user = getUsername ();
}
// 密码
String password = getPassword ();
PasswordCallback passwordCallback = getPasswordCallback ();
if ( passwordCallback != null ) {
if ( passwordCallback instanceof DruidPasswordCallback ) {
DruidPasswordCallback druidPasswordCallback = ( DruidPasswordCallback ) passwordCallback ;
druidPasswordCallback . setUrl ( url );
druidPasswordCallback . setProperties ( connectProperties );
}
char [] chars = passwordCallback . getPassword ();
if ( chars != null ) {
password = new String ( chars );
}
}
// 连接属性
Properties physicalConnectProperties = new Properties ();
if ( connectProperties != null ) {
physicalConnectProperties . putAll ( connectProperties );
}
if ( user != null && user . length () != 0 ) {
physicalConnectProperties . put ( "user" , user );
}
if ( password != null && password . length () != 0 ) {
physicalConnectProperties . put ( "password" , password );
}
Connection conn ;
// 进行连接相关操作的开始时间
long connectStartNanos = System . nanoTime ();
// 成功连接后的时间, 初始化后的时间, 验证成功后的时间
long connectedNanos , initedNanos , validatedNanos ;
try {
// 创建物理连接
conn = createPhysicalConnection ( url , physicalConnectProperties );
connectedNanos = System . nanoTime ();
if ( conn == null ) {
throw new SQLException ( "connect error, url " + url + ", driverClass " + this . driverClass );
}
// 初始化连接基本属性, 如是否自动提交, 事务隔离级别
initPhysicalConnection ( conn );
initedNanos = System . nanoTime ();
// 验证连接
validateConnection ( conn );
validatedNanos = System . nanoTime ();
setCreateError ( null );
} catch ( SQLException ex ) {
setCreateError ( ex );
throw ex ;
} catch ( RuntimeException ex ) {
setCreateError ( ex );
throw ex ;
} catch ( Error ex ) {
// 创建连接错误数 + 1
createErrorCount . incrementAndGet ();
throw ex ;
} finally {
long nano = System . nanoTime () - connectStartNanos ;
// 创建连接的时间跨度
createTimespan += nano ;
}
// 物理连接信息
return new PhysicalConnectionInfo ( conn , connectStartNanos , connectedNanos , initedNanos , validatedNanos );
}
/**
* 创建物理连接
* @param url DB URL
* @param info 连接属性
* @return 连接
* @throws SQLException
*/
public Connection createPhysicalConnection ( String url , Properties info ) throws SQLException {
Connection conn ;
if ( getProxyFilters (). size () == 0 ) {
// 若没有配置proxyFilter, 则直接创建连接
conn = getDriver (). connect ( url , info );
} else {
// 若配置了proxyFilter, 则通过FilterChainImpl来创建连接, FilterChain后面讲述
conn = new FilterChainImpl ( this ). connection_connect ( info );
}
// 创建连接数 +1
createCount . incrementAndGet ();
return conn ;
}
public static class PhysicalConnectionInfo {
private Connection connection ;
...
public PhysicalConnectionInfo ( Connection connection //
, long connectStartNanos //
, long connectedNanos //
, long initedNanos //
, long validatedNanos ) {
this . connection = connection ;
...
}
...
// 获取建立连接所消耗的时间
public long getConnectNanoSpan () {
return connectedNanos - connectStartNanos ;
}
}
/**
* 验证连接有效性
*/
public void validateConnection ( Connection conn ) throws SQLException {
// 验证的SQL语句
String query = getValidationQuery ();
if ( conn . isClosed ()) {
throw new SQLException ( "validateConnection: connection closed" );
}
// 若配置了验证实现类
if ( validConnectionChecker != null ) {
boolean result = true ;
Exception error = null ;
try {
// 通常使用ping命令校验,如MySQL
result = validConnectionChecker . isValidConnection ( conn , validationQuery , validationQueryTimeout );
} catch ( Exception ex ) {
error = ex ;
}
if (! result ) {
SQLException sqlError = error != null ? //
new SQLException ( "validateConnection false" , error ) //
: new SQLException ( "validateConnection false" );
throw sqlError ;
}
return ;
}
// 若未配置了验证实现类
// 直接创建Statement执行query
if ( null != query ) {
Statement stmt = null ;
ResultSet rs = null ;
try {
stmt = conn . createStatement ();
if ( getValidationQueryTimeout () > 0 ) {
stmt . setQueryTimeout ( getValidationQueryTimeout ());
}
rs = stmt . executeQuery ( query );
if (! rs . next ()) {
throw new SQLException ( "validationQuery didn't return a row" );
}
} finally {
// 释放Statement, ResultSet
JdbcUtils . close ( rs );
JdbcUtils . close ( stmt );
}
}
}
DruidDataSource
public DruidDataSource ( boolean fairLock ){
// 是否以公平锁来获取数据源
super ( fairLock );
// 配置数据源基本属性
configFromPropety ( System . getProperties ());
}
public void configFromPropety ( Properties properties ) {
{
// 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,
// 如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。
Boolean value = getBoolean ( properties , "druid.testWhileIdle" );
if ( value != null ) {
this . setTestWhileIdle ( value );
}
}
{
// 获取连接后是否检测有效性,
// 这通常可以防止使用无效的连接执行SQL时, Server抛错, 但有损性能
Boolean value = getBoolean ( properties , "druid.testOnBorrow" );
if ( value != null ) {
this . setTestOnBorrow ( value );
}
}
{
// 检测语句
String property = properties . getProperty ( "druid.validationQuery" );
if ( property != null && property . length () > 0 ) {
this . setValidationQuery ( property );
}
}
{
// 是否使用全局的数据源状态
Boolean value = getBoolean ( properties , "druid.useGlobalDataSourceStat" );
if ( value != null ) {
this . setUseGlobalDataSourceStat ( value );
}
}
{
// 过滤器
String property = properties . getProperty ( "druid.filters" );
if ( property != null && property . length () > 0 ) {
try {
this . setFilters ( property );
} catch ( SQLException e ) {
LOG . error ( "setFilters error" , e );
}
}
}
{
// 数据源状态日志输出时间间隔
String property = properties . getProperty ( Constants . DRUID_TIME_BETWEEN_LOG_STATS_MILLIS );
if ( property != null && property . length () > 0 ) {
try {
long value = Long . parseLong ( property );
this . setTimeBetweenLogStatsMillis ( value );
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property '" + Constants . DRUID_TIME_BETWEEN_LOG_STATS_MILLIS + "'" , e );
}
}
}
{
// 数据源状态的最大SQL长度
String property = properties . getProperty ( Constants . DRUID_STAT_SQL_MAX_SIZE );
if ( property != null && property . length () > 0 ) {
try {
int value = Integer . parseInt ( property );
if ( dataSourceStat != null ) {
dataSourceStat . setMaxSqlSize ( value );
}
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property '" + Constants . DRUID_STAT_SQL_MAX_SIZE + "'" , e );
}
}
}
{
// 是否允许清除过滤器
Boolean value = getBoolean ( properties , "druid.clearFiltersEnable" );
if ( value != null ) {
this . setClearFiltersEnable ( value );
}
}
{
// 是否允许重置状态
Boolean value = getBoolean ( properties , "druid.resetStatEnable" );
if ( value != null ) {
this . setResetStatEnable ( value );
}
}
{
// 数据源未满条件等待超时后的重试次数
String property = properties . getProperty ( "druid.notFullTimeoutRetryCount" );
if ( property != null && property . length () > 0 ) {
try {
int value = Integer . parseInt ( property );
this . setNotFullTimeoutRetryCount ( value );
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property 'druid.notFullTimeoutRetryCount'" , e );
}
}
}
{
// 获取连接时, 等待的最大线程数
String property = properties . getProperty ( "druid.maxWaitThreadCount" );
if ( property != null && property . length () > 0 ) {
try {
int value = Integer . parseInt ( property );
this . setMaxWaitThreadCount ( value );
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property 'druid.maxWaitThreadCount'" , e );
}
}
}
{
// 是否快速失败
Boolean value = getBoolean ( properties , "druid.failFast" );
if ( value != null ) {
this . setFailFast ( value );
}
}
{
// 物理超时时间
String property = properties . getProperty ( "druid.phyTimeoutMillis" );
if ( property != null && property . length () > 0 ) {
try {
long value = Long . parseLong ( property );
this . setPhyTimeoutMillis ( value );
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property 'druid.phyTimeoutMillis'" , e );
}
}
}
{
// 连接保持空闲而不被驱逐的最长时间
String property = properties . getProperty ( "druid.minEvictableIdleTimeMillis" );
if ( property != null && property . length () > 0 ) {
try {
long value = Long . parseLong ( property );
this . setMinEvictableIdleTimeMillis ( value );
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property 'druid.minEvictableIdleTimeMillis'" , e );
}
}
}
{
// 连接被驱逐前, 最大的空闲时间
String property = properties . getProperty ( "druid.maxEvictableIdleTimeMillis" );
if ( property != null && property . length () > 0 ) {
try {
long value = Long . parseLong ( property );
this . setMaxEvictableIdleTimeMillis ( value );
} catch ( NumberFormatException e ) {
LOG . error ( "illegal property 'druid.maxEvictableIdleTimeMillis'" , e );
}
}
}
}
public void init () throws SQLException {
// 是否已初始化
if ( inited ) {
return ;
}
// 初始化加锁
final ReentrantLock lock = this . lock ;
try {
// 可被中断
lock . lockInterruptibly ();
} catch ( InterruptedException e ) {
throw new SQLException ( "interrupt" , e );
}
boolean init = false ;
try {
// 再次检查是否已初始化
if ( inited ) {
return ;
}
initStackTrace = Utils . toString ( Thread . currentThread (). getStackTrace ());
// 数据源ID生成, 从1开始
this . id = DruidDriver . createDataSourceId ();
if ( this . id > 1 ) {
// 多数据源时, 以下属性ID生成相差10w, 即每个数据源的以下属性有10w个id
long delta = ( this . id - 1 ) * 100000 ;
// Connection ID
this . connectionIdSeed . addAndGet ( delta );
// Statement ID
this . statementIdSeed . addAndGet ( delta );
// ResultSet ID
this . resultSetIdSeed . addAndGet ( delta );
// transaction ID
this . transactionIdSeed . addAndGet ( delta );
}
// jdbc url
if ( this . jdbcUrl != null ) {
this . jdbcUrl = this . jdbcUrl . trim ();
initFromWrapDriverUrl ();
}
// 初始化filter
for ( Filter filter : filters ) {
filter . init ( this );
}
// db类型
if ( this . dbType == null || this . dbType . length () == 0 ) {
this . dbType = JdbcUtils . getDbType ( jdbcUrl , null );
}
// MySQL 或 MariaDB
if ( JdbcConstants . MYSQL . equals ( this . dbType ) || //
JdbcConstants . MARIADB . equals ( this . dbType )) {
boolean cacheServerConfigurationSet = false ;
// 是否缓存服务端配置
if ( this . connectProperties . containsKey ( "cacheServerConfiguration" )) {
cacheServerConfigurationSet = true ;
} else if ( this . jdbcUrl . indexOf ( "cacheServerConfiguration" ) != - 1 ) {
cacheServerConfigurationSet = true ;
}
if ( cacheServerConfigurationSet ) {
this . connectProperties . put ( "cacheServerConfiguration" , "true" );
}
}
// 最大存活连接数不能小于0
if ( maxActive <= 0 ) {
throw new IllegalArgumentException ( "illegal maxActive " + maxActive );
}
// 最大存活连接数不能小于最小空闲连接数
if ( maxActive < minIdle ) {
throw new IllegalArgumentException ( "illegal maxActive " + maxActive );
}
// 初始化连接数不能大于最大存活连接数
if ( getInitialSize () > maxActive ) {
throw new IllegalArgumentException ( "illegal initialSize " + this . initialSize + ", maxActieve " + maxActive );
}
if ( timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat ) {
throw new IllegalArgumentException ( "timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true" );
}
if ( maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis ) {
throw new SQLException ( "maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis" );
}
if ( this . driverClass != null ) {
this . driverClass = driverClass . trim ();
}
// 使用ServiceLoader加载Filter, 即从META-INF/services/com.alibaba.druid.filter.Filter中获取实现类
// Filter实现类必须注解@AutoLoad, 才能被addFilter
initFromSPIServiceLoader ();
if ( this . driver == null ) {
if ( this . driverClass == null || this . driverClass . isEmpty ()) {
// 从URL中解析Driver
this . driverClass = JdbcUtils . getDriverClassName ( this . jdbcUrl );
}
if ( MockDriver . class . getName (). equals ( driverClass )) {
// MockDriver
driver = MockDriver . instance ;
} else {
// 加载Driver
driver = JdbcUtils . createDriver ( driverClassLoader , driverClass );
}
} else {
if ( this . driverClass == null ) {
this . driverClass = driver . getClass (). getName ();
}
}
// 做一些特例DB检查, 如Oracle版本必须大于10
// 针对DB2的validateQuery语句
initCheck ();
// 初始化异常处理器, 根据不同的DB驱动, 实例化不同的ExceptionSorter
// ExceptionSorter可用于友好地处理一些严重的数据库错误,
// 如Server不可用等, 这时可能需要将连接关闭
initExceptionSorter ();
// 初始化连接验证器, 根据不同的DB驱动, 实例化不同的ValidConnectionChecker
// ValidConnectionChecker用于验证连接是否有效
initValidConnectionChecker ();
// 检查validationQuery是否需要设置
validationQueryCheck ();
if ( isUseGlobalDataSourceStat ()) {
// 使用全局的数据源状态
dataSourceStat = JdbcDataSourceStat . getGlobal ();
if ( dataSourceStat == null ) {
dataSourceStat = new JdbcDataSourceStat ( "Global" , "Global" , this . dbType );
JdbcDataSourceStat . setGlobal ( dataSourceStat );
}
if ( dataSourceStat . getDbType () == null ) {
dataSourceStat . setDbType ( this . getDbType ());
}
} else {
// 使用单独的数据源状态
dataSourceStat = new JdbcDataSourceStat ( this . name , this . jdbcUrl , this . dbType , this . connectProperties );
}
dataSourceStat . setResetStatEnable ( this . resetStatEnable );
// 预实例化连接数为最大存活连接数
connections = new DruidConnectionHolder [ maxActive ];
SQLException connectError = null ;
try {
// 初始化连接池, 连接数为initialSize
for ( int i = 0 , size = getInitialSize (); i < size ; ++ i ) {
// 创建物理连接
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection ();
// 包装连接为DruidConnectionHolder
DruidConnectionHolder holder = new DruidConnectionHolder ( this , pyConnectInfo );
connections [ poolingCount ] = holder ;
// poolingCount 统计数+1
incrementPoolingCount ();
}
if ( poolingCount > 0 ) {
// 记录下连接池最大值
poolingPeak = poolingCount ;
// 记录下连接池最大值时的时间, 可以分析什么时刻请求连接达到最高
poolingPeakTime = System . currentTimeMillis ();
}
} catch ( SQLException ex ) {
LOG . error ( "init datasource error, url: " + this . getUrl (), ex );
connectError = ex ;
}
// 开启日志记录数据源状态的线程
createAndLogThread ();
// 开启创建连接的线程
createAndStartCreatorThread ();
// 开启销毁连接的线程
createAndStartDestroyThread ();
// 等待上面的CreateConnectionThread和DestroyConnectionThread成功启动
initedLatch . await ();
init = true ;
// 完成初始化的时间
initedTime = new Date ();
// 注册数据源到MBean Server
registerMbean ();
if ( connectError != null && poolingCount == 0 ) {
throw connectError ;
}
} catch ( SQLException e ) {
LOG . error ( "{dataSource-" + this . getID () + "} init error" , e );
throw e ;
} catch ( InterruptedException e ) {
throw new SQLException ( e . getMessage (), e );
} finally {
// 初始化完毕
inited = true ;
lock . unlock ();
if ( init && LOG . isInfoEnabled ()) {
LOG . info ( "{dataSource-" + this . getID () + "} inited" );
}
}
}
public void close () {
// 加锁
lock . lock ();
try {
// 若已经关闭
if ( this . closed ) {
return ;
}
// 若未初始化过
if (! this . inited ) {
return ;
}
// 中断记录状态的日志线程
if ( logStatsThread != null ) {
logStatsThread . interrupt ();
}
// 中断创建连接的线程
if ( createConnectionThread != null ) {
createConnectionThread . interrupt ();
}
// 中断销毁连接的线程
if ( destroyConnectionThread != null ) {
destroyConnectionThread . interrupt ();
}
// 取消销毁调度任务
if ( destroySchedulerFuture != null ) {
destroySchedulerFuture . cancel ( true );
}
for ( int i = 0 ; i < poolingCount ; ++ i ) {
try {
DruidConnectionHolder connHolder = connections [ i ];
// 关闭连接中的Statement
for ( PreparedStatementHolder stmtHolder : connHolder . getStatementPool (). getMap (). values ()) {
connHolder . getStatementPool (). closeRemovedStatement ( stmtHolder );
}
connHolder . getStatementPool (). getMap (). clear ();
Connection physicalConnection = connHolder . getConnection ();
// 关闭物理连接
physicalConnection . close ();
// help gc clean
connections [ i ] = null ;
// 销毁连接数 +1
destroyCount . incrementAndGet ();
} catch ( Exception ex ) {
LOG . warn ( "close connection error" , ex );
}
}
// 连接数置0
poolingCount = 0 ;
// 取消注册MBean
unregisterMbean ();
enable = false ;
notEmpty . signalAll ();
notEmptySignalCount ++;
// 已关闭
this . closed = true ;
this . closeTimeMillis = System . currentTimeMillis ();
// 销毁filter
for ( Filter filter : filters ) {
filter . destroy ();
}
} finally {
lock . unlock ();
}
if ( LOG . isInfoEnabled ()) {
LOG . info ( "{dataSource-" + this . getID () + "} closed" );
}
}
public void restart () throws SQLException {
// 加锁
lock . lock ();
try {
// 若还有存活的连接
if ( activeCount > 0 ) {
throw new SQLException ( "can not restart, activeCount not zero. " + activeCount );
}
if ( LOG . isInfoEnabled ()) {
LOG . info ( "{dataSource-" + this . getID () + "} restart" );
}
// 关闭数据源
this . close ();
// 重置状态
this . resetStat ();
// 未初始化标志
this . inited = false ;
this . enable = true ;
this . closed = false ;
} finally {
lock . unlock ();
}
}
public DruidPooledConnection getConnection () throws SQLException {
// 默认不设置最大等待时间
return getConnection ( maxWait );
}
public DruidPooledConnection getConnection ( long maxWaitMillis ) throws SQLException {
// 尝试初始化数据源
init ();
if ( filters . size () > 0 ) {
// 经过filter中获取连接
FilterChainImpl filterChain = new FilterChainImpl ( this );
return filterChain . dataSource_connect ( this , maxWaitMillis );
} else {
// 直接获取连接
return getConnectionDirect ( maxWaitMillis );
}
}
public DruidPooledConnection getConnectionDirect ( long maxWaitMillis ) throws SQLException {
int notFullTimeoutRetryCnt = 0 ;
// 请求连接
for (;;) {
// handle notFullTimeoutRetry
DruidPooledConnection poolableConnection ;
try {
// 获取Connection
poolableConnection = getConnectionInternal ( maxWaitMillis );
} catch ( GetConnectionTimeoutException ex ) {
// 获取连接超时, 并且连接池未满
if ( notFullTimeoutRetryCnt <= this . notFullTimeoutRetryCount && ! isFull ()) {
notFullTimeoutRetryCnt ++;
if ( LOG . isWarnEnabled ()) {
LOG . warn ( "not full timeout retry : " + notFullTimeoutRetryCnt );
}
// 再次尝试获取连接
continue ;
}
throw ex ;
}
// 当获取到连接后作验证
if ( isTestOnBorrow ()) {
// 验证连接
// 1. 优先使用ValidConnectionChecker
// 2. 否则, 直接通过Connection的Statement对象执行validateQuery
boolean validate = testConnectionInternal ( poolableConnection . getConnection ());
if (! validate ) {
// 验证失败
// 真实的连接对象
Connection realConnection = poolableConnection . getConnection ();
// 丢弃连接
discardConnection ( realConnection );
continue ;
}
} else {
Connection realConnection = poolableConnection . getConnection ();
if ( realConnection . isClosed ()) {
// 连接已关闭
// 传入null,避免重复关闭
discardConnection ( null );
continue ;
}
// 是否在连接空闲时, 检测连接有效性
if ( isTestWhileIdle ()) {
// 当前时间
final long currentTimeMillis = System . currentTimeMillis ();
// 连接最后激活时间
final long lastActiveTimeMillis = poolableConnection . getConnectionHolder (). getLastActiveTimeMillis ();
// 连接已经空闲的时间
final long idleMillis = currentTimeMillis - lastActiveTimeMillis ;
// 每隔多久运行一次驱逐空闲连接的动作
long timeBetweenEvictionRunsMillis = this . getTimeBetweenEvictionRunsMillis ();
if ( timeBetweenEvictionRunsMillis <= 0 ) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS ;
}
// 该连接空闲时间 > 驱逐空闲连接的间隔时间
// 可以执行一次连接验证
if ( idleMillis >= timeBetweenEvictionRunsMillis ) {
// 验证同上
boolean validate = testConnectionInternal ( poolableConnection . getConnection ());
if (! validate ) {
if ( LOG . isDebugEnabled ()) {
LOG . debug ( "skip not validate connection." );
}
// 丢弃连接
discardConnection ( realConnection );
continue ;
}
}
}
}
// 是否回收长时间不使用的连接
// 防止因应用程序未正确关闭连接而导致的连接泄露
// remove abandoned的动作会在DestroyTask中执行
// 配置removeAbandoned对性能会有一些影响,建议怀疑存在泄漏之后再打开
if ( isRemoveAbandoned ()) {
StackTraceElement [] stackTrace = Thread . currentThread (). getStackTrace ();
poolableConnection . setConnectStackTrace ( stackTrace );
// 设置连接的连接完成时间
poolableConnection . setConnectedTimeNano ();
poolableConnection . setTraceEnable ( true );
// 将连接放入activeConnections
synchronized ( activeConnections ) {
activeConnections . put ( poolableConnection , PRESENT );
}
}
// 不自动提交事务
if (! this . isDefaultAutoCommit ()) {
poolableConnection . setAutoCommit ( false );
}
return poolableConnection ;
}
}
/**
* 丢弃连接
*/
public void discardConnection ( Connection realConnection ) {
// 物理关闭连接
JdbcUtils . close ( realConnection );
// 加锁
lock . lock ();
try {
// 存活连接数 -1
activeCount --;
// 丢弃连接数 +1
discardCount ++;
// 当 存活连接数 < 最小空闲连接数时, 可以尝试增加一个用于创建连接的任务, 即CreateConnectionTask
if ( activeCount <= minIdle ) {
emptySignal ();
}
} finally {
// 释放锁
lock . unlock ();
}
}
/**
* 获取连接对象
*/
private DruidPooledConnection getConnectionInternal ( long maxWait ) throws SQLException {
// 数据源是否已经关闭
if ( closed ) {
connectErrorCount . incrementAndGet ();
throw new DataSourceClosedException ( "dataSource already closed at " + new Date ( closeTimeMillis ));
}
// 数据源是否可用
if (! enable ) {
connectErrorCount . incrementAndGet ();
throw new DataSourceDisableException ();
}
// 最大等待时间
final long nanos = TimeUnit . MILLISECONDS . toNanos ( maxWait );
// 最大等待线程数
final int maxWaitThreadCount = getMaxWaitThreadCount ();
DruidConnectionHolder holder ;
try {
// 获取连接时加锁, 并可被中断
lock . lockInterruptibly ();
} catch ( InterruptedException e ) {
// 连接错误数 +1
connectErrorCount . incrementAndGet ();
throw new SQLException ( "interrupt" , e );
}
try {
if ( maxWaitThreadCount > 0 ) {
// 当前等待连接的线程数 > 最大等待线程数
if ( notEmptyWaitThreadCount >= maxWaitThreadCount ) {
// 连接错误数 +1
connectErrorCount . incrementAndGet ();
throw new SQLException ( "maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock . getQueueLength ());
}
}
// 连接数 +1
connectCount ++;
// 开始获取连接:这里使用lock的notEmpty, notFull两个Condition实现连接的获取,类似生产者,消费者的模式
if ( maxWait > 0 ) {
// 若设置了最大等待时间
// 获取连接, 有可能超时, 返回null
holder = pollLast ( nanos );
} else {
// 一直等待到获取到连接
holder = takeLast ();
}
if ( holder != null ) {
// 激活连接数 +1
activeCount ++;
// 设置最大激活连接数和最大激活连接数的时间
if ( activeCount > activePeak ) {
activePeak = activeCount ;
activePeakTime = System . currentTimeMillis ();
}
}
} catch ( InterruptedException e ) {
// 连接错误数 +1
connectErrorCount . incrementAndGet ();
throw new SQLException ( e . getMessage (), e );
} catch ( SQLException e ) {
// 连接错误数 +1
connectErrorCount . incrementAndGet ();
throw e ;
} finally {
lock . unlock ();
}
if ( holder == null ) {
// 未获取到连接时, 输出一些错误信息, 如超时信息, 如等待时间, 存活连接数等, 当前运行的SQL
long waitNanos = waitNanosLocal . get ();
StringBuilder buf = new StringBuilder ();
buf . append ( "wait millis " ) //
. append ( waitNanos / ( 1000 * 1000 )) //
. append ( ", active " + activeCount ) //
. append ( ", maxActive " + maxActive ) //
;
List < JdbcSqlStatValue > sqlList = this . getDataSourceStat (). getRuningSqlList ();
for ( int i = 0 ; i < sqlList . size (); ++ i ) {
if ( i != 0 ) {
buf . append ( '\n' );
} else {
buf . append ( ", " );
}
JdbcSqlStatValue sql = sqlList . get ( i );
buf . append ( "runningSqlCount " );
buf . append ( sql . getRunningCount ());
buf . append ( " : " );
buf . append ( sql . getSql ());
}
String errorMessage = buf . toString ();
if ( this . createError != null ) {
throw new GetConnectionTimeoutException ( errorMessage , createError );
} else {
throw new GetConnectionTimeoutException ( errorMessage );
}
}
// 增加该连接的使用次数
holder . incrementUseCount ();
DruidPooledConnection poolalbeConnection = new DruidPooledConnection ( holder );
return poolalbeConnection ;
}
/**
* 获取连接池最后的连接
*/
DruidConnectionHolder takeLast () throws InterruptedException , SQLException {
try {
// 若池中可用的连接为0
while ( poolingCount == 0 ) {
// 尝试增加一个用于创建连接的任务, 即CreateConnectionTask
emptySignal ();
// 允许快速失败, 且在CreateConnectionTask中创建连接时发生了持续错误
if ( failFast && failContinuous . get ()) {
throw new DataSourceNotAvailableException ( createError );
}
// 等待线程数 +1
notEmptyWaitThreadCount ++;
// 统计最大等待线程数
if ( notEmptyWaitThreadCount > notEmptyWaitThreadPeak ) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount ;
}
try {
// 等待, 直到有连接被回收 or 创建
notEmpty . await ();
} finally {
// 等待线程数 -1
notEmptyWaitThreadCount --;
}
// 等待次数 +1
notEmptyWaitCount ++;
// 数据源已经禁用
if (! enable ) {
connectErrorCount . incrementAndGet ();
throw new DataSourceDisableException ();
}
}
} catch ( InterruptedException ie ) {
// 有可能当前线程获取到了连接通知, 但被中断了
// 通知其他未被中断的线程, 有机会获取连接
notEmpty . signal ();
// 通知次数 +1
notEmptySignalCount ++;
throw ie ;
}
// 连接池可用连接数 -1
decrementPoolingCount ();
// 获取最后一个连接
DruidConnectionHolder last = connections [ poolingCount ];
// 移除该连接的引用
connections [ poolingCount ] = null ;
return last ;
}
/**
* 超时获取连接
*/
private DruidConnectionHolder pollLast ( long nanos ) throws InterruptedException , SQLException {
long estimate = nanos ;
for (;;) {
// 若池中可用连接为0
if ( poolingCount == 0 ) {
// 尝试增加一个用于创建连接的任务, 即CreateConnectionTask
emptySignal ();
// 允许快速失败, 且在CreateConnectionTask中创建连接时发生了持续错误
if ( failFast && failContinuous . get ()) {
throw new DataSourceNotAvailableException ( createError );
}
if ( estimate <= 0 ) {
// 等待超时了, 设置等待了多久
waitNanosLocal . set ( nanos - estimate );
// 返回null
return null ;
}
// 等待线程数 +1
notEmptyWaitThreadCount ++;
// 统计最大等待线程数
if ( notEmptyWaitThreadCount > notEmptyWaitThreadPeak ) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount ;
}
try {
long startEstimate = estimate ;
// 等待, 直到超时或者被唤醒
estimate = notEmpty . awaitNanos ( estimate );
// 等待次数 +1
notEmptyWaitCount ++;
// 等待时间
notEmptyWaitNanos += ( startEstimate - estimate );
// 数据源已关闭
if (! enable ) {
connectErrorCount . incrementAndGet ();
throw new DataSourceDisableException ();
}
} catch ( InterruptedException ie ) {
// 有可能当前线程获取到了连接通知, 但被中断了
// 通知其他未被中断的线程, 有机会获取连接
notEmpty . signal ();
// 通知次数 +1
notEmptySignalCount ++;
throw ie ;
} finally {
// 等待线程数 +1
notEmptyWaitThreadCount --;
}
if ( poolingCount == 0 ) {
// 继续等待
if ( estimate > 0 ) {
continue ;
}
// 等待超时
waitNanosLocal . set ( nanos - estimate );
return null ;
}
}
// 减小池中可用连接数
decrementPoolingCount ();
// 取出连接
DruidConnectionHolder last = connections [ poolingCount ];
connections [ poolingCount ] = null ;
long waitNanos = nanos - estimate ;
last . setLastNotEmptyWaitNanos ( waitNanos );
return last ;
}
}
/**
* 连接池为空时,尝试提交一个用于创建连接的任务
*/
private void emptySignal () {
// 创建连接的调度器为null
if ( createScheduler == null ) {
empty . signal ();
return ;
}
// 创建连接的任务线程达到最大值, 默认为3
if ( createTaskCount >= maxCreateTaskCount ) {
return ;
}
// 存活连接数 + 池中可用连接数 + 创建连接的任务数 >= 最大存活连接数
if ( activeCount + poolingCount + createTaskCount >= maxActive ) {
return ;
}
// 创建连接的任务数 +1
createTaskCount ++;
// 创建用于创建连接的任务
CreateConnectionTask task = new CreateConnectionTask ();
// 提交任务
createScheduler . submit ( task );
}
/**
* 用于创建连接的任务
*/
public class CreateConnectionTask implements Runnable {
/**
* 错误次数
*/
private int errorCount = 0 ;
@Override
public void run () {
runInternal ();
}
private void runInternal () {
for (;;) {
// 加锁
lock . lock ();
try {
boolean emptyWait = true ;
// 已经发生过创建连接的错误, 且池中可用连接为0
if ( createError != null && poolingCount == 0 ) {
emptyWait = false ;
}
if ( emptyWait ) {
// 若池中的连接数 >= 等待线程数, 没必要再创建连接
if ( poolingCount >= notEmptyWaitThreadCount ) {
createTaskCount --;
return ;
}
// 防止创建超过maxActive数量的连接
if ( activeCount + poolingCount >= maxActive ) {
createTaskCount --;
return ;
}
}
} finally {
lock . unlock ();
}
PhysicalConnectionInfo physicalConnection = null ;
try {
// 创建物理连接
physicalConnection = createPhysicalConnection ();
// 创建成功, 则重试创建连接成功
setFailContinuous ( false );
} catch ( SQLException e ) {
LOG . error ( "create connection error, url: " + jdbcUrl , e );
// 创建连接, 发生错误次数 + 1
errorCount ++;
if ( errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0 ) {
// 错误次数 > 连接错误重试次数, 并设置了创建连接错误的时间间隔
// 重试创建连接失败
setFailContinuous ( true );
// 快速失败
if ( failFast ) {
lock . lock ();
try {
// 通知其他等待的线程
// 这个时候有可能数据源已经不可用, 不用再等待了
notEmpty . signalAll ();
} finally {
lock . unlock ();
}
}
if ( breakAfterAcquireFailure ) {
lock . lock ();
try {
createTaskCount --;
} finally {
lock . unlock ();
}
return ;
}
this . errorCount = 0 ; // reset errorCount
// timeBetweenConnectErrorMillis之后, 重新调度该Task
createScheduler . schedule ( this , timeBetweenConnectErrorMillis , TimeUnit . MILLISECONDS );
return ;
}
} catch ( RuntimeException e ) {
LOG . error ( "create connection error" , e );
// unknow fatal exception
setFailContinuous ( true );
continue ;
} catch ( Error e ) {
lock . lock ();
try {
createTaskCount --;
} finally {
lock . unlock ();
}
LOG . error ( "create connection error" , e );
// unknow fatal exception
setFailContinuous ( true );
break ;
}
if ( physicalConnection == null ) {
continue ;
}
// 将创建的连接放入池中
boolean result = put ( physicalConnection );
if (! result ) {
// 未放入成功,则关闭连接
JdbcUtils . close ( physicalConnection . getPhysicalConnection ());
LOG . info ( "put physical connection to pool failed." );
}
break ;
}
}
}
/**
* 创建物理连接
*/
public PhysicalConnectionInfo createPhysicalConnection () throws SQLException {
// 数据库URL
String url = this . getUrl ();
// 连接属性
Properties connectProperties = getConnectProperties ();
// 用户名
String user ;
if ( getUserCallback () != null ) {
// 输入用户名
user = getUserCallback (). getName ();
} else {
user = getUsername ();
}
// 密码
String password = getPassword ();
PasswordCallback passwordCallback = getPasswordCallback ();
if ( passwordCallback != null ) {
if ( passwordCallback instanceof DruidPasswordCallback ) {
DruidPasswordCallback druidPasswordCallback = ( DruidPasswordCallback ) passwordCallback ;
druidPasswordCallback . setUrl ( url );
druidPasswordCallback . setProperties ( connectProperties );
}
char [] chars = passwordCallback . getPassword ();
if ( chars != null ) {
password = new String ( chars );
}
}
// 连接属性
Properties physicalConnectProperties = new Properties ();
if ( connectProperties != null ) {
physicalConnectProperties . putAll ( connectProperties );
}
if ( user != null && user . length () != 0 ) {
physicalConnectProperties . put ( "user" , user );
}
if ( password != null && password . length () != 0 ) {
physicalConnectProperties . put ( "password" , password );
}
Connection conn ;
// 进行连接相关操作的开始时间
long connectStartNanos = System . nanoTime ();
// 成功连接后的时间, 初始化后的时间, 验证成功后的时间
long connectedNanos , initedNanos , validatedNanos ;
try {
// 创建物理连接
conn = createPhysicalConnection ( url , physicalConnectProperties );
connectedNanos = System . nanoTime ();
if ( conn == null ) {
throw new SQLException ( "connect error, url " + url + ", driverClass " + this . driverClass );
}
// 初始化连接基本属性, 如是否自动提交, 事务隔离级别
initPhysicalConnection ( conn );
initedNanos = System . nanoTime ();
// 验证连接
validateConnection ( conn );
validatedNanos = System . nanoTime ();
setCreateError ( null );
} catch ( SQLException ex ) {
setCreateError ( ex );
throw ex ;
} catch ( RuntimeException ex ) {
setCreateError ( ex );
throw ex ;
} catch ( Error ex ) {
// 创建连接错误数 + 1
createErrorCount . incrementAndGet ();
throw ex ;
} finally {
long nano = System . nanoTime () - connectStartNanos ;
// 创建连接的时间跨度
createTimespan += nano ;
}
// 物理连接信息
return new PhysicalConnectionInfo ( conn , connectStartNanos , connectedNanos , initedNanos , validatedNanos );
}
/**
* 将物理连接服务连接池中
*/
protected boolean put ( PhysicalConnectionInfo physicalConnectionInfo ) {
DruidConnectionHolder holder = null ;
try {
// 包装连接为DruidConnectionHolder对象
holder = new DruidConnectionHolder ( DruidDataSource . this , physicalConnectionInfo );
} catch ( SQLException ex ) {
lock . lock ();
try {
if ( createScheduler != null ) {
createTaskCount --;
}
} finally {
lock . unlock ();
}
LOG . error ( "create connection holder error" , ex );
return false ;
}
lock . lock ();
try {
// 连接池中连接数 > 最大存活连接数
if ( poolingCount >= maxActive ) {
return false ;
}
// 放入连接
connections [ poolingCount ] = holder ;
// 增加连接池连接数
incrementPoolingCount ();
// 统计最大连接池连接数
if ( poolingCount > poolingPeak ) {
poolingPeak = poolingCount ;
poolingPeakTime = System . currentTimeMillis ();
}
// 唤醒一个等待线程
notEmpty . signal ();
notEmptySignalCount ++;
if ( createScheduler != null ) {
createTaskCount --;
if ( poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive ) {
// 尝试启动创建连接的线程
emptySignal ();
}
}
} finally {
lock . unlock ();
}
return true ;
}
通过上面的代码片段,即获取到了连接对象DruidPooledConnection ,继续看下DruidPooledConnection 如何关闭内部Connection 对象:
public void close () throws SQLException {
// 连接已经不可用
if ( this . disable ) {
return ;
}
DruidConnectionHolder holder = this . holder ;
if ( holder == null ) {
// 已经关闭了
if ( dupCloseLogEnable ) {
LOG . error ( "dup close" );
}
return ;
}
DruidAbstractDataSource dataSource = holder . getDataSource ();
// 创建该连接的线程是否与关闭线程一致
boolean isSameThread = this . getOwnerThread () == Thread . currentThread ();
if (! isSameThread ) {
// 不一致, 则设置数据源可以异步关闭连接
dataSource . setAsyncCloseConnectionEnable ( true );
}
if ( dataSource . isAsyncCloseConnectionEnable ()) {
// 异步关闭连接
syncClose ();
return ;
}
for ( ConnectionEventListener listener : holder . getConnectionEventListeners ()) {
listener . connectionClosed ( new ConnectionEvent ( this ));
}
List < Filter > filters = dataSource . getProxyFilters ();
if ( filters . size () > 0 ) {
// 若配置了Filter, 则调用Filter链回收连接
FilterChainImpl filterChain = new FilterChainImpl ( dataSource );
filterChain . dataSource_recycle ( this );
} else {
// 回收连接
recycle ();
}
// 连接不可用
this . disable = true ;
}
public void recycle () throws SQLException {
// 连接已关闭
if ( this . disable ) {
return ;
}
DruidConnectionHolder holder = this . holder ;
if ( holder == null ) {
// 重复关闭
if ( dupCloseLogEnable ) {
LOG . error ( "dup close" );
}
return ;
}
// 是否直接丢弃, 默认当然是放回连接池
if (! this . abandoned ) {
DruidAbstractDataSource dataSource = holder . getDataSource ();
// 回收连接
dataSource . recycle ( this );
}
// 置空
this . holder = null ;
conn = null ;
transactionInfo = null ;
closed = true ;
}
/**
* DruidDataSource.recycle()
*/
protected void recycle ( DruidPooledConnection pooledConnection ) throws SQLException {
final DruidConnectionHolder holder = pooledConnection . getConnectionHolder ();
if ( holder == null ) {
LOG . warn ( "connectionHolder is null" );
return ;
}
if ( logDifferentThread //
&& (! isAsyncCloseConnectionEnable ()) //
&& pooledConnection . getOwnerThread () != Thread . currentThread () //
) {
// 不是同一线程获取/关闭连接
LOG . warn ( "get/close not same thread" );
}
final Connection physicalConnection = holder . getConnection ();
if ( pooledConnection . isTraceEnable ()) {
// 连接开启了调试
synchronized ( activeConnections ) {
if ( pooledConnection . isTraceEnable ()) {
// 从激活连接中移除连接
Object oldInfo = activeConnections . remove ( pooledConnection );
if ( oldInfo == null ) {
if ( LOG . isWarnEnabled ()) {
LOG . warn ( "remove abandonded failed. activeConnections.size " + activeConnections . size ());
}
}
pooledConnection . setTraceEnable ( false );
}
}
}
// 连接是否自动提交
final boolean isAutoCommit = holder . isUnderlyingAutoCommit ();
// 连接是否是只读模式
final boolean isReadOnly = holder . isUnderlyingReadOnly ();
// 是否在归还连接时, 检测连接有效性
final boolean testOnReturn = this . isTestOnReturn ();
try {
if ((! isAutoCommit ) && (! isReadOnly )) {
// 有必要回滚连接
pooledConnection . rollback ();
}
// 重置holder
// 连接获取者和归还者是否为同一线程
boolean isSameThread = pooledConnection . getOwnerThread () == Thread . currentThread ();
if (! isSameThread ) {
// 不同线程, 需加锁
synchronized ( pooledConnection ) {
holder . reset ();
}
} else {
// 同一线程
holder . reset ();
}
// 如果线程已经标记为丢弃, 不用归还
if ( holder . isDiscard ()) {
return ;
}
// 检测连接, 无效则关闭连接
if ( testOnReturn ) {
boolean validate = testConnectionInternal ( physicalConnection );
if (! validate ) {
JdbcUtils . close ( physicalConnection );
destroyCount . incrementAndGet ();
lock . lock ();
try {
activeCount --;
closeCount ++;
} finally {
lock . unlock ();
}
return ;
}
}
// 数据源已经不可用, 则丢弃连接
if (! enable ) {
discardConnection ( holder . getConnection ());
return ;
}
boolean result ;
final long lastActiveTimeMillis = System . currentTimeMillis ();
lock . lockInterruptibly ();
try {
// 激活连接数 -1
activeCount --;
// 关闭连接数 +1
closeCount ++;
// 将连接放入连接池尾部
result = putLast ( holder , lastActiveTimeMillis );
// 归还连接数 +1
recycleCount ++;
} finally {
lock . unlock ();
}
if (! result ) {
// 归还失败,则关闭连接
JdbcUtils . close ( holder . getConnection ());
LOG . info ( "connection recyle failed." );
}
} catch ( Throwable e ) {
// 清空PS缓存
holder . clearStatementCache ();
if (! holder . isDiscard ()) {
this . discardConnection ( physicalConnection );
holder . setDiscard ( true );
}
LOG . error ( "recyle error" , e );
// 归还错误数 +1
recycleErrorCount . incrementAndGet ();
}
}
/**
* 将连接放入连接池尾部
*/
boolean putLast ( DruidConnectionHolder e , long lastActiveTimeMillis ) {
// 池中连接数已经 >= 最大激活连接数, 则不用归还
if ( poolingCount >= maxActive ) {
return false ;
}
// 设置连接最后激活连接时间
e . setLastActiveTimeMillis ( lastActiveTimeMillis );
// 归还连接
connections [ poolingCount ] = e ;
// 池中连接数 +1
incrementPoolingCount ();
// 统计池中最大连接数
if ( poolingCount > poolingPeak ) {
poolingPeak = poolingCount ;
poolingPeakTime = lastActiveTimeMillis ;
}
// 唤醒一个等待的线程
notEmpty . signal ();
// 通知数 +1
notEmptySignalCount ++;
return true ;
}
除了获取连接 ,归还连接 ,数据源中还需要自动移除一些长时间未使用的连接,这通过DestroyTask 来完成:
/**
* 移除长时间未使用连接的任务
*/
public class DestroyTask implements Runnable {
@Override
public void run () {
// 收缩连接池
// 以连接的最后激活时间为准
shrink ( true );
if ( isRemoveAbandoned ()) {
// 是以连接建立的时间为准
removeAbandoned ();
}
}
}
/**
* 收缩连接池
*/
public void shrink ( boolean checkTime ) {
// 被驱逐的连接
final List < DruidConnectionHolder > evictList = new ArrayList < DruidConnectionHolder >();
try {
lock . lockInterruptibly ();
} catch ( InterruptedException e ) {
return ;
}
try {
// 需要检查的连接数
final int checkCount = poolingCount - minIdle ;
final long currentTimeMillis = System . currentTimeMillis ();
for ( int i = 0 ; i < poolingCount ; ++ i ) {
DruidConnectionHolder connection = connections [ i ];
// 需要驱逐超时的连接
if ( checkTime ) {
// 物理超时时间
if ( phyTimeoutMillis > 0 ) {
long phyConnectTimeMillis = currentTimeMillis - connection . getTimeMillis ();
if ( phyConnectTimeMillis > phyTimeoutMillis ) {
// 超时则驱逐
evictList . add ( connection );
continue ;
}
}
// 连接已经空闲的时间
long idleMillis = currentTimeMillis - connection . getLastActiveTimeMillis ();
if ( idleMillis < minEvictableIdleTimeMillis ) {
break ;
}
if ( checkTime && i < checkCount ) {
// 驱逐前面多余的连接
evictList . add ( connection );
} else if ( idleMillis > maxEvictableIdleTimeMillis ) {
// 驱逐超过最大空闲时间的连接
evictList . add ( connection );
}
} else {
// 仅驱逐多余的连接, 不作超时校验
if ( i < checkCount ) {
// 驱逐前面多余的连接
evictList . add ( connection );
} else {
break ;
}
}
}
int removeCount = evictList . size ();
if ( removeCount > 0 ) {
// 将保留的连接复制到连接池前面
System . arraycopy ( connections , removeCount , connections , 0 , poolingCount - removeCount );
// 置空连接池剩余的位置
Arrays . fill ( connections , poolingCount - removeCount , poolingCount , null );
// 连接池可用连接数 -removeCount
poolingCount -= removeCount ;
}
} finally {
lock . unlock ();
}
// 将驱逐的连接都物理关闭
for ( DruidConnectionHolder item : evictList ) {
Connection connection = item . getConnection ();
JdbcUtils . close ( connection );
destroyCount . incrementAndGet ();
}
}
/**
* 关闭连接过长的连接
*/
public int removeAbandoned () {
int removeCount = 0 ;
long currrentNanos = System . nanoTime ();
// 移除的连接
List < DruidPooledConnection > abandonedList = new ArrayList < DruidPooledConnection >();
synchronized ( activeConnections ) {
// 仅从激活的连接中
Iterator < DruidPooledConnection > iter = activeConnections . keySet (). iterator ();
for (; iter . hasNext ();) {
DruidPooledConnection pooledConnection = iter . next ();
if ( pooledConnection . isRunning ()) {
continue ;
}
// 连接已经建立的时间
long timeMillis = ( currrentNanos - pooledConnection . getConnectedTimeNano ()) / ( 1000 * 1000 );
if ( timeMillis >= removeAbandonedTimeoutMillis ) {
// 超过连接最大连接时间, 则移除
iter . remove ();
pooledConnection . setTraceEnable ( false );
abandonedList . add ( pooledConnection );
}
}
}
if ( abandonedList . size () > 0 ) {
for ( DruidPooledConnection pooledConnection : abandonedList ) {
synchronized ( pooledConnection ) {
// 连接已经disable
if ( pooledConnection . isDisable ()) {
continue ;
}
}
// 直接关闭连接
JdbcUtils . close ( pooledConnection );
pooledConnection . abandond ();
removeAbandonedCount ++;
removeCount ++;
// 日志处理
if ( isLogAbandoned ()) {
StringBuilder buf = new StringBuilder ();
buf . append ( "abandon connection, owner thread: " );
buf . append ( pooledConnection . getOwnerThread (). getName ());
buf . append ( ", connected at : " );
buf . append ( pooledConnection . getConnectedTimeMillis ());
buf . append ( ", open stackTrace\n" );
StackTraceElement [] trace = pooledConnection . getConnectStackTrace ();
for ( int i = 0 ; i < trace . length ; i ++) {
buf . append ( "\tat " );
buf . append ( trace [ i ]. toString ());
buf . append ( "\n" );
}
buf . append ( "ownerThread current state is " + pooledConnection . getOwnerThread (). getState () + ", current stackTrace\n" );
trace = pooledConnection . getOwnerThread (). getStackTrace ();
for ( int i = 0 ; i < trace . length ; i ++) {
buf . append ( "\tat " );
buf . append ( trace [ i ]. toString ());
buf . append ( "\n" );
}
LOG . error ( buf . toString ());
}
}
}
return removeCount ;
}
总结
以上,则是Druid 数据源内部对连接的基本管理,这也是连接池的基本功能。除此外,Druid 通过Filter 实现了对数据源的监控 ,代理 等功能,这部分可以后期研究。