Mybatis事务管理
2015 年 02 月 23 日
mybatis

    Mybatis通过内部的TransactionTransactionFactory来管理事务。 本文将探究一番Mybatis事务管理的一些细节。

  • 事务(Transaction)

  • Mybatis将事务抽象为Transaction,通过管理内部的JDBCConnection对象来完成事务管理。
  • public interface Transaction {
    
        /**
        * 获取Connection对象
        */
        Connection getConnection() throws SQLException;
    
        /**
        * 提交Connection
        */
        void commit() throws SQLException;
    
        /**
        * 回滚Connection
        */
        void rollback() throws SQLException;
    
        /**
        * 关闭Connection
        */
        void close() throws SQLException;
    }
        
  • Mybatis内部有两个具体的事务实现: JdbcTransactionManagedTransaction
  • JdbcTransaction

  • JdbcTransaction使用JDBC的Connection对象的提交和回滚来管理事务,并通过内部的 DataSource对象来获取Connection对象。
  • public class JdbcTransaction implements Transaction {
    
        private static final Log log = LogFactory.getLog(JdbcTransaction.class);
    
        protected Connection connection;            //当前JDBC连接
        protected DataSource dataSource;            //数据源
        protected TransactionIsolationLevel level;  //事务级别
        protected boolean autoCommmit;
    
        public JdbcTransaction(DataSource ds, TransactionIsolationLevel desiredLevel, boolean desiredAutoCommit) {
            dataSource = ds;
            level = desiredLevel;
            autoCommmit = desiredAutoCommit;
        }
    
        public JdbcTransaction(Connection connection) {
            this.connection = connection;
        }
    
        /**
         * 延迟加载Connection对象
         */
        public Connection getConnection() throws SQLException {
            if (connection == null) {
                openConnection();
            }
            return connection;
        }
    
        /**
         * 提交事务
         */
        public void commit() throws SQLException {
            if (connection != null && !connection.getAutoCommit()) {
                if (log.isDebugEnabled()) {
                    log.debug("Committing JDBC Connection [" + connection + "]");
                }
                connection.commit();
            }
        }
    
        /**
         * 回滚事务
         */
        public void rollback() throws SQLException {
            if (connection != null && !connection.getAutoCommit()) {
                if (log.isDebugEnabled()) {
                    log.debug("Rolling back JDBC Connection [" + connection + "]");
                }
                connection.rollback();
            }
        }
    
        /**
         * 关闭事务
         */
        public void close() throws SQLException {
            if (connection != null) {
                resetAutoCommit();
                if (log.isDebugEnabled()) {
                    log.debug("Closing JDBC Connection [" + connection + "]");
                }
                connection.close();
            }
        }
    
        protected void setDesiredAutoCommit(boolean desiredAutoCommit) {
            try {
                if (connection.getAutoCommit() != desiredAutoCommit) {
                    if (log.isDebugEnabled()) {
                        log.debug("Setting autocommit to " + desiredAutoCommit + " on JDBC Connection [" + connection + "]");
                    }
                    connection.setAutoCommit(desiredAutoCommit);
                }
            } catch (SQLException e) {
                throw new TransactionException("Error configuring AutoCommit.  "
                + "Your driver may not support getAutoCommit() or setAutoCommit(). "
                + "Requested setting: " + desiredAutoCommit + ".  Cause: " + e, e);
            }
        }
    
        /**
         * 重置事务为自动提交
         */
        protected void resetAutoCommit() {
            try {
                if (!connection.getAutoCommit()) {
                    if (log.isDebugEnabled()) {
                        log.debug("Resetting autocommit to true on JDBC Connection [" + connection + "]");
                    }
                    connection.setAutoCommit(true);
                }
            } catch (SQLException e) {
                log.debug("Error resetting autocommit to true "
                + "before closing the connection.  Cause: " + e);
            }
        }
    
        /**
         * 从数据源获取Connection对象
         */
        protected void openConnection() throws SQLException {
            if (log.isDebugEnabled()) {
                log.debug("Opening JDBC Connection");
            }
            connection = dataSource.getConnection();
            if (level != null) {
                connection.setTransactionIsolation(level.getLevel());
            }
            setDesiredAutoCommit(autoCommmit);
        }
    }
        
  • 而真正创建事务,是通过工厂类JdbcTransactionFactory
  • public class JdbcTransactionFactory implements TransactionFactory {
    
        public void setProperties(Properties props) {
        }
    
        public Transaction newTransaction(Connection conn) {
            return new JdbcTransaction(conn);
        }
    
        public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
            return new JdbcTransaction(ds, level, autoCommit);
        }
    }
        
  • ManagedTransaction

  • ManagedTransaction本身并不做任何事务管理,其忽略了事务提交回滚的动作,而是交由容器去管理事务。
  • public class ManagedTransaction {
        ...
    
        public void commit() throws SQLException {
        // Does nothing
        }
    
        public void rollback() throws SQLException {
        // Does nothing
        }
    
        ...
    }
        
  • 同样ManagedTransaction的创建,是通过ManagedTransactionFactory工厂类
  • public class ManagedTransactionFactory implements TransactionFactory {
    
        private boolean closeConnection = true;     //关闭事务时是否关闭连接
    
        public void setProperties(Properties props) {
            if (props != null) {
                String closeConnectionProperty = props.getProperty("closeConnection");
                if (closeConnectionProperty != null) {
                    closeConnection = Boolean.valueOf(closeConnectionProperty);
                }
            }
        }
    
        public Transaction newTransaction(Connection conn) {
            return new ManagedTransaction(conn, closeConnection);
        }
    
        public Transaction newTransaction(DataSource ds, TransactionIsolationLevel level, boolean autoCommit) {
            return new ManagedTransaction(ds, level, closeConnection);
        }
    }
        
  • SqlSession

  • 当使用Mybatis进行数据库操作时,都是通过SqlSession内部的Executor对象来进行CRUD等操作
  • public interface SqlSession extends Closeable {
        <T> T selectOne(String statement);
        <T> T selectOne(String statement, Object parameter);
        <E> List<E> selectList(String statement);
        <E> List<E> selectList(String statement, Object parameter);
        <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds);
        <K, V> Map<K, V> selectMap(String statement, String mapKey);
        <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey);
        <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds);
        void select(String statement, Object parameter, ResultHandler handler);
        void select(String statement, ResultHandler handler);
        void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler);
        int insert(String statement);
        int insert(String statement, Object parameter);
        int update(String statement);
        int update(String statement, Object parameter);
        int delete(String statement);
        int delete(String statement, Object parameter);
        void commit();
        void commit(boolean force);
        void rollback();
        void rollback(boolean force);
        List<BatchResult> flushStatements();
        void close();
        void clearCache();
        Configuration getConfiguration();
        <T> T getMapper(Class<T> type);
        Connection getConnection();
    }
        
  • SqlSession由工厂类 SqlSessionFactory创建。
  • public interface SqlSessionFactory {
        SqlSession openSession();
    
        SqlSession openSession(boolean autoCommit);
        SqlSession openSession(Connection connection);
        SqlSession openSession(TransactionIsolationLevel level);
    
        SqlSession openSession(ExecutorType execType);
        SqlSession openSession(ExecutorType execType, boolean autoCommit);
        SqlSession openSession(ExecutorType execType, TransactionIsolationLevel level);
        SqlSession openSession(ExecutorType execType, Connection connection);
    
        Configuration getConfiguration();
    }
        
  • SqlSessionFactory有两个实现DefaultSqlSessionFactorySqlSessionManager,其回话管理机制有所不同。这里主要看下下DefaultSqlSessionFactory
  • DefaultSqlSessionFactory回话管理

  • DefaultSqlSessionFactory回话创建, 主要是通过数据源获取Connection或直接传入Connection对象, 继而创建Transaction对象,最终构造出Executor对象。
  • /**
     * 直接通过数据源创建Transaction,再创建executor
     */
    private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
        Transaction tx = null;
        try {
            final Environment environment = configuration.getEnvironment();
            final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
            tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
            final Executor executor = configuration.newExecutor(tx, execType); //创建Executor
            return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
            closeTransaction(tx); // may have fetched a connection so lets call close()
            throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
            ErrorContext.instance().reset();
        }
    }
    /**
     * 直接Connection对象创建Transaction,再创建executor
     */
    private SqlSession openSessionFromConnection(ExecutorType execType, Connection connection) {
        try {
            boolean autoCommit;
            try {
                autoCommit = connection.getAutoCommit();
            } catch (SQLException e) {
                autoCommit = true;
            }
            final Environment environment = configuration.getEnvironment();
            final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
            final Transaction tx = transactionFactory.newTransaction(connection);
            final Executor executor = configuration.newExecutor(tx, execType);
            return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
            throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
            ErrorContext.instance().reset();
        }
    }
        
  • 数据源管理

  • Mybatis中数据源管理分为三种: UNPOOLEDPOOLEDJNDI。 这里着讲述POOLED(基于连接池的数据源)。
  • POOLED别名的类型为PooledDataSource,其内部包装了一个 UnpooledDataSource,当需要真正创建一个JDBC连接时,则委托给该对象去处理。看看PooledDataSource 是如何获取一个JDBC连接的
  • private PooledConnection popConnection(String username, String password) throws SQLException {
        boolean countedWait = false;
        PooledConnection conn = null;
        long t = System.currentTimeMillis();
        int localBadConnectionCount = 0;
    
        while (conn == null) {
            synchronized (state) {
                if (state.idleConnections.size() > 0) { //若存在空闲连接,直接返回一个Connection
                    conn = state.idleConnections.remove(0);
                    if (log.isDebugEnabled()) {
                        log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
                    }
                } else {
                    if (state.activeConnections.size() < poolMaximumActiveConnections) { //没有超过配置的最大激活连接数,则创建一个
                        conn = new PooledConnection(dataSource.getConnection(), this);
                        Connection realConn = conn.getRealConnection();
                    } else {
                        // 获取激活最久的连接
                        PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                        long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                        if (longestCheckoutTime > poolMaximumCheckoutTime) {    // 若校验时间大于配置的最大校验时间,则该连接已经过期
                            // Can claim overdue connection
                            state.claimedOverdueConnectionCount++;  // 统计过期的连接数
                            state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;   //统计累计的过期连接的校验时间和
                            state.accumulatedCheckoutTime += longestCheckoutTime;       // 统计累计的校验时间和
                            state.activeConnections.remove(oldestActiveConnection);     // 移除过期的连接
                            if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                                oldestActiveConnection.getRealConnection().rollback();
                            }
                            conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); // 创建新连接
                            oldestActiveConnection.invalidate();    //失效过期连接
                        } else {
                            // 必须等待连接
                            try {
                                if (!countedWait) {
                                    state.hadToWaitCount++; // 统计等待连接的请求数
                                    countedWait = true;
                                }
                                long wt = System.currentTimeMillis();
                                state.wait(poolTimeToWait);     //等待poolTimeToWait时间
                                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                            } catch (InterruptedException e) {
                                break;
                            }
                        }
                    }
                }
                if (conn != null) {
                    if (conn.isValid()) { //有效连接
                        if (!conn.getRealConnection().getAutoCommit()) {
                            conn.getRealConnection().rollback();
                        }
                        conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                        conn.setCheckoutTimestamp(System.currentTimeMillis());  //校验时间
                        conn.setLastUsedTimestamp(System.currentTimeMillis());  //最近使用时间
                        state.activeConnections.add(conn);                      //加入到激活连接中
                        state.requestCount++;
                        state.accumulatedRequestTime += System.currentTimeMillis() - t;
                    } else {    //无效连接
                        state.badConnectionCount++; //统计无效连接数
                        localBadConnectionCount++;  //总无效连接
                        conn = null;
                        if (localBadConnectionCount > (poolMaximumIdleConnections + 3)) {
                            throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                        }
                    }
                }
            }
        }
    
        if (conn == null) {
            if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
            }
            throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }
    
        return conn;
    }
        
  • Mybatis将PooledDataSource的状态抽象为PoolState
  • public class PoolState {
    
        protected PooledDataSource dataSource;
    
        protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();     //空闲连接列表
        protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();   //激活连接列表
        protected long requestCount = 0; //请求数
        protected long accumulatedRequestTime = 0; //累计请求事件
        protected long accumulatedCheckoutTime = 0;//累计校验时间
        protected long claimedOverdueConnectionCount = 0;//过期连接数
        protected long accumulatedCheckoutTimeOfOverdueConnections = 0;//过期连接的累计校验时间
        protected long accumulatedWaitTime = 0; //累计等待时间
        protected long hadToWaitCount = 0; //等待连接的请求数
        protected long badConnectionCount = 0; //无效连接数
        ...
    }
        
  • 那么如何PooledDataSource释放连接呢?PooledConnection 对象内部封装了JDBC的Connection,并动态代理了Connection.close方法
  • class PooledConnection implements InvocationHandler {
    
        private static final String CLOSE = "close";
        private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
    
        private int hashCode = 0;
        private PooledDataSource dataSource;
        private Connection realConnection;
        private Connection proxyConnection;
        private long checkoutTimestamp;
        private long createdTimestamp;
        private long lastUsedTimestamp;
        private int connectionTypeCode;
        private boolean valid;
    
        ...
    
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            String methodName = method.getName();
            if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
                dataSource.pushConnection(this); //close时,将连接放入数据源连接池中
                return null;
            } else {
                try {
                    if (!Object.class.equals(method.getDeclaringClass())) {
                        checkConnection();
                    }
                    return method.invoke(realConnection, args);
                } catch (Throwable t) {
                    throw ExceptionUtil.unwrapThrowable(t);
                }
            }
        }
    }
        
  • 看看pushConnection()方法
  • protected void pushConnection(PooledConnection conn) throws SQLException {
        synchronized (state) {
            state.activeConnections.remove(conn); //将连接移除激活连接队列
            if (conn.isValid()) {
                if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
                    state.accumulatedCheckoutTime += conn.getCheckoutTime();
                    if (!conn.getRealConnection().getAutoCommit()) {
                        conn.getRealConnection().rollback();
                    }
                    PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
                    state.idleConnections.add(newConn); // 放入空闲连接
                    newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
                    newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
                    conn.invalidate();
                    state.notifyAll();  //唤醒popConnection中等待连接的请求线程
                } else {
                    state.accumulatedCheckoutTime += conn.getCheckoutTime();
                    if (!conn.getRealConnection().getAutoCommit()) {
                        conn.getRealConnection().rollback();
                    }
                    conn.getRealConnection().close();
                    conn.invalidate();
                }
            } else {
                state.badConnectionCount++;
            }
        }
    }
        
好人,一生平安。