如何在MyBatis中實現DataSource
一、DataSource
首先大傢要清楚DataSource屬於MyBatis三層架構設計的基礎層
然後我們來看看具體的實現。
在數據持久層中,數據源是一個非常重要的組件,其性能直接關系到整個數據持久層的性能,在實際開發中我們常用的數據源有 Apache Common DBCP,C3P0,Druid 等,MyBatis不僅可以集成第三方數據源,還提供的有自己實現的數據源。
在MyBatis中提供瞭兩個 javax.sql.DataSource 接口的實現,分別是 PooledDataSource 和 UnpooledDataSource .
二、DataSourceFactory
DataSourceFactory是用來創建DataSource對象的,接口中聲明瞭兩個方法,作用如下
public interface DataSourceFactory { // 設置 DataSource 的相關屬性,一般緊跟在初始化完成之後 void setProperties(Properties props); // 獲取 DataSource 對象 DataSource getDataSource(); }
DataSourceFactory接口的兩個具體實現是 UnpooledDataSourceFactory 和 PooledDataSourceFactory 這兩個工廠對象的作用通過名稱我們也能發現是用來創建不帶連接池的數據源對象和創建帶連接池的數據源對象,先來看下 UnpooledDataSourceFactory 中的方法
/** * 完成對 UnpooledDataSource 的配置 * @param properties 封裝的有 DataSource 所需要的相關屬性信息 */ @Override public void setProperties(Properties properties) { Properties driverProperties = new Properties(); // 創建 DataSource 對應的 MetaObject 對象 MetaObject metaDataSource = SystemMetaObject.forObject(dataSource); // 遍歷 Properties 集合,該集合中配置瞭數據源需要的信息 for (Object key : properties.keySet()) { String propertyName = (String) key; // 獲取屬性名稱 if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) { // 以 "driver." 開頭的配置項是對 DataSource 的配置 String value = properties.getProperty(propertyName); driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value); } else if (metaDataSource.hasSetter(propertyName)) { // 有該屬性的 setter 方法 String value = (String) properties.get(propertyName); Object convertedValue = convertValue(metaDataSource, propertyName, value); // 設置 DataSource 的相關屬性值 metaDataSource.setValue(propertyName, convertedValue); } else { throw new DataSourceException("Unknown DataSource property: " + propertyName); } } if (driverProperties.size() > 0) { // 設置 DataSource.driverProperties 的屬性值 metaDataSource.setValue("driverProperties", driverProperties); } }
UnpooledDataSourceFactory的getDataSource方法實現比較簡單,直接返回DataSource屬性記錄的 UnpooledDataSource 對象
三、UnpooledDataSource
UnpooledDataSource 是 DataSource接口的其中一個實現,但是 UnpooledDataSource 並沒有提供數據庫連接池的支持,我們來看下他的具體實現吧
聲明的相關屬性信息
private ClassLoader driverClassLoader; // 加載Driver的類加載器 private Properties driverProperties; // 數據庫連接驅動的相關信息 // 緩存所有已註冊的數據庫連接驅動 private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>(); private String driver; // 驅動 private String url; // 數據庫 url private String username; // 賬號 private String password; // 密碼 private Boolean autoCommit; // 是否自動提交 private Integer defaultTransactionIsolationLevel; // 事務隔離級別 private Integer defaultNetworkTimeout;
然後在靜態代碼塊中完成瞭 Driver的復制
static { // 從 DriverManager 中獲取 Drivers Enumeration<Driver> drivers = DriverManager.getDrivers(); while (drivers.hasMoreElements()) { Driver driver = drivers.nextElement(); // 將獲取的 Driver 記錄到 Map 集合中 registeredDrivers.put(driver.getClass().getName(), driver); } }
UnpooledDataSource 中獲取Connection的方法最終都會調用 doGetConnection() 方法。
private Connection doGetConnection(Properties properties) throws SQLException { // 初始化數據庫驅動 initializeDriver(); // 創建真正的數據庫連接 Connection connection = DriverManager.getConnection(url, properties); // 配置Connection的自動提交和事務隔離級別 configureConnection(connection); return connection; }
四、PooledDataSource
有開發經驗的小夥伴都知道,在操作數據庫的時候數據庫連接的創建過程是非常耗時的,數據庫能夠建立的連接數量也是非常有限的,所以數據庫連接池的使用是非常重要的,使用數據庫連接池會給我們帶來很多好處,比如可以實現數據庫連接的重用,提高響應速度,防止數據庫連接過多造成數據庫假死,避免數據庫連接泄漏等等。
首先來看下聲明的相關的屬性
// 管理狀態 private final PoolState state = new PoolState(this); // 記錄UnpooledDataSource,用於生成真實的數據庫連接對象 private final UnpooledDataSource dataSource; // OPTIONAL CONFIGURATION FIELDS protected int poolMaximumActiveConnections = 10; // 最大活躍連接數 protected int poolMaximumIdleConnections = 5; // 最大空閑連接數 protected int poolMaximumCheckoutTime = 20000; // 最大checkout時間 protected int poolTimeToWait = 20000; // 無法獲取連接的線程需要等待的時長 protected int poolMaximumLocalBadConnectionTolerance = 3; // protected String poolPingQuery = "NO PING QUERY SET"; // 測試的SQL語句 protected boolean poolPingEnabled; // 是否允許發送測試SQL語句 // 當連接超過 poolPingConnectionsNotUsedFor毫秒未使用時,會發送一次測試SQL語句,檢測連接是否正常 protected int poolPingConnectionsNotUsedFor; // 根據數據庫URL,用戶名和密碼生成的一個hash值。 private int expectedConnectionTypeCode;
然後重點來看下 getConnection 方法,該方法是用來給調用者提供 Connection 對象的。
@Override public Connection getConnection() throws SQLException { return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection(); }
我們會發現其中調用瞭 popConnection 方法,在該方法中 返回的是 PooledConnection 對象,而 PooledConnection 對象實現瞭 InvocationHandler 接口,所以會使用到Java的動態代理,其中相關的屬性為
private static final String CLOSE = "close"; private static final Class<?>[] IFACES = new Class<?>[] { Connection.class }; private final int hashCode; private final PooledDataSource dataSource; // 真正的數據庫連接 private final Connection realConnection; // 數據庫連接的代理對象 private final Connection proxyConnection; private long checkoutTimestamp; // 從連接池中取出該連接的時間戳 private long createdTimestamp; // 該連接創建的時間戳 private long lastUsedTimestamp; // 最後一次被使用的時間戳 private int connectionTypeCode; // 又數據庫URL、用戶名和密碼計算出來的hash值,可用於標識該連接所在的連接池 // 連接是否有效的標志 private boolean valid;
重點關註下invoke 方法
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (CLOSE.equals(methodName)) { // 如果是 close 方法被執行則將連接放回連接池中,而不是真正的關閉數據庫連接 dataSource.pushConnection(this); return null; } try { if (!Object.class.equals(method.getDeclaringClass())) { // issue #579 toString() should never fail // throw an SQLException instead of a Runtime // 通過上面的 valid 字段來檢測 連接是否有效 checkConnection(); } // 調用真正數據庫連接對象的對應方法 return method.invoke(realConnection, args); } catch (Throwable t) { throw ExceptionUtil.unwrapThrowable(t); } }
還有就是前面提到的 PoolState 對象,它主要是用來管理 PooledConnection 對象狀態的組件,通過兩個 ArrayList 集合分別管理空閑狀態的連接和活躍狀態的連接,定義如下:
protected PooledDataSource dataSource; // 空閑的連接 protected final List<PooledConnection> idleConnections = new ArrayList<>(); // 活躍的連接 protected final List<PooledConnection> activeConnections = new ArrayList<>(); protected long requestCount = 0; // 請求數據庫連接的次數 protected long accumulatedRequestTime = 0; // 獲取連接累計的時間 // CheckoutTime 表示應用從連接池中取出來,到歸還連接的時長 // accumulatedCheckoutTime 記錄瞭所有連接累計的CheckoutTime時長 protected long accumulatedCheckoutTime = 0; // 當連接長時間沒有歸還連接時,會被認為該連接超時 // claimedOverdueConnectionCount 記錄連接超時的個數 protected long claimedOverdueConnectionCount = 0; // 累計超時時間 protected long accumulatedCheckoutTimeOfOverdueConnections = 0; // 累計等待時間 protected long accumulatedWaitTime = 0; // 等待次數 protected long hadToWaitCount = 0; // 無效連接數 protected long badConnectionCount = 0;
再回到 popConnection 方法中來看
private PooledConnection popConnection(String username, String password) throws SQLException { boolean countedWait = false; PooledConnection conn = null; long t = System.currentTimeMillis(); int localBadConnectionCount = 0; while (conn == null) { synchronized (state) { // 同步 if (!state.idleConnections.isEmpty()) { // 檢測空閑連接 // Pool has available connection 連接池中有空閑的連接 conn = state.idleConnections.remove(0); // 獲取連接 if (log.isDebugEnabled()) { log.debug("Checked out connection " + conn.getRealHashCode() + " from pool."); } } else {// 當前連接池 沒有空閑連接 // Pool does not have available connection if (state.activeConnections.size() < poolMaximumActiveConnections) { // 活躍數沒有達到最大連接數 可以創建新的連接 // Can create new connection 創建新的數據庫連接 conn = new PooledConnection(dataSource.getConnection(), this); if (log.isDebugEnabled()) { log.debug("Created connection " + conn.getRealHashCode() + "."); } } else { // 活躍數已經達到瞭最大數 不能創建新的連接 // Cannot create new connection 獲取最先創建的活躍連接 PooledConnection oldestActiveConnection = state.activeConnections.get(0); // 獲取該連接的超時時間 long longestCheckoutTime = oldestActiveConnection.getCheckoutTime(); // 檢查是否超時 if (longestCheckoutTime > poolMaximumCheckoutTime) { // Can claim overdue connection 對超時連接的信息進行統計 state.claimedOverdueConnectionCount++; state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime; state.accumulatedCheckoutTime += longestCheckoutTime; // 將超時連接移除 activeConnections state.activeConnections.remove(oldestActiveConnection); if (!oldestActiveConnection.getRealConnection().getAutoCommit()) { // 如果超時連接沒有提交 則自動回滾 try { oldestActiveConnection.getRealConnection().rollback(); } catch (SQLException e) { /* Just log a message for debug and continue to execute the following statement like nothing happened. Wrap the bad connection with a new PooledConnection, this will help to not interrupt current executing thread and give current thread a chance to join the next competition for another valid/good database connection. At the end of this loop, bad {@link @conn} will be set as null. */ log.debug("Bad connection. Could not roll back"); } } // 創建 PooledConnection,但是數據庫中的真正連接並沒有創建 conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this); conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp()); conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp()); // 將超時的 PooledConnection 設置為無效 oldestActiveConnection.invalidate(); if (log.isDebugEnabled()) { log.debug("Claimed overdue connection " + conn.getRealHashCode() + "."); } } else { // Must wait 無空閑連接,無法創建新連接和無超時連接 那就隻能等待 try { if (!countedWait) { state.hadToWaitCount++; // 統計等待次數 countedWait = true; } if (log.isDebugEnabled()) { log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection."); } long wt = System.currentTimeMillis(); state.wait(poolTimeToWait); // 阻塞等待 // 統計累計的等待時間 state.accumulatedWaitTime += System.currentTimeMillis() - wt; } catch (InterruptedException e) { break; } } } } if (conn != null) { // ping to server and check the connection is valid or not // 檢查 PooledConnection 是否有效 if (conn.isValid()) { if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } // 配置 PooledConnection 的相關屬性 conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password)); conn.setCheckoutTimestamp(System.currentTimeMillis()); conn.setLastUsedTimestamp(System.currentTimeMillis()); state.activeConnections.add(conn); state.requestCount++; // 進行相關的統計 state.accumulatedRequestTime += System.currentTimeMillis() - t; } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection."); } state.badConnectionCount++; localBadConnectionCount++; conn = null; if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) { if (log.isDebugEnabled()) { log.debug("PooledDataSource: Could not get a good connection to the database."); } throw new SQLException("PooledDataSource: Could not get a good connection to the database."); } } } } }
為瞭更好的理解代碼的含義,我們繪制瞭對應的流程圖
然後我們來看下當我們從連接池中使用完成瞭數據庫的相關操作後,是如何來關閉連接的呢?通過前面的 invoke 方法的介紹其實我們能夠發現,當我們執行代理對象的 close 方法的時候其實是執行的 pushConnection 方法。
具體的實現代碼為
protected void pushConnection(PooledConnection conn) throws SQLException { synchronized (state) { // 從 activeConnections 中移除 PooledConnection 對象 state.activeConnections.remove(conn); if (conn.isValid()) { // 檢測 連接是否有效 if (state.idleConnections.size() < poolMaximumIdleConnections // 是否達到上限 && conn.getConnectionTypeCode() == expectedConnectionTypeCode // 該 PooledConnection 是否為該連接池的連接 ) { state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累計 checkout 時長 if (!conn.getRealConnection().getAutoCommit()) { // 回滾未提交的事務 conn.getRealConnection().rollback(); } // 為返還連接創建新的 PooledConnection 對象 PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this); // 添加到 空閑連接集合中 state.idleConnections.add(newConn); newConn.setCreatedTimestamp(conn.getCreatedTimestamp()); newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp()); conn.invalidate(); // 將原來的 PooledConnection 連接設置為無效 if (log.isDebugEnabled()) { log.debug("Returned connection " + newConn.getRealHashCode() + " to pool."); } // 喚醒阻塞等待的線程 state.notifyAll(); } else { // 空閑連接達到上限或者 PooledConnection不屬於當前的連接池 state.accumulatedCheckoutTime += conn.getCheckoutTime(); // 累計 checkout 時長 if (!conn.getRealConnection().getAutoCommit()) { conn.getRealConnection().rollback(); } conn.getRealConnection().close(); // 關閉真正的數據庫連接 if (log.isDebugEnabled()) { log.debug("Closed connection " + conn.getRealHashCode() + "."); } conn.invalidate(); // 設置 PooledConnection 無線 } } else { if (log.isDebugEnabled()) { log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection."); } state.badConnectionCount++; // 統計無效的 PooledConnection 對象個數 } } }
為瞭便於理解,我們同樣的來繪制對應的流程圖:
還有就是我們在源碼中多處有看到 conn.isValid方法來檢測連接是否有效
public boolean isValid() { return valid && realConnection != null && dataSource.pingConnection(this); }
dataSource.pingConnection(this)中會真正的實現數據庫的SQL執行操作
最後一點要註意的是在我們修改瞭任意的PooledDataSource中的屬性的時候都會執行forceCloseAll來強制關閉所有的連接。
/** * Closes all active and idle connections in the pool. */ public void forceCloseAll() { synchronized (state) { // 更新 當前的 連接池 標識 expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword()); for (int i = state.activeConnections.size(); i > 0; i--) {// 處理全部的活躍連接 try { // 獲取 獲取的連接 PooledConnection conn = state.activeConnections.remove(i - 1); conn.invalidate(); // 標識為無效連接 // 獲取真實的 數據庫連接 Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); // 回滾未處理的事務 } realConn.close(); // 關閉真正的數據庫連接 } catch (Exception e) { // ignore } } // 同樣的邏輯處理空閑的連接 for (int i = state.idleConnections.size(); i > 0; i--) { try { PooledConnection conn = state.idleConnections.remove(i - 1); conn.invalidate(); Connection realConn = conn.getRealConnection(); if (!realConn.getAutoCommit()) { realConn.rollback(); } realConn.close(); } catch (Exception e) { // ignore } } } if (log.isDebugEnabled()) { log.debug("PooledDataSource forcefully closed/removed all connections."); } }
到此這篇關於如何在MyBatis中實現DataSource的文章就介紹到這瞭,更多相關DataSource的實現內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Data Source與數據庫連接池簡介(JDBC簡介)
- SpringBoot默認使用HikariDataSource數據源方式
- MySQL使用ReplicationConnection導致連接失效解決
- springBoot下實現java自動創建數據庫表
- Java JDBC自定義封裝工具類的步驟和完整代碼