Mybatis通过内部的Transaction和TransactionFactory来管理事务。 本文将探究一番Mybatis事务管理的一些细节。
public interface Transaction {
/**
* 获取Connection对象
*/
Connection getConnection() throws SQLException;
/**
* 提交Connection
*/
void commit() throws SQLException;
/**
* 回滚Connection
*/
void rollback() throws SQLException;
/**
* 关闭Connection
*/
void close() throws SQLException;
}
public class JdbcTransaction implements Transaction {
private static final Log log = LogFactory.getLog(JdbcTransaction.class);
protected Connection connection; //当前JDBC连接
protected DataSource dataSource; //数据源
protected TransactionIsolationLevel level; //事务级别
protected boolean autoCommmit;
public JdbcTransaction(DataSource ds, TransactionIsolationLevel desiredLevel, boolean desiredAutoCommit) {
dataSource = ds;
level = desiredLevel;
autoCommmit = desiredAutoCommit;
}
public JdbcTransaction(Connection connection) {
this.connection = connection;
}
/**
* 延迟加载Connection对象
*/
public Connection getConnection() throws SQLException {
if (connection == null) {
openConnection();
}
return connection;
}
/**
* 提交事务
*/
public void commit() throws SQLException {
if (connection != null && !connection.getAutoCommit()) {
if (log.isDebugEnabled()) {
log.debug("Committing JDBC Connection [" + connection + "]");
}
connection.commit();
}
}
/**
* 回滚事务
*/
public void rollback() throws SQLException {
if (connection != null && !connection.getAutoCommit()) {
if (log.isDebugEnabled()) {
log.debug("Rolling back JDBC Connection [" + connection + "]");
}
connection.rollback();
}
}
/**
* 关闭事务
*/
public void close() throws SQLException {
if (connection != null) {
resetAutoCommit();
if (log.isDebugEnabled()) {
log.debug("Closing JDBC Connection [" + connection + "]");
}
connection.close();
}
}
protected void setDesiredAutoCommit(boolean desiredAutoCommit) {
try {
if (connection.getAutoCommit() != desiredAutoCommit) {
if (log.isDebugEnabled()) {
log.debug("Setting autocommit to " + desiredAutoCommit + " on JDBC Connection [" + connection + "]");
}
connection.setAutoCommit(desiredAutoCommit);
}
} catch (SQLException e) {
throw new TransactionException("Error configuring AutoCommit. "
+ "Your driver may not support getAutoCommit() or setAutoCommit(). "
+ "Requested setting: " + desiredAutoCommit + ". Cause: " + e, e);
}
}
/**
* 重置事务为自动提交
*/
protected void resetAutoCommit() {
try {
if (!connection.getAutoCommit()) {
if (log.isDebugEnabled()) {
log.debug("Resetting autocommit to true on JDBC Connection [" + connection + "]");
}
connection.setAutoCommit(true);
}
} catch (SQLException e) {
log.debug("Error resetting autocommit to true "
+ "before closing the connection. Cause: " + e);
}
}
/**
* 从数据源获取Connection对象
*/
protected void openConnection() throws SQLException {
if (log.isDebugEnabled()) {
log.debug("Opening JDBC Connection");
}
connection = dataSource.getConnection();
if (level != null) {
connection.setTransactionIsolation(level.getLevel());
}
setDesiredAutoCommit(autoCommmit);
}
}
public class JdbcTransactionFactory implements TransactionFactory {
public void setProperties(Properties props) {
}
public Transaction newTransaction(Connection conn) {
return new JdbcTransaction(conn);
}
public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
return new JdbcTransaction(ds, level, autoCommit);
}
}
public class ManagedTransaction {
...
public void commit() throws SQLException {
// Does nothing
}
public void rollback() throws SQLException {
// Does nothing
}
...
}
public class ManagedTransactionFactory implements TransactionFactory {
private boolean closeConnection = true; //关闭事务时是否关闭连接
public void setProperties(Properties props) {
if (props != null) {
String closeConnectionProperty = props.getProperty("closeConnection");
if (closeConnectionProperty != null) {
closeConnection = Boolean.valueOf(closeConnectionProperty);
}
}
}
public Transaction newTransaction(Connection conn) {
return new ManagedTransaction(conn, closeConnection);
}
public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
return new ManagedTransaction(ds, level, closeConnection);
}
}
public interface SqlSession extends Closeable {
<T> T selectOne(String statement);
<T> T selectOne(String statement, Object parameter);
<E> List<E> selectList(String statement);
<E> List<E> selectList(String statement, Object parameter);
<E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds);
<K, V> Map<K, V> selectMap(String statement, String mapKey);
<K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey);
<K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds);
void select(String statement, Object parameter, ResultHandler handler);
void select(String statement, ResultHandler handler);
void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler);
int insert(String statement);
int insert(String statement, Object parameter);
int update(String statement);
int update(String statement, Object parameter);
int delete(String statement);
int delete(String statement, Object parameter);
void commit();
void commit(boolean force);
void rollback();
void rollback(boolean force);
List<BatchResult> flushStatements();
void close();
void clearCache();
Configuration getConfiguration();
<T> T getMapper(Class<T> type);
Connection getConnection();
}
public interface SqlSessionFactory {
SqlSession openSession();
SqlSession openSession(boolean autoCommit);
SqlSession openSession(Connection connection);
SqlSession openSession(TransactionIsolationLevel level);
SqlSession openSession(ExecutorType execType);
SqlSession openSession(ExecutorType execType, boolean autoCommit);
SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);
SqlSession openSession(ExecutorType execType, Connection connection);
Configuration getConfiguration();
}
/**
* 直接通过数据源创建Transaction,再创建executor
*/
private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
Transaction tx = null;
try {
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
final Executor executor = configuration.newExecutor(tx, execType); //创建Executor
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
closeTransaction(tx); // may have fetched a connection so lets call close()
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
/**
* 直接Connection对象创建Transaction,再创建executor
*/
private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
try {
boolean autoCommit;
try {
autoCommit = connection.getAutoCommit();
} catch (SQLException e) {
autoCommit = true;
}
final Environment environment = configuration.getEnvironment();
final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
final Transaction tx = transactionFactory.newTransaction(connection);
final Executor executor = configuration.newExecutor(tx, execType);
return new DefaultSqlSession(configuration, executor, autoCommit);
} catch (Exception e) {
throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e);
} finally {
ErrorContext.instance().reset();
}
}
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
while (conn == null) {
synchronized (state) {
if (state.idleConnections.size() > 0) { //若存在空闲连接,直接返回一个Connection
conn = state.idleConnections.remove(0);
if (log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
} else {
if (state.activeConnections.size() < poolMaximumActiveConnections) { //没有超过配置的最大激活连接数,则创建一个
conn = new PooledConnection(dataSource.getConnection(), this);
Connection realConn = conn.getRealConnection();
} else {
// 获取激活最久的连接
PooledConnection oldestActiveConnection = state.activeConnections.get(0);
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
if (longestCheckoutTime > poolMaximumCheckoutTime) { // 若校验时间大于配置的最大校验时间,则该连接已经过期
// Can claim overdue connection
state.claimedOverdueConnectionCount++; // 统计过期的连接数
state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; //统计累计的过期连接的校验时间和
state.accumulatedCheckoutTime += longestCheckoutTime; // 统计累计的校验时间和
state.activeConnections.remove(oldestActiveConnection); // 移除过期的连接
if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
oldestActiveConnection.getRealConnection().rollback();
}
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); // 创建新连接
oldestActiveConnection.invalidate(); //失效过期连接
} else {
// 必须等待连接
try {
if (!countedWait) {
state.hadToWaitCount++; // 统计等待连接的请求数
countedWait = true;
}
long wt = System.currentTimeMillis();
state.wait(poolTimeToWait); //等待poolTimeToWait时间
state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException e) {
break;
}
}
}
}
if (conn != null) {
if (conn.isValid()) { //有效连接
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis()); //校验时间
conn.setLastUsedTimestamp(System.currentTimeMillis()); //最近使用时间
state.activeConnections.add(conn); //加入到激活连接中
state.requestCount++;
state.accumulatedRequestTime += System.currentTimeMillis() - t;
} else { //无效连接
state.badConnectionCount++; //统计无效连接数
localBadConnectionCount++; //总无效连接
conn = null;
if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if (conn == null) {
if (log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
return conn;
}
public class PoolState {
protected PooledDataSource dataSource;
protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>(); //空闲连接列表
protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>(); //激活连接列表
protected long requestCount = 0; //请求数
protected long accumulatedRequestTime = 0; //累计请求事件
protected long accumulatedCheckoutTime = 0;//累计校验时间
protected long claimedOverdueConnectionCount = 0;//过期连接数
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;//过期连接的累计校验时间
protected long accumulatedWaitTime = 0; //累计等待时间
protected long hadToWaitCount = 0; //等待连接的请求数
protected long badConnectionCount = 0; //无效连接数
...
}
class PooledConnection implements InvocationHandler {
private static final String CLOSE = "close";
private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
private int hashCode = 0;
private PooledDataSource dataSource;
private Connection realConnection;
private Connection proxyConnection;
private long checkoutTimestamp;
private long createdTimestamp;
private long lastUsedTimestamp;
private int connectionTypeCode;
private boolean valid;
...
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
dataSource.pushConnection(this); //close时,将连接放入数据源连接池中
return null;
} else {
try {
if (!Object.class.equals(method.getDeclaringClass())) {
checkConnection();
}
return method.invoke(realConnection, args);
} catch (Throwable t) {
throw ExceptionUtil.unwrapThrowable(t);
}
}
}
}
protected void pushConnection(PooledConnection conn) throws SQLException {
synchronized (state) {
state.activeConnections.remove(conn); //将连接移除激活连接队列
if (conn.isValid()) {
if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
state.idleConnections.add(newConn); // 放入空闲连接
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
conn.invalidate();
state.notifyAll(); //唤醒popConnection中等待连接的请求线程
} else {
state.accumulatedCheckoutTime += conn.getCheckoutTime();
if (!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
conn.getRealConnection().close();
conn.invalidate();
}
} else {
state.badConnectionCount++;
}
}
}