Python 中創建 PostgreSQL 數據庫連接池
習慣於使用數據庫之前都必須創建一個連接池,即使是單線程的應用,隻要有多個方法中需用到數據庫連接,建立一兩個連接的也會考慮先池化他們。連接池的好處多多,
- 1) 如果反復創建連接相當耗時,
- 2) 對於單個連接一路用到底的應用,有連接池時避免瞭數據庫連接對象傳來傳去,
- 3) 忘記關連接瞭,連接池幸許還能幫忙在一定時長後關掉,當然密集取連接的應用勢將耗盡連接,
- 4) 一個應用打開連接的數量是可控的
接觸到 Python
後,在使用 PostgreSQL
也自然而然的考慮創建連接池,使用時從池中取,用完後還回去,而不是每次需要連接時創建一個物理的。Python
連接 PostgreSQL
是主要有兩個包, py-postgresql
和 psycopg2
, 而本文的實例將使用後者。
Psycopg
在 psycopg2.pool
模塊中提供瞭兩個連接池的實現在,它們都繼承自 psycopg2.pool.AbstractConnectionPool,
該抽象類的基本方法是
getconn(key=None):
獲取連接putconn(conn, key=None, close=False):
歸還連接closeall():
關閉連接池中的所有連接
兩個連接池的實現類是
psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwars)
: 給單線程應用用的psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwars)
: 多線程時更安全,其實就是在getconn()
和putconn()
時加瞭鎖來控制
所以最安全保險的做法還是使用 ThreadedConnectionPool
, 在單線程應用中, SimpleConnectionPool
也不見得比 ThreadedConnectionPool
效率高多少。
下面來看一個具體的連接池實現,其中用到瞭 Context Manager
, 使用時結合 with
鍵字更方便,用完後不用顯式的調用 putconn()
歸還連接
db_helper.py
from psycopg2 import pool from psycopg2.extras import RealDictCursor from contextlib import contextmanager import atexit class DBHelper: def __init__(self): self._connection_pool = None def initialize_connection_pool(self): db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5' self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn) @contextmanager def get_resource(self, autocommit=True): if self._connection_pool is None: self.initialize_connection_pool() conn = self._connection_pool.getconn() conn.autocommit = autocommit cursor = conn.cursor(cursor_factory=RealDictCursor) try: yield cursor, conn finally: cursor.close() self._connection_pool.putconn(conn) def shutdown_connection_pool(self): if self._connection_pool is not None: self._connection_pool.closeall() db_helper = DBHelper() @atexit.register def shutdown_connection_pool(): db_helper.shutdown_connection_pool() from psycopg2 import pool from psycopg2 . extras import RealDictCursor from contextlib import contextmanager import atexit class DBHelper : def __init__ ( self ) : self . _connection_pool = None def initialize_connection_pool ( self ) : db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5' self . _connection_pool = pool . ThreadedConnectionPool ( 1 , 3 , db_dsn ) @ contextmanager def get_resource ( self , autocommit = True ) : if self . _connection_pool is None : self . initialize_connection_pool ( ) conn = self . _connection_pool . getconn ( ) conn . autocommit = autocommit cursor = conn . cursor ( cursor_factory = RealDictCursor ) try : yield cursor , conn finally : cursor . close ( ) self . _connection_pool . putconn ( conn ) def shutdown_connection_pool ( self ) : if self . _connection_pool is not None : self . _connection_pool . closeall ( ) db_helper = DBHelper ( ) @ atexit . register def shutdown_connection_pool ( ) : db_helper . shutdown_connection_pool ( )
幾點說明:
- 隻在第一次調用
get_resource()
時創建連接池,而不是在from db_helper import db_helper
引用時就創建連接池 Context Manager
返回瞭兩個對象,cursor
和connection
, 需要用connection
管理事物時用它- 默認時
cursor
返回的記錄是字典,而非數組 - 默認時連接為自動提交
- 最後的
@atexit.register
那個ShutdownHook
可能有點多餘,在進程退出時連接也被關閉,TIME_WAIT
時間應該會稍長些
使用方式:
如果不用事物
from db_helper import db_helper with db_helper.get_resource() as (cursor, _): cursor.execute('select * from users') for record in cursor.fetchall(): ... process record, record['name'] ... from db_helper import db_helper with db_helper . get_resource ( ) as ( cursor , _ ) : cursor . execute ( 'select * from users' ) for record in cursor . fetchall ( ) : . . . process record , record [ 'name' ] . . .
如果需要用到事物
with db_helper.get_resource(autocommit=False) as (cursor, _): try: cursor.execute('update users set name = %s where id = %s', ('new_name', 1)) cursor.execute('delete from orders where user_id = %s', (1,)) conn.commit() except: conn.rollback() with db_helper . get_resource ( autocommit = False ) as ( cursor , _ ) : try : cursor . execute ( 'update users set name = %s where id = %s' , ( 'new_name' , 1 ) ) cursor . execute ( 'delete from orders where user_id = %s' , ( 1 , ) ) conn . commit ( ) except : conn . rollback ( )
在寫作本文時,查看 psycopg 的官網時,發現 Psycopg 3.0
正式版在 2021-10-13 日發佈瞭( Psycopg 3.0 released ), 更好的支持 async。在 Psycopg2 2.2 版本時就開始支持異步瞭。而且還註意到 Psycopg
的主要部分是用 C 實現的,才使得它效率比較高,也難怪經常用 pip install psycopg2
安裝不成功,而要用 pip install psycopg2-binary
來安裝的原因。
在創建連接池時加上參數 keepalivesXxx
能讓服務器及時斷掉死鏈接,否則在 Linux
下默認要 2 個小時後才斷開。死鏈接的情況發生在客戶端異常退出(如斷電)時先前建立的鏈接就變為死鏈接瞭。
pool.ThreadedConnectionPool(1, 3, db_dsn, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
PostgreSQL
服務端會對連接在空閑 tcp_keepalives_idle
秒後,主動發送tcp_keepalives_count 個 tcp_keeplive
偵測包,每個偵探包在 tcp_keepalives_interval
秒內都沒有回應,就認為是死連接,於是切斷它。
到此這篇關於Python
中創建 PostgreSQL
數據庫連接池的文章就介紹到這瞭,更多相關PostgreSQL Python
內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!
推薦閱讀:
- Python操作PostgreSql數據庫的方法(基本的增刪改查)
- 淺析python連接數據庫的重要事項
- python調用pymssql包操作SqlServer數據庫的實現
- 安裝python依賴包psycopg2來調用postgresql的操作
- python 操作sqlite數據庫的方法