Python封裝數據庫連接池詳解
前言:
線程安全問題:當2個線程同時用到線程池時,會同時創建2個線程池。如果多個線程,錯開用到線程池,就隻會創建一個線程池,會共用一個線程池。我用的註解方式的單例模式,感覺就是這個註解的單例方式,解決瞭多線程問題,但是沒解決線程安全問題,需要優化這個單例模式。
主要通過 PooledDB 模塊實現。
一、數據庫封裝
1.1數據庫基本配置
db_config.py
# -*- coding: UTF-8 -*- import pymysql # 數據庫信息 DB_TEST_HOST = "127.0.0.1" DB_TEST_PORT = 3308 DB_TEST_DBNAME = "bt" DB_TEST_USER = "root" DB_TEST_PASSWORD = "123456" # 數據庫連接編碼 DB_CHARSET = "utf8" # mincached : 啟動時開啟的閑置連接數量(缺省值 0 開始時不創建連接) DB_MIN_CACHED = 5 # maxcached : 連接池中允許的閑置的最多連接數量(缺省值 0 代表不閑置連接池大小) DB_MAX_CACHED = 0 # maxshared : 共享連接數允許的最大數量(缺省值 0 代表所有連接都是專用的)如果達到瞭最大數量,被請求為共享的連接將會被共享使用 DB_MAX_SHARED = 5 # maxconnecyions : 創建連接池的最大數量(缺省值 0 代表不限制) DB_MAX_CONNECYIONS = 300 # blocking : 設置在連接池達到最大數量時的行為(缺省值 0 或 False 代表返回一個錯誤<toMany......> 其他代表阻塞直到連接數減少,連接被分配) DB_BLOCKING = True # maxusage : 單個連接的最大允許復用次數(缺省值 0 或 False 代表不限制的復用).當達到最大數時,連接會自動重新連接(關閉和重新打開) DB_MAX_USAGE = 0 # setsession : 一個可選的SQL命令列表用於準備每個會話,如["set datestyle to german", ...] DB_SET_SESSION = None # creator : 使用連接數據庫的模塊 DB_CREATOR = pymysql
設置連接池最大最小為5個。則啟動連接池時,就會建立5個連接。
1.2 編寫單例模式註解
singleton.py
#單例模式函數,用來修飾類 def singleton(cls,*args,**kw): instances = {} def _singleton(): if cls not in instances: instances[cls] = cls(*args,**kw) return instances[cls] return _singleton
1.3 構建連接池
db_dbutils_init.py
from dbutils.pooled_db import PooledDB import db_config as config # import random from singleton import singleton """ @功能:創建數據庫連接池 """ class MyConnectionPool(object): # 私有屬性 # 能通過對象直接訪問,但是可以在本類內部訪問; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 創建數據庫連接conn和遊標cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 創建數據庫連接池 def __getconn(self): if self.__pool is None: # i = random.randint(1, 100) # print("創建線程池的數量"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連接池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關閉連接歸還給鏈接池 # def close(self): # self.cursor.close() # self.conn.close() # 從連接池中取出一個連接 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連接池,實例化 @singleton def get_my_connection(): return MyConnectionPool()
1.4 封裝Python操作MYSQL的代碼
mysqlhelper.py
import time from db_dbutils_init import get_my_connection """執行語句查詢有結果返回結果沒有返回0;增/刪/改返回變更數據條數,沒有返回0""" class MySqLHelper(object): def __init__(self): self.db = get_my_connection() # 從數據池中獲取連接 # # def __new__(cls, *args, **kwargs): # if not hasattr(cls, 'inst'): # 單例 # cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs) # return cls.inst # 封裝執行命令 def execute(self, sql, param=None, autoclose=False): """ 【主要判斷是否有參數和是否執行完就釋放連接】 :param sql: 字符串類型,sql語句 :param param: sql語句中要替換的參數"select %s from tab where id=%s" 其中的%s就是參數 :param autoclose: 是否關閉連接 :return: 返回連接conn和遊標cursor """ cursor, conn = self.db.getconn() # 從連接池獲取連接 count = 0 try: # count : 為改變的數據條數 if param: count = cursor.execute(sql, param) else: count = cursor.execute(sql) conn.commit() if autoclose: self.close(cursor, conn) except Exception as e: pass return cursor, conn, count # 釋放連接 def close(self, cursor, conn): """釋放連接歸還給連接池""" cursor.close() conn.close() # 查詢所有 def selectall(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchall() return res except Exception as e: print(e) self.close(cursor, conn) return count # 查詢單條 def selectone(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) res = cursor.fetchone() self.close(cursor, conn) return res except Exception as e: print("error_msg:", e.args) self.close(cursor, conn) return count # 增加 def insertone(self, sql, param): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) # _id = cursor.lastrowid() # 獲取當前插入數據的主鍵id,該id應該為自動生成為好 conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 增加多行 def insertmany(self, sql, param): """ :param sql: :param param: 必須是元組或列表[(),()]或((),()) :return: """ cursor, conn, count = self.db.getconn() try: cursor.executemany(sql, param) conn.commit() return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 刪除 def delete(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # 更新 def update(self, sql, param=None): cursor = None conn = None count = None try: cursor, conn, count = self.execute(sql, param) conn.commit() self.close(cursor, conn) return count except Exception as e: print(e) conn.rollback() self.close(cursor, conn) return count # if __name__ == '__main__': # db = MySqLHelper() # sql = "SELECT SLEEP(10)" # db.execute(sql) # time.sleep(20) # TODO 查詢單條 # sql1 = 'select * from userinfo where name=%s' # args = 'python' # ret = db.selectone(sql=sql1, param=args) # print(ret) # (None, b'python', b'123456', b'0') # TODO 增加單條 # sql2 = 'insert into hotel_urls(cname,hname,cid,hid,url) values(%s,%s,%s,%s,%s)' # ret = db.insertone(sql2, ('1', '2', '1', '2', '2')) # print(ret) # TODO 增加多條 # sql3 = 'insert into userinfo (name,password) VALUES (%s,%s)' # li = li = [ # ('分省', '123'), # ('到達','456') # ] # ret = db.insertmany(sql3,li) # print(ret) # TODO 刪除 # sql4 = 'delete from userinfo WHERE name=%s' # args = 'xxxx' # ret = db.delete(sql4, args) # print(ret) # TODO 更新 # sql5 = r'update userinfo set password=%s WHERE name LIKE %s' # args = ('993333993', '%old%') # ret = db.update(sql5, args) # print(ret)
二、連接池測試
修改 db_dbutils_init.py 文件,在創建連接池def __getconn(self):方法下,加一個打印隨機數,方便將來我們定位是否時單例的線程池。
修改後的db_dbutils_init.py 文件:
from dbutils.pooled_db import PooledDB import db_config as config import random from singleton import singleton """ @功能:創建數據庫連接池 """ class MyConnectionPool(object): # 私有屬性 # 能通過對象直接訪問,但是可以在本類內部訪問; __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() # 創建數據庫連接conn和遊標cursor def __enter__(self): self.conn = self.__getconn() self.cursor = self.conn.cursor() # 創建數據庫連接池 def __getconn(self): if self.__pool is None: i = random.randint(1, 100) print("線程池的隨機數"+str(i)) self.__pool = PooledDB( creator=config.DB_CREATOR, mincached=config.DB_MIN_CACHED, maxcached=config.DB_MAX_CACHED, maxshared=config.DB_MAX_SHARED, maxconnections=config.DB_MAX_CONNECYIONS, blocking=config.DB_BLOCKING, maxusage=config.DB_MAX_USAGE, setsession=config.DB_SET_SESSION, host=config.DB_TEST_HOST, port=config.DB_TEST_PORT, user=config.DB_TEST_USER, passwd=config.DB_TEST_PASSWORD, db=config.DB_TEST_DBNAME, use_unicode=False, charset=config.DB_CHARSET ) return self.__pool.connection() # 釋放連接池資源 def __exit__(self, exc_type, exc_val, exc_tb): self.cursor.close() self.conn.close() # 關閉連接歸還給鏈接池 # def close(self): # self.cursor.close() # self.conn.close() # 從連接池中取出一個連接 def getconn(self): conn = self.__getconn() cursor = conn.cursor() return cursor, conn # 獲取連接池,實例化 @singleton def get_my_connection(): return MyConnectionPool()
開始測試:
場景一:同一個實例,執行2次sql
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db = MySqLHelper() db.execute(sql) db.execute(sql1) time.sleep(20)
在數據庫中,使用 show processlist;
show processlist;
當執行第一個sql時。數據庫連接顯示。
當執行第二個sql時。數據庫連接顯示:
當執行完sql,程序sleep時。數據庫連接顯示:
程序打印結果:
線程池的隨機數43
由以上可以得出結論:
線程池啟動後,生成瞭5個連接。執行第一個sql時,使用瞭1個連接。執行完第一個sql後,使用瞭另外1個連接。 這是一個線性的,線程池中一共5個連接,但是每次執行,隻使用瞭其中一個。
有個疑問,連接池如果不支持並發是不是就毫無意義?
如上,雖然開瞭線程池5個連接,但是每次執行sql,隻用到瞭一個連接。那為何不設置線程池大小為1呢?設置線程池大小的意義何在呢?(如果在非並發的場景下,是不是設置大小無意義?)
相比於不用線程池的優點:
如果不用線程池,則每次執行一個sql都要創建、斷開連接。 像我們這樣使用連接池,不用反復創建、斷開連接,拿現成的連接直接用就好瞭。
場景二:依次創建2個實例,各自執行sql
from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() db1 = MySqLHelper() sql = "SELECT SLEEP(10)" sql1 = "SELECT SLEEP(15)" db.execute(sql) db1.execute(sql1) time.sleep(20)
第一個實例db,執行sql。線程池啟動瞭5個連接
第二個實例db1,執行sql:
程序睡眠時,一共5個線程池:
打印結果:
結果證明:
雖然我們依次創建瞭2個實例,但是(1)創建線程池的打印結果,隻打印1次,且從始至終,線程池一共隻啟動瞭5個連接,且連接的id沒有發生改變,說明一直是這5個連接。
證明,我們雖然創建瞭2個實例,但是這2個實例其實是一個實例。(單例模式是生效的)
場景三:啟動2個線程,但是線程在創建連接池實例時,有時間間隔
import threading from mysqlhelper import MySqLHelper import time def sl1(): time.sleep(2) db = MySqLHelper() sql = "SELECT SLEEP(6)" db.execute(sql) def sl2(): time.sleep(4) db = MySqLHelper() sql = "SELECT SLEEP(15)" db.execute(sql) if __name__ == '__main__': threads = [] t1 = threading.Thread(target=sl1) threads.append(t1) t2 = threading.Thread(target=sl2) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
2個線程間隔瞭2秒。
觀察數據庫的連接數量:
打印結果:
在並發執行2個sql時,共用瞭這5個連接,且打印結果隻打印瞭一次,說明雖然並發創建瞭2次實例,但真正隻創建瞭一個連接池。
場景四:啟動2個線程,線程在創建連接池實例時,沒有時間間隔
import threading from mysqlhelper import MySqLHelper import time if __name__ == '__main__': db = MySqLHelper() sql = "SELECT SLEEP(6)" sql1 = "SELECT SLEEP(15)" threads = [] t1 = threading.Thread(target=db.execute, args=(sql,)) threads.append(t1) t2 = threading.Thread(target=db.execute, args=(sql1,)) threads.append(t2) for t in threads: t.setDaemon(True) t.start() time.sleep(20)
觀察數據庫連接 :
打印結果:
結果表明:
終端打印瞭2次,數據庫建立瞭10個連接,說明創建瞭2個線程池。這樣的單例模式,存在線程安全問題。
到此這篇關於Python封裝數據庫連接池詳解的文章就介紹到這瞭,更多相關Python連接池內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Python 中創建 PostgreSQL 數據庫連接池
- python 操作sqlite數據庫的方法
- python執行數據庫的查詢操作實例講解
- 詳解Python如何利用pymysql封裝項目通用的連接和查詢
- 利用Python連接Oracle數據庫的基本操作指南